1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

refactor acceptor pipeline add client timeout

This commit is contained in:
Nikolay Kim 2018-09-27 17:15:38 -07:00
parent b6a1cfa6ad
commit d57579d700
7 changed files with 474 additions and 220 deletions

315
src/server/acceptor.rs Normal file
View File

@ -0,0 +1,315 @@
use std::time::Duration;
use actix_net::server::ServerMessage;
use actix_net::service::{NewService, Service};
use futures::future::{err, ok, Either, FutureResult};
use futures::{Async, Future, Poll};
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
use tokio_timer::{sleep, Delay};
use super::handler::HttpHandler;
use super::settings::WorkerSettings;
use super::IoStream;
/// This trait indicates types that can create acceptor service for http server.
pub trait AcceptorServiceFactory: Send + Clone + 'static {
type Io: IoStream + Send;
type NewService: NewService<
Request = TcpStream,
Response = Self::Io,
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> AcceptorServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T::Response: IoStream + Send,
T: NewService<Request = TcpStream, Error = (), InitError = ()>,
{
type Io = T::Response;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
#[derive(Clone)]
/// Default acceptor service convert `TcpStream` to a `tokio_tcp::TcpStream`
pub(crate) struct DefaultAcceptor;
impl AcceptorServiceFactory for DefaultAcceptor {
type Io = TcpStream;
type NewService = DefaultAcceptor;
fn create(&self) -> Self::NewService {
DefaultAcceptor
}
}
impl NewService for DefaultAcceptor {
type Request = TcpStream;
type Response = TcpStream;
type Error = ();
type InitError = ();
type Service = DefaultAcceptor;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
ok(DefaultAcceptor)
}
}
impl Service for DefaultAcceptor {
type Request = TcpStream;
type Response = TcpStream;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
ok(req)
}
}
pub(crate) struct TcpAcceptor<T, H: HttpHandler> {
inner: T,
settings: WorkerSettings<H>,
}
impl<T, H> TcpAcceptor<T, H>
where
H: HttpHandler,
T: NewService<Request = TcpStream>,
{
pub(crate) fn new(settings: WorkerSettings<H>, inner: T) -> Self {
TcpAcceptor { inner, settings }
}
}
impl<T, H> NewService for TcpAcceptor<T, H>
where
H: HttpHandler,
T: NewService<Request = TcpStream>,
{
type Request = ServerMessage;
type Response = ();
type Error = ();
type InitError = ();
type Service = TcpAcceptorService<T::Service, H>;
type Future = TcpAcceptorResponse<T, H>;
fn new_service(&self) -> Self::Future {
TcpAcceptorResponse {
fut: self.inner.new_service(),
settings: self.settings.clone(),
}
}
}
pub(crate) struct TcpAcceptorResponse<T, H>
where
H: HttpHandler,
T: NewService<Request = TcpStream>,
{
fut: T::Future,
settings: WorkerSettings<H>,
}
impl<T, H> Future for TcpAcceptorResponse<T, H>
where
H: HttpHandler,
T: NewService<Request = TcpStream>,
{
type Item = TcpAcceptorService<T::Service, H>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Err(_) => Err(()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(service)) => Ok(Async::Ready(TcpAcceptorService {
inner: service,
settings: self.settings.clone(),
})),
}
}
}
pub(crate) struct TcpAcceptorService<T, H: HttpHandler> {
inner: T,
settings: WorkerSettings<H>,
}
impl<T, H> Service for TcpAcceptorService<T, H>
where
H: HttpHandler,
T: Service<Request = TcpStream>,
{
type Request = ServerMessage;
type Response = ();
type Error = ();
type Future = Either<TcpAcceptorServiceFut<T::Future>, FutureResult<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready().map_err(|_| ())
}
fn call(&mut self, req: Self::Request) -> Self::Future {
match req {
ServerMessage::Connect(stream) => {
let stream =
TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e);
});
if let Ok(stream) = stream {
Either::A(TcpAcceptorServiceFut {
fut: self.inner.call(stream),
})
} else {
Either::B(err(()))
}
}
ServerMessage::Shutdown(timeout) => Either::B(ok(())),
ServerMessage::ForceShutdown => {
// self.settings.head().traverse::<TcpStream, H>();
Either::B(ok(()))
}
}
}
}
pub(crate) struct TcpAcceptorServiceFut<T: Future> {
fut: T,
}
impl<T> Future for TcpAcceptorServiceFut<T>
where
T: Future,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Err(_) => Err(()),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
}
}
}
/// Errors produced by `AcceptorTimeout` service.
#[derive(Debug)]
pub enum TimeoutError<T> {
/// The inner service error
Service(T),
/// The request did not complete within the specified timeout.
Timeout,
}
/// Acceptor timeout middleware
///
/// Applies timeout to request prcoessing.
pub(crate) struct AcceptorTimeout<T> {
inner: T,
timeout: usize,
}
impl<T: NewService> AcceptorTimeout<T> {
pub(crate) fn new(timeout: usize, inner: T) -> Self {
Self { inner, timeout }
}
}
impl<T: NewService> NewService for AcceptorTimeout<T> {
type Request = T::Request;
type Response = T::Response;
type Error = TimeoutError<T::Error>;
type InitError = T::InitError;
type Service = AcceptorTimeoutService<T::Service>;
type Future = AcceptorTimeoutFut<T>;
fn new_service(&self) -> Self::Future {
AcceptorTimeoutFut {
fut: self.inner.new_service(),
timeout: self.timeout,
}
}
}
#[doc(hidden)]
pub(crate) struct AcceptorTimeoutFut<T: NewService> {
fut: T::Future,
timeout: usize,
}
impl<T: NewService> Future for AcceptorTimeoutFut<T> {
type Item = AcceptorTimeoutService<T::Service>;
type Error = T::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = try_ready!(self.fut.poll());
Ok(Async::Ready(AcceptorTimeoutService {
inner,
timeout: self.timeout as u64,
}))
}
}
/// Acceptor timeout service
///
/// Applies timeout to request prcoessing.
pub(crate) struct AcceptorTimeoutService<T> {
inner: T,
timeout: u64,
}
impl<T: Service> Service for AcceptorTimeoutService<T> {
type Request = T::Request;
type Response = T::Response;
type Error = TimeoutError<T::Error>;
type Future = AcceptorTimeoutResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready().map_err(TimeoutError::Service)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
AcceptorTimeoutResponse {
fut: self.inner.call(req),
sleep: sleep(Duration::from_millis(self.timeout)),
}
}
}
pub(crate) struct AcceptorTimeoutResponse<T: Service> {
fut: T::Future,
sleep: Delay,
}
impl<T: Service> Future for AcceptorTimeoutResponse<T> {
type Item = T::Response;
type Error = TimeoutError<T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Ok(Async::NotReady) => match self.sleep.poll() {
Err(_) => Err(TimeoutError::Timeout),
Ok(Async::Ready(_)) => Err(TimeoutError::Timeout),
Ok(Async::NotReady) => Ok(Async::NotReady),
},
Ok(Async::Ready(resp)) => Ok(Async::Ready(resp)),
Err(err) => Err(TimeoutError::Service(err)),
}
}
}

