//! Framed dispatcher service and related utilities use std::collections::VecDeque; use std::marker::PhantomData; use std::mem; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, FutureResult}; use futures::task::AtomicTask; use futures::{Async, Future, Poll, Sink, Stream}; use log::debug; use crate::cell::Cell; type Request = ::Item; type Response = ::Item; pub struct FramedNewService { factory: S, _t: PhantomData<(T, U, C)>, } impl FramedNewService where C: Clone, S: NewService, Response = Response>, S::Error: 'static, ::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { pub fn new>(factory: F1) -> Self { Self { factory: factory.into_new_service(), _t: PhantomData, } } } impl Clone for FramedNewService where S: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), _t: PhantomData, } } } impl NewService for FramedNewService where C: Clone, S: NewService, Response = Response> + Clone, S::Error: 'static, ::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { type Request = Framed; type Response = FramedTransport; type Error = S::InitError; type InitError = S::InitError; type Service = FramedService; type Future = FutureResult; fn new_service(&self, cfg: &C) -> Self::Future { ok(FramedService { factory: self.factory.clone(), config: cfg.clone(), _t: PhantomData, }) } } pub struct FramedService { factory: S, config: C, _t: PhantomData<(T, U)>, } impl Clone for FramedService where S: Clone, C: Clone, { fn clone(&self) -> Self { Self { factory: self.factory.clone(), config: self.config.clone(), _t: PhantomData, } } } impl Service for FramedService where S: NewService, Response = Response>, S::Error: 'static, ::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, C: Clone, { type Request = Framed; type Response = FramedTransport; type Error = S::InitError; type Future = FramedServiceResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Framed) -> Self::Future { FramedServiceResponseFuture { fut: self.factory.new_service(&self.config), framed: Some(req), } } } #[doc(hidden)] pub struct FramedServiceResponseFuture where S: NewService, Response = Response>, S::Error: 'static, ::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { fut: S::Future, framed: Option>, } impl Future for FramedServiceResponseFuture where S: NewService, Response = Response>, S::Error: 'static, ::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { type Item = FramedTransport; type Error = S::InitError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(service) => Ok(Async::Ready(FramedTransport::new( self.framed.take().unwrap(), service, ))), } } } /// Framed transport errors pub enum FramedTransportError { Service(E), Encoder(::Error), Decoder(::Error), } impl From for FramedTransportError { fn from(err: E) -> Self { FramedTransportError::Service(err) } } /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. pub struct FramedTransport where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, U: Encoder + Decoder, ::Item: 'static, ::Error: std::fmt::Debug, { service: S, state: TransportState, framed: Framed, inner: Cell::Item, S::Error>>, } enum TransportState { Processing, Error(FramedTransportError), FramedError(FramedTransportError), Stopping, } struct FramedTransportInner { buf: VecDeque>, task: AtomicTask, } impl FramedTransport where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { fn poll_read(&mut self) -> bool { loop { match self.service.poll_ready() { Ok(Async::Ready(_)) => loop { let item = match self.framed.poll() { Ok(Async::Ready(Some(el))) => el, Err(err) => { self.state = TransportState::FramedError(FramedTransportError::Decoder(err)); return true; } Ok(Async::NotReady) => return false, Ok(Async::Ready(None)) => { self.state = TransportState::Stopping; return true; } }; let mut cell = self.inner.clone(); cell.get_mut().task.register(); tokio_current_thread::spawn(self.service.call(item).then(move |item| { let inner = cell.get_mut(); inner.buf.push_back(item); inner.task.notify(); Ok(()) })); }, Ok(Async::NotReady) => return false, Err(err) => { self.state = TransportState::Error(FramedTransportError::Service(err)); return true; } } } } /// write to framed object fn poll_write(&mut self) -> bool { let inner = self.inner.get_mut(); loop { while !self.framed.is_write_buf_full() { if let Some(msg) = inner.buf.pop_front() { match msg { Ok(msg) => { if let Err(err) = self.framed.force_send(msg) { self.state = TransportState::FramedError( FramedTransportError::Encoder(err), ); return true; } } Err(err) => { self.state = TransportState::Error(FramedTransportError::Service(err)); return true; } } } else { break; } } if !self.framed.is_write_buf_empty() { match self.framed.poll_complete() { Ok(Async::NotReady) => break, Err(err) => { debug!("Error sending data: {:?}", err); self.state = TransportState::FramedError(FramedTransportError::Encoder(err)); return true; } Ok(Async::Ready(_)) => (), } } else { break; } } false } } impl FramedTransport where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { pub fn new>(framed: Framed, service: F) -> Self { FramedTransport { framed, service: service.into_service(), state: TransportState::Processing, inner: Cell::new(FramedTransportInner { buf: VecDeque::new(), task: AtomicTask::new(), }), } } /// Get reference to a service wrapped by `FramedTransport` instance. pub fn get_ref(&self) -> &S { &self.service } /// Get mutable reference to a service wrapped by `FramedTransport` /// instance. pub fn get_mut(&mut self) -> &mut S { &mut self.service } /// Get reference to a framed instance wrapped by `FramedTransport` /// instance. pub fn get_framed(&self) -> &Framed { &self.framed } /// Get mutable reference to a framed instance wrapped by `FramedTransport` /// instance. pub fn get_framed_mut(&mut self) -> &mut Framed { &mut self.framed } } impl Future for FramedTransport where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, U: Decoder + Encoder, ::Item: 'static, ::Error: std::fmt::Debug, { type Item = (); type Error = FramedTransportError; fn poll(&mut self) -> Poll { match mem::replace(&mut self.state, TransportState::Processing) { TransportState::Processing => { if self.poll_read() || self.poll_write() { self.poll() } else { Ok(Async::NotReady) } } TransportState::Error(err) => { if self.framed.is_write_buf_empty() || (self.poll_write() || self.framed.is_write_buf_empty()) { Err(err) } else { self.state = TransportState::Error(err); Ok(Async::NotReady) } } TransportState::FramedError(err) => Err(err), TransportState::Stopping => Ok(Async::Ready(())), } } } pub struct IntoFramed where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { factory: F, _t: PhantomData<(T,)>, } impl IntoFramed where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { pub fn new(factory: F) -> Self { IntoFramed { factory, _t: PhantomData, } } } impl NewService<()> for IntoFramed where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { type Request = T; type Response = Framed; type Error = (); type InitError = (); type Service = IntoFramedService; type Future = FutureResult; fn new_service(&self, _: &()) -> Self::Future { ok(IntoFramedService { factory: self.factory.clone(), _t: PhantomData, }) } } pub struct IntoFramedService where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { factory: F, _t: PhantomData<(T,)>, } impl Service for IntoFramedService where T: AsyncRead + AsyncWrite, F: Fn() -> U + Send + Clone + 'static, U: Encoder + Decoder, { type Request = T; type Response = Framed; type Error = (); type Future = FutureResult; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: T) -> Self::Future { ok(Framed::new(req, (self.factory)())) } }