//! Framed dispatcher service and related utilities use std::marker::PhantomData; use std::mem; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_rt::Arbiter; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, FutureResult}; use futures::unsync::mpsc; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; type Request = ::Item; type Response = ::Item; pub struct FramedNewService { factory: S, _t: PhantomData<(T, U)>, } impl FramedNewService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Response>, <>>::Service as Service>>::Future: 'static, <>>::Service as Service>>::Error: 'static, ::Item: 'static, ::Error: 'static, { pub fn new>>(factory: F1) -> Self { Self { factory: factory.into_new_service(), _t: PhantomData, } } } impl Clone for FramedNewService where S: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), _t: PhantomData, } } } impl NewService> for FramedNewService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Response> + Clone, <>>::Service as Service>>::Future: 'static, <>>::Service as Service>>::Error: 'static, ::Item: 'static, ::Error: 'static, { type Response = FramedTransport; type Error = S::InitError; type InitError = S::InitError; type Service = FramedService; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(FramedService { factory: self.factory.clone(), _t: PhantomData, }) } } pub struct FramedService { factory: S, _t: PhantomData<(T, U)>, } impl Clone for FramedService where S: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), _t: PhantomData, } } } impl Service> for FramedService where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Response>, <>>::Service as Service>>::Future: 'static, <>>::Service as Service>>::Error: 'static, ::Item: 'static, ::Error: 'static, { type Response = FramedTransport; type Error = S::InitError; type Future = FramedServiceResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Framed) -> Self::Future { FramedServiceResponseFuture { fut: self.factory.new_service(), framed: Some(req), } } } #[doc(hidden)] pub struct FramedServiceResponseFuture where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Response>, <>>::Service as Service>>::Future: 'static, <>>::Service as Service>>::Error: 'static, ::Item: 'static, ::Error: 'static, { fut: S::Future, framed: Option>, } impl Future for FramedServiceResponseFuture where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: NewService, Response = Response>, <>>::Service as Service>>::Future: 'static, <>>::Service as Service>>::Error: 'static, ::Item: 'static, ::Error: 'static, { type Item = FramedTransport; type Error = S::InitError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(service) => Ok(Async::Ready(FramedTransport::new( self.framed.take().unwrap(), service, ))), } } } /// Framed transport errors pub enum FramedTransportError { Service(E), Encoder(::Error), Decoder(::Error), } impl From for FramedTransportError { fn from(err: E) -> Self { FramedTransportError::Service(err) } } /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. pub struct FramedTransport where S: Service, Response = Response>, T: AsyncRead + AsyncWrite, U: Encoder + Decoder, { service: S, state: TransportState, framed: Framed, request: Option>, response: Option>, write_rx: mpsc::Receiver, S::Error>>, write_tx: mpsc::Sender, S::Error>>, flushed: bool, } enum TransportState>, U: Encoder + Decoder> { Processing, Error(FramedTransportError), EncoderError(FramedTransportError), Stopping, } impl FramedTransport where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Response>, S::Future: 'static, S::Error: 'static, ::Error: 'static, { pub fn new>>(framed: Framed, service: F) -> Self { let (write_tx, write_rx) = mpsc::channel(16); FramedTransport { framed, write_rx, write_tx, service: service.into_service(), state: TransportState::Processing, request: None, response: None, flushed: true, } } /// Get reference to a service wrapped by `FramedTransport` instance. pub fn get_ref(&self) -> &S { &self.service } /// Get mutable reference to a service wrapped by `FramedTransport` /// instance. pub fn get_mut(&mut self) -> &mut S { &mut self.service } /// Get reference to a framed instance wrapped by `FramedTransport` /// instance. pub fn get_framed(&self) -> &Framed { &self.framed } /// Get mutable reference to a framed instance wrapped by `FramedTransport` /// instance. pub fn get_framed_mut(&mut self) -> &mut Framed { &mut self.framed } } impl FramedTransport where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Response>, S::Future: 'static, S::Error: 'static, ::Item: 'static, ::Error: 'static, { fn poll_service(&mut self) -> bool { match self.service.poll_ready() { Ok(Async::Ready(_)) => { if let Some(item) = self.request.take() { let sender = self.write_tx.clone(); Arbiter::spawn( self.service .call(item) .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), ); } loop { let item = match self.framed.poll() { Ok(Async::Ready(Some(el))) => el, Err(err) => { self.state = TransportState::Error(FramedTransportError::Decoder(err)); return true; } Ok(Async::NotReady) => return false, Ok(Async::Ready(None)) => { self.state = TransportState::Stopping; return true; } }; match self.service.poll_ready() { Ok(Async::Ready(_)) => { let sender = self.write_tx.clone(); Arbiter::spawn( self.service .call(item) .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), ); } Ok(Async::NotReady) => { self.request = Some(item); return false; } Err(err) => { self.state = TransportState::Error(FramedTransportError::Service(err)); return true; } } } } Ok(Async::NotReady) => false, Err(err) => { self.state = TransportState::Error(FramedTransportError::Service(err)); true } } } /// write to sink fn poll_response(&mut self) -> bool { let mut item = self.response.take(); loop { item = if let Some(msg) = item { self.flushed = false; match self.framed.start_send(msg) { Ok(AsyncSink::Ready) => None, Ok(AsyncSink::NotReady(item)) => Some(item), Err(err) => { self.state = TransportState::EncoderError(FramedTransportError::Encoder(err)); return true; } } } else { None }; // flush sink if !self.flushed { match self.framed.poll_complete() { Ok(Async::Ready(_)) => { self.flushed = true; } Ok(Async::NotReady) => break, Err(err) => { self.state = TransportState::EncoderError(FramedTransportError::Encoder(err)); return true; } } } // check channel if self.flushed { if item.is_none() { match self.write_rx.poll() { Ok(Async::Ready(Some(msg))) => match msg { Ok(msg) => item = Some(msg), Err(err) => { self.state = TransportState::Error(FramedTransportError::Service(err)); return true; } }, Ok(Async::NotReady) => break, Err(_) => panic!("Bug in gw code"), Ok(Async::Ready(None)) => panic!("Bug in gw code"), } } else { continue; } } else { self.response = item; break; } } false } } impl Future for FramedTransport where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, S: Service, Response = Response>, S::Future: 'static, S::Error: 'static, ::Item: 'static, ::Error: 'static, { type Item = (); type Error = FramedTransportError; fn poll(&mut self) -> Poll { match mem::replace(&mut self.state, TransportState::Processing) { TransportState::Processing => { if self.poll_service() || self.poll_response() { self.poll() } else { Ok(Async::NotReady) } } TransportState::Error(err) => { if self.poll_response() || self.flushed { Err(err) } else { self.state = TransportState::Error(err); Ok(Async::NotReady) } } TransportState::EncoderError(err) => Err(err), TransportState::Stopping => Ok(Async::Ready(())), } } } pub struct IntoFramed where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { factory: F, _t: PhantomData<(T,)>, } impl IntoFramed where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { pub fn new(factory: F) -> Self { IntoFramed { factory, _t: PhantomData, } } } impl NewService for IntoFramed where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { type Response = Framed; type Error = (); type InitError = (); type Service = IntoFramedService; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(IntoFramedService { factory: self.factory.clone(), _t: PhantomData, }) } } pub struct IntoFramedService where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { factory: F, _t: PhantomData<(T,)>, } impl Service for IntoFramedService where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { type Response = Framed; type Error = (); type Future = FutureResult; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: T) -> Self::Future { ok(Framed::new(req, (self.factory)())) } }