1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

add client shutdown timeout

This commit is contained in:
Nikolay Kim 2018-10-01 20:04:16 -07:00
parent 91af3ca148
commit 16945a554a
9 changed files with 208 additions and 38 deletions

View File

@ -2,6 +2,13 @@
## [0.7.9] - 2018-09-x ## [0.7.9] - 2018-09-x
### Added
* Added client shutdown timeout setting
* Added slow request timeout setting
### Fixed ### Fixed
* HTTP1 decoding errors are reported to the client. #512 * HTTP1 decoding errors are reported to the client. #512

View File

@ -176,11 +176,11 @@ where
/// Applies timeout to request prcoessing. /// Applies timeout to request prcoessing.
pub(crate) struct AcceptorTimeout<T> { pub(crate) struct AcceptorTimeout<T> {
inner: T, inner: T,
timeout: usize, timeout: u64,
} }
impl<T: NewService> AcceptorTimeout<T> { impl<T: NewService> AcceptorTimeout<T> {
pub(crate) fn new(timeout: usize, inner: T) -> Self { pub(crate) fn new(timeout: u64, inner: T) -> Self {
Self { inner, timeout } Self { inner, timeout }
} }
} }
@ -204,7 +204,7 @@ impl<T: NewService> NewService for AcceptorTimeout<T> {
#[doc(hidden)] #[doc(hidden)]
pub(crate) struct AcceptorTimeoutFut<T: NewService> { pub(crate) struct AcceptorTimeoutFut<T: NewService> {
fut: T::Future, fut: T::Future,
timeout: usize, timeout: u64,
} }
impl<T: NewService> Future for AcceptorTimeoutFut<T> { impl<T: NewService> Future for AcceptorTimeoutFut<T> {
@ -215,7 +215,7 @@ impl<T: NewService> Future for AcceptorTimeoutFut<T> {
let inner = try_ready!(self.fut.poll()); let inner = try_ready!(self.fut.poll());
Ok(Async::Ready(AcceptorTimeoutService { Ok(Async::Ready(AcceptorTimeoutService {
inner, inner,
timeout: self.timeout as u64, timeout: self.timeout,
})) }))
} }
} }

View File

@ -16,12 +16,13 @@ use super::KeepAlive;
pub(crate) trait ServiceProvider { pub(crate) trait ServiceProvider {
fn register( fn register(
&self, server: Server, lst: net::TcpListener, host: String, &self, server: Server, lst: net::TcpListener, host: String,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64,
client_shutdown: u64,
) -> Server; ) -> Server;
} }
/// Utility type that builds complete http pipeline /// Utility type that builds complete http pipeline
pub struct HttpServiceBuilder<F, H, A> pub(crate) struct HttpServiceBuilder<F, H, A>
where where
F: Fn() -> H + Send + Clone, F: Fn() -> H + Send + Clone,
{ {
@ -51,22 +52,9 @@ where
self self
} }
/// Use different acceptor factory
pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<F, H, A1>
where
A1: AcceptorServiceFactory,
<A1::NewService as NewService>::InitError: fmt::Debug,
{
HttpServiceBuilder {
acceptor,
factory: self.factory.clone(),
no_client_timer: self.no_client_timer,
}
}
fn finish( fn finish(
&self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, &self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive,
client_timeout: usize, client_timeout: u64, client_shutdown: u64,
) -> impl ServiceFactory { ) -> impl ServiceFactory {
let timeout = if self.no_client_timer { let timeout = if self.no_client_timer {
0 0
@ -81,6 +69,7 @@ where
app, app,
keep_alive, keep_alive,
timeout as u64, timeout as u64,
client_shutdown,
ServerSettings::new(addr, &host, false), ServerSettings::new(addr, &host, false),
); );
@ -137,12 +126,13 @@ where
{ {
fn register( fn register(
&self, server: Server, lst: net::TcpListener, host: String, &self, server: Server, lst: net::TcpListener, host: String,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: u64,
client_shutdown: u64,
) -> Server { ) -> Server {
server.listen2( server.listen2(
"actix-web", "actix-web",
lst, lst,
self.finish(host, addr, keep_alive, client_timeout), self.finish(host, addr, keep_alive, client_timeout, client_shutdown),
) )
} }
} }

View File

@ -243,8 +243,15 @@ where
} else { } else {
trace!("Keep-alive timeout, close connection"); trace!("Keep-alive timeout, close connection");
self.flags.insert(Flags::SHUTDOWN); self.flags.insert(Flags::SHUTDOWN);
// TODO: start shutdown timer
return Ok(()); // start shutdown timer
if let Some(deadline) =
self.settings.client_shutdown_timer()
{
timer.reset(deadline)
} else {
return Ok(());
}
} }
} else if let Some(deadline) = self.settings.keep_alive_expire() } else if let Some(deadline) = self.settings.keep_alive_expire()
{ {
@ -548,6 +555,7 @@ mod tests {
App::new().into_handler(), App::new().into_handler(),
KeepAlive::Os, KeepAlive::Os,
5000, 5000,
2000,
ServerSettings::default(), ServerSettings::default(),
) )
} }

