use std::net; use std::time::Duration; use actix_net::server::ServerMessage; use actix_net::service::{NewService, Service}; use futures::future::{err, ok, Either, FutureResult}; use futures::{Async, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; use tokio_timer::{sleep, Delay}; use super::error::AcceptorError; use super::handler::HttpHandler; use super::settings::WorkerSettings; use super::IoStream; /// This trait indicates types that can create acceptor service for http server. pub trait AcceptorServiceFactory: Send + Clone + 'static { type Io: IoStream + Send; type NewService: NewService; fn create(&self) -> Self::NewService; } impl AcceptorServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T::Response: IoStream + Send, T: NewService, { type Io = T::Response; type NewService = T; fn create(&self) -> T { (self)() } } #[derive(Clone)] /// Default acceptor service convert `TcpStream` to a `tokio_tcp::TcpStream` pub(crate) struct DefaultAcceptor; impl AcceptorServiceFactory for DefaultAcceptor { type Io = TcpStream; type NewService = DefaultAcceptor; fn create(&self) -> Self::NewService { DefaultAcceptor } } impl NewService for DefaultAcceptor { type Request = TcpStream; type Response = TcpStream; type Error = (); type InitError = (); type Service = DefaultAcceptor; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(DefaultAcceptor) } } impl Service for DefaultAcceptor { type Request = TcpStream; type Response = TcpStream; type Error = (); type Future = FutureResult; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { ok(req) } } pub(crate) struct TcpAcceptor { inner: T, } impl TcpAcceptor where T: NewService>, { pub(crate) fn new(inner: T) -> Self { TcpAcceptor { inner } } } impl NewService for TcpAcceptor where T: NewService>, { type Request = net::TcpStream; type Response = T::Response; type Error = AcceptorError; type InitError = T::InitError; type Service = TcpAcceptorService; type Future = TcpAcceptorResponse; fn new_service(&self) -> Self::Future { TcpAcceptorResponse { fut: self.inner.new_service(), } } } pub(crate) struct TcpAcceptorResponse where T: NewService, { fut: T::Future, } impl Future for TcpAcceptorResponse where T: NewService, { type Item = TcpAcceptorService; type Error = T::InitError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(service) => { Ok(Async::Ready(TcpAcceptorService { inner: service })) } } } } pub(crate) struct TcpAcceptorService { inner: T, } impl Service for TcpAcceptorService where T: Service>, { type Request = net::TcpStream; type Response = T::Response; type Error = AcceptorError; type Future = Either>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready() } fn call(&mut self, req: Self::Request) -> Self::Future { let stream = TcpStream::from_std(req, &Handle::default()).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); AcceptorError::Io(e) }); match stream { Ok(stream) => Either::A(self.inner.call(stream)), Err(e) => Either::B(err(e)), } } } /// Acceptor timeout middleware /// /// Applies timeout to request prcoessing. pub(crate) struct AcceptorTimeout { inner: T, timeout: usize, } impl AcceptorTimeout { pub(crate) fn new(timeout: usize, inner: T) -> Self { Self { inner, timeout } } } impl NewService for AcceptorTimeout { type Request = T::Request; type Response = T::Response; type Error = AcceptorError; type InitError = T::InitError; type Service = AcceptorTimeoutService; type Future = AcceptorTimeoutFut; fn new_service(&self) -> Self::Future { AcceptorTimeoutFut { fut: self.inner.new_service(), timeout: self.timeout, } } } #[doc(hidden)] pub(crate) struct AcceptorTimeoutFut { fut: T::Future, timeout: usize, } impl Future for AcceptorTimeoutFut { type Item = AcceptorTimeoutService; type Error = T::InitError; fn poll(&mut self) -> Poll { let inner = try_ready!(self.fut.poll()); Ok(Async::Ready(AcceptorTimeoutService { inner, timeout: self.timeout as u64, })) } } /// Acceptor timeout service /// /// Applies timeout to request prcoessing. pub(crate) struct AcceptorTimeoutService { inner: T, timeout: u64, } impl Service for AcceptorTimeoutService { type Request = T::Request; type Response = T::Response; type Error = AcceptorError; type Future = AcceptorTimeoutResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready().map_err(AcceptorError::Service) } fn call(&mut self, req: Self::Request) -> Self::Future { AcceptorTimeoutResponse { fut: self.inner.call(req), sleep: sleep(Duration::from_millis(self.timeout)), } } } pub(crate) struct AcceptorTimeoutResponse { fut: T::Future, sleep: Delay, } impl Future for AcceptorTimeoutResponse { type Item = T::Response; type Error = AcceptorError; fn poll(&mut self) -> Poll { match self.fut.poll().map_err(AcceptorError::Service)? { Async::NotReady => match self.sleep.poll() { Err(_) => Err(AcceptorError::Timeout), Ok(Async::Ready(_)) => Err(AcceptorError::Timeout), Ok(Async::NotReady) => Ok(Async::NotReady), }, Async::Ready(resp) => Ok(Async::Ready(resp)), } } } pub(crate) struct ServerMessageAcceptor { inner: T, settings: WorkerSettings, } impl ServerMessageAcceptor where H: HttpHandler, T: NewService, { pub(crate) fn new(settings: WorkerSettings, inner: T) -> Self { ServerMessageAcceptor { inner, settings } } } impl NewService for ServerMessageAcceptor where H: HttpHandler, T: NewService, { type Request = ServerMessage; type Response = (); type Error = T::Error; type InitError = T::InitError; type Service = ServerMessageAcceptorService; type Future = ServerMessageAcceptorResponse; fn new_service(&self) -> Self::Future { ServerMessageAcceptorResponse { fut: self.inner.new_service(), settings: self.settings.clone(), } } } pub(crate) struct ServerMessageAcceptorResponse where H: HttpHandler, T: NewService, { fut: T::Future, settings: WorkerSettings, } impl Future for ServerMessageAcceptorResponse where H: HttpHandler, T: NewService, { type Item = ServerMessageAcceptorService; type Error = T::InitError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(service) => Ok(Async::Ready(ServerMessageAcceptorService { inner: service, settings: self.settings.clone(), })), } } } pub(crate) struct ServerMessageAcceptorService { inner: T, settings: WorkerSettings, } impl Service for ServerMessageAcceptorService where H: HttpHandler, T: Service, { type Request = ServerMessage; type Response = (); type Error = T::Error; type Future = Either, FutureResult<(), Self::Error>>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready() } fn call(&mut self, req: Self::Request) -> Self::Future { match req { ServerMessage::Connect(stream) => { Either::A(ServerMessageAcceptorServiceFut { fut: self.inner.call(stream), }) } ServerMessage::Shutdown(timeout) => Either::B(ok(())), ServerMessage::ForceShutdown => { // self.settings.head().traverse::(); Either::B(ok(())) } } } } pub(crate) struct ServerMessageAcceptorServiceFut { fut: T::Future, } impl Future for ServerMessageAcceptorServiceFut where T: Service, { type Item = (); type Error = T::Error; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(_) => Ok(Async::Ready(())), } } }