use std::net; use std::time::Duration; use actix_service::{NewService, Service}; use futures::future::{err, ok, FutureResult}; use futures::{Future, Poll}; use log::error; use tokio_current_thread::spawn; use tokio_reactor::Handle; use tokio_tcp::TcpStream; use super::Token; use crate::counter::CounterGuard; /// Server message pub enum ServerMessage { /// New stream Connect(net::TcpStream), /// Gracefull shutdown Shutdown(Duration), /// Force shutdown ForceShutdown, } pub trait StreamServiceFactory: Send + Clone + 'static { type NewService: NewService; fn create(&self) -> Self::NewService; } pub trait ServiceFactory: Send + Clone + 'static { type NewService: NewService; fn create(&self) -> Self::NewService; } pub(crate) trait InternalServiceFactory: Send { fn name(&self, token: Token) -> &str; fn clone_factory(&self) -> Box; fn create(&self) -> Box, Error = ()>>; } pub(crate) type BoxedServerService = Box< Service< (Option, ServerMessage), Response = (), Error = (), Future = FutureResult<(), ()>, >, >; pub(crate) struct StreamService { service: T, } impl StreamService { pub(crate) fn new(service: T) -> Self { StreamService { service } } } impl Service<(Option, ServerMessage)> for StreamService where T: Service, T::Future: 'static, T::Error: 'static, { type Response = (); type Error = (); type Future = FutureResult<(), ()>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.service.poll_ready().map_err(|_| ()) } fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); }); if let Ok(stream) = stream { spawn(self.service.call(stream).map_err(|_| ()).map(move |val| { drop(guard); val })); ok(()) } else { err(()) } } _ => ok(()), } } } pub(crate) struct ServerService { service: T, } impl ServerService { fn new(service: T) -> Self { ServerService { service } } } impl Service<(Option, ServerMessage)> for ServerService where T: Service, T::Future: 'static, T::Error: 'static, { type Response = (); type Error = (); type Future = FutureResult<(), ()>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.service.poll_ready().map_err(|_| ()) } fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { spawn(self.service.call(req).map_err(|_| ()).map(move |val| { drop(guard); val })); ok(()) } } pub(crate) struct ServiceNewService { name: String, inner: F, token: Token, } impl ServiceNewService where F: ServiceFactory, { pub(crate) fn create(name: String, token: Token, inner: F) -> Box { Box::new(Self { name, inner, token }) } } impl InternalServiceFactory for ServiceNewService where F: ServiceFactory, { fn name(&self, _: Token) -> &str { &self.name } fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), token: self.token, }) } fn create(&self) -> Box, Error = ()>> { let token = self.token; Box::new( self.inner .create() .new_service() .map_err(|_| ()) .map(move |inner| { let service: BoxedServerService = Box::new(ServerService::new(inner)); vec![(token, service)] }), ) } } pub(crate) struct StreamNewService { name: String, inner: F, token: Token, } impl StreamNewService where F: StreamServiceFactory, { pub(crate) fn create(name: String, token: Token, inner: F) -> Box { Box::new(Self { name, token, inner }) } } impl InternalServiceFactory for StreamNewService where F: StreamServiceFactory, { fn name(&self, _: Token) -> &str { &self.name } fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), token: self.token, }) } fn create(&self) -> Box, Error = ()>> { let token = self.token; Box::new( self.inner .create() .new_service() .map_err(|_| ()) .map(move |inner| { let service: BoxedServerService = Box::new(StreamService::new(inner)); vec![(token, service)] }), ) } } impl InternalServiceFactory for Box { fn name(&self, token: Token) -> &str { self.as_ref().name(token) } fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } fn create(&self) -> Box, Error = ()>> { self.as_ref().create() } } impl ServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T: NewService, { type NewService = T; fn create(&self) -> T { (self)() } } impl StreamServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T: NewService, { type NewService = T; fn create(&self) -> T { (self)() } }