diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 7bb703e9..00bed130 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,10 @@ # Changes +### Added + +* Allow to send messages to `FramedTransport` via mpsc channel. + + ## [0.3.4] - 2019-03-12 ### Changed diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 7389aefa..44fe9d85 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -7,6 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, FutureResult}; use futures::task::AtomicTask; +use futures::unsync::mpsc; use futures::{Async, Future, IntoFuture, Poll, Sink, Stream}; use log::debug; @@ -178,6 +179,11 @@ impl From for FramedTransportError { } } +pub enum FramedMessage { + Message(T), + Close, +} + /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. pub struct FramedTransport @@ -193,6 +199,7 @@ where service: S, state: TransportState, framed: Framed, + rx: Option::Item>>>, inner: Cell::Item, S::Error>>, } @@ -200,6 +207,7 @@ enum TransportState { Processing, Error(FramedTransportError), FramedError(FramedTransportError), + FlushAndStop, Stopping, } @@ -257,10 +265,12 @@ where /// write to framed object fn poll_write(&mut self) -> bool { let inner = self.inner.get_mut(); + let mut rx_done = self.rx.is_none(); + let mut buf_empty = inner.buf.is_empty(); loop { while !self.framed.is_write_buf_full() { - if let Some(msg) = inner.buf.pop_front() { - match msg { + if !buf_empty { + match inner.buf.pop_front().unwrap() { Ok(msg) => { if let Err(err) = self.framed.force_send(msg) { self.state = TransportState::FramedError( @@ -268,6 +278,7 @@ where ); return true; } + buf_empty = inner.buf.is_empty(); } Err(err) => { self.state = @@ -275,7 +286,33 @@ where return true; } } - } else { + } + + if !rx_done && self.rx.is_some() { + match self.rx.as_mut().unwrap().poll() { + Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => { + if let Err(err) = self.framed.force_send(msg) { + self.state = TransportState::FramedError( + FramedTransportError::Encoder(err), + ); + return true; + } + } + Ok(Async::Ready(Some(FramedMessage::Close))) => { + self.state = TransportState::FlushAndStop; + return true; + } + Ok(Async::Ready(None)) => { + let _ = self.rx.take(); + } + Ok(Async::NotReady) => rx_done = true, + Err(_e) => { + let _ = self.rx.take(); + } + } + } + + if rx_done && buf_empty { break; } } @@ -313,6 +350,7 @@ where pub fn new>(framed: Framed, service: F) -> Self { FramedTransport { framed, + rx: None, service: service.into_service(), state: TransportState::Processing, inner: Cell::new(FramedTransportInner { @@ -322,6 +360,15 @@ where } } + /// Get Sender + pub fn set_receiver( + mut self, + rx: mpsc::UnboundedReceiver::Item>>, + ) -> Self { + self.rx = Some(rx); + self + } + /// Get reference to a service wrapped by `FramedTransport` instance. pub fn get_ref(&self) -> &S { &self.service @@ -378,6 +425,20 @@ where Ok(Async::NotReady) } } + TransportState::FlushAndStop => { + if !self.framed.is_write_buf_empty() { + match self.framed.poll_complete() { + Err(err) => { + debug!("Error sending data: {:?}", err); + Ok(Async::Ready(())) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => Ok(Async::Ready(())), + } + } else { + Ok(Async::Ready(())) + } + } TransportState::FramedError(err) => Err(err), TransportState::Stopping => Ok(Async::Ready(())), }