View File

@ -41,7 +41,8 @@ where
pub(super) factory: F, pub(super) factory: F,
pub(super) host: Option<String>, pub(super) host: Option<String>,
pub(super) keep_alive: KeepAlive, pub(super) keep_alive: KeepAlive,
pub(super) client_timeout: usize, pub(super) client_timeout: u64,
pub(super) client_shutdown: u64,
backlog: i32, backlog: i32,
threads: usize, threads: usize,
exit: bool, exit: bool,
@ -73,6 +74,7 @@ where
maxconn: 25_600, maxconn: 25_600,
maxconnrate: 256, maxconnrate: 256,
client_timeout: 5000, client_timeout: 5000,
client_shutdown: 5000,
sockets: Vec::new(), sockets: Vec::new(),
} }
} }
@ -140,11 +142,24 @@ where
/// To disable timeout set value to 0. /// To disable timeout set value to 0.
/// ///
/// By default client timeout is set to 5000 milliseconds. /// By default client timeout is set to 5000 milliseconds.
pub fn client_timeout(mut self, val: usize) -> Self { pub fn client_timeout(mut self, val: u64) -> Self {
self.client_timeout = val; self.client_timeout = val;
self self
} }
/// Set server connection shutdown timeout in milliseconds.
///
/// Defines a timeout for shutdown connection. If a shutdown procedure does not complete
/// within this time, the request is dropped.
///
/// To disable timeout set value to 0.
///
/// By default client timeout is set to 5000 milliseconds.
pub fn client_shutdown(mut self, val: u64) -> Self {
self.client_shutdown = val;
self
}
/// Set server host name. /// Set server host name.
/// ///
/// Host name is used by application router aa a hostname for url /// Host name is used by application router aa a hostname for url
@ -480,6 +495,11 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
.as_ref() .as_ref()
.map(|h| h.to_owned()) .map(|h| h.to_owned())
.unwrap_or_else(|| format!("{}", socket.addr)); .unwrap_or_else(|| format!("{}", socket.addr));
let client_shutdown = if socket.scheme == "https" {
self.client_shutdown
} else {
0
};
srv = socket.handler.register( srv = socket.handler.register(
srv, srv,
socket.lst, socket.lst,
@ -487,6 +507,7 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
socket.addr, socket.addr,
self.keep_alive, self.keep_alive,
self.client_timeout, self.client_timeout,
client_shutdown,
); );
} }
srv.start() srv.start()
@ -526,6 +547,11 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
.as_ref() .as_ref()
.map(|h| h.to_owned()) .map(|h| h.to_owned())
.unwrap_or_else(|| format!("{}", socket.addr)); .unwrap_or_else(|| format!("{}", socket.addr));
let client_shutdown = if socket.scheme == "https" {
self.client_shutdown
} else {
0
};
srv = socket.handler.register( srv = socket.handler.register(
srv, srv,
socket.lst, socket.lst,
@ -533,6 +559,7 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
socket.addr, socket.addr,
self.keep_alive, self.keep_alive,
self.client_timeout, self.client_timeout,
client_shutdown,
); );
} }
srv srv

View File

@ -35,7 +35,8 @@ where
let settings = WorkerSettings::new( let settings = WorkerSettings::new(
apps, apps,
self.keep_alive, self.keep_alive,
self.client_timeout as u64, self.client_timeout,
self.client_shutdown,
ServerSettings::new(addr, "127.0.0.1:8080", secure), ServerSettings::new(addr, "127.0.0.1:8080", secure),
); );

