diff --git a/actix-utils/src/stream.rs b/actix-utils/src/stream.rs index 46519109..b17258f0 100644 --- a/actix-utils/src/stream.rs +++ b/actix-utils/src/stream.rs @@ -5,18 +5,22 @@ use actix_service::{IntoService, NewService, Service}; use futures::unsync::mpsc; use futures::{future, Async, Future, Poll, Stream}; -pub struct StreamDispatcher { +pub struct StreamDispatcher +where + S: Stream, + T: Service>, +{ stream: S, service: T, item: Option>, - stop_rx: mpsc::UnboundedReceiver<()>, - stop_tx: mpsc::UnboundedSender<()>, + stop_rx: mpsc::UnboundedReceiver, + stop_tx: mpsc::UnboundedSender, } impl StreamDispatcher where S: Stream, - T: Service, Response = (), Error = ()>, + T: Service, Response = ()>, T::Future: 'static, { pub fn new(stream: S, service: F) -> Self @@ -37,15 +41,15 @@ where impl Future for StreamDispatcher where S: Stream, - T: Service, Response = (), Error = ()>, + T: Service, Response = ()>, T::Future: 'static, { type Item = (); - type Error = (); + type Error = T::Error; fn poll(&mut self) -> Poll { - if let Ok(Async::Ready(Some(_))) = self.stop_rx.poll() { - return Ok(Async::Ready(())); + if let Ok(Async::Ready(Some(e))) = self.stop_rx.poll() { + return Err(e); } let mut item = self.item.take(); @@ -74,7 +78,7 @@ where struct StreamDispatcherService { fut: F, - stop: mpsc::UnboundedSender<()>, + stop: mpsc::UnboundedSender, } impl Future for StreamDispatcherService { @@ -85,8 +89,8 @@ impl Future for StreamDispatcherService { match self.fut.poll() { Ok(Async::Ready(_)) => Ok(Async::Ready(())), Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => { - let _ = self.stop.unbounded_send(()); + Err(e) => { + let _ = self.stop.unbounded_send(e); Ok(Async::Ready(())) } }