use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use either::Either; use futures::future::{FutureExt, LocalBoxFuture}; use pin_project::{pin_project, project}; use crate::connect::{Connect, ConnectResult}; use crate::dispatcher::FramedDispatcher; use crate::error::ServiceError; use crate::item::Item; use crate::state::State; type RequestItem = Item; type ResponseItem = Option<::Item>; /// Service builder - structure that follows the builder pattern /// for building instances for framed services. pub struct Builder(PhantomData<(St, Codec)>); impl Builder { pub fn new() -> Builder { Builder(PhantomData) } /// Construct framed handler service with specified connect service pub fn service(self, connect: F) -> ServiceBuilder where F: IntoService, Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, Codec: Decoder + Encoder + Unpin, { ServiceBuilder { connect: connect.into_service(), disconnect: None, _t: PhantomData, } } /// Construct framed handler new service with specified connect service pub fn factory(self, connect: F) -> NewServiceBuilder where F: IntoServiceFactory, Io: AsyncRead + AsyncWrite + Unpin, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, C::Error: 'static, C::Future: 'static, Codec: Decoder + Encoder + Unpin, { NewServiceBuilder { connect: connect.into_factory(), disconnect: None, _t: PhantomData, } } } pub struct ServiceBuilder { connect: C, disconnect: Option>, _t: PhantomData<(St, Io, Codec)>, } impl ServiceBuilder where St: 'static, Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, C::Error: 'static, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { /// Callback to execute on disconnect /// /// Second parameter indicates error occured during disconnect. pub fn disconnect(mut self, disconnect: F) -> Self where F: Fn(&mut St, bool) + 'static, { self.disconnect = Some(Rc::new(disconnect)); self } /// Provide stream items handler service and construct service factory. pub fn finish( self, service: F, ) -> impl Service> where F: IntoServiceFactory, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, > + 'static, { FramedServiceImpl { connect: self.connect, handler: Rc::new(service.into_factory()), disconnect: self.disconnect.clone(), _t: PhantomData, } } } pub struct NewServiceBuilder { connect: C, disconnect: Option>, _t: PhantomData<(St, Io, Codec)>, } impl NewServiceBuilder where St: 'static, Io: AsyncRead + AsyncWrite + Unpin, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, C::Error: 'static, C::Future: 'static, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { /// Callback to execute on disconnect /// /// Second parameter indicates error occured during disconnect. pub fn disconnect(mut self, disconnect: F) -> Self where F: Fn(&mut St, bool) + 'static, { self.disconnect = Some(Rc::new(disconnect)); self } pub fn finish( self, service: F, ) -> impl ServiceFactory< Config = Cfg, Request = Io, Response = (), Error = ServiceError, > where F: IntoServiceFactory, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, > + 'static, { FramedService { connect: self.connect, handler: Rc::new(service.into_factory()), disconnect: self.disconnect, _t: PhantomData, } } } pub(crate) struct FramedService { connect: C, handler: Rc, disconnect: Option>, _t: PhantomData<(St, Io, Codec, Cfg)>, } impl ServiceFactory for FramedService where St: 'static, Io: AsyncRead + AsyncWrite + Unpin, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, C::Error: 'static, C::Future: 'static, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, > + 'static, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { type Config = Cfg; type Request = Io; type Response = (); type Error = ServiceError; type InitError = C::InitError; type Service = FramedServiceImpl; type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: &Cfg) -> Self::Future { let handler = self.handler.clone(); let disconnect = self.disconnect.clone(); // create connect service and then create service impl self.connect .new_service(&()) .map(move |result| { result.map(move |connect| FramedServiceImpl { connect, handler, disconnect, _t: PhantomData, }) }) .boxed_local() } } pub struct FramedServiceImpl { connect: C, handler: Rc, disconnect: Option>, _t: PhantomData<(St, Io, Codec)>, } impl Service for FramedServiceImpl where Io: AsyncRead + AsyncWrite + Unpin, C: Service, Response = ConnectResult>, C::Error: 'static, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, <::Service as Service>::Future: 'static, Codec: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { type Request = Io; type Response = (); type Error = ServiceError; type Future = FramedServiceImplResponse; fn poll_ready(&mut self, cx: &mut Context) -> Poll> { self.connect.poll_ready(cx).map_err(|e| e.into()) } fn call(&mut self, req: Io) -> Self::Future { FramedServiceImplResponse { inner: FramedServiceImplResponseInner::Connect( self.connect.call(Connect::new(req)), self.handler.clone(), self.disconnect.clone(), ), } } } #[pin_project] pub struct FramedServiceImplResponse where C: Service, Response = ConnectResult>, C::Error: 'static, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, <::Service as Service>::Future: 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { inner: FramedServiceImplResponseInner, } impl Future for FramedServiceImplResponse where C: Service, Response = ConnectResult>, C::Error: 'static, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, <::Service as Service>::Future: 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { type Output = Result<(), ServiceError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.get_mut(); loop { match unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx) { Either::Left(new) => this.inner = new, Either::Right(poll) => return poll, }; } } } #[pin_project] enum FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, C::Error: 'static, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, <::Service as Service>::Future: 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { Connect(#[pin] C::Future, Rc, Option>), Handler( #[pin] T::Future, Option>, Option>, ), Dispatcher(FramedDispatcher), } impl FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, C::Error: 'static, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, <::Service as Service>::Future: 'static, Io: AsyncRead + AsyncWrite + Unpin, Codec: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { #[project] fn poll( self: Pin<&mut Self>, cx: &mut Context, ) -> Either< FramedServiceImplResponseInner, Poll>>, > { #[project] match self.project() { FramedServiceImplResponseInner::Connect( ref mut fut, ref handler, ref mut disconnect, ) => match Pin::new(fut).poll(cx) { Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler( handler.new_service(&res.state), Some(res), disconnect.take(), )), Poll::Pending => Either::Right(Poll::Pending), Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), }, FramedServiceImplResponseInner::Handler( ref mut fut, ref mut res, ref mut disconnect, ) => match Pin::new(fut).poll(cx) { Poll::Ready(Ok(handler)) => { let res = res.take().unwrap(); Either::Left(FramedServiceImplResponseInner::Dispatcher( FramedDispatcher::new( res.framed, State::new(res.state), handler, res.rx, res.sink, disconnect.take(), ), )) } Poll::Pending => Either::Right(Poll::Pending), Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), }, FramedServiceImplResponseInner::Dispatcher(ref mut fut) => { Either::Right(Pin::new(fut).poll(cx)) } } } }