use std::time::Duration; use actix_net::server::ServerMessage; use actix_net::service::{NewService, Service}; use futures::future::{err, ok, Either, FutureResult}; use futures::{Async, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; use tokio_timer::{sleep, Delay}; use super::handler::HttpHandler; use super::settings::WorkerSettings; use super::IoStream; /// This trait indicates types that can create acceptor service for http server. pub trait AcceptorServiceFactory: Send + Clone + 'static { type Io: IoStream + Send; type NewService: NewService< Request = TcpStream, Response = Self::Io, Error = (), InitError = (), >; fn create(&self) -> Self::NewService; } impl AcceptorServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T::Response: IoStream + Send, T: NewService, { type Io = T::Response; type NewService = T; fn create(&self) -> T { (self)() } } #[derive(Clone)] /// Default acceptor service convert `TcpStream` to a `tokio_tcp::TcpStream` pub(crate) struct DefaultAcceptor; impl AcceptorServiceFactory for DefaultAcceptor { type Io = TcpStream; type NewService = DefaultAcceptor; fn create(&self) -> Self::NewService { DefaultAcceptor } } impl NewService for DefaultAcceptor { type Request = TcpStream; type Response = TcpStream; type Error = (); type InitError = (); type Service = DefaultAcceptor; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(DefaultAcceptor) } } impl Service for DefaultAcceptor { type Request = TcpStream; type Response = TcpStream; type Error = (); type Future = FutureResult; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { ok(req) } } pub(crate) struct TcpAcceptor { inner: T, settings: WorkerSettings, } impl TcpAcceptor where H: HttpHandler, T: NewService, { pub(crate) fn new(settings: WorkerSettings, inner: T) -> Self { TcpAcceptor { inner, settings } } } impl NewService for TcpAcceptor where H: HttpHandler, T: NewService, { type Request = ServerMessage; type Response = (); type Error = (); type InitError = (); type Service = TcpAcceptorService; type Future = TcpAcceptorResponse; fn new_service(&self) -> Self::Future { TcpAcceptorResponse { fut: self.inner.new_service(), settings: self.settings.clone(), } } } pub(crate) struct TcpAcceptorResponse where H: HttpHandler, T: NewService, { fut: T::Future, settings: WorkerSettings, } impl Future for TcpAcceptorResponse where H: HttpHandler, T: NewService, { type Item = TcpAcceptorService; type Error = (); fn poll(&mut self) -> Poll { match self.fut.poll() { Err(_) => Err(()), Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(service)) => Ok(Async::Ready(TcpAcceptorService { inner: service, settings: self.settings.clone(), })), } } } pub(crate) struct TcpAcceptorService { inner: T, settings: WorkerSettings, } impl Service for TcpAcceptorService where H: HttpHandler, T: Service, { type Request = ServerMessage; type Response = (); type Error = (); type Future = Either, FutureResult<(), ()>>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready().map_err(|_| ()) } fn call(&mut self, req: Self::Request) -> Self::Future { match req { ServerMessage::Connect(stream) => { let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); }); if let Ok(stream) = stream { Either::A(TcpAcceptorServiceFut { fut: self.inner.call(stream), }) } else { Either::B(err(())) } } ServerMessage::Shutdown(timeout) => Either::B(ok(())), ServerMessage::ForceShutdown => { // self.settings.head().traverse::(); Either::B(ok(())) } } } } pub(crate) struct TcpAcceptorServiceFut { fut: T, } impl Future for TcpAcceptorServiceFut where T: Future, { type Item = (); type Error = (); fn poll(&mut self) -> Poll { match self.fut.poll() { Err(_) => Err(()), Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => Ok(Async::Ready(())), } } } /// Errors produced by `AcceptorTimeout` service. #[derive(Debug)] pub enum TimeoutError { /// The inner service error Service(T), /// The request did not complete within the specified timeout. Timeout, } /// Acceptor timeout middleware /// /// Applies timeout to request prcoessing. pub(crate) struct AcceptorTimeout { inner: T, timeout: usize, } impl AcceptorTimeout { pub(crate) fn new(timeout: usize, inner: T) -> Self { Self { inner, timeout } } } impl NewService for AcceptorTimeout { type Request = T::Request; type Response = T::Response; type Error = TimeoutError; type InitError = T::InitError; type Service = AcceptorTimeoutService; type Future = AcceptorTimeoutFut; fn new_service(&self) -> Self::Future { AcceptorTimeoutFut { fut: self.inner.new_service(), timeout: self.timeout, } } } #[doc(hidden)] pub(crate) struct AcceptorTimeoutFut { fut: T::Future, timeout: usize, } impl Future for AcceptorTimeoutFut { type Item = AcceptorTimeoutService; type Error = T::InitError; fn poll(&mut self) -> Poll { let inner = try_ready!(self.fut.poll()); Ok(Async::Ready(AcceptorTimeoutService { inner, timeout: self.timeout as u64, })) } } /// Acceptor timeout service /// /// Applies timeout to request prcoessing. pub(crate) struct AcceptorTimeoutService { inner: T, timeout: u64, } impl Service for AcceptorTimeoutService { type Request = T::Request; type Response = T::Response; type Error = TimeoutError; type Future = AcceptorTimeoutResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready().map_err(TimeoutError::Service) } fn call(&mut self, req: Self::Request) -> Self::Future { AcceptorTimeoutResponse { fut: self.inner.call(req), sleep: sleep(Duration::from_millis(self.timeout)), } } } pub(crate) struct AcceptorTimeoutResponse { fut: T::Future, sleep: Delay, } impl Future for AcceptorTimeoutResponse { type Item = T::Response; type Error = TimeoutError; fn poll(&mut self) -> Poll { match self.fut.poll() { Ok(Async::NotReady) => match self.sleep.poll() { Err(_) => Err(TimeoutError::Timeout), Ok(Async::Ready(_)) => Err(TimeoutError::Timeout), Ok(Async::NotReady) => Ok(Async::NotReady), }, Ok(Async::Ready(resp)) => Ok(Async::Ready(resp)), Err(err) => Err(TimeoutError::Service(err)), } } }