use std::marker::PhantomData; use std::net::SocketAddr; use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::spawn; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_utils::counter::CounterGuard; use futures_util::future::{err, ok, LocalBoxFuture, Ready}; use futures_util::{FutureExt, TryFutureExt}; use log::error; use super::Token; use crate::socket::{FromStream, StdStream}; /// Server message pub(crate) enum ServerMessage { /// New stream Connect(StdStream), /// Gracefully shutdown Shutdown(Duration), /// Force shutdown ForceShutdown, } pub trait ServiceFactory: Send + Clone + 'static { type Factory: actix::ServiceFactory; fn create(&self) -> Self::Factory; } pub(crate) trait InternalServiceFactory: Send { fn name(&self, token: Token) -> &str; fn clone_factory(&self) -> Box; fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; } pub(crate) type BoxedServerService = Box< dyn Service< Request = (Option, ServerMessage), Response = (), Error = (), Future = Ready>, >, >; pub(crate) struct StreamService { service: T, } impl StreamService { pub(crate) fn new(service: T) -> Self { StreamService { service } } } impl Service for StreamService where T: Service, T::Future: 'static, T::Error: 'static, I: FromStream, { type Request = (Option, ServerMessage); type Response = (); type Error = (); type Future = Ready>; fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { self.service.poll_ready(ctx).map_err(|_| ()) } fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { let stream = FromStream::from_stdstream(stream).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); }); if let Ok(stream) = stream { let f = self.service.call(stream); spawn(async move { let _ = f.await; drop(guard); }); ok(()) } else { err(()) } } _ => ok(()), } } } pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, token: Token, addr: SocketAddr, _t: PhantomData, } impl StreamNewService where F: ServiceFactory, Io: FromStream + Send + 'static, { pub(crate) fn create( name: String, token: Token, inner: F, addr: SocketAddr, ) -> Box { Box::new(Self { name, token, inner, addr, _t: PhantomData, }) } } impl InternalServiceFactory for StreamNewService where F: ServiceFactory, Io: FromStream + Send + 'static, { fn name(&self, _: Token) -> &str { &self.name } fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), token: self.token, addr: self.addr, _t: PhantomData, }) } fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { let token = self.token; self.inner .create() .new_service(()) .map_err(|_| ()) .map_ok(move |inner| { let service: BoxedServerService = Box::new(StreamService::new(inner)); vec![(token, service)] }) .boxed_local() } } impl InternalServiceFactory for Box { fn name(&self, token: Token) -> &str { self.as_ref().name(token) } fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { self.as_ref().create() } } impl ServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T: actix::ServiceFactory, I: FromStream, { type Factory = T; fn create(&self) -> T { (self)() } }