From d57579d70067e675ba47c09d52ac3bab4aa18edf Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 27 Sep 2018 17:15:38 -0700 Subject: [PATCH] refactor acceptor pipeline add client timeout --- src/server/acceptor.rs | 315 +++++++++++++++++++++++++++++++++++++++++ src/server/builder.rs | 214 +++++++++++----------------- src/server/channel.rs | 2 +- src/server/http.rs | 51 ++++--- src/server/mod.rs | 4 + src/server/service.rs | 77 +++------- src/server/settings.rs | 31 ++-- 7 files changed, 474 insertions(+), 220 deletions(-) create mode 100644 src/server/acceptor.rs diff --git a/src/server/acceptor.rs b/src/server/acceptor.rs new file mode 100644 index 000000000..d78474160 --- /dev/null +++ b/src/server/acceptor.rs @@ -0,0 +1,315 @@ +use std::time::Duration; + +use actix_net::server::ServerMessage; +use actix_net::service::{NewService, Service}; +use futures::future::{err, ok, Either, FutureResult}; +use futures::{Async, Future, Poll}; +use tokio_reactor::Handle; +use tokio_tcp::TcpStream; +use tokio_timer::{sleep, Delay}; + +use super::handler::HttpHandler; +use super::settings::WorkerSettings; +use super::IoStream; + +/// This trait indicates types that can create acceptor service for http server. +pub trait AcceptorServiceFactory: Send + Clone + 'static { + type Io: IoStream + Send; + type NewService: NewService< + Request = TcpStream, + Response = Self::Io, + Error = (), + InitError = (), + >; + + fn create(&self) -> Self::NewService; +} + +impl AcceptorServiceFactory for F +where + F: Fn() -> T + Send + Clone + 'static, + T::Response: IoStream + Send, + T: NewService, +{ + type Io = T::Response; + type NewService = T; + + fn create(&self) -> T { + (self)() + } +} + +#[derive(Clone)] +/// Default acceptor service convert `TcpStream` to a `tokio_tcp::TcpStream` +pub(crate) struct DefaultAcceptor; + +impl AcceptorServiceFactory for DefaultAcceptor { + type Io = TcpStream; + type NewService = DefaultAcceptor; + + fn create(&self) -> Self::NewService { + DefaultAcceptor + } +} + +impl NewService for DefaultAcceptor { + type Request = TcpStream; + type Response = TcpStream; + type Error = (); + type InitError = (); + type Service = DefaultAcceptor; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(DefaultAcceptor) + } +} + +impl Service for DefaultAcceptor { + type Request = TcpStream; + type Response = TcpStream; + type Error = (); + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + ok(req) + } +} + +pub(crate) struct TcpAcceptor { + inner: T, + settings: WorkerSettings, +} + +impl TcpAcceptor +where + H: HttpHandler, + T: NewService, +{ + pub(crate) fn new(settings: WorkerSettings, inner: T) -> Self { + TcpAcceptor { inner, settings } + } +} + +impl NewService for TcpAcceptor +where + H: HttpHandler, + T: NewService, +{ + type Request = ServerMessage; + type Response = (); + type Error = (); + type InitError = (); + type Service = TcpAcceptorService; + type Future = TcpAcceptorResponse; + + fn new_service(&self) -> Self::Future { + TcpAcceptorResponse { + fut: self.inner.new_service(), + settings: self.settings.clone(), + } + } +} + +pub(crate) struct TcpAcceptorResponse +where + H: HttpHandler, + T: NewService, +{ + fut: T::Future, + settings: WorkerSettings, +} + +impl Future for TcpAcceptorResponse +where + H: HttpHandler, + T: NewService, +{ + type Item = TcpAcceptorService; + type Error = (); + + fn poll(&mut self) -> Poll { + match self.fut.poll() { + Err(_) => Err(()), + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(service)) => Ok(Async::Ready(TcpAcceptorService { + inner: service, + settings: self.settings.clone(), + })), + } + } +} + +pub(crate) struct TcpAcceptorService { + inner: T, + settings: WorkerSettings, +} + +impl Service for TcpAcceptorService +where + H: HttpHandler, + T: Service, +{ + type Request = ServerMessage; + type Response = (); + type Error = (); + type Future = Either, FutureResult<(), ()>>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready().map_err(|_| ()) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + match req { + ServerMessage::Connect(stream) => { + let stream = + TcpStream::from_std(stream, &Handle::default()).map_err(|e| { + error!("Can not convert to an async tcp stream: {}", e); + }); + + if let Ok(stream) = stream { + Either::A(TcpAcceptorServiceFut { + fut: self.inner.call(stream), + }) + } else { + Either::B(err(())) + } + } + ServerMessage::Shutdown(timeout) => Either::B(ok(())), + ServerMessage::ForceShutdown => { + // self.settings.head().traverse::(); + Either::B(ok(())) + } + } + } +} + +pub(crate) struct TcpAcceptorServiceFut { + fut: T, +} + +impl Future for TcpAcceptorServiceFut +where + T: Future, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.fut.poll() { + Err(_) => Err(()), + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => Ok(Async::Ready(())), + } + } +} + +/// Errors produced by `AcceptorTimeout` service. +#[derive(Debug)] +pub enum TimeoutError { + /// The inner service error + Service(T), + + /// The request did not complete within the specified timeout. + Timeout, +} + +/// Acceptor timeout middleware +/// +/// Applies timeout to request prcoessing. +pub(crate) struct AcceptorTimeout { + inner: T, + timeout: usize, +} + +impl AcceptorTimeout { + pub(crate) fn new(timeout: usize, inner: T) -> Self { + Self { inner, timeout } + } +} + +impl NewService for AcceptorTimeout { + type Request = T::Request; + type Response = T::Response; + type Error = TimeoutError; + type InitError = T::InitError; + type Service = AcceptorTimeoutService; + type Future = AcceptorTimeoutFut; + + fn new_service(&self) -> Self::Future { + AcceptorTimeoutFut { + fut: self.inner.new_service(), + timeout: self.timeout, + } + } +} + +#[doc(hidden)] +pub(crate) struct AcceptorTimeoutFut { + fut: T::Future, + timeout: usize, +} + +impl Future for AcceptorTimeoutFut { + type Item = AcceptorTimeoutService; + type Error = T::InitError; + + fn poll(&mut self) -> Poll { + let inner = try_ready!(self.fut.poll()); + Ok(Async::Ready(AcceptorTimeoutService { + inner, + timeout: self.timeout as u64, + })) + } +} + +/// Acceptor timeout service +/// +/// Applies timeout to request prcoessing. +pub(crate) struct AcceptorTimeoutService { + inner: T, + timeout: u64, +} + +impl Service for AcceptorTimeoutService { + type Request = T::Request; + type Response = T::Response; + type Error = TimeoutError; + type Future = AcceptorTimeoutResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready().map_err(TimeoutError::Service) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + AcceptorTimeoutResponse { + fut: self.inner.call(req), + sleep: sleep(Duration::from_millis(self.timeout)), + } + } +} + +pub(crate) struct AcceptorTimeoutResponse { + fut: T::Future, + sleep: Delay, +} +impl Future for AcceptorTimeoutResponse { + type Item = T::Response; + type Error = TimeoutError; + + fn poll(&mut self) -> Poll { + match self.fut.poll() { + Ok(Async::NotReady) => match self.sleep.poll() { + Err(_) => Err(TimeoutError::Timeout), + Ok(Async::Ready(_)) => Err(TimeoutError::Timeout), + Ok(Async::NotReady) => Ok(Async::NotReady), + }, + Ok(Async::Ready(resp)) => Ok(Async::Ready(resp)), + Err(err) => Err(TimeoutError::Service(err)), + } + } +} diff --git a/src/server/builder.rs b/src/server/builder.rs index ad4124445..98a2d5023 100644 --- a/src/server/builder.rs +++ b/src/server/builder.rs @@ -1,21 +1,24 @@ use std::marker::PhantomData; use std::net; +use actix_net::either::Either; use actix_net::server; -use actix_net::service::{NewService, NewServiceExt, Service}; -use futures::future::{ok, FutureResult}; -use futures::{Async, Poll}; -use tokio_tcp::TcpStream; +use actix_net::service::{NewService, NewServiceExt}; -use super::handler::IntoHttpHandler; +use super::acceptor::{AcceptorServiceFactory, AcceptorTimeout, TcpAcceptor}; +use super::handler::{HttpHandler, IntoHttpHandler}; use super::service::HttpService; +use super::settings::{ServerSettings, WorkerSettings}; use super::{IoStream, KeepAlive}; pub(crate) trait ServiceFactory where H: IntoHttpHandler, { - fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server; + fn register( + &self, server: server::Server, lst: net::TcpListener, host: Option, + addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, + ) -> server::Server; } pub struct HttpServiceBuilder @@ -29,11 +32,12 @@ where impl HttpServiceBuilder where - F: Fn() -> H + Send + Clone, + F: Fn() -> H + Send + Clone + 'static, H: IntoHttpHandler, A: AcceptorServiceFactory, - P: HttpPipelineFactory, + P: HttpPipelineFactory, { + /// Create http service builder pub fn new(factory: F, acceptor: A, pipeline: P) -> Self { Self { factory, @@ -42,6 +46,7 @@ where } } + /// Use different acceptor factory pub fn acceptor(self, acceptor: A1) -> HttpServiceBuilder where A1: AcceptorServiceFactory, @@ -53,9 +58,10 @@ where } } + /// Use different pipeline factory pub fn pipeline(self, pipeline: P1) -> HttpServiceBuilder where - P1: HttpPipelineFactory, + P1: HttpPipelineFactory, { HttpServiceBuilder { pipeline, @@ -64,18 +70,45 @@ where } } - fn finish(&self) -> impl server::StreamServiceFactory { + fn finish( + &self, host: Option, addr: net::SocketAddr, keep_alive: KeepAlive, + client_timeout: usize, + ) -> impl server::ServiceFactory { + let factory = self.factory.clone(); let pipeline = self.pipeline.clone(); let acceptor = self.acceptor.clone(); - move || acceptor.create().and_then(pipeline.create()) + move || { + let app = (factory)().into_handler(); + let settings = WorkerSettings::new( + app, + keep_alive, + client_timeout as u64, + ServerSettings::new(Some(addr), &host, false), + ); + + if client_timeout == 0 { + Either::A(TcpAcceptor::new( + settings.clone(), + acceptor.create().and_then(pipeline.create(settings)), + )) + } else { + Either::B(TcpAcceptor::new( + settings.clone(), + AcceptorTimeout::new(client_timeout, acceptor.create()) + .map_err(|_| ()) + .and_then(pipeline.create(settings)), + )) + } + } } } impl Clone for HttpServiceBuilder where F: Fn() -> H + Send + Clone, + H: IntoHttpHandler, A: AcceptorServiceFactory, - P: HttpPipelineFactory, + P: HttpPipelineFactory, { fn clone(&self) -> Self { HttpServiceBuilder { @@ -88,44 +121,24 @@ where impl ServiceFactory for HttpServiceBuilder where - F: Fn() -> H + Send + Clone, + F: Fn() -> H + Send + Clone + 'static, A: AcceptorServiceFactory, - P: HttpPipelineFactory, + P: HttpPipelineFactory, H: IntoHttpHandler, { - fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server { - server.listen("actix-web", lst, self.finish()) + fn register( + &self, server: server::Server, lst: net::TcpListener, host: Option, + addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, + ) -> server::Server { + server.listen2( + "actix-web", + lst, + self.finish(host, addr, keep_alive, client_timeout), + ) } } -/// This trait indicates types that can create acceptor service for http server. -pub trait AcceptorServiceFactory: Send + Clone + 'static { - type Io: IoStream + Send; - type NewService: NewService< - Request = TcpStream, - Response = Self::Io, - Error = (), - InitError = (), - >; - - fn create(&self) -> Self::NewService; -} - -impl AcceptorServiceFactory for F -where - F: Fn() -> T + Send + Clone + 'static, - T::Response: IoStream + Send, - T: NewService, -{ - type Io = T::Response; - type NewService = T; - - fn create(&self) -> T { - (self)() - } -} - -pub trait HttpPipelineFactory: Send + Clone + 'static { +pub trait HttpPipelineFactory: Send + Clone + 'static { type Io: IoStream; type NewService: NewService< Request = Self::Io, @@ -134,126 +147,59 @@ pub trait HttpPipelineFactory: Send + Clone + 'static { InitError = (), >; - fn create(&self) -> Self::NewService; + fn create(&self, settings: WorkerSettings) -> Self::NewService; } -impl HttpPipelineFactory for F +impl HttpPipelineFactory for F where - F: Fn() -> T + Send + Clone + 'static, + F: Fn(WorkerSettings) -> T + Send + Clone + 'static, T: NewService, T::Request: IoStream, + H: HttpHandler, { type Io = T::Request; type NewService = T; - fn create(&self) -> T { - (self)() + fn create(&self, settings: WorkerSettings) -> T { + (self)(settings) } } -pub(crate) struct DefaultPipelineFactory -where - F: Fn() -> H + Send + Clone, -{ - factory: F, - host: Option, - addr: net::SocketAddr, - keep_alive: KeepAlive, - _t: PhantomData, +pub(crate) struct DefaultPipelineFactory { + _t: PhantomData<(H, Io)>, } -impl DefaultPipelineFactory +unsafe impl Send for DefaultPipelineFactory {} + +impl DefaultPipelineFactory where Io: IoStream + Send, - F: Fn() -> H + Send + Clone + 'static, - H: IntoHttpHandler + 'static, + H: HttpHandler + 'static, { - pub fn new( - factory: F, host: Option, addr: net::SocketAddr, keep_alive: KeepAlive, - ) -> Self { - Self { - factory, - addr, - keep_alive, - host, - _t: PhantomData, - } + pub fn new() -> Self { + Self { _t: PhantomData } } } -impl Clone for DefaultPipelineFactory +impl Clone for DefaultPipelineFactory where Io: IoStream, - F: Fn() -> H + Send + Clone, - H: IntoHttpHandler, + H: HttpHandler, { fn clone(&self) -> Self { - Self { - factory: self.factory.clone(), - addr: self.addr, - keep_alive: self.keep_alive, - host: self.host.clone(), - _t: PhantomData, - } + Self { _t: PhantomData } } } -impl HttpPipelineFactory for DefaultPipelineFactory +impl HttpPipelineFactory for DefaultPipelineFactory where - Io: IoStream + Send, - F: Fn() -> H + Send + Clone + 'static, - H: IntoHttpHandler + 'static, + Io: IoStream, + H: HttpHandler + 'static, { type Io = Io; - type NewService = HttpService; + type NewService = HttpService; - fn create(&self) -> Self::NewService { - HttpService::new( - self.factory.clone(), - self.addr, - self.host.clone(), - self.keep_alive, - ) - } -} - -#[derive(Clone)] -/// Default acceptor service convert `TcpStream` to a `tokio_tcp::TcpStream` -pub(crate) struct DefaultAcceptor; - -impl AcceptorServiceFactory for DefaultAcceptor { - type Io = TcpStream; - type NewService = DefaultAcceptor; - - fn create(&self) -> Self::NewService { - DefaultAcceptor - } -} - -impl NewService for DefaultAcceptor { - type Request = TcpStream; - type Response = TcpStream; - type Error = (); - type InitError = (); - type Service = DefaultAcceptor; - type Future = FutureResult; - - fn new_service(&self) -> Self::Future { - ok(DefaultAcceptor) - } -} - -impl Service for DefaultAcceptor { - type Request = TcpStream; - type Response = TcpStream; - type Error = (); - type Future = FutureResult; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - ok(req) + fn create(&self, settings: WorkerSettings) -> Self::NewService { + HttpService::new(settings) } } diff --git a/src/server/channel.rs b/src/server/channel.rs index 6d0992bc9..c1e6b6b24 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -41,7 +41,7 @@ where pub(crate) fn new( settings: WorkerSettings, io: T, peer: Option, ) -> HttpChannel { - let ka_timeout = settings.keep_alive_timer(); + let ka_timeout = settings.client_timer(); HttpChannel { ka_timeout, diff --git a/src/server/http.rs b/src/server/http.rs index 3baf8a237..0fe14221e 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -18,8 +18,9 @@ use openssl::ssl::SslAcceptorBuilder; //#[cfg(feature = "rust-tls")] //use rustls::ServerConfig; -use super::builder::{AcceptorServiceFactory, HttpServiceBuilder, ServiceFactory}; -use super::builder::{DefaultAcceptor, DefaultPipelineFactory}; +use super::acceptor::{AcceptorServiceFactory, DefaultAcceptor}; +use super::builder::DefaultPipelineFactory; +use super::builder::{HttpServiceBuilder, ServiceFactory}; use super::{IntoHttpHandler, IoStream, KeepAlive}; struct Socket { @@ -50,6 +51,7 @@ where no_signals: bool, maxconn: usize, maxconnrate: usize, + client_timeout: usize, sockets: Vec>, } @@ -72,6 +74,7 @@ where no_signals: false, maxconn: 25_600, maxconnrate: 256, + client_timeout: 5000, sockets: Vec::new(), } } @@ -130,6 +133,20 @@ where self } + /// Set server client timneout in milliseconds for first request. + /// + /// Defines a timeout for reading client request header. If a client does not transmit + /// the entire set headers within this time, the request is terminated with + /// the 408 (Request Time-out) error. + /// + /// To disable timeout set value to 0. + /// + /// By default client timeout is set to 5000 milliseconds. + pub fn client_timeout(mut self, val: usize) -> Self { + self.client_timeout = val; + self + } + /// Set server host name. /// /// Host name is used by application router aa a hostname for url @@ -205,12 +222,7 @@ where handler: Box::new(HttpServiceBuilder::new( self.factory.clone(), DefaultAcceptor, - DefaultPipelineFactory::new( - self.factory.clone(), - self.host.clone(), - addr, - self.keep_alive, - ), + DefaultPipelineFactory::new(), )), }); @@ -237,12 +249,7 @@ where handler: Box::new(HttpServiceBuilder::new( self.factory.clone(), acceptor, - DefaultPipelineFactory::new( - self.factory.clone(), - self.host.clone(), - addr, - self.keep_alive, - ), + DefaultPipelineFactory::new(), )), }); @@ -347,12 +354,7 @@ where handler: Box::new(HttpServiceBuilder::new( self.factory.clone(), acceptor.clone(), - DefaultPipelineFactory::new( - self.factory.clone(), - self.host.clone(), - addr, - self.keep_alive, - ), + DefaultPipelineFactory::new(), )), }); } @@ -513,7 +515,14 @@ impl H + Send + Clone> HttpServer { let sockets = mem::replace(&mut self.sockets, Vec::new()); for socket in sockets { - srv = socket.handler.register(srv, socket.lst); + srv = socket.handler.register( + srv, + socket.lst, + self.host.clone(), + socket.addr, + self.keep_alive.clone(), + self.client_timeout, + ); } srv.start() } diff --git a/src/server/mod.rs b/src/server/mod.rs index ac4ffc9af..9e91eda08 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -117,6 +117,7 @@ use tokio_tcp::TcpStream; pub use actix_net::server::{PauseServer, ResumeServer, StopServer}; +pub(crate) mod acceptor; pub(crate) mod builder; mod channel; mod error; @@ -144,6 +145,9 @@ pub use self::ssl::*; #[doc(hidden)] pub use self::helpers::write_content_length; +#[doc(hidden)] +pub use self::builder::HttpServiceBuilder; + use body::Binary; use extensions::Extensions; use header::ContentEncoding; diff --git a/src/server/service.rs b/src/server/service.rs index 6f80cd6df..042c86ed4 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -1,75 +1,50 @@ use std::marker::PhantomData; -use std::net; -use std::time::Duration; use actix_net::service::{NewService, Service}; use futures::future::{ok, FutureResult}; use futures::{Async, Poll}; use super::channel::HttpChannel; -use super::handler::{HttpHandler, IntoHttpHandler}; -use super::settings::{ServerSettings, WorkerSettings}; -use super::{IoStream, KeepAlive}; +use super::handler::HttpHandler; +use super::settings::WorkerSettings; +use super::IoStream; -pub enum HttpServiceMessage { - /// New stream - Connect(T), - /// Gracefull shutdown - Shutdown(Duration), - /// Force shutdown - ForceShutdown, -} - -pub(crate) struct HttpService +pub(crate) struct HttpService where - F: Fn() -> H, - H: IntoHttpHandler, + H: HttpHandler, Io: IoStream, { - factory: F, - addr: net::SocketAddr, - host: Option, - keep_alive: KeepAlive, + settings: WorkerSettings, _t: PhantomData, } -impl HttpService +impl HttpService where - F: Fn() -> H, - H: IntoHttpHandler, + H: HttpHandler, Io: IoStream, { - pub fn new( - factory: F, addr: net::SocketAddr, host: Option, keep_alive: KeepAlive, - ) -> Self { + pub fn new(settings: WorkerSettings) -> Self { HttpService { - factory, - addr, - host, - keep_alive, + settings, _t: PhantomData, } } } -impl NewService for HttpService +impl NewService for HttpService where - F: Fn() -> H, - H: IntoHttpHandler, + H: HttpHandler, Io: IoStream, { type Request = Io; type Response = (); type Error = (); type InitError = (); - type Service = HttpServiceHandler; + type Service = HttpServiceHandler; type Future = FutureResult; fn new_service(&self) -> Self::Future { - let s = ServerSettings::new(Some(self.addr), &self.host, false); - let app = (self.factory)().into_handler(); - - ok(HttpServiceHandler::new(app, self.keep_alive, s)) + ok(HttpServiceHandler::new(self.settings.clone())) } } @@ -79,7 +54,7 @@ where Io: IoStream, { settings: WorkerSettings, - tcp_ka: Option, + // tcp_ka: Option, _t: PhantomData, } @@ -88,18 +63,14 @@ where H: HttpHandler, Io: IoStream, { - fn new( - app: H, keep_alive: KeepAlive, settings: ServerSettings, - ) -> HttpServiceHandler { - let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { - Some(Duration::new(val as u64, 0)) - } else { - None - }; - let settings = WorkerSettings::new(app, keep_alive, settings); + fn new(settings: WorkerSettings) -> HttpServiceHandler { + // let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { + // Some(Duration::new(val as u64, 0)) + // } else { + // None + // }; HttpServiceHandler { - tcp_ka, settings, _t: PhantomData, } @@ -124,10 +95,4 @@ where let _ = req.set_nodelay(true); HttpChannel::new(self.settings.clone(), req, None) } - - // fn shutdown(&self, force: bool) { - // if force { - // self.settings.head().traverse::(); - // } - // } } diff --git a/src/server/settings.rs b/src/server/settings.rs index 21ce27195..fe564c5b9 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -133,11 +133,12 @@ impl ServerSettings { // "Sun, 06 Nov 1994 08:49:37 GMT".len() const DATE_VALUE_LENGTH: usize = 29; -pub(crate) struct WorkerSettings(Rc>); +pub struct WorkerSettings(Rc>); struct Inner { handler: H, keep_alive: u64, + client_timeout: u64, ka_enabled: bool, bytes: Rc, messages: &'static RequestPool, @@ -153,7 +154,7 @@ impl Clone for WorkerSettings { impl WorkerSettings { pub(crate) fn new( - handler: H, keep_alive: KeepAlive, settings: ServerSettings, + handler: H, keep_alive: KeepAlive, client_timeout: u64, settings: ServerSettings, ) -> WorkerSettings { let (keep_alive, ka_enabled) = match keep_alive { KeepAlive::Timeout(val) => (val as u64, true), @@ -165,6 +166,7 @@ impl WorkerSettings { handler, keep_alive, ka_enabled, + client_timeout, bytes: Rc::new(SharedBytesPool::new()), messages: RequestPool::pool(settings), node: RefCell::new(Node::head()), @@ -172,14 +174,15 @@ impl WorkerSettings { })) } - pub fn head(&self) -> RefMut> { + pub(crate) fn head(&self) -> RefMut> { self.0.node.borrow_mut() } - pub fn handler(&self) -> &H { + pub(crate) fn handler(&self) -> &H { &self.0.handler } + #[inline] pub fn keep_alive_timer(&self) -> Option { let ka = self.0.keep_alive; if ka != 0 { @@ -189,23 +192,35 @@ impl WorkerSettings { } } + #[inline] pub fn keep_alive(&self) -> u64 { self.0.keep_alive } + #[inline] pub fn keep_alive_enabled(&self) -> bool { self.0.ka_enabled } - pub fn get_bytes(&self) -> BytesMut { + #[inline] + pub fn client_timer(&self) -> Option { + let delay = self.0.client_timeout; + if delay != 0 { + Some(Delay::new(Instant::now() + Duration::from_millis(delay))) + } else { + None + } + } + + pub(crate) fn get_bytes(&self) -> BytesMut { self.0.bytes.get_bytes() } - pub fn release_bytes(&self, bytes: BytesMut) { + pub(crate) fn release_bytes(&self, bytes: BytesMut) { self.0.bytes.release_bytes(bytes) } - pub fn get_request(&self) -> Request { + pub(crate) fn get_request(&self) -> Request { RequestPool::get(self.0.messages) } @@ -216,7 +231,7 @@ impl WorkerSettings { } impl WorkerSettings { - pub fn set_date(&self, dst: &mut BytesMut, full: bool) { + pub(crate) fn set_date(&self, dst: &mut BytesMut, full: bool) { // Unsafe: WorkerSetting is !Sync and !Send let date_bytes = unsafe { let date = &mut (*self.0.date.get());