From f3ce6574e4d7e6ec2308bbd2a0235a7b25b8caf4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 2 Oct 2018 00:19:28 -0700 Subject: [PATCH] fix client timer and add slow request tests --- CHANGES.md | 2 ++ Cargo.toml | 3 ++- src/server/builder.rs | 55 ++++++++++++++++++------------------------ src/server/channel.rs | 30 ++++++++++++++--------- src/server/h1.rs | 36 ++++++++++++++++++++++++++- src/server/http.rs | 22 +++++++++-------- src/server/settings.rs | 10 ++++++++ tests/test_server.rs | 40 ++++++++++++++++++++++++++++++ 8 files changed, 144 insertions(+), 54 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 32d2bea7b..145caec1d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ * Added slow request timeout setting +* Respond with 408 response on slow request timeout #523 + ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 205e178b9..8997fa5ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ language-tags = "0.2" lazy_static = "1.0" lazycell = "1.0.0" parking_lot = "0.6" +serde_urlencoded = "^0.5.3" url = { version="1.7", features=["query_encoding"] } cookie = { version="0.11", features=["percent-encode"] } brotli2 = { version="^0.3.2", optional = true } @@ -125,7 +126,7 @@ webpki-roots = { version = "0.15", optional = true } # unix sockets tokio-uds = { version="0.2", optional = true } -serde_urlencoded = "^0.5.3" +backtrace="*" [dev-dependencies] env_logger = "0.5" diff --git a/src/server/builder.rs b/src/server/builder.rs index 9e9323537..6bafb4607 100644 --- a/src/server/builder.rs +++ b/src/server/builder.rs @@ -16,7 +16,7 @@ use super::KeepAlive; pub(crate) trait ServiceProvider { fn register( &self, server: Server, lst: net::TcpListener, host: String, - addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64, + addr: net::SocketAddr, keep_alive: KeepAlive, secure: bool, client_timeout: u64, client_shutdown: u64, ) -> Server; } @@ -28,7 +28,6 @@ where { factory: F, acceptor: A, - no_client_timer: bool, } impl HttpServiceBuilder @@ -40,27 +39,13 @@ where { /// Create http service builder pub fn new(factory: F, acceptor: A) -> Self { - Self { - factory, - acceptor, - no_client_timer: false, - } - } - - pub(crate) fn no_client_timer(mut self) -> Self { - self.no_client_timer = true; - self + Self { factory, acceptor } } fn finish( - &self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, + &self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, secure: bool, client_timeout: u64, client_shutdown: u64, ) -> impl ServiceFactory { - let timeout = if self.no_client_timer { - 0 - } else { - client_timeout - }; let factory = self.factory.clone(); let acceptor = self.acceptor.clone(); move || { @@ -68,12 +53,12 @@ where let settings = WorkerSettings::new( app, keep_alive, - timeout as u64, + client_timeout, client_shutdown, ServerSettings::new(addr, &host, false), ); - if timeout == 0 { + if secure { Either::A(ServerMessageAcceptor::new( settings.clone(), TcpAcceptor::new(acceptor.create().map_err(AcceptorError::Service)) @@ -88,14 +73,16 @@ where } else { Either::B(ServerMessageAcceptor::new( settings.clone(), - TcpAcceptor::new(AcceptorTimeout::new(timeout, acceptor.create())) - .map_err(|_| ()) - .map_init_err(|_| ()) - .and_then( - HttpService::new(settings) - .map_init_err(|_| ()) - .map_err(|_| ()), - ), + TcpAcceptor::new(AcceptorTimeout::new( + client_timeout, + acceptor.create(), + )).map_err(|_| ()) + .map_init_err(|_| ()) + .and_then( + HttpService::new(settings) + .map_init_err(|_| ()) + .map_err(|_| ()), + ), )) } } @@ -112,7 +99,6 @@ where HttpServiceBuilder { factory: self.factory.clone(), acceptor: self.acceptor.clone(), - no_client_timer: self.no_client_timer, } } } @@ -126,13 +112,20 @@ where { fn register( &self, server: Server, lst: net::TcpListener, host: String, - addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64, + addr: net::SocketAddr, keep_alive: KeepAlive, secure: bool, client_timeout: u64, client_shutdown: u64, ) -> Server { server.listen2( "actix-web", lst, - self.finish(host, addr, keep_alive, client_timeout, client_shutdown), + self.finish( + host, + addr, + keep_alive, + secure, + client_timeout, + client_shutdown, + ), ) } } diff --git a/src/server/channel.rs b/src/server/channel.rs index d8cad9707..f57806209 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -9,6 +9,7 @@ use tokio_timer::Delay; use super::error::HttpDispatchError; use super::settings::WorkerSettings; use super::{h1, h2, HttpHandler, IoStream}; +use http::StatusCode; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; @@ -42,11 +43,9 @@ where pub(crate) fn new( settings: WorkerSettings, io: T, peer: Option, ) -> HttpChannel { - let ka_timeout = settings.client_timer(); - HttpChannel { - ka_timeout, node: None, + ka_timeout: settings.client_timer(), proto: Some(HttpProtocol::Unknown( settings, peer, @@ -91,10 +90,23 @@ where fn poll(&mut self) -> Poll { // keep-alive timer - if let Some(ref mut timer) = self.ka_timeout { - match timer.poll() { + if self.ka_timeout.is_some() { + match self.ka_timeout.as_mut().unwrap().poll() { Ok(Async::Ready(_)) => { trace!("Slow request timed out, close connection"); + if let Some(HttpProtocol::Unknown(settings, _, io, buf)) = + self.proto.take() + { + self.proto = + Some(HttpProtocol::H1(h1::Http1Dispatcher::for_error( + settings, + io, + StatusCode::REQUEST_TIMEOUT, + self.ka_timeout.take(), + buf, + ))); + return self.poll(); + } return Ok(Async::Ready(())); } Ok(Async::NotReady) => (), @@ -121,12 +133,8 @@ where let mut is_eof = false; let kind = match self.proto { - Some(HttpProtocol::H1(ref mut h1)) => { - return h1.poll(); - } - Some(HttpProtocol::H2(ref mut h2)) => { - return h2.poll(); - } + Some(HttpProtocol::H1(ref mut h1)) => return h1.poll(), + Some(HttpProtocol::H2(ref mut h2)) => return h2.poll(), Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => { let mut err = None; let mut disconnect = false; diff --git a/src/server/h1.rs b/src/server/h1.rs index fe8f976b7..205be9494 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -121,6 +121,31 @@ where } } + pub(crate) fn for_error( + settings: WorkerSettings, stream: T, status: StatusCode, + mut keepalive_timer: Option, buf: BytesMut, + ) -> Self { + if let Some(deadline) = settings.client_timer_expire() { + let _ = keepalive_timer.as_mut().map(|delay| delay.reset(deadline)); + } + + let mut disp = Http1Dispatcher { + flags: Flags::STARTED | Flags::READ_DISCONNECTED, + stream: H1Writer::new(stream, settings.clone()), + decoder: H1Decoder::new(), + payload: None, + tasks: VecDeque::new(), + error: None, + addr: None, + ka_timer: keepalive_timer, + ka_expire: settings.now(), + buf, + settings, + }; + disp.push_response_entry(status); + disp + } + #[inline] pub fn settings(&self) -> &WorkerSettings { &self.settings @@ -133,7 +158,7 @@ where #[inline] fn can_read(&self) -> bool { - if self.flags.intersects(Flags::READ_DISCONNECTED) { + if self.flags.contains(Flags::READ_DISCONNECTED) { return false; } @@ -250,6 +275,15 @@ where ); let _ = IoStream::shutdown(io, Shutdown::Both); return Err(HttpDispatchError::ShutdownTimeout); + } else if !self.flags.contains(Flags::STARTED) { + // timeout on first request (slow request) return 408 + trace!("Slow request timeout"); + self.flags + .insert(Flags::STARTED | Flags::READ_DISCONNECTED); + self.tasks.push_back(Entry::Error(ServerError::err( + Version::HTTP_11, + StatusCode::REQUEST_TIMEOUT, + ))); } else { trace!("Keep-alive timeout, close connection"); self.flags.insert(Flags::SHUTDOWN); diff --git a/src/server/http.rs b/src/server/http.rs index 5a7200868..91f5d73e0 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -232,10 +232,10 @@ where lst, addr, scheme: "http", - handler: Box::new( - HttpServiceBuilder::new(self.factory.clone(), DefaultAcceptor) - .no_client_timer(), - ), + handler: Box::new(HttpServiceBuilder::new( + self.factory.clone(), + DefaultAcceptor, + )), }); self @@ -498,10 +498,10 @@ impl H + Send + Clone> HttpServer { .as_ref() .map(|h| h.to_owned()) .unwrap_or_else(|| format!("{}", socket.addr)); - let client_shutdown = if socket.scheme == "https" { - self.client_shutdown + let (secure, client_shutdown) = if socket.scheme == "https" { + (true, self.client_shutdown) } else { - 0 + (false, 0) }; srv = socket.handler.register( srv, @@ -509,6 +509,7 @@ impl H + Send + Clone> HttpServer { host, socket.addr, self.keep_alive, + secure, self.client_timeout, client_shutdown, ); @@ -550,10 +551,10 @@ impl H + Send + Clone> HttpServer { .as_ref() .map(|h| h.to_owned()) .unwrap_or_else(|| format!("{}", socket.addr)); - let client_shutdown = if socket.scheme == "https" { - self.client_shutdown + let (secure, client_shutdown) = if socket.scheme == "https" { + (true, self.client_shutdown) } else { - 0 + (false, 0) }; srv = socket.handler.register( srv, @@ -561,6 +562,7 @@ impl H + Send + Clone> HttpServer { host, socket.addr, self.keep_alive, + secure, self.client_timeout, client_shutdown, ); diff --git a/src/server/settings.rs b/src/server/settings.rs index a50a07069..2f306073c 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -232,6 +232,16 @@ impl WorkerSettings { } } + /// Client timeout for first request. + pub fn client_timer_expire(&self) -> Option { + let delay = self.0.client_timeout; + if delay != 0 { + Some(self.now() + Duration::from_millis(delay)) + } else { + None + } + } + /// Client shutdown timer pub fn client_shutdown_timer(&self) -> Option { let delay = self.0.client_shutdown; diff --git a/tests/test_server.rs b/tests/test_server.rs index a85c5c329..269a1cd7d 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1054,3 +1054,43 @@ fn test_custom_pipeline() { assert!(response.status().is_success()); } } + +#[test] +fn test_slow_request() { + use actix::System; + use std::net; + use std::sync::mpsc; + let (tx, rx) = mpsc::channel(); + + let addr = test::TestServer::unused_addr(); + + thread::spawn(move || { + System::run(move || { + let srv = server::new(|| { + vec![App::new().resource("/", |r| { + r.method(http::Method::GET).f(|_| HttpResponse::Ok()) + })] + }); + + let srv = srv.bind(addr).unwrap(); + srv.client_timeout(200).start(); + let _ = tx.send(System::current()); + }); + }); + let sys = rx.recv().unwrap(); + + thread::sleep(time::Duration::from_millis(200)); + + let mut stream = net::TcpStream::connect(addr).unwrap(); + let mut data = String::new(); + let _ = stream.read_to_string(&mut data); + assert!(data.starts_with("HTTP/1.1 408 Request Timeou")); + + let mut stream = net::TcpStream::connect(addr).unwrap(); + let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); + let mut data = String::new(); + let _ = stream.read_to_string(&mut data); + assert!(data.starts_with("HTTP/1.1 408 Request Timeou")); + + sys.stop(); +}