View File

@ -1,21 +1,24 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net; use std::net;
use actix_net::either::Either;
use actix_net::server; use actix_net::server;
use actix_net::service::{NewService, NewServiceExt, Service}; use actix_net::service::{NewService, NewServiceExt};
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use tokio_tcp::TcpStream;
use super::handler::IntoHttpHandler; use super::acceptor::{AcceptorServiceFactory, AcceptorTimeout, TcpAcceptor};
use super::handler::{HttpHandler, IntoHttpHandler};
use super::service::HttpService; use super::service::HttpService;
use super::settings::{ServerSettings, WorkerSettings};
use super::{IoStream, KeepAlive}; use super::{IoStream, KeepAlive};
pub(crate) trait ServiceFactory<H> pub(crate) trait ServiceFactory<H>
where where
H: IntoHttpHandler, H: IntoHttpHandler,
{ {
fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server; fn register(
&self, server: server::Server, lst: net::TcpListener, host: Option<String>,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize,
) -> server::Server;
} }
pub struct HttpServiceBuilder<F, H, A, P> pub struct HttpServiceBuilder<F, H, A, P>
@ -29,11 +32,12 @@ where
impl<F, H, A, P> HttpServiceBuilder<F, H, A, P> impl<F, H, A, P> HttpServiceBuilder<F, H, A, P>
where where
F: Fn() -> H + Send + Clone, F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler, H: IntoHttpHandler,
A: AcceptorServiceFactory, A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>, P: HttpPipelineFactory<H::Handler, Io = A::Io>,
{ {
/// Create http service builder
pub fn new(factory: F, acceptor: A, pipeline: P) -> Self { pub fn new(factory: F, acceptor: A, pipeline: P) -> Self {
Self { Self {
factory, factory,
@ -42,6 +46,7 @@ where
} }
} }
/// Use different acceptor factory
pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<F, H, A1, P> pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<F, H, A1, P>
where where
A1: AcceptorServiceFactory, A1: AcceptorServiceFactory,
@ -53,9 +58,10 @@ where
} }
} }
/// Use different pipeline factory
pub fn pipeline<P1>(self, pipeline: P1) -> HttpServiceBuilder<F, H, A, P1> pub fn pipeline<P1>(self, pipeline: P1) -> HttpServiceBuilder<F, H, A, P1>
where where
P1: HttpPipelineFactory, P1: HttpPipelineFactory<H::Handler>,
{ {
HttpServiceBuilder { HttpServiceBuilder {
pipeline, pipeline,
@ -64,18 +70,45 @@ where
} }
} }
fn finish(&self) -> impl server::StreamServiceFactory { fn finish(
&self, host: Option<String>, addr: net::SocketAddr, keep_alive: KeepAlive,
client_timeout: usize,
) -> impl server::ServiceFactory {
let factory = self.factory.clone();
let pipeline = self.pipeline.clone(); let pipeline = self.pipeline.clone();
let acceptor = self.acceptor.clone(); let acceptor = self.acceptor.clone();
move || acceptor.create().and_then(pipeline.create()) move || {
let app = (factory)().into_handler();
let settings = WorkerSettings::new(
app,
keep_alive,
client_timeout as u64,
ServerSettings::new(Some(addr), &host, false),
);
if client_timeout == 0 {
Either::A(TcpAcceptor::new(
settings.clone(),
acceptor.create().and_then(pipeline.create(settings)),
))
} else {
Either::B(TcpAcceptor::new(
settings.clone(),
AcceptorTimeout::new(client_timeout, acceptor.create())
.map_err(|_| ())
.and_then(pipeline.create(settings)),
))
}
}
} }
} }
impl<F, H, A, P> Clone for HttpServiceBuilder<F, H, A, P> impl<F, H, A, P> Clone for HttpServiceBuilder<F, H, A, P>
where where
F: Fn() -> H + Send + Clone, F: Fn() -> H + Send + Clone,
H: IntoHttpHandler,
A: AcceptorServiceFactory, A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>, P: HttpPipelineFactory<H::Handler, Io = A::Io>,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
HttpServiceBuilder { HttpServiceBuilder {
@ -88,44 +121,24 @@ where
impl<F, H, A, P> ServiceFactory<H> for HttpServiceBuilder<F, H, A, P> impl<F, H, A, P> ServiceFactory<H> for HttpServiceBuilder<F, H, A, P>
where where
F: Fn() -> H + Send + Clone, F: Fn() -> H + Send + Clone + 'static,
A: AcceptorServiceFactory, A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>, P: HttpPipelineFactory<H::Handler, Io = A::Io>,
H: IntoHttpHandler, H: IntoHttpHandler,
{ {
fn register(&self, server: server::Server, lst: net::TcpListener) -> server::Server { fn register(
server.listen("actix-web", lst, self.finish()) &self, server: server::Server, lst: net::TcpListener, host: Option<String>,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize,
) -> server::Server {
server.listen2(
"actix-web",
lst,
self.finish(host, addr, keep_alive, client_timeout),
)
} }
} }
/// This trait indicates types that can create acceptor service for http server. pub trait HttpPipelineFactory<H: HttpHandler>: Send + Clone + 'static {
pub trait AcceptorServiceFactory: Send + Clone + 'static {
type Io: IoStream + Send;
type NewService: NewService<
Request = TcpStream,
Response = Self::Io,
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> AcceptorServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T::Response: IoStream + Send,
T: NewService<Request = TcpStream, Error = (), InitError = ()>,
{
type Io = T::Response;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
pub trait HttpPipelineFactory: Send + Clone + 'static {
type Io: IoStream; type Io: IoStream;
type NewService: NewService< type NewService: NewService<
Request = Self::Io, Request = Self::Io,
@ -134,126 +147,59 @@ pub trait HttpPipelineFactory: Send + Clone + 'static {
InitError = (), InitError = (),
>; >;
fn create(&self) -> Self::NewService; fn create(&self, settings: WorkerSettings<H>) -> Self::NewService;
} }
impl<F, T> HttpPipelineFactory for F impl<F, T, H> HttpPipelineFactory<H> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn(WorkerSettings<H>) -> T + Send + Clone + 'static,
T: NewService<Response = (), Error = (), InitError = ()>, T: NewService<Response = (), Error = (), InitError = ()>,
T::Request: IoStream, T::Request: IoStream,
H: HttpHandler,
{ {
type Io = T::Request; type Io = T::Request;
type NewService = T; type NewService = T;
fn create(&self) -> T { fn create(&self, settings: WorkerSettings<H>) -> T {
(self)() (self)(settings)
} }
} }
pub(crate) struct DefaultPipelineFactory<F, H, Io> pub(crate) struct DefaultPipelineFactory<H, Io> {
where _t: PhantomData<(H, Io)>,
F: Fn() -> H + Send + Clone,
{
factory: F,
host: Option<String>,
addr: net::SocketAddr,
keep_alive: KeepAlive,
_t: PhantomData<Io>,
} }
impl<F, H, Io> DefaultPipelineFactory<F, H, Io> unsafe impl<H, Io> Send for DefaultPipelineFactory<H, Io> {}
impl<H, Io> DefaultPipelineFactory<H, Io>
where where
Io: IoStream + Send, Io: IoStream + Send,
F: Fn() -> H + Send + Clone + 'static, H: HttpHandler + 'static,
H: IntoHttpHandler + 'static,
{ {
pub fn new( pub fn new() -> Self {
factory: F, host: Option<String>, addr: net::SocketAddr, keep_alive: KeepAlive, Self { _t: PhantomData }
) -> Self {
Self {
factory,
addr,
keep_alive,
host,
_t: PhantomData,
}
} }
} }
impl<F, H, Io> Clone for DefaultPipelineFactory<F, H, Io> impl<H, Io> Clone for DefaultPipelineFactory<H, Io>
where where
Io: IoStream, Io: IoStream,
F: Fn() -> H + Send + Clone, H: HttpHandler,
H: IntoHttpHandler,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self { _t: PhantomData }
factory: self.factory.clone(),
addr: self.addr,
keep_alive: self.keep_alive,
host: self.host.clone(),
_t: PhantomData,
}
} }
} }
impl<F, H, Io> HttpPipelineFactory for DefaultPipelineFactory<F, H, Io> impl<H, Io> HttpPipelineFactory<H> for DefaultPipelineFactory<H, Io>
where where
Io: IoStream + Send, Io: IoStream,
F: Fn() -> H + Send + Clone + 'static, H: HttpHandler + 'static,
H: IntoHttpHandler + 'static,
{ {
type Io = Io; type Io = Io;
type NewService = HttpService<F, H, Io>; type NewService = HttpService<H, Io>;
fn create(&self) -> Self::NewService { fn create(&self, settings: WorkerSettings<H>) -> Self::NewService {
HttpService::new( HttpService::new(settings)
self.factory.clone(),
self.addr,
self.host.clone(),
self.keep_alive,
)
}
}
#[derive(Clone)]
/// Default acceptor service convert `TcpStream` to a `tokio_tcp::TcpStream`
pub(crate) struct DefaultAcceptor;
impl AcceptorServiceFactory for DefaultAcceptor {
type Io = TcpStream;
type NewService = DefaultAcceptor;
fn create(&self) -> Self::NewService {
DefaultAcceptor
}
}
impl NewService for DefaultAcceptor {
type Request = TcpStream;
type Response = TcpStream;
type Error = ();
type InitError = ();
type Service = DefaultAcceptor;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
ok(DefaultAcceptor)
}
}
impl Service for DefaultAcceptor {
type Request = TcpStream;
type Response = TcpStream;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
ok(req)
} }
} }

