From 16945a554abd5ddc9b3aaec4f102f9eeaae5e1a8 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 1 Oct 2018 20:04:16 -0700 Subject: [PATCH] add client shutdown timeout --- CHANGES.md | 7 ++ src/server/acceptor.rs | 8 +-- src/server/builder.rs | 26 +++----- src/server/h1.rs | 12 +++- src/server/http.rs | 31 ++++++++- src/server/incoming.rs | 3 +- src/server/mod.rs | 2 +- src/server/settings.rs | 142 ++++++++++++++++++++++++++++++++++++++++- tests/test_server.rs | 15 +++-- 9 files changed, 208 insertions(+), 38 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 517f8cbe5..32d2bea7b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,13 @@ ## [0.7.9] - 2018-09-x +### Added + +* Added client shutdown timeout setting + +* Added slow request timeout setting + + ### Fixed * HTTP1 decoding errors are reported to the client. #512 diff --git a/src/server/acceptor.rs b/src/server/acceptor.rs index bad8847dc..15d66112a 100644 --- a/src/server/acceptor.rs +++ b/src/server/acceptor.rs @@ -176,11 +176,11 @@ where /// Applies timeout to request prcoessing. pub(crate) struct AcceptorTimeout { inner: T, - timeout: usize, + timeout: u64, } impl AcceptorTimeout { - pub(crate) fn new(timeout: usize, inner: T) -> Self { + pub(crate) fn new(timeout: u64, inner: T) -> Self { Self { inner, timeout } } } @@ -204,7 +204,7 @@ impl NewService for AcceptorTimeout { #[doc(hidden)] pub(crate) struct AcceptorTimeoutFut { fut: T::Future, - timeout: usize, + timeout: u64, } impl Future for AcceptorTimeoutFut { @@ -215,7 +215,7 @@ impl Future for AcceptorTimeoutFut { let inner = try_ready!(self.fut.poll()); Ok(Async::Ready(AcceptorTimeoutService { inner, - timeout: self.timeout as u64, + timeout: self.timeout, })) } } diff --git a/src/server/builder.rs b/src/server/builder.rs index 8e7f82f80..9e9323537 100644 --- a/src/server/builder.rs +++ b/src/server/builder.rs @@ -16,12 +16,13 @@ use super::KeepAlive; pub(crate) trait ServiceProvider { fn register( &self, server: Server, lst: net::TcpListener, host: String, - addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, + addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64, + client_shutdown: u64, ) -> Server; } /// Utility type that builds complete http pipeline -pub struct HttpServiceBuilder +pub(crate) struct HttpServiceBuilder where F: Fn() -> H + Send + Clone, { @@ -51,22 +52,9 @@ where self } - /// Use different acceptor factory - pub fn acceptor(self, acceptor: A1) -> HttpServiceBuilder - where - A1: AcceptorServiceFactory, - ::InitError: fmt::Debug, - { - HttpServiceBuilder { - acceptor, - factory: self.factory.clone(), - no_client_timer: self.no_client_timer, - } - } - fn finish( &self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, - client_timeout: usize, + client_timeout: u64, client_shutdown: u64, ) -> impl ServiceFactory { let timeout = if self.no_client_timer { 0 @@ -81,6 +69,7 @@ where app, keep_alive, timeout as u64, + client_shutdown, ServerSettings::new(addr, &host, false), ); @@ -137,12 +126,13 @@ where { fn register( &self, server: Server, lst: net::TcpListener, host: String, - addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, + addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64, + client_shutdown: u64, ) -> Server { server.listen2( "actix-web", lst, - self.finish(host, addr, keep_alive, client_timeout), + self.finish(host, addr, keep_alive, client_timeout, client_shutdown), ) } } diff --git a/src/server/h1.rs b/src/server/h1.rs index f3c71e3c2..f5e2bf2f5 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -243,8 +243,15 @@ where } else { trace!("Keep-alive timeout, close connection"); self.flags.insert(Flags::SHUTDOWN); - // TODO: start shutdown timer - return Ok(()); + + // start shutdown timer + if let Some(deadline) = + self.settings.client_shutdown_timer() + { + timer.reset(deadline) + } else { + return Ok(()); + } } } else if let Some(deadline) = self.settings.keep_alive_expire() { @@ -548,6 +555,7 @@ mod tests { App::new().into_handler(), KeepAlive::Os, 5000, + 2000, ServerSettings::default(), ) } diff --git a/src/server/http.rs b/src/server/http.rs index 511b1832e..5e1d33512 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -41,7 +41,8 @@ where pub(super) factory: F, pub(super) host: Option, pub(super) keep_alive: KeepAlive, - pub(super) client_timeout: usize, + pub(super) client_timeout: u64, + pub(super) client_shutdown: u64, backlog: i32, threads: usize, exit: bool, @@ -73,6 +74,7 @@ where maxconn: 25_600, maxconnrate: 256, client_timeout: 5000, + client_shutdown: 5000, sockets: Vec::new(), } } @@ -140,11 +142,24 @@ where /// To disable timeout set value to 0. /// /// By default client timeout is set to 5000 milliseconds. - pub fn client_timeout(mut self, val: usize) -> Self { + pub fn client_timeout(mut self, val: u64) -> Self { self.client_timeout = val; self } + /// Set server connection shutdown timeout in milliseconds. + /// + /// Defines a timeout for shutdown connection. If a shutdown procedure does not complete + /// within this time, the request is dropped. + /// + /// To disable timeout set value to 0. + /// + /// By default client timeout is set to 5000 milliseconds. + pub fn client_shutdown(mut self, val: u64) -> Self { + self.client_shutdown = val; + self + } + /// Set server host name. /// /// Host name is used by application router aa a hostname for url @@ -480,6 +495,11 @@ impl H + Send + Clone> HttpServer { .as_ref() .map(|h| h.to_owned()) .unwrap_or_else(|| format!("{}", socket.addr)); + let client_shutdown = if socket.scheme == "https" { + self.client_shutdown + } else { + 0 + }; srv = socket.handler.register( srv, socket.lst, @@ -487,6 +507,7 @@ impl H + Send + Clone> HttpServer { socket.addr, self.keep_alive, self.client_timeout, + client_shutdown, ); } srv.start() @@ -526,6 +547,11 @@ impl H + Send + Clone> HttpServer { .as_ref() .map(|h| h.to_owned()) .unwrap_or_else(|| format!("{}", socket.addr)); + let client_shutdown = if socket.scheme == "https" { + self.client_shutdown + } else { + 0 + }; srv = socket.handler.register( srv, socket.lst, @@ -533,6 +559,7 @@ impl H + Send + Clone> HttpServer { socket.addr, self.keep_alive, self.client_timeout, + client_shutdown, ); } srv diff --git a/src/server/incoming.rs b/src/server/incoming.rs index a56ccb80f..c4e984b9d 100644 --- a/src/server/incoming.rs +++ b/src/server/incoming.rs @@ -35,7 +35,8 @@ where let settings = WorkerSettings::new( apps, self.keep_alive, - self.client_timeout as u64, + self.client_timeout, + self.client_shutdown, ServerSettings::new(addr, "127.0.0.1:8080", secure), ); diff --git a/src/server/mod.rs b/src/server/mod.rs index f9d2b585e..b72410516 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -144,7 +144,7 @@ pub use self::ssl::*; pub use self::error::{AcceptorError, HttpDispatchError}; pub use self::service::HttpService; -pub use self::settings::{ServerSettings, WorkerSettings}; +pub use self::settings::{ServerSettings, WorkerSettings, WorkerSettingsBuilder}; #[doc(hidden)] pub use self::helpers::write_content_length; diff --git a/src/server/settings.rs b/src/server/settings.rs index fe9cd82a3..ac79e4a46 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -76,7 +76,9 @@ impl Default for ServerSettings { impl ServerSettings { /// Crate server settings instance - pub fn new(addr: net::SocketAddr, host: &str, secure: bool) -> ServerSettings { + pub(crate) fn new( + addr: net::SocketAddr, host: &str, secure: bool, + ) -> ServerSettings { let host = host.to_owned(); let cpu_pool = LazyCell::new(); let responses = HttpResponsePool::get_pool(); @@ -131,6 +133,7 @@ struct Inner { handler: H, keep_alive: Option, client_timeout: u64, + client_shutdown: u64, ka_enabled: bool, bytes: Rc, messages: &'static RequestPool, @@ -146,8 +149,9 @@ impl Clone for WorkerSettings { impl WorkerSettings { /// Create instance of `WorkerSettings` - pub fn new( - handler: H, keep_alive: KeepAlive, client_timeout: u64, settings: ServerSettings, + pub(crate) fn new( + handler: H, keep_alive: KeepAlive, client_timeout: u64, client_shutdown: u64, + settings: ServerSettings, ) -> WorkerSettings { let (keep_alive, ka_enabled) = match keep_alive { KeepAlive::Timeout(val) => (val as u64, true), @@ -165,6 +169,7 @@ impl WorkerSettings { keep_alive, ka_enabled, client_timeout, + client_shutdown, bytes: Rc::new(SharedBytesPool::new()), messages: RequestPool::pool(settings), node: RefCell::new(Node::head()), @@ -172,6 +177,11 @@ impl WorkerSettings { })) } + /// Create worker settings builder. + pub fn build(handler: H) -> WorkerSettingsBuilder { + WorkerSettingsBuilder::new(handler) + } + pub(crate) fn head(&self) -> RefMut> { self.0.node.borrow_mut() } @@ -222,6 +232,16 @@ impl WorkerSettings { } } + /// Client shutdown timer + pub fn client_shutdown_timer(&self) -> Option { + let delay = self.0.client_shutdown; + if delay != 0 { + Some(self.now() + Duration::from_millis(delay)) + } else { + None + } + } + #[inline] /// Return keep-alive timer delay is configured. pub fn keep_alive_timer(&self) -> Option { @@ -289,6 +309,121 @@ impl WorkerSettings { } } +/// An worker settings builder +/// +/// This type can be used to construct an instance of `WorkerSettings` through a +/// builder-like pattern. +pub struct WorkerSettingsBuilder { + handler: H, + keep_alive: KeepAlive, + client_timeout: u64, + client_shutdown: u64, + host: String, + addr: net::SocketAddr, + secure: bool, +} + +impl WorkerSettingsBuilder { + /// Create instance of `WorkerSettingsBuilder` + pub fn new(handler: H) -> WorkerSettingsBuilder { + WorkerSettingsBuilder { + handler, + keep_alive: KeepAlive::Timeout(5), + client_timeout: 5000, + client_shutdown: 5000, + secure: false, + host: "localhost".to_owned(), + addr: "127.0.0.1:8080".parse().unwrap(), + } + } + + /// Enable secure flag for current server. + /// + /// By default this flag is set to false. + pub fn secure(mut self) -> Self { + self.secure = true; + self + } + + /// Set server keep-alive setting. + /// + /// By default keep alive is set to a 5 seconds. + pub fn keep_alive>(mut self, val: T) -> Self { + self.keep_alive = val.into(); + self + } + + /// Set server client timeout in milliseconds for first request. + /// + /// Defines a timeout for reading client request header. If a client does not transmit + /// the entire set headers within this time, the request is terminated with + /// the 408 (Request Time-out) error. + /// + /// To disable timeout set value to 0. + /// + /// By default client timeout is set to 5000 milliseconds. + pub fn client_timeout(mut self, val: u64) -> Self { + self.client_timeout = val; + self + } + + /// Set server connection shutdown timeout in milliseconds. + /// + /// Defines a timeout for shutdown connection. If a shutdown procedure does not complete + /// within this time, the request is dropped. This timeout affects only secure connections. + /// + /// To disable timeout set value to 0. + /// + /// By default client timeout is set to 5000 milliseconds. + pub fn client_shutdown(mut self, val: u64) -> Self { + self.client_shutdown = val; + self + } + + /// Set server host name. + /// + /// Host name is used by application router aa a hostname for url + /// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo. + /// html#method.host) documentation for more information. + /// + /// By default host name is set to a "localhost" value. + pub fn server_hostname(mut self, val: &str) -> Self { + self.host = val.to_owned(); + self + } + + /// Set server ip address. + /// + /// Host name is used by application router aa a hostname for url + /// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo. + /// html#method.host) documentation for more information. + /// + /// By default server address is set to a "127.0.0.1:8080" + pub fn server_address(mut self, addr: S) -> Self { + match addr.to_socket_addrs() { + Err(err) => error!("Can not convert to SocketAddr: {}", err), + Ok(mut addrs) => if let Some(addr) = addrs.next() { + self.addr = addr; + }, + } + self + } + + /// Finish worker settings configuration and create `WorkerSettings` object. + pub fn finish(self) -> WorkerSettings { + let settings = ServerSettings::new(self.addr, &self.host, self.secure); + let client_shutdown = if self.secure { self.client_shutdown } else { 0 }; + + WorkerSettings::new( + self.handler, + self.keep_alive, + self.client_timeout, + client_shutdown, + settings, + ) + } +} + struct Date { current: Instant, bytes: [u8; DATE_VALUE_LENGTH], @@ -366,6 +501,7 @@ mod tests { (), KeepAlive::Os, 0, + 0, ServerSettings::default(), ); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); diff --git a/tests/test_server.rs b/tests/test_server.rs index 66b96ecce..f8fabef6d 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1016,7 +1016,7 @@ fn test_server_cookies() { #[test] fn test_custom_pipeline() { use actix::System; - use actix_web::server::{HttpService, KeepAlive, ServerSettings, WorkerSettings}; + use actix_web::server::{HttpService, KeepAlive, WorkerSettings}; let addr = test::TestServer::unused_addr(); @@ -1026,12 +1026,13 @@ fn test_custom_pipeline() { let app = App::new() .route("/", http::Method::GET, |_: HttpRequest| "OK") .finish(); - let settings = WorkerSettings::new( - app, - KeepAlive::Disabled, - 10, - ServerSettings::new(addr, "localhost", false), - ); + let settings = WorkerSettings::build(app) + .keep_alive(KeepAlive::Disabled) + .client_timeout(1000) + .client_shutdown(1000) + .server_hostname("localhost") + .server_address(addr) + .finish(); HttpService::new(settings) }).unwrap()