diff --git a/CHANGES.md b/CHANGES.md index 77cac1fe..c764a592 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.7.8] - 2018-09-xx + +### Added + +* Use server `Keep-Alive` setting as slow request timeout. + + ## [0.7.7] - 2018-09-11 ### Fixed diff --git a/src/scope.rs b/src/scope.rs index 4ce4901a..bd3daf16 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -821,11 +821,9 @@ mod tests { scope .route("/path1", Method::GET, |_: HttpRequest<_>| { HttpResponse::Ok() - }).route( - "/path1", - Method::DELETE, - |_: HttpRequest<_>| HttpResponse::Ok(), - ) + }).route("/path1", Method::DELETE, |_: HttpRequest<_>| { + HttpResponse::Ok() + }) }).finish(); let req = TestRequest::with_uri("/app/path1").request(); diff --git a/src/server/accept.rs b/src/server/accept.rs index d642c40f..307a2a2f 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -451,10 +451,13 @@ impl Accept { Delay::new( Instant::now() + Duration::from_millis(510), ).map_err(|_| ()) - .and_then(move |_| { - let _ = r.set_readiness(mio::Ready::readable()); - Ok(()) - }), + .and_then( + move |_| { + let _ = + r.set_readiness(mio::Ready::readable()); + Ok(()) + }, + ), ); Ok(()) }, diff --git a/src/server/channel.rs b/src/server/channel.rs index 7b63125e..5119eb5f 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -5,6 +5,7 @@ use std::{io, ptr, time}; use bytes::{Buf, BufMut, BytesMut}; use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; use super::settings::WorkerSettings; use super::{h1, h2, ConnectionTag, HttpHandler, IoStream}; @@ -30,6 +31,7 @@ where { proto: Option>, node: Option>>, + ka_timeout: Option, _tag: ConnectionTag, } @@ -42,9 +44,11 @@ where settings: Rc>, io: T, peer: Option, ) -> HttpChannel { let _tag = settings.connection(); + let ka_timeout = settings.keep_alive_timer(); HttpChannel { _tag, + ka_timeout, node: None, proto: Some(HttpProtocol::Unknown( settings, @@ -77,6 +81,21 @@ where type Error = (); fn poll(&mut self) -> Poll { + // keep-alive timer + if let Some(ref mut timer) = self.ka_timeout { + match timer.poll() { + Ok(Async::Ready(_)) => { + trace!("Slow request timed out, close connection"); + if let Some(n) = self.node.as_mut() { + n.remove() + }; + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => (), + Err(_) => panic!("Something is really wrong"), + } + } + if self.node.is_none() { let el = self as *mut _; self.node = Some(Node::new(el)); @@ -161,7 +180,12 @@ where match kind { ProtocolKind::Http1 => { self.proto = Some(HttpProtocol::H1(h1::Http1::new( - settings, io, addr, buf, is_eof, + settings, + io, + addr, + buf, + is_eof, + self.ka_timeout.take(), ))); return self.poll(); } @@ -171,6 +195,7 @@ where io, addr, buf.freeze(), + self.ka_timeout.take(), ))); return self.poll(); } diff --git a/src/server/h1.rs b/src/server/h1.rs index dc88cac9..d6e13e22 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -91,7 +91,7 @@ where { pub fn new( settings: Rc>, stream: T, addr: Option, - buf: BytesMut, is_eof: bool, + buf: BytesMut, is_eof: bool, keepalive_timer: Option, ) -> Self { Http1 { flags: if is_eof { @@ -103,10 +103,10 @@ where decoder: H1Decoder::new(), payload: None, tasks: VecDeque::new(), - keepalive_timer: None, addr, buf, settings, + keepalive_timer, } } @@ -364,7 +364,7 @@ where if self.keepalive_timer.is_none() && keep_alive > 0 { trace!("Start keep-alive timer"); let mut timer = - Delay::new(Instant::now() + Duration::new(keep_alive, 0)); + Delay::new(Instant::now() + Duration::from_secs(keep_alive)); // register timer let _ = timer.poll(); self.keepalive_timer = Some(timer); @@ -632,7 +632,7 @@ mod tests { let readbuf = BytesMut::new(); let settings = Rc::new(wrk_settings()); - let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false); + let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None); h1.poll_io(); h1.poll_io(); assert_eq!(h1.tasks.len(), 1); @@ -645,7 +645,7 @@ mod tests { BytesMut::from(Vec::::from(&b"GET /test HTTP/1.1\r\n\r\n"[..])); let settings = Rc::new(wrk_settings()); - let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true); + let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true, None); h1.poll_io(); assert_eq!(h1.tasks.len(), 1); } @@ -656,7 +656,7 @@ mod tests { let readbuf = BytesMut::new(); let settings = Rc::new(wrk_settings()); - let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false); + let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None); h1.poll_io(); h1.poll_io(); assert!(h1.flags.contains(Flags::ERROR)); diff --git a/src/server/h2.rs b/src/server/h2.rs index 986888ff..913e2cd7 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -59,6 +59,7 @@ where { pub fn new( settings: Rc>, io: T, addr: Option, buf: Bytes, + keepalive_timer: Option, ) -> Self { let extensions = io.extensions(); Http2 { @@ -68,10 +69,10 @@ where unread: if buf.is_empty() { None } else { Some(buf) }, inner: io, })), - keepalive_timer: None, addr, settings, extensions, + keepalive_timer, } } diff --git a/src/server/settings.rs b/src/server/settings.rs index e9ca0f85..fc0d931f 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -13,7 +13,7 @@ use http::StatusCode; use lazycell::LazyCell; use parking_lot::Mutex; use time; -use tokio_timer::Interval; +use tokio_timer::{Delay, Interval}; use super::channel::Node; use super::message::{Request, RequestPool}; @@ -197,6 +197,16 @@ impl WorkerSettings { &self.h } + pub fn keep_alive_timer(&self) -> Option { + if self.keep_alive != 0 { + Some(Delay::new( + Instant::now() + Duration::from_secs(self.keep_alive), + )) + } else { + None + } + } + pub fn keep_alive(&self) -> u64 { self.keep_alive } diff --git a/src/test.rs b/src/test.rs index 64aef663..c068086d 100644 --- a/src/test.rs +++ b/src/test.rs @@ -120,6 +120,7 @@ impl TestServer { HttpServer::new(factory) .disable_signals() .listen(tcp) + .keep_alive(5) .start(); tx.send((System::current(), local_addr, TestServer::get_conn())) @@ -328,6 +329,7 @@ impl TestServerBuilder { config(&mut app); vec![app] }).workers(1) + .keep_alive(5) .disable_signals(); tx.send((System::current(), addr, TestServer::get_conn())) diff --git a/tests/test_client.rs b/tests/test_client.rs index 28d60faf..8c5d5819 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -407,24 +407,29 @@ fn test_client_cookie_handling() { let cookie2 = cookie2b.clone(); app.handler(move |req: &HttpRequest| { // Check cookies were sent correctly - req.cookie("cookie1").ok_or_else(err) - .and_then(|c1| if c1.value() == "value1" { + req.cookie("cookie1") + .ok_or_else(err) + .and_then(|c1| { + if c1.value() == "value1" { Ok(()) } else { Err(err()) - }) - .and_then(|()| req.cookie("cookie2").ok_or_else(err)) - .and_then(|c2| if c2.value() == "value2" { + } + }).and_then(|()| req.cookie("cookie2").ok_or_else(err)) + .and_then(|c2| { + if c2.value() == "value2" { Ok(()) } else { Err(err()) - }) - // Send some cookies back - .map(|_| HttpResponse::Ok() - .cookie(cookie1.clone()) - .cookie(cookie2.clone()) - .finish() - ) + } + }) + // Send some cookies back + .map(|_| { + HttpResponse::Ok() + .cookie(cookie1.clone()) + .cookie(cookie2.clone()) + .finish() + }) }) });