View File

@ -41,7 +41,7 @@ where
pub(crate) fn new( pub(crate) fn new(
settings: WorkerSettings<H>, io: T, peer: Option<SocketAddr>, settings: WorkerSettings<H>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> { ) -> HttpChannel<T, H> {
let ka_timeout = settings.keep_alive_timer(); let ka_timeout = settings.client_timer();
HttpChannel { HttpChannel {
ka_timeout, ka_timeout,

View File

@ -18,8 +18,9 @@ use openssl::ssl::SslAcceptorBuilder;
//#[cfg(feature = "rust-tls")] //#[cfg(feature = "rust-tls")]
//use rustls::ServerConfig; //use rustls::ServerConfig;
use super::builder::{AcceptorServiceFactory, HttpServiceBuilder, ServiceFactory}; use super::acceptor::{AcceptorServiceFactory, DefaultAcceptor};
use super::builder::{DefaultAcceptor, DefaultPipelineFactory}; use super::builder::DefaultPipelineFactory;
use super::builder::{HttpServiceBuilder, ServiceFactory};
use super::{IntoHttpHandler, IoStream, KeepAlive}; use super::{IntoHttpHandler, IoStream, KeepAlive};
struct Socket<H: IntoHttpHandler> { struct Socket<H: IntoHttpHandler> {
@ -50,6 +51,7 @@ where
no_signals: bool, no_signals: bool,
maxconn: usize, maxconn: usize,
maxconnrate: usize, maxconnrate: usize,
client_timeout: usize,
sockets: Vec<Socket<H>>, sockets: Vec<Socket<H>>,
} }
@ -72,6 +74,7 @@ where
no_signals: false, no_signals: false,
maxconn: 25_600, maxconn: 25_600,
maxconnrate: 256, maxconnrate: 256,
client_timeout: 5000,
sockets: Vec::new(), sockets: Vec::new(),
} }
} }
@ -130,6 +133,20 @@ where
self self
} }
/// Set server client timneout in milliseconds for first request.
///
/// Defines a timeout for reading client request header. If a client does not transmit
/// the entire set headers within this time, the request is terminated with
/// the 408 (Request Time-out) error.
///
/// To disable timeout set value to 0.
///
/// By default client timeout is set to 5000 milliseconds.
pub fn client_timeout(mut self, val: usize) -> Self {
self.client_timeout = val;
self
}
/// Set server host name. /// Set server host name.
/// ///
/// Host name is used by application router aa a hostname for url /// Host name is used by application router aa a hostname for url
@ -205,12 +222,7 @@ where
handler: Box::new(HttpServiceBuilder::new( handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(), self.factory.clone(),
DefaultAcceptor, DefaultAcceptor,
DefaultPipelineFactory::new( DefaultPipelineFactory::new(),
self.factory.clone(),
self.host.clone(),
addr,
self.keep_alive,
),
)), )),
}); });
@ -237,12 +249,7 @@ where
handler: Box::new(HttpServiceBuilder::new( handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(), self.factory.clone(),
acceptor, acceptor,
DefaultPipelineFactory::new( DefaultPipelineFactory::new(),
self.factory.clone(),
self.host.clone(),
addr,
self.keep_alive,
),
)), )),
}); });
@ -347,12 +354,7 @@ where
handler: Box::new(HttpServiceBuilder::new( handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(), self.factory.clone(),
acceptor.clone(), acceptor.clone(),
DefaultPipelineFactory::new( DefaultPipelineFactory::new(),
self.factory.clone(),
self.host.clone(),
addr,
self.keep_alive,
),
)), )),
}); });
} }
@ -513,7 +515,14 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
let sockets = mem::replace(&mut self.sockets, Vec::new()); let sockets = mem::replace(&mut self.sockets, Vec::new());
for socket in sockets { for socket in sockets {
srv = socket.handler.register(srv, socket.lst); srv = socket.handler.register(
srv,
socket.lst,
self.host.clone(),
socket.addr,
self.keep_alive.clone(),
self.client_timeout,
);
} }
srv.start() srv.start()
} }

