//! Framed dispatcher service and related utilities use std::fmt; use std::marker::PhantomData; use actix; use futures::future::{ok, Either, FutureResult, Join}; use futures::unsync::mpsc; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use tokio_codec::{Decoder, Encoder, Framed}; use tokio_io::{AsyncRead, AsyncWrite}; use service::{IntoNewService, IntoService, NewService, Service}; type Request = ::Item; type Response = ::Item; pub struct FramedNewService { factory: S, error_handler: E, _t: PhantomData<(T, U)>, } impl FramedNewService> where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>> + Clone, <::Service as Service>::Future: 'static, <::Service as Service>::Error: fmt::Debug + 'static, ::Item: 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { pub fn new>(factory: F1) -> Self { Self { factory: factory.into_new_service(), error_handler: DefaultErrorHandler(PhantomData), _t: PhantomData, } } } impl Clone for FramedNewService where S: Clone, E: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), error_handler: self.error_handler.clone(), _t: PhantomData, } } } impl NewService for FramedNewService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>> + Clone, E: NewService, InitError = S::InitError> + Clone, <::Service as Service>::Future: 'static, <::Service as Service>::Error: fmt::Debug + 'static, ::Item: 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Request = Framed; type Response = FramedTransport; type Error = S::InitError; type InitError = S::InitError; type Service = FramedService; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(FramedService { factory: self.factory.clone(), error_service: self.error_handler.clone(), _t: PhantomData, }) } } pub struct FramedService { factory: S, error_service: E, _t: PhantomData<(T, U)>, } impl Clone for FramedService where S: Clone, E: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), error_service: self.error_service.clone(), _t: PhantomData, } } } impl Service for FramedService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>>, E: NewService, InitError = S::InitError>, <::Service as Service>::Future: 'static, <::Service as Service>::Error: fmt::Debug + 'static, ::Item: 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Request = Framed; type Response = FramedTransport; type Error = S::InitError; type Future = FramedServiceResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { FramedServiceResponseFuture { fut: self .factory .new_service() .join(self.error_service.new_service()), framed: Some(req), } } } #[doc(hidden)] pub struct FramedServiceResponseFuture where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>>, E: NewService, InitError = S::InitError>, <::Service as Service>::Future: 'static, <::Service as Service>::Error: fmt::Debug + 'static, ::Item: 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { fut: Join, framed: Option>, } impl Future for FramedServiceResponseFuture where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Option>>, E: NewService, InitError = S::InitError>, <::Service as Service>::Future: 'static, <::Service as Service>::Error: fmt::Debug + 'static, ::Item: 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Item = FramedTransport; type Error = S::InitError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready((service, error_service)) => { Ok(Async::Ready(FramedTransport::with_error_service( self.framed.take().unwrap(), service, error_service, ))) } } } } pub enum TransportError { Decoder(::Error), Encoder(::Error), Service(S::Error), } /// Default error handling service pub struct DefaultErrorHandler(PhantomData<(S, U, E)>); impl Service for DefaultErrorHandler where S: Service, U: Encoder + Decoder, S::Error: fmt::Debug, ::Error: fmt::Debug, ::Error: fmt::Debug, { type Request = TransportError; type Response = (); type Error = (); type Future = FutureResult; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { match req { TransportError::Service(err) => debug!("Service error: {:?}", err), TransportError::Decoder(err) => trace!("Service decoder error: {:?}", err), TransportError::Encoder(err) => trace!("Service encoder error: {:?}", err), } ok(()) } } impl NewService for DefaultErrorHandler where S: Service, U: Encoder + Decoder, S::Error: fmt::Debug, ::Error: fmt::Debug, ::Error: fmt::Debug, { type Request = TransportError; type Response = (); type Error = (); type InitError = E; type Service = DefaultErrorHandler; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(DefaultErrorHandler(PhantomData)) } } /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. pub struct FramedTransport where S: Service, T: AsyncRead + AsyncWrite, U: Encoder + Decoder, E: Service, { service: S, error_service: E, state: TransportState, framed: Framed, request: Option>, response: Option>, write_rx: mpsc::Receiver, S::Error>>, write_tx: mpsc::Sender, S::Error>>, flushed: bool, } enum TransportState { Processing, Error(E::Future), EncoderError(E::Future), SinkFlushing, Stopping, } impl FramedTransport> where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Option>>, S::Future: 'static, S::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { pub fn new>(framed: Framed, service: F) -> Self { let (write_tx, write_rx) = mpsc::channel(16); FramedTransport { framed, write_rx, write_tx, service: service.into_service(), error_service: DefaultErrorHandler(PhantomData), state: TransportState::Processing, request: None, response: None, flushed: true, } } /// Set error handler service pub fn error_handler(self, handler: E) -> FramedTransport where E: Service>, { FramedTransport { framed: self.framed, request: self.request, service: self.service, write_rx: self.write_rx, write_tx: self.write_tx, response: self.response, flushed: self.flushed, state: TransportState::Processing, error_service: handler, } } } impl FramedTransport where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Option>>, E: Service>, S::Future: 'static, S::Error: fmt::Debug + 'static, ::Item: 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { pub fn with_error_service>( framed: Framed, service: F, error_service: E, ) -> Self { let (write_tx, write_rx) = mpsc::channel(16); FramedTransport { framed, write_rx, write_tx, error_service, service: service.into_service(), state: TransportState::Processing, request: None, response: None, flushed: true, } } fn poll_service(&mut self) -> bool { match self.service.poll_ready() { Ok(Async::Ready(_)) => { let mut item = self.request.take(); loop { if let Some(item) = item { match self.service.poll_ready() { Ok(Async::Ready(_)) => { let sender = self.write_tx.clone(); actix::Arbiter::spawn(self.service.call(item).then(|item| { let item = match item { Ok(item) => { if let Some(item) = item { Ok(item) } else { return Either::B(ok(())); } } Err(err) => Err(err), }; Either::A(sender.send(item).map(|_| ()).map_err(|_| ())) })); } Ok(Async::NotReady) => { self.request = Some(item); return false; } Err(err) => { self.state = TransportState::Error( self.error_service.call(TransportError::Service(err)), ); return true; } } } match self.framed.poll() { Ok(Async::Ready(Some(el))) => item = Some(el), Err(err) => { self.state = TransportState::Error( self.error_service.call(TransportError::Decoder(err)), ); return true; } Ok(Async::NotReady) => return false, Ok(Async::Ready(None)) => { self.state = TransportState::Stopping; return true; } } } } Ok(Async::NotReady) => return false, Err(err) => { self.state = TransportState::Error( self.error_service.call(TransportError::Service(err)), ); return true; } } } /// write to sink fn poll_response(&mut self) -> bool { let mut item = self.response.take(); loop { item = if let Some(msg) = item { self.flushed = false; match self.framed.start_send(msg) { Ok(AsyncSink::Ready) => None, Ok(AsyncSink::NotReady(item)) => Some(item), Err(err) => { trace!("Connection error: {:?}", err); self.state = TransportState::EncoderError( self.error_service.call(TransportError::Encoder(err)), ); return true; } } } else { None }; // flush sink if !self.flushed { match self.framed.poll_complete() { Ok(Async::Ready(_)) => { self.flushed = true; } Ok(Async::NotReady) => break, Err(err) => { trace!("Connection flush error: {:?}", err); self.state = TransportState::EncoderError( self.error_service.call(TransportError::Encoder(err)), ); return true; } } } // check channel if self.flushed { if item.is_none() { match self.write_rx.poll() { Ok(Async::Ready(Some(msg))) => match msg { Ok(msg) => item = Some(msg), Err(err) => { self.state = TransportState::Error( self.error_service.call(TransportError::Service(err)), ); return true; } }, Ok(Async::NotReady) => break, Err(_) => panic!("Bug in gw code"), Ok(Async::Ready(None)) => panic!("Bug in gw code"), } } else { continue; } } else { self.response = item; break; } } false } } impl Future for FramedTransport where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Option>>, S::Future: 'static, S::Error: fmt::Debug + 'static, E: Service>, ::Item: 'static, ::Error: fmt::Debug + 'static, ::Error: fmt::Debug + 'static, { type Item = (); type Error = S::Error; fn poll(&mut self) -> Poll { let state = match self.state { TransportState::Processing => { if self.poll_service() { return self.poll(); } if self.poll_response() { return self.poll(); } return Ok(Async::NotReady); } TransportState::Error(ref mut fut) => match fut.poll() { Err(_) | Ok(Async::Ready(_)) => TransportState::SinkFlushing, _ => return Ok(Async::NotReady), }, TransportState::EncoderError(ref mut fut) => match fut.poll() { Err(_) | Ok(Async::Ready(_)) => return Ok(Async::Ready(())), _ => return Ok(Async::NotReady), }, TransportState::SinkFlushing => { if self.poll_response() { return self.poll(); } if self.flushed { return Ok(Async::Ready(())); } return Ok(Async::NotReady); } TransportState::Stopping => return Ok(Async::Ready(())), }; self.state = state; self.poll() } }