//! Framed dispatcher service and related utilities use std::fmt; use std::marker::PhantomData; use actix; use futures::future::{ok, Either, FutureResult}; use futures::unsync::mpsc; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use tokio_codec::{Decoder, Encoder, Framed}; use tokio_io::{AsyncRead, AsyncWrite}; use service::{IntoNewService, IntoService, NewService, Service}; type Item = ::Item; type StreamItem = Result<::Item, ::Error>; pub struct FramedNewService { factory: S, _t: PhantomData<(T, U)>, } impl FramedNewService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>> + Clone, <::Service as Service>::Future: 'static, <::Service as Service>::Error: From<::Error> + 'static, ::Item: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { pub fn new>(factory: F1) -> Self { Self { factory: factory.into_new_service(), _t: PhantomData, } } } impl Clone for FramedNewService where S: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), _t: PhantomData, } } } impl NewService for FramedNewService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>> + Clone, <::Service as Service>::Future: 'static, <::Service as Service>::Error: From<::Error> + 'static, ::Item: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Request = Framed; type Response = FramedDispatcher; type Error = S::InitError; type InitError = S::InitError; type Service = FramedService; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(FramedService { factory: self.factory.clone(), _t: PhantomData, }) } } pub struct FramedService { factory: S, _t: PhantomData<(T, U)>, } impl Clone for FramedService where S: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), _t: PhantomData, } } } impl Service for FramedService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>>, <::Service as Service>::Future: 'static, <::Service as Service>::Error: From<::Error> + 'static, ::Item: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Request = Framed; type Response = FramedDispatcher; type Error = S::InitError; type Future = FramedServiceResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { FramedServiceResponseFuture { fut: self.factory.new_service(), framed: Some(req), } } } #[doc(hidden)] pub struct FramedServiceResponseFuture where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>>, <::Service as Service>::Future: 'static, <::Service as Service>::Error: From<::Error> + 'static, ::Item: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { fut: S::Future, framed: Option>, } impl Future for FramedServiceResponseFuture where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>>, <::Service as Service>::Future: 'static, <::Service as Service>::Error: From<::Error> + 'static, ::Item: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Item = FramedDispatcher; type Error = S::InitError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(service) => Ok(Async::Ready(FramedDispatcher::new( self.framed.take().unwrap(), service, ))), } } } /// FramedDispatcher - is a future that reads frames from Framed object /// and pass then to the service. pub struct FramedDispatcher where S: Service, T: AsyncRead + AsyncWrite, U: Encoder + Decoder, { service: S, framed: Framed, item: Option>, write_item: Option>, write_rx: mpsc::Receiver, S::Error>>, write_tx: mpsc::Sender, S::Error>>, flushed: bool, } impl FramedDispatcher where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Option>>, S::Future: 'static, S::Error: From<::Error> + 'static, ::Item: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { pub fn new>(framed: Framed, service: F) -> Self { let (write_tx, write_rx) = mpsc::channel(16); FramedDispatcher { framed, item: None, service: service.into_service(), write_rx, write_tx, write_item: None, flushed: true, } } } impl Future for FramedDispatcher where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Option>>, S::Future: 'static, S::Error: From<::Error> + 'static, ::Item: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Item = (); type Error = S::Error; fn poll(&mut self) -> Poll { if let Async::Ready(_) = self.service.poll_ready()? { let mut item = self.item.take(); loop { if let Some(item) = item { match self.service.poll_ready()? { Async::Ready(_) => { let sender = self.write_tx.clone(); actix::Arbiter::spawn(self.service.call(item).then(|item| { let item = match item { Ok(item) => { if let Some(item) = item { Ok(item) } else { return Either::B(ok(())); } } Err(err) => Err(err), }; Either::A(sender.send(item).map(|_| ()).map_err(|_| ())) })); } Async::NotReady => { self.item = Some(item); break; } } } match self.framed.poll() { Ok(Async::Ready(Some(el))) => item = Some(Ok(el)), Err(err) => item = Some(Err(err)), Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => return Ok(Async::Ready(())), } } } // write let mut item = self.write_item.take(); loop { item = if let Some(msg) = item { self.flushed = false; match self.framed.start_send(msg) { Ok(AsyncSink::Ready) => None, Ok(AsyncSink::NotReady(item)) => Some(item), Err(err) => { trace!("Connection error: {:?}", err); return Err(err.into()); } } } else { None }; // flush sink if !self.flushed { match self.framed.poll_complete() { Ok(Async::Ready(_)) => { self.flushed = true; } Ok(Async::NotReady) => break, Err(err) => { trace!("Connection flush error: {:?}", err); return Err(err.into()); } } } // check channel if self.flushed { if item.is_none() { match self.write_rx.poll() { Ok(Async::Ready(Some(msg))) => match msg { Ok(msg) => item = Some(msg), Err(err) => return Err(err), }, Ok(Async::NotReady) => break, Err(_) => panic!("Bug in gw code"), Ok(Async::Ready(None)) => panic!("Bug in gw code"), } } else { continue; } } else { self.write_item = item; break; } } Ok(Async::NotReady) } }