1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-19 04:12:38 +01:00
actix-net/actix-ioframe/src/dispatcher.rs

303 lines
10 KiB
Rust
Raw Normal View History

2019-06-26 11:19:40 +02:00
//! Framed dispatcher service and related utilities
use std::collections::VecDeque;
use std::mem;
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures::task::AtomicTask;
use futures::unsync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Sink as FutureSink, Stream};
use log::debug;
use crate::cell::Cell;
use crate::error::ServiceError;
use crate::item::Item;
use crate::sink::Sink;
type Request<S, U> = Item<S, U>;
type Response<U> = <U as Encoder>::Item;
pub(crate) enum FramedMessage<T> {
Message(T),
Close,
WaitClose(oneshot::Sender<()>),
}
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
pub(crate) struct FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
service: S,
sink: Sink<<U as Encoder>::Item>,
state: Cell<St>,
dispatch_state: State<S, U>,
framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedDispatcherInner<<U as Encoder>::Item, S::Error>>,
}
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
pub(crate) fn new<F: IntoService<S>>(
framed: Framed<T, U>,
state: Cell<St>,
service: F,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
sink: Sink<<U as Encoder>::Item>,
) -> Self {
FramedDispatcher {
framed,
state,
sink,
rx: Some(rx),
service: service.into_service(),
dispatch_state: State::Processing,
inner: Cell::new(FramedDispatcherInner {
buf: VecDeque::new(),
task: AtomicTask::new(),
}),
}
}
}
enum State<S: Service, U: Encoder + Decoder> {
Processing,
Error(ServiceError<S::Error, U>),
FramedError(ServiceError<S::Error, U>),
FlushAndStop(Vec<oneshot::Sender<()>>),
Stopping,
}
impl<S: Service, U: Encoder + Decoder> State<S, U> {
fn stop(&mut self, tx: Option<oneshot::Sender<()>>) {
match self {
State::FlushAndStop(ref mut vec) => {
if let Some(tx) = tx {
vec.push(tx)
}
}
State::Processing => {
*self = State::FlushAndStop(if let Some(tx) = tx {
vec![tx]
} else {
Vec::new()
})
}
State::Error(_) | State::FramedError(_) | State::Stopping => {
if let Some(tx) = tx {
let _ = tx.send(());
}
}
}
}
}
struct FramedDispatcherInner<I, E> {
buf: VecDeque<Result<I, E>>,
task: AtomicTask,
}
impl<St, S, T, U> FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
fn poll_read(&mut self) -> bool {
loop {
match self.service.poll_ready() {
Ok(Async::Ready(_)) => {
let item = match self.framed.poll() {
Ok(Async::Ready(Some(el))) => el,
Err(err) => {
self.dispatch_state =
State::FramedError(ServiceError::Decoder(err));
return true;
}
Ok(Async::NotReady) => return false,
Ok(Async::Ready(None)) => {
self.dispatch_state = State::Stopping;
return true;
}
};
let mut cell = self.inner.clone();
cell.get_mut().task.register();
tokio_current_thread::spawn(
self.service
.call(Item::new(self.state.clone(), self.sink.clone(), item))
.then(move |item| {
let item = match item {
Ok(Some(item)) => Ok(item),
Ok(None) => return Ok(()),
Err(err) => Err(err),
};
let inner = cell.get_mut();
inner.buf.push_back(item);
inner.task.notify();
Ok(())
}),
);
}
Ok(Async::NotReady) => return false,
Err(err) => {
self.dispatch_state = State::Error(ServiceError::Service(err));
return true;
}
}
}
}
/// write to framed object
fn poll_write(&mut self) -> bool {
let inner = self.inner.get_mut();
let mut rx_done = self.rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !self.framed.is_write_buf_full() {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = self.framed.force_send(msg) {
self.dispatch_state =
State::FramedError(ServiceError::Encoder(err));
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
self.dispatch_state = State::Error(ServiceError::Service(err));
return true;
}
}
}
if !rx_done && self.rx.is_some() {
match self.rx.as_mut().unwrap().poll() {
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
if let Err(err) = self.framed.force_send(msg) {
self.dispatch_state =
State::FramedError(ServiceError::Encoder(err));
return true;
}
}
Ok(Async::Ready(Some(FramedMessage::Close))) => {
self.dispatch_state.stop(None);
return true;
}
Ok(Async::Ready(Some(FramedMessage::WaitClose(tx)))) => {
self.dispatch_state.stop(Some(tx));
return true;
}
Ok(Async::Ready(None)) => {
rx_done = true;
let _ = self.rx.take();
}
Ok(Async::NotReady) => rx_done = true,
Err(_e) => {
rx_done = true;
let _ = self.rx.take();
}
}
}
if rx_done && buf_empty {
break;
}
}
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Ok(Async::NotReady) => break,
Err(err) => {
debug!("Error sending data: {:?}", err);
self.dispatch_state = State::FramedError(ServiceError::Encoder(err));
return true;
}
Ok(Async::Ready(_)) => (),
}
} else {
break;
}
}
false
}
}
impl<St, S, T, U> Future for FramedDispatcher<St, S, T, U>
where
S: Service<Request = Request<St, U>, Response = Option<Response<U>>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Item = ();
type Error = ServiceError<S::Error, U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match mem::replace(&mut self.dispatch_state, State::Processing) {
State::Processing => {
if self.poll_read() || self.poll_write() {
self.poll()
} else {
Ok(Async::NotReady)
}
}
State::Error(err) => {
if self.framed.is_write_buf_empty()
|| (self.poll_write() || self.framed.is_write_buf_empty())
{
Err(err)
} else {
self.dispatch_state = State::Error(err);
Ok(Async::NotReady)
}
}
State::FlushAndStop(mut vec) => {
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Err(err) => {
debug!("Error sending data: {:?}", err);
}
Ok(Async::NotReady) => {
self.dispatch_state = State::FlushAndStop(vec);
return Ok(Async::NotReady);
}
Ok(Async::Ready(_)) => (),
}
};
for tx in vec.drain(..) {
let _ = tx.send(());
}
Ok(Async::Ready(()))
}
State::FramedError(err) => Err(err),
State::Stopping => Ok(Async::Ready(())),
}
}
}