1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 09:41:49 +01:00

allow to send messages to framed transport via mpsc channel

This commit is contained in:
Nikolay Kim 2019-03-20 09:44:23 -07:00
parent ef1bdb2eb2
commit e0d3581239
2 changed files with 69 additions and 3 deletions

View File

@ -1,5 +1,10 @@
# Changes
### Added
* Allow to send messages to `FramedTransport` via mpsc channel.
## [0.3.4] - 2019-03-12
### Changed

View File

@ -7,6 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, FutureResult};
use futures::task::AtomicTask;
use futures::unsync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Sink, Stream};
use log::debug;
@ -178,6 +179,11 @@ impl<E, U: Encoder + Decoder> From<E> for FramedTransportError<E, U> {
}
}
pub enum FramedMessage<T> {
Message(T),
Close,
}
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
pub struct FramedTransport<S, T, U>
@ -193,6 +199,7 @@ where
service: S,
state: TransportState<S, U>,
framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
}
@ -200,6 +207,7 @@ enum TransportState<S: Service, U: Encoder + Decoder> {
Processing,
Error(FramedTransportError<S::Error, U>),
FramedError(FramedTransportError<S::Error, U>),
FlushAndStop,
Stopping,
}
@ -257,10 +265,12 @@ where
/// write to framed object
fn poll_write(&mut self) -> bool {
let inner = self.inner.get_mut();
let mut rx_done = self.rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !self.framed.is_write_buf_full() {
if let Some(msg) = inner.buf.pop_front() {
match msg {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = self.framed.force_send(msg) {
self.state = TransportState::FramedError(
@ -268,6 +278,7 @@ where
);
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
self.state =
@ -275,7 +286,33 @@ where
return true;
}
}
} else {
}
if !rx_done && self.rx.is_some() {
match self.rx.as_mut().unwrap().poll() {
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
if let Err(err) = self.framed.force_send(msg) {
self.state = TransportState::FramedError(
FramedTransportError::Encoder(err),
);
return true;
}
}
Ok(Async::Ready(Some(FramedMessage::Close))) => {
self.state = TransportState::FlushAndStop;
return true;
}
Ok(Async::Ready(None)) => {
let _ = self.rx.take();
}
Ok(Async::NotReady) => rx_done = true,
Err(_e) => {
let _ = self.rx.take();
}
}
}
if rx_done && buf_empty {
break;
}
}
@ -313,6 +350,7 @@ where
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
FramedTransport {
framed,
rx: None,
service: service.into_service(),
state: TransportState::Processing,
inner: Cell::new(FramedTransportInner {
@ -322,6 +360,15 @@ where
}
}
/// Get Sender
pub fn set_receiver(
mut self,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
) -> Self {
self.rx = Some(rx);
self
}
/// Get reference to a service wrapped by `FramedTransport` instance.
pub fn get_ref(&self) -> &S {
&self.service
@ -378,6 +425,20 @@ where
Ok(Async::NotReady)
}
}
TransportState::FlushAndStop => {
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Err(err) => {
debug!("Error sending data: {:?}", err);
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
}
} else {
Ok(Async::Ready(()))
}
}
TransportState::FramedError(err) => Err(err),
TransportState::Stopping => Ok(Async::Ready(())),
}