From 5e5ae2ddec0f67d395e9fbd1d593a18ad5b6084b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 29 Nov 2019 10:41:09 +0600 Subject: [PATCH] restore stream dispatcher --- actix-utils/src/lib.rs | 1 + actix-utils/src/stream.rs | 101 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 actix-utils/src/stream.rs diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 727362bf..4bc755d4 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -9,6 +9,7 @@ pub mod keepalive; pub mod mpsc; pub mod oneshot; pub mod order; +pub mod stream; pub mod task; pub mod time; pub mod timeout; diff --git a/actix-utils/src/stream.rs b/actix-utils/src/stream.rs new file mode 100644 index 00000000..143b4bfc --- /dev/null +++ b/actix-utils/src/stream.rs @@ -0,0 +1,101 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use actix_service::{IntoService, Service}; +use futures::Stream; + +use crate::mpsc; + +#[pin_project::pin_project] +pub struct StreamDispatcher +where + S: Stream, + T: Service + 'static, +{ + #[pin] + stream: S, + service: T, + err_rx: mpsc::Receiver, + err_tx: mpsc::Sender, +} + +impl StreamDispatcher +where + S: Stream, + T: Service, + T::Future: 'static, +{ + pub fn new(stream: S, service: F) -> Self + where + F: IntoService, + { + let (err_tx, err_rx) = mpsc::channel(); + StreamDispatcher { + err_rx, + err_tx, + stream, + service: service.into_service(), + } + } +} + +impl Future for StreamDispatcher +where + S: Stream, + T: Service + 'static, +{ + type Output = Result<(), T::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut this = self.as_mut().project(); + + if let Poll::Ready(Some(e)) = Pin::new(&mut this.err_rx).poll_next(cx) { + return Poll::Ready(Err(e)); + } + + loop { + match this.service.poll_ready(cx)? { + Poll::Ready(_) => match this.stream.poll_next(cx) { + Poll::Ready(Some(item)) => { + actix_rt::spawn(StreamDispatcherService { + fut: this.service.call(item), + stop: self.err_tx.clone(), + }); + this = self.as_mut().project(); + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(Ok(())), + }, + Poll::Pending => return Poll::Pending, + } + } + } +} + +#[pin_project::pin_project] +struct StreamDispatcherService { + #[pin] + fut: F, + stop: mpsc::Sender, +} + +impl Future for StreamDispatcherService +where + F: Future>, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + + match this.fut.poll(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + let _ = this.stop.send(e); + Poll::Ready(()) + } + } + } +}