use std::time::Duration; use std::{fmt, net}; use actix_net::server::ServerMessage; use actix_net::service::{NewService, Service}; use futures::future::{err, ok, Either, FutureResult}; use futures::{Async, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; use tokio_timer::{sleep, Delay}; use super::channel::HttpProtocol; use super::error::AcceptorError; use super::handler::HttpHandler; use super::settings::ServiceConfig; use super::IoStream; /// This trait indicates types that can create acceptor service for http server. pub trait AcceptorServiceFactory: Send + Clone + 'static { type Io: IoStream + Send; type NewService: NewService; fn create(&self) -> Self::NewService; } impl AcceptorServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T::Response: IoStream + Send, T: NewService, T::InitError: fmt::Debug, { type Io = T::Response; type NewService = T; fn create(&self) -> T { (self)() } } #[derive(Clone)] /// Default acceptor service convert `TcpStream` to a `tokio_tcp::TcpStream` pub(crate) struct DefaultAcceptor; impl AcceptorServiceFactory for DefaultAcceptor { type Io = TcpStream; type NewService = DefaultAcceptor; fn create(&self) -> Self::NewService { DefaultAcceptor } } impl NewService for DefaultAcceptor { type Request = TcpStream; type Response = TcpStream; type Error = (); type InitError = (); type Service = DefaultAcceptor; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(DefaultAcceptor) } } impl Service for DefaultAcceptor { type Request = TcpStream; type Response = TcpStream; type Error = (); type Future = FutureResult; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { ok(req) } } pub(crate) struct TcpAcceptor { inner: T, } impl TcpAcceptor where T: NewService>, T::InitError: fmt::Debug, { pub(crate) fn new(inner: T) -> Self { TcpAcceptor { inner } } } impl NewService for TcpAcceptor where T: NewService>, T::InitError: fmt::Debug, { type Request = net::TcpStream; type Response = T::Response; type Error = AcceptorError; type InitError = T::InitError; type Service = TcpAcceptorService; type Future = TcpAcceptorResponse; fn new_service(&self) -> Self::Future { TcpAcceptorResponse { fut: self.inner.new_service(), } } } pub(crate) struct TcpAcceptorResponse where T: NewService, T::InitError: fmt::Debug, { fut: T::Future, } impl Future for TcpAcceptorResponse where T: NewService, T::InitError: fmt::Debug, { type Item = TcpAcceptorService; type Error = T::InitError; fn poll(&mut self) -> Poll { match self.fut.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(service)) => { Ok(Async::Ready(TcpAcceptorService { inner: service })) } Err(e) => { error!("Can not create accetor service: {:?}", e); Err(e) } } } } pub(crate) struct TcpAcceptorService { inner: T, } impl Service for TcpAcceptorService where T: Service>, { type Request = net::TcpStream; type Response = T::Response; type Error = AcceptorError; type Future = Either>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready() } fn call(&mut self, req: Self::Request) -> Self::Future { let stream = TcpStream::from_std(req, &Handle::default()).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); AcceptorError::Io(e) }); match stream { Ok(stream) => Either::A(self.inner.call(stream)), Err(e) => Either::B(err(e)), } } } #[doc(hidden)] /// Acceptor timeout middleware /// /// Applies timeout to request prcoessing. pub struct AcceptorTimeout { inner: T, timeout: Duration, } impl AcceptorTimeout { /// Create new `AcceptorTimeout` instance. timeout is in milliseconds. pub fn new(timeout: u64, inner: T) -> Self { Self { inner, timeout: Duration::from_millis(timeout), } } } impl NewService for AcceptorTimeout { type Request = T::Request; type Response = T::Response; type Error = AcceptorError; type InitError = T::InitError; type Service = AcceptorTimeoutService; type Future = AcceptorTimeoutFut; fn new_service(&self) -> Self::Future { AcceptorTimeoutFut { fut: self.inner.new_service(), timeout: self.timeout, } } } #[doc(hidden)] pub struct AcceptorTimeoutFut { fut: T::Future, timeout: Duration, } impl Future for AcceptorTimeoutFut { type Item = AcceptorTimeoutService; type Error = T::InitError; fn poll(&mut self) -> Poll { let inner = try_ready!(self.fut.poll()); Ok(Async::Ready(AcceptorTimeoutService { inner, timeout: self.timeout, })) } } #[doc(hidden)] /// Acceptor timeout service /// /// Applies timeout to request prcoessing. pub struct AcceptorTimeoutService { inner: T, timeout: Duration, } impl Service for AcceptorTimeoutService { type Request = T::Request; type Response = T::Response; type Error = AcceptorError; type Future = AcceptorTimeoutResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready().map_err(AcceptorError::Service) } fn call(&mut self, req: Self::Request) -> Self::Future { AcceptorTimeoutResponse { fut: self.inner.call(req), sleep: sleep(self.timeout), } } } #[doc(hidden)] pub struct AcceptorTimeoutResponse { fut: T::Future, sleep: Delay, } impl Future for AcceptorTimeoutResponse { type Item = T::Response; type Error = AcceptorError; fn poll(&mut self) -> Poll { match self.fut.poll().map_err(AcceptorError::Service)? { Async::NotReady => match self.sleep.poll() { Err(_) => Err(AcceptorError::Timeout), Ok(Async::Ready(_)) => Err(AcceptorError::Timeout), Ok(Async::NotReady) => Ok(Async::NotReady), }, Async::Ready(resp) => Ok(Async::Ready(resp)), } } } pub(crate) struct ServerMessageAcceptor { inner: T, settings: ServiceConfig, } impl ServerMessageAcceptor where H: HttpHandler, T: NewService, { pub(crate) fn new(settings: ServiceConfig, inner: T) -> Self { ServerMessageAcceptor { inner, settings } } } impl NewService for ServerMessageAcceptor where H: HttpHandler, T: NewService, { type Request = ServerMessage; type Response = (); type Error = T::Error; type InitError = T::InitError; type Service = ServerMessageAcceptorService; type Future = ServerMessageAcceptorResponse; fn new_service(&self) -> Self::Future { ServerMessageAcceptorResponse { fut: self.inner.new_service(), settings: self.settings.clone(), } } } pub(crate) struct ServerMessageAcceptorResponse where H: HttpHandler, T: NewService, { fut: T::Future, settings: ServiceConfig, } impl Future for ServerMessageAcceptorResponse where H: HttpHandler, T: NewService, { type Item = ServerMessageAcceptorService; type Error = T::InitError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(service) => Ok(Async::Ready(ServerMessageAcceptorService { inner: service, settings: self.settings.clone(), })), } } } pub(crate) struct ServerMessageAcceptorService { inner: T, settings: ServiceConfig, } impl Service for ServerMessageAcceptorService where H: HttpHandler, T: Service, { type Request = ServerMessage; type Response = (); type Error = T::Error; type Future = Either, FutureResult<(), Self::Error>>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready() } fn call(&mut self, req: Self::Request) -> Self::Future { match req { ServerMessage::Connect(stream) => { Either::A(ServerMessageAcceptorServiceFut { fut: self.inner.call(stream), }) } ServerMessage::Shutdown(_) => Either::B(ok(())), ServerMessage::ForceShutdown => { self.settings .head() .traverse(|proto: &mut HttpProtocol| proto.shutdown()); Either::B(ok(())) } } } } pub(crate) struct ServerMessageAcceptorServiceFut { fut: T::Future, } impl Future for ServerMessageAcceptorServiceFut where T: Service, { type Item = (); type Error = T::Error; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(_) => Ok(Async::Ready(())), } } }