diff --git a/src/config.rs b/src/config.rs index 4e85044f1..2e14a33e6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -48,7 +48,7 @@ struct Inner { client_timeout: u64, client_disconnect: u64, ka_enabled: bool, - date: UnsafeCell<(bool, Date)>, + timer: DateService, } impl Clone for ServiceConfig { @@ -78,7 +78,7 @@ impl ServiceConfig { ka_enabled, client_timeout, client_disconnect, - date: UnsafeCell::new((false, Date::new())), + timer: DateService::with(Duration::from_millis(500)), })) } @@ -99,17 +99,14 @@ impl ServiceConfig { self.0.ka_enabled } - fn update_date(&self) { - // Unsafe: WorkerSetting is !Sync and !Send - unsafe { (*self.0.date.get()).0 = false }; - } - #[inline] /// Client timeout for first request. pub fn client_timer(&self) -> Option { let delay = self.0.client_timeout; if delay != 0 { - Some(Delay::new(self.now() + Duration::from_millis(delay))) + Some(Delay::new( + self.0.timer.now() + Duration::from_millis(delay), + )) } else { None } @@ -119,7 +116,7 @@ impl ServiceConfig { pub fn client_timer_expire(&self) -> Option { let delay = self.0.client_timeout; if delay != 0 { - Some(self.now() + Duration::from_millis(delay)) + Some(self.0.timer.now() + Duration::from_millis(delay)) } else { None } @@ -129,7 +126,7 @@ impl ServiceConfig { pub fn client_disconnect_timer(&self) -> Option { let delay = self.0.client_disconnect; if delay != 0 { - Some(self.now() + Duration::from_millis(delay)) + Some(self.0.timer.now() + Duration::from_millis(delay)) } else { None } @@ -139,7 +136,7 @@ impl ServiceConfig { /// Return keep-alive timer delay is configured. pub fn keep_alive_timer(&self) -> Option { if let Some(ka) = self.0.keep_alive { - Some(Delay::new(self.now() + ka)) + Some(Delay::new(self.0.timer.now() + ka)) } else { None } @@ -148,57 +145,23 @@ impl ServiceConfig { /// Keep-alive expire time pub fn keep_alive_expire(&self) -> Option { if let Some(ka) = self.0.keep_alive { - Some(self.now() + ka) + Some(self.0.timer.now() + ka) } else { None } } - pub(crate) fn set_date(&self, dst: &mut BytesMut, full: bool) { - // Unsafe: WorkerSetting is !Sync and !Send - let date_bytes = unsafe { - let date = &mut (*self.0.date.get()); - if !date.0 { - date.1.update(); - date.0 = true; - - // periodic date update - let s = self.clone(); - spawn(sleep(Duration::from_millis(500)).then(move |_| { - s.update_date(); - future::ok(()) - })); - } - &date.1.bytes - }; - if full { - let mut buf: [u8; 39] = [0; 39]; - buf[..6].copy_from_slice(b"date: "); - buf[6..35].copy_from_slice(date_bytes); - buf[35..].copy_from_slice(b"\r\n\r\n"); - dst.extend_from_slice(&buf); - } else { - dst.extend_from_slice(date_bytes); - } - } - #[inline] pub(crate) fn now(&self) -> Instant { - unsafe { - let date = &mut (*self.0.date.get()); - if !date.0 { - date.1.update(); - date.0 = true; + self.0.timer.now() + } - // periodic date update - let s = self.clone(); - spawn(sleep(Duration::from_millis(500)).then(move |_| { - s.update_date(); - future::ok(()) - })); - } - date.1.current - } + pub(crate) fn set_date(&self, dst: &mut BytesMut) { + let mut buf: [u8; 39] = [0; 39]; + buf[..6].copy_from_slice(b"date: "); + buf[6..35].copy_from_slice(&self.0.timer.date().bytes); + buf[35..].copy_from_slice(b"\r\n\r\n"); + dst.extend_from_slice(&buf); } } @@ -311,7 +274,6 @@ impl ServiceConfigBuilder { } struct Date { - current: Instant, bytes: [u8; DATE_VALUE_LENGTH], pos: usize, } @@ -319,7 +281,6 @@ struct Date { impl Date { fn new() -> Date { let mut date = Date { - current: Instant::now(), bytes: [0; DATE_VALUE_LENGTH], pos: 0, }; @@ -328,7 +289,6 @@ impl Date { } fn update(&mut self) { self.pos = 0; - self.current = Instant::now(); write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap(); } } @@ -342,6 +302,68 @@ impl fmt::Write for Date { } } +#[derive(Clone)] +struct DateService(Rc); + +struct DateServiceInner { + interval: Duration, + current: UnsafeCell>, +} + +impl DateServiceInner { + fn new(interval: Duration) -> Self { + DateServiceInner { + interval, + current: UnsafeCell::new(None), + } + } + + fn get_ref(&self) -> &Option<(Date, Instant)> { + unsafe { &*self.current.get() } + } + + fn reset(&self) { + unsafe { (&mut *self.current.get()).take() }; + } + + fn update(&self) { + let now = Instant::now(); + let date = Date::new(); + *(unsafe { &mut *self.current.get() }) = Some((date, now)); + } +} + +impl DateService { + fn with(resolution: Duration) -> Self { + DateService(Rc::new(DateServiceInner::new(resolution))) + } + + fn check_date(&self) { + if self.0.get_ref().is_none() { + self.0.update(); + + // periodic date update + let s = self.clone(); + spawn(sleep(Duration::from_millis(500)).then(move |_| { + s.0.reset(); + future::ok(()) + })); + } + } + + fn now(&self) -> Instant { + self.check_date(); + self.0.get_ref().as_ref().unwrap().1 + } + + fn date(&self) -> &Date { + self.check_date(); + + let item = self.0.get_ref().as_ref().unwrap(); + &item.0 + } +} + #[cfg(test)] mod tests { use super::*; @@ -360,9 +382,9 @@ mod tests { let _ = rt.block_on(future::lazy(|| { let settings = ServiceConfig::new(KeepAlive::Os, 0, 0); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf1, true); + settings.set_date(&mut buf1); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf2, true); + settings.set_date(&mut buf2); assert_eq!(buf1, buf2); future::ok::<_, ()>(()) })); diff --git a/src/h1/codec.rs b/src/h1/codec.rs index 247b0f01c..d0faad43f 100644 --- a/src/h1/codec.rs +++ b/src/h1/codec.rs @@ -7,6 +7,7 @@ use tokio_codec::{Decoder, Encoder}; use super::decoder::{PayloadDecoder, PayloadItem, RequestDecoder}; use super::encoder::{ResponseEncoder, ResponseLength}; use body::Body; +use config::ServiceConfig; use error::ParseError; use helpers; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; @@ -48,6 +49,7 @@ pub enum InMessage { /// HTTP/1 Codec pub struct Codec { + config: ServiceConfig, decoder: RequestDecoder, payload: Option, version: Version, @@ -62,20 +64,19 @@ impl Codec { /// Create HTTP/1 codec. /// /// `keepalive_enabled` how response `connection` header get generated. - pub fn new(keepalive_enabled: bool) -> Self { - Codec::with_pool(RequestPool::pool(), keepalive_enabled) + pub fn new(config: ServiceConfig) -> Self { + Codec::with_pool(RequestPool::pool(), config) } /// Create HTTP/1 codec with request's pool - pub(crate) fn with_pool( - pool: &'static RequestPool, keepalive_enabled: bool, - ) -> Self { - let flags = if keepalive_enabled { + pub(crate) fn with_pool(pool: &'static RequestPool, config: ServiceConfig) -> Self { + let flags = if config.keep_alive_enabled() { Flags::KEEPALIVE_ENABLED } else { Flags::empty() }; Codec { + config, decoder: RequestDecoder::with_pool(pool), payload: None, version: Version::HTTP_11, @@ -217,7 +218,7 @@ impl Codec { // optimized date header, set_date writes \r\n if !has_date { - // self.settings.set_date(&mut buffer, true); + self.config.set_date(buffer); buffer.extend_from_slice(b"\r\n"); } else { // msg eof diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index d44e687d1..c8ce7d65e 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -97,7 +97,7 @@ where } else { Flags::FLUSHED }; - let framed = Framed::new(stream, Codec::new(keepalive)); + let framed = Framed::new(stream, Codec::new(config.clone())); let (ka_expire, ka_timer) = if let Some(delay) = timeout { (delay.deadline(), Some(delay)) diff --git a/src/lib.rs b/src/lib.rs index 8a7bcfa43..85bf9c2ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,17 +48,10 @@ //! //! ## Features //! -//! * Supported *HTTP/1.x* and *HTTP/2.0* protocols +//! * Supported *HTTP/1.x* protocol //! * Streaming and pipelining //! * Keep-alive and slow requests handling //! * `WebSockets` server/client -//! * Transparent content compression/decompression (br, gzip, deflate) -//! * Configurable request routing -//! * Graceful server shutdown -//! * Multipart streams -//! * SSL support with OpenSSL or `native-tls` -//! * Middlewares (`Logger`, `Session`, `CORS`, `CSRF`, `DefaultHeaders`) -//! * Built on top of [Actix actor framework](https://github.com/actix/actix) //! * Supported Rust version: 1.26 or later //! //! ## Package feature