View File

@ -117,6 +117,7 @@ use tokio_tcp::TcpStream;
pub use actix_net::server::{PauseServer, ResumeServer, StopServer}; pub use actix_net::server::{PauseServer, ResumeServer, StopServer};
pub(crate) mod acceptor;
pub(crate) mod builder; pub(crate) mod builder;
mod channel; mod channel;
mod error; mod error;
@ -144,6 +145,9 @@ pub use self::ssl::*;
#[doc(hidden)] #[doc(hidden)]
pub use self::helpers::write_content_length; pub use self::helpers::write_content_length;
#[doc(hidden)]
pub use self::builder::HttpServiceBuilder;
use body::Binary; use body::Binary;
use extensions::Extensions; use extensions::Extensions;
use header::ContentEncoding; use header::ContentEncoding;

View File

@ -1,75 +1,50 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net;
use std::time::Duration;
use actix_net::service::{NewService, Service}; use actix_net::service::{NewService, Service};
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
use futures::{Async, Poll}; use futures::{Async, Poll};
use super::channel::HttpChannel; use super::channel::HttpChannel;
use super::handler::{HttpHandler, IntoHttpHandler}; use super::handler::HttpHandler;
use super::settings::{ServerSettings, WorkerSettings}; use super::settings::WorkerSettings;
use super::{IoStream, KeepAlive}; use super::IoStream;
pub enum HttpServiceMessage<T> { pub(crate) struct HttpService<H, Io>
/// New stream
Connect(T),
/// Gracefull shutdown
Shutdown(Duration),
/// Force shutdown
ForceShutdown,
}
pub(crate) struct HttpService<F, H, Io>
where where
F: Fn() -> H, H: HttpHandler,
H: IntoHttpHandler,
Io: IoStream, Io: IoStream,
{ {
factory: F, settings: WorkerSettings<H>,
addr: net::SocketAddr,
host: Option<String>,
keep_alive: KeepAlive,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
impl<F, H, Io> HttpService<F, H, Io> impl<H, Io> HttpService<H, Io>
where where
F: Fn() -> H, H: HttpHandler,
H: IntoHttpHandler,
Io: IoStream, Io: IoStream,
{ {
pub fn new( pub fn new(settings: WorkerSettings<H>) -> Self {
factory: F, addr: net::SocketAddr, host: Option<String>, keep_alive: KeepAlive,
) -> Self {
HttpService { HttpService {
factory, settings,
addr,
host,
keep_alive,
_t: PhantomData, _t: PhantomData,
} }
} }
} }
impl<F, H, Io> NewService for HttpService<F, H, Io> impl<H, Io> NewService for HttpService<H, Io>
where where
F: Fn() -> H, H: HttpHandler,
H: IntoHttpHandler,
Io: IoStream, Io: IoStream,
{ {
type Request = Io; type Request = Io;
type Response = (); type Response = ();
type Error = (); type Error = ();
type InitError = (); type InitError = ();
type Service = HttpServiceHandler<H::Handler, Io>; type Service = HttpServiceHandler<H, Io>;
type Future = FutureResult<Self::Service, Self::Error>; type Future = FutureResult<Self::Service, Self::Error>;
fn new_service(&self) -> Self::Future { fn new_service(&self) -> Self::Future {
let s = ServerSettings::new(Some(self.addr), &self.host, false); ok(HttpServiceHandler::new(self.settings.clone()))
let app = (self.factory)().into_handler();
ok(HttpServiceHandler::new(app, self.keep_alive, s))
} }
} }
@ -79,7 +54,7 @@ where
Io: IoStream, Io: IoStream,
{ {
settings: WorkerSettings<H>, settings: WorkerSettings<H>,
tcp_ka: Option<Duration>, // tcp_ka: Option<Duration>,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
@ -88,18 +63,14 @@ where
H: HttpHandler, H: HttpHandler,
Io: IoStream, Io: IoStream,
{ {
fn new( fn new(settings: WorkerSettings<H>) -> HttpServiceHandler<H, Io> {
app: H, keep_alive: KeepAlive, settings: ServerSettings, // let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
) -> HttpServiceHandler<H, Io> { // Some(Duration::new(val as u64, 0))
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { // } else {
Some(Duration::new(val as u64, 0)) // None
} else { // };
None
};
let settings = WorkerSettings::new(app, keep_alive, settings);
HttpServiceHandler { HttpServiceHandler {
tcp_ka,
settings, settings,
_t: PhantomData, _t: PhantomData,
} }
@ -124,10 +95,4 @@ where
let _ = req.set_nodelay(true); let _ = req.set_nodelay(true);
HttpChannel::new(self.settings.clone(), req, None) HttpChannel::new(self.settings.clone(), req, None)
} }
// fn shutdown(&self, force: bool) {
// if force {
// self.settings.head().traverse::<TcpStream, H>();
// }
// }
} }

View File

@ -133,11 +133,12 @@ impl ServerSettings {
// "Sun, 06 Nov 1994 08:49:37 GMT".len() // "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH: usize = 29; const DATE_VALUE_LENGTH: usize = 29;
pub(crate) struct WorkerSettings<H>(Rc<Inner<H>>); pub struct WorkerSettings<H>(Rc<Inner<H>>);
struct Inner<H> { struct Inner<H> {
handler: H, handler: H,
keep_alive: u64, keep_alive: u64,
client_timeout: u64,
ka_enabled: bool, ka_enabled: bool,
bytes: Rc<SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: &'static RequestPool, messages: &'static RequestPool,
@ -153,7 +154,7 @@ impl<H> Clone for WorkerSettings<H> {
impl<H> WorkerSettings<H> { impl<H> WorkerSettings<H> {
pub(crate) fn new( pub(crate) fn new(
handler: H, keep_alive: KeepAlive, settings: ServerSettings, handler: H, keep_alive: KeepAlive, client_timeout: u64, settings: ServerSettings,
) -> WorkerSettings<H> { ) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive { let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true), KeepAlive::Timeout(val) => (val as u64, true),
@ -165,6 +166,7 @@ impl<H> WorkerSettings<H> {
handler, handler,
keep_alive, keep_alive,
ka_enabled, ka_enabled,
client_timeout,
bytes: Rc::new(SharedBytesPool::new()), bytes: Rc::new(SharedBytesPool::new()),
messages: RequestPool::pool(settings), messages: RequestPool::pool(settings),
node: RefCell::new(Node::head()), node: RefCell::new(Node::head()),
@ -172,14 +174,15 @@ impl<H> WorkerSettings<H> {
})) }))
} }
pub fn head(&self) -> RefMut<Node<()>> { pub(crate) fn head(&self) -> RefMut<Node<()>> {
self.0.node.borrow_mut() self.0.node.borrow_mut()
} }
pub fn handler(&self) -> &H { pub(crate) fn handler(&self) -> &H {
&self.0.handler &self.0.handler
} }
#[inline]
pub fn keep_alive_timer(&self) -> Option<Delay> { pub fn keep_alive_timer(&self) -> Option<Delay> {
let ka = self.0.keep_alive; let ka = self.0.keep_alive;
if ka != 0 { if ka != 0 {
@ -189,23 +192,35 @@ impl<H> WorkerSettings<H> {
} }
} }
#[inline]
pub fn keep_alive(&self) -> u64 { pub fn keep_alive(&self) -> u64 {
self.0.keep_alive self.0.keep_alive
} }
#[inline]
pub fn keep_alive_enabled(&self) -> bool { pub fn keep_alive_enabled(&self) -> bool {
self.0.ka_enabled self.0.ka_enabled
} }
pub fn get_bytes(&self) -> BytesMut { #[inline]
pub fn client_timer(&self) -> Option<Delay> {
let delay = self.0.client_timeout;
if delay != 0 {
Some(Delay::new(Instant::now() + Duration::from_millis(delay)))
} else {
None
}
}
pub(crate) fn get_bytes(&self) -> BytesMut {
self.0.bytes.get_bytes() self.0.bytes.get_bytes()
} }
pub fn release_bytes(&self, bytes: BytesMut) { pub(crate) fn release_bytes(&self, bytes: BytesMut) {
self.0.bytes.release_bytes(bytes) self.0.bytes.release_bytes(bytes)
} }
pub fn get_request(&self) -> Request { pub(crate) fn get_request(&self) -> Request {
RequestPool::get(self.0.messages) RequestPool::get(self.0.messages)
} }
@ -216,7 +231,7 @@ impl<H> WorkerSettings<H> {
} }
impl<H: 'static> WorkerSettings<H> { impl<H: 'static> WorkerSettings<H> {
pub fn set_date(&self, dst: &mut BytesMut, full: bool) { pub(crate) fn set_date(&self, dst: &mut BytesMut, full: bool) {
// Unsafe: WorkerSetting is !Sync and !Send // Unsafe: WorkerSetting is !Sync and !Send
let date_bytes = unsafe { let date_bytes = unsafe {
let date = &mut (*self.0.date.get()); let date = &mut (*self.0.date.get());