View File

@ -144,7 +144,7 @@ pub use self::ssl::*;
pub use self::error::{AcceptorError, HttpDispatchError}; pub use self::error::{AcceptorError, HttpDispatchError};
pub use self::service::HttpService; pub use self::service::HttpService;
pub use self::settings::{ServerSettings, WorkerSettings}; pub use self::settings::{ServerSettings, WorkerSettings, WorkerSettingsBuilder};
#[doc(hidden)] #[doc(hidden)]
pub use self::helpers::write_content_length; pub use self::helpers::write_content_length;

View File

@ -76,7 +76,9 @@ impl Default for ServerSettings {
impl ServerSettings { impl ServerSettings {
/// Crate server settings instance /// Crate server settings instance
pub fn new(addr: net::SocketAddr, host: &str, secure: bool) -> ServerSettings { pub(crate) fn new(
addr: net::SocketAddr, host: &str, secure: bool,
) -> ServerSettings {
let host = host.to_owned(); let host = host.to_owned();
let cpu_pool = LazyCell::new(); let cpu_pool = LazyCell::new();
let responses = HttpResponsePool::get_pool(); let responses = HttpResponsePool::get_pool();
@ -131,6 +133,7 @@ struct Inner<H> {
handler: H, handler: H,
keep_alive: Option<Duration>, keep_alive: Option<Duration>,
client_timeout: u64, client_timeout: u64,
client_shutdown: u64,
ka_enabled: bool, ka_enabled: bool,
bytes: Rc<SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: &'static RequestPool, messages: &'static RequestPool,
@ -146,8 +149,9 @@ impl<H> Clone for WorkerSettings<H> {
impl<H> WorkerSettings<H> { impl<H> WorkerSettings<H> {
/// Create instance of `WorkerSettings` /// Create instance of `WorkerSettings`
pub fn new( pub(crate) fn new(
handler: H, keep_alive: KeepAlive, client_timeout: u64, settings: ServerSettings, handler: H, keep_alive: KeepAlive, client_timeout: u64, client_shutdown: u64,
settings: ServerSettings,
) -> WorkerSettings<H> { ) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive { let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true), KeepAlive::Timeout(val) => (val as u64, true),
@ -165,6 +169,7 @@ impl<H> WorkerSettings<H> {
keep_alive, keep_alive,
ka_enabled, ka_enabled,
client_timeout, client_timeout,
client_shutdown,
bytes: Rc::new(SharedBytesPool::new()), bytes: Rc::new(SharedBytesPool::new()),
messages: RequestPool::pool(settings), messages: RequestPool::pool(settings),
node: RefCell::new(Node::head()), node: RefCell::new(Node::head()),
@ -172,6 +177,11 @@ impl<H> WorkerSettings<H> {
})) }))
} }
/// Create worker settings builder.
pub fn build(handler: H) -> WorkerSettingsBuilder<H> {
WorkerSettingsBuilder::new(handler)
}
pub(crate) fn head(&self) -> RefMut<Node<()>> { pub(crate) fn head(&self) -> RefMut<Node<()>> {
self.0.node.borrow_mut() self.0.node.borrow_mut()
} }
@ -222,6 +232,16 @@ impl<H: 'static> WorkerSettings<H> {
} }
} }
/// Client shutdown timer
pub fn client_shutdown_timer(&self) -> Option<Instant> {
let delay = self.0.client_shutdown;
if delay != 0 {
Some(self.now() + Duration::from_millis(delay))
} else {
None
}
}
#[inline] #[inline]
/// Return keep-alive timer delay is configured. /// Return keep-alive timer delay is configured.
pub fn keep_alive_timer(&self) -> Option<Delay> { pub fn keep_alive_timer(&self) -> Option<Delay> {
@ -289,6 +309,121 @@ impl<H: 'static> WorkerSettings<H> {
} }
} }
/// An worker settings builder
///
/// This type can be used to construct an instance of `WorkerSettings` through a
/// builder-like pattern.
pub struct WorkerSettingsBuilder<H> {
handler: H,
keep_alive: KeepAlive,
client_timeout: u64,
client_shutdown: u64,
host: String,
addr: net::SocketAddr,
secure: bool,
}
impl<H> WorkerSettingsBuilder<H> {
/// Create instance of `WorkerSettingsBuilder`
pub fn new(handler: H) -> WorkerSettingsBuilder<H> {
WorkerSettingsBuilder {
handler,
keep_alive: KeepAlive::Timeout(5),
client_timeout: 5000,
client_shutdown: 5000,
secure: false,
host: "localhost".to_owned(),
addr: "127.0.0.1:8080".parse().unwrap(),
}
}
/// Enable secure flag for current server.
///
/// By default this flag is set to false.
pub fn secure(mut self) -> Self {
self.secure = true;
self
}
/// Set server keep-alive setting.
///
/// By default keep alive is set to a 5 seconds.
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
self.keep_alive = val.into();
self
}
/// Set server client timeout in milliseconds for first request.
///
/// Defines a timeout for reading client request header. If a client does not transmit
/// the entire set headers within this time, the request is terminated with
/// the 408 (Request Time-out) error.
///
/// To disable timeout set value to 0.
///
/// By default client timeout is set to 5000 milliseconds.
pub fn client_timeout(mut self, val: u64) -> Self {
self.client_timeout = val;
self
}
/// Set server connection shutdown timeout in milliseconds.
///
/// Defines a timeout for shutdown connection. If a shutdown procedure does not complete
/// within this time, the request is dropped. This timeout affects only secure connections.
///
/// To disable timeout set value to 0.
///
/// By default client timeout is set to 5000 milliseconds.
pub fn client_shutdown(mut self, val: u64) -> Self {
self.client_shutdown = val;
self
}
/// Set server host name.
///
/// Host name is used by application router aa a hostname for url
/// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo.
/// html#method.host) documentation for more information.
///
/// By default host name is set to a "localhost" value.
pub fn server_hostname(mut self, val: &str) -> Self {
self.host = val.to_owned();
self
}
/// Set server ip address.
///
/// Host name is used by application router aa a hostname for url
/// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo.
/// html#method.host) documentation for more information.
///
/// By default server address is set to a "127.0.0.1:8080"
pub fn server_address<S: net::ToSocketAddrs>(mut self, addr: S) -> Self {
match addr.to_socket_addrs() {
Err(err) => error!("Can not convert to SocketAddr: {}", err),
Ok(mut addrs) => if let Some(addr) = addrs.next() {
self.addr = addr;
},
}
self
}
/// Finish worker settings configuration and create `WorkerSettings` object.
pub fn finish(self) -> WorkerSettings<H> {
let settings = ServerSettings::new(self.addr, &self.host, self.secure);
let client_shutdown = if self.secure { self.client_shutdown } else { 0 };
WorkerSettings::new(
self.handler,
self.keep_alive,
self.client_timeout,
client_shutdown,
settings,
)
}
}
struct Date { struct Date {
current: Instant, current: Instant,
bytes: [u8; DATE_VALUE_LENGTH], bytes: [u8; DATE_VALUE_LENGTH],
@ -366,6 +501,7 @@ mod tests {
(), (),
KeepAlive::Os, KeepAlive::Os,
0, 0,
0,
ServerSettings::default(), ServerSettings::default(),
); );
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);

View File

@ -1016,7 +1016,7 @@ fn test_server_cookies() {
#[test] #[test]
fn test_custom_pipeline() { fn test_custom_pipeline() {
use actix::System; use actix::System;
use actix_web::server::{HttpService, KeepAlive, ServerSettings, WorkerSettings}; use actix_web::server::{HttpService, KeepAlive, WorkerSettings};
let addr = test::TestServer::unused_addr(); let addr = test::TestServer::unused_addr();
@ -1026,12 +1026,13 @@ fn test_custom_pipeline() {
let app = App::new() let app = App::new()
.route("/", http::Method::GET, |_: HttpRequest| "OK") .route("/", http::Method::GET, |_: HttpRequest| "OK")
.finish(); .finish();
let settings = WorkerSettings::new( let settings = WorkerSettings::build(app)
app, .keep_alive(KeepAlive::Disabled)
KeepAlive::Disabled, .client_timeout(1000)
10, .client_shutdown(1000)
ServerSettings::new(addr, "localhost", false), .server_hostname("localhost")
); .server_address(addr)
.finish();
HttpService::new(settings) HttpService::new(settings)
}).unwrap() }).unwrap()