diff --git a/src/client/connector.rs b/src/client/connector.rs index 8d71913fe..32426e0ac 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -50,8 +50,8 @@ type SslConnector = Arc; feature = "alpn", feature = "ssl", feature = "tls", - feature = "rust-tls", -),))] + feature = "rust-tls" +)))] type SslConnector = (); use server::IoStream; diff --git a/src/server/h1.rs b/src/server/h1.rs index b5ee93e66..76c0d4b6e 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; use std::net::SocketAddr; -use std::time::{Duration, Instant}; +use std::time::Instant; use bytes::BytesMut; use futures::{Async, Future, Poll}; @@ -49,7 +49,14 @@ pub(crate) struct Http1 { payload: Option, buf: BytesMut, tasks: VecDeque>, - keepalive_timer: Option, + ka_enabled: bool, + ka_expire: Instant, + ka_timer: Option, +} + +struct Entry { + pipe: EntryPipe, + flags: EntryFlags, } enum EntryPipe { @@ -78,11 +85,6 @@ impl EntryPipe { } } -struct Entry { - pipe: EntryPipe, - flags: EntryFlags, -} - impl Http1 where T: IoStream, @@ -92,6 +94,15 @@ where settings: WorkerSettings, stream: T, addr: Option, buf: BytesMut, is_eof: bool, keepalive_timer: Option, ) -> Self { + let ka_enabled = settings.keep_alive_enabled(); + let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer { + (delay.deadline(), Some(delay)) + } else if let Some(delay) = settings.keep_alive_timer() { + (delay.deadline(), Some(delay)) + } else { + (settings.now(), None) + }; + Http1 { flags: if is_eof { Flags::READ_DISCONNECTED @@ -105,7 +116,9 @@ where addr, buf, settings, - keepalive_timer, + ka_timer, + ka_expire, + ka_enabled, } } @@ -143,9 +156,6 @@ where for task in &mut self.tasks { task.pipe.disconnected(); } - - // kill keepalive - self.keepalive_timer.take(); } fn read_disconnected(&mut self) { @@ -163,16 +173,9 @@ where #[inline] pub fn poll(&mut self) -> Poll<(), ()> { - // keep-alive timer - if let Some(ref mut timer) = self.keepalive_timer { - match timer.poll() { - Ok(Async::Ready(_)) => { - trace!("Keep-alive timeout, close connection"); - self.flags.insert(Flags::SHUTDOWN); - } - Ok(Async::NotReady) => (), - Err(_) => unreachable!(), - } + // check connection keep-alive + if !self.poll_keep_alive() { + return Ok(Async::Ready(())); } // shutdown @@ -203,11 +206,70 @@ where self.flags.insert(Flags::SHUTDOWN); return self.poll(); } - Async::NotReady => return Ok(Async::NotReady), + Async::NotReady => { + // deal with keep-alive and steam eof (client-side write shutdown) + if self.tasks.is_empty() { + // handle stream eof + if self.flags.contains(Flags::READ_DISCONNECTED) { + self.flags.insert(Flags::SHUTDOWN); + return self.poll(); + } + // no keep-alive + if self.flags.contains(Flags::ERROR) + || (!self.flags.contains(Flags::KEEPALIVE) + || !self.ka_enabled) + && self.flags.contains(Flags::STARTED) + { + self.flags.insert(Flags::SHUTDOWN); + return self.poll(); + } + } + return Ok(Async::NotReady); + } } } } + /// keep-alive timer. returns `true` is keep-alive, otherwise drop + fn poll_keep_alive(&mut self) -> bool { + let timer = if let Some(ref mut timer) = self.ka_timer { + match timer.poll() { + Ok(Async::Ready(_)) => { + if timer.deadline() >= self.ka_expire { + // check for any outstanding request handling + if self.tasks.is_empty() { + // if we get timer during shutdown, just drop connection + if self.flags.contains(Flags::SHUTDOWN) { + return false; + } else { + trace!("Keep-alive timeout, close connection"); + self.flags.insert(Flags::SHUTDOWN); + None + } + } else { + self.settings.keep_alive_timer() + } + } else { + Some(Delay::new(self.ka_expire)) + } + } + Ok(Async::NotReady) => None, + Err(e) => { + error!("Timer error {:?}", e); + return false; + } + } + } else { + None + }; + + if let Some(mut timer) = timer { + let _ = timer.poll(); + self.ka_timer = Some(timer); + } + true + } + #[inline] /// read data from stream pub fn poll_io(&mut self) { @@ -283,6 +345,11 @@ where } // no more IO for this iteration Ok(Async::NotReady) => { + // check if we need timer + if self.ka_timer.is_some() && self.stream.upgrade() { + self.ka_timer.take(); + } + // check if previously read backpressure was enabled if self.can_read() && !retry { return Ok(Async::Ready(true)); @@ -348,32 +415,6 @@ where } } - // deal with keep-alive and steam eof (client-side write shutdown) - if self.tasks.is_empty() { - // handle stream eof - if self.flags.contains(Flags::READ_DISCONNECTED) { - return Ok(Async::Ready(false)); - } - // no keep-alive - if self.flags.contains(Flags::ERROR) - || (!self.flags.contains(Flags::KEEPALIVE) - || !self.settings.keep_alive_enabled()) - && self.flags.contains(Flags::STARTED) - { - return Ok(Async::Ready(false)); - } - - // start keep-alive timer - let keep_alive = self.settings.keep_alive(); - if self.keepalive_timer.is_none() && keep_alive > 0 { - trace!("Start keep-alive timer"); - let mut timer = - Delay::new(Instant::now() + Duration::from_secs(keep_alive)); - // register timer - let _ = timer.poll(); - self.keepalive_timer = Some(timer); - } - } Ok(Async::NotReady) } @@ -385,9 +426,12 @@ where } pub fn parse(&mut self) { + let mut updated = false; + 'outer: loop { match self.decoder.decode(&mut self.buf, &self.settings) { Ok(Some(Message::Message { mut msg, payload })) => { + updated = true; self.flags.insert(Flags::STARTED); if payload { @@ -403,9 +447,6 @@ where // set remote addr msg.inner_mut().addr = self.addr; - // stop keepalive timer - self.keepalive_timer.take(); - // search handler for request match self.settings.handler().handle(msg) { Ok(mut pipe) => { @@ -430,7 +471,7 @@ where } continue 'outer; } - Ok(Async::NotReady) => {} + Ok(Async::NotReady) => (), Err(err) => { error!("Unhandled error: {}", err); self.flags.insert(Flags::ERROR); @@ -460,6 +501,7 @@ where self.push_response_entry(StatusCode::NOT_FOUND); } Ok(Some(Message::Chunk(chunk))) => { + updated = true; if let Some(ref mut payload) = self.payload { payload.feed_data(chunk); } else { @@ -470,6 +512,7 @@ where } } Ok(Some(Message::Eof)) => { + updated = true; if let Some(mut payload) = self.payload.take() { payload.feed_eof(); } else { @@ -489,6 +532,7 @@ where break; } Err(e) => { + updated = false; self.flags.insert(Flags::ERROR); if let Some(mut payload) = self.payload.take() { let e = match e { @@ -504,6 +548,12 @@ where } } } + + if self.ka_timer.is_some() && updated { + if let Some(expire) = self.settings.keep_alive_expire() { + self.ka_expire = expire; + } + } } } @@ -512,7 +562,9 @@ mod tests { use std::net::Shutdown; use std::{cmp, io, time}; + use actix::System; use bytes::{Buf, Bytes, BytesMut}; + use futures::future; use http::{Method, Version}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -647,15 +699,19 @@ mod tests { #[test] fn test_req_parse_err() { - let buf = Buffer::new("GET /test HTTP/1\r\n\r\n"); - let readbuf = BytesMut::new(); - let settings = wrk_settings(); + let mut sys = System::new("test"); + sys.block_on(future::lazy(|| { + let buf = Buffer::new("GET /test HTTP/1\r\n\r\n"); + let readbuf = BytesMut::new(); + let settings = wrk_settings(); - let mut h1 = Http1::new(settings.clone(), buf, None, readbuf, false, None); - h1.poll_io(); - h1.poll_io(); - assert!(h1.flags.contains(Flags::ERROR)); - assert_eq!(h1.tasks.len(), 1); + let mut h1 = Http1::new(settings.clone(), buf, None, readbuf, false, None); + h1.poll_io(); + h1.poll_io(); + assert!(h1.flags.contains(Flags::ERROR)); + assert_eq!(h1.tasks.len(), 1); + future::ok::<_, ()>(()) + })); } #[test] diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 15451659d..3036aa089 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -66,6 +66,10 @@ impl H1Writer { self.flags.insert(Flags::DISCONNECTED); } + pub fn upgrade(&self) -> bool { + self.flags.contains(Flags::UPGRADE) + } + pub fn keepalive(&self) -> bool { self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) } diff --git a/src/server/h2.rs b/src/server/h2.rs index f31c2db38..d9ca2f64a 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use std::io::{Read, Write}; use std::net::SocketAddr; use std::rc::Rc; -use std::time::{Duration, Instant}; +use std::time::Instant; use std::{cmp, io, mem}; use bytes::{Buf, Bytes}; @@ -232,16 +232,15 @@ where // start keep-alive timer if self.tasks.is_empty() { if self.settings.keep_alive_enabled() { - let keep_alive = self.settings.keep_alive(); - if keep_alive > 0 && self.keepalive_timer.is_none() { - trace!("Start keep-alive timer"); - let mut timeout = Delay::new( - Instant::now() - + Duration::new(keep_alive, 0), - ); - // register timeout - let _ = timeout.poll(); - self.keepalive_timer = Some(timeout); + if self.keepalive_timer.is_none() { + if let Some(ka) = self.settings.keep_alive() { + trace!("Start keep-alive timer"); + let mut timeout = + Delay::new(Instant::now() + ka); + // register timeout + let _ = timeout.poll(); + self.keepalive_timer = Some(timeout); + } } } else { // keep-alive disable, drop connection diff --git a/src/server/settings.rs b/src/server/settings.rs index db5f6c57b..5ca777290 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -137,7 +137,7 @@ pub struct WorkerSettings(Rc>); struct Inner { handler: H, - keep_alive: u64, + keep_alive: Option, client_timeout: u64, ka_enabled: bool, bytes: Rc, @@ -161,6 +161,11 @@ impl WorkerSettings { KeepAlive::Os | KeepAlive::Tcp(_) => (0, true), KeepAlive::Disabled => (0, false), }; + let keep_alive = if ka_enabled && keep_alive > 0 { + Some(Duration::from_secs(keep_alive)) + } else { + None + }; WorkerSettings(Rc::new(Inner { handler, @@ -183,17 +188,7 @@ impl WorkerSettings { } #[inline] - pub fn keep_alive_timer(&self) -> Option { - let ka = self.0.keep_alive; - if ka != 0 { - Some(Delay::new(Instant::now() + Duration::from_secs(ka))) - } else { - None - } - } - - #[inline] - pub fn keep_alive(&self) -> u64 { + pub fn keep_alive(&self) -> Option { self.0.keep_alive } @@ -202,16 +197,6 @@ impl WorkerSettings { self.0.ka_enabled } - #[inline] - pub fn client_timer(&self) -> Option { - let delay = self.0.client_timeout; - if delay != 0 { - Some(Delay::new(Instant::now() + Duration::from_millis(delay))) - } else { - None - } - } - pub(crate) fn get_bytes(&self) -> BytesMut { self.0.bytes.get_bytes() } @@ -231,6 +216,34 @@ impl WorkerSettings { } impl WorkerSettings { + #[inline] + pub fn client_timer(&self) -> Option { + let delay = self.0.client_timeout; + if delay != 0 { + Some(Delay::new(self.now() + Duration::from_millis(delay))) + } else { + None + } + } + + #[inline] + pub fn keep_alive_timer(&self) -> Option { + if let Some(ka) = self.0.keep_alive { + Some(Delay::new(self.now() + ka)) + } else { + None + } + } + + /// Keep-alive expire time + pub fn keep_alive_expire(&self) -> Option { + if let Some(ka) = self.0.keep_alive { + Some(self.now() + ka) + } else { + None + } + } + pub(crate) fn set_date(&self, dst: &mut BytesMut, full: bool) { // Unsafe: WorkerSetting is !Sync and !Send let date_bytes = unsafe { @@ -258,9 +271,29 @@ impl WorkerSettings { dst.extend_from_slice(date_bytes); } } + + #[inline] + pub(crate) fn now(&self) -> Instant { + unsafe { + let date = &mut (*self.0.date.get()); + if !date.0 { + date.1.update(); + date.0 = true; + + // periodic date update + let s = self.clone(); + spawn(sleep(Duration::from_secs(1)).then(move |_| { + s.update_date(); + future::ok(()) + })); + } + date.1.current + } + } } struct Date { + current: Instant, bytes: [u8; DATE_VALUE_LENGTH], pos: usize, } @@ -268,6 +301,7 @@ struct Date { impl Date { fn new() -> Date { let mut date = Date { + current: Instant::now(), bytes: [0; DATE_VALUE_LENGTH], pos: 0, }; @@ -276,6 +310,7 @@ impl Date { } fn update(&mut self) { self.pos = 0; + self.current = Instant::now(); write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap(); } }