use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use either::Either; use futures_core::{ready, stream::Stream}; use crate::connect::{Connect, ConnectResult}; use crate::dispatcher::Dispatcher; use crate::error::ServiceError; type RequestItem = ::Item; type ResponseItem = Option<::Item>; /// Service builder - structure that follows the builder pattern /// for building instances for framed services. pub struct Builder { connect: C, _t: PhantomData<(St, Io, Codec, Out)>, } impl Builder where C: Service, Response = ConnectResult>, Io: AsyncRead + AsyncWrite, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { /// Construct framed handler service with specified connect service pub fn new(connect: F) -> Builder where F: IntoService, Io: AsyncRead + AsyncWrite, C: Service, Response = ConnectResult>, Codec: Decoder + Encoder, Out: Stream::Item>, { Builder { connect: connect.into_service(), _t: PhantomData, } } /// Provide stream items handler service and construct service factory. pub fn build(self, service: F) -> FramedServiceImpl where F: IntoServiceFactory, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, { FramedServiceImpl { connect: self.connect, handler: Rc::new(service.into_factory()), _t: PhantomData, } } } /// Service builder - structure that follows the builder pattern /// for building instances for framed services. pub struct FactoryBuilder { connect: C, _t: PhantomData<(St, Io, Codec, Out)>, } impl FactoryBuilder where Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, Codec: Decoder + Encoder, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { /// Construct framed handler new service with specified connect service pub fn new(connect: F) -> FactoryBuilder where F: IntoServiceFactory, Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, Codec: Decoder + Encoder, Out: Stream::Item> + Unpin, { FactoryBuilder { connect: connect.into_factory(), _t: PhantomData, } } pub fn build(self, service: F) -> FramedService where F: IntoServiceFactory, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, { FramedService { connect: self.connect, handler: Rc::new(service.into_factory()), _t: PhantomData, } } } pub struct FramedService { connect: C, handler: Rc, _t: PhantomData<(St, Io, Codec, Out, Cfg)>, } impl ServiceFactory for FramedService where Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { type Config = Cfg; type Request = Io; type Response = (); type Error = ServiceError; type InitError = C::InitError; type Service = FramedServiceImpl; type Future = FramedServiceResponse; fn new_service(&self, _: Cfg) -> Self::Future { // create connect service and then create service impl FramedServiceResponse { fut: self.connect.new_service(()), handler: self.handler.clone(), } } } #[pin_project::pin_project] pub struct FramedServiceResponse where Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { #[pin] fut: C::Future, handler: Rc, } impl Future for FramedServiceResponse where Io: AsyncRead + AsyncWrite, C: ServiceFactory< Config = (), Request = Connect, Response = ConnectResult, >, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { type Output = Result, C::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let connect = ready!(this.fut.poll(cx))?; Poll::Ready(Ok(FramedServiceImpl { connect, handler: this.handler.clone(), _t: PhantomData, })) } } pub struct FramedServiceImpl { connect: C, handler: Rc, _t: PhantomData<(St, Io, Codec, Out)>, } impl Service for FramedServiceImpl where Io: AsyncRead + AsyncWrite, C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Codec: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { type Request = Io; type Response = (); type Error = ServiceError; type Future = FramedServiceImplResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.connect.poll_ready(cx).map_err(|e| e.into()) } fn call(&mut self, req: Io) -> Self::Future { FramedServiceImplResponse { inner: FramedServiceImplResponseInner::Connect( self.connect.call(Connect::new(req)), self.handler.clone(), ), } } } #[pin_project::pin_project] pub struct FramedServiceImplResponse where C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { #[pin] inner: FramedServiceImplResponseInner, } impl Future for FramedServiceImplResponse where C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { type Output = Result<(), ServiceError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); loop { match this.inner.poll(cx) { Either::Left(new) => { this = self.as_mut().project(); this.inner.set(new) } Either::Right(poll) => return poll, }; } } } #[pin_project::pin_project(project = FramedServiceImplResponseInnerProj)] enum FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { Connect(#[pin] C::Future, Rc), Handler(#[pin] T::Future, Option>, Option), Dispatcher(#[pin] Dispatcher), } impl FramedServiceImplResponseInner where C: Service, Response = ConnectResult>, T: ServiceFactory< Config = St, Request = RequestItem, Response = ResponseItem, Error = C::Error, InitError = C::Error, >, ::Error: 'static, ::Future: 'static, Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, Out: Stream::Item> + Unpin, { fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Either< FramedServiceImplResponseInner, Poll>>, > { match self.project() { FramedServiceImplResponseInnerProj::Connect(fut, handler) => match fut.poll(cx) { Poll::Ready(Ok(res)) => Either::Left(FramedServiceImplResponseInner::Handler( handler.new_service(res.state), Some(res.framed), res.out, )), Poll::Pending => Either::Right(Poll::Pending), Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), }, FramedServiceImplResponseInnerProj::Handler(fut, framed, out) => { match fut.poll(cx) { Poll::Ready(Ok(handler)) => { Either::Left(FramedServiceImplResponseInner::Dispatcher( Dispatcher::new(framed.take().unwrap(), handler, out.take()), )) } Poll::Pending => Either::Right(Poll::Pending), Poll::Ready(Err(e)) => Either::Right(Poll::Ready(Err(e.into()))), } } FramedServiceImplResponseInnerProj::Dispatcher(fut) => { Either::Right(fut.poll(cx)) } } } }