use futures::unsync::mpsc; use futures::{Async, Future, Poll, Stream}; use tokio::executor::current_thread::spawn; use super::{IntoService, Service}; pub struct StreamDispatcher { stream: S, service: T, item: Option>, stop_rx: mpsc::UnboundedReceiver<()>, stop_tx: mpsc::UnboundedSender<()>, } impl StreamDispatcher where S: Stream, T: Service, Response = (), Error = ()>, T::Future: 'static, { pub fn new>(stream: S, service: F) -> Self { let (stop_tx, stop_rx) = mpsc::unbounded(); StreamDispatcher { stream, item: None, service: service.into_service(), stop_rx, stop_tx, } } } impl Future for StreamDispatcher where S: Stream, T: Service, Response = (), Error = ()>, T::Future: 'static, { type Item = (); type Error = (); fn poll(&mut self) -> Poll { if let Ok(Async::Ready(Some(_))) = self.stop_rx.poll() { return Ok(Async::Ready(())); } let mut item = self.item.take(); loop { if item.is_some() { match self.service.poll_ready()? { Async::Ready(_) => spawn(StreamDispatcherService { fut: self.service.call(item.take().unwrap()), stop: self.stop_tx.clone(), }), Async::NotReady => { self.item = item; return Ok(Async::NotReady); } } } match self.stream.poll() { Ok(Async::Ready(Some(el))) => item = Some(Ok(el)), Err(err) => item = Some(Err(err)), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(())), } } } } struct StreamDispatcherService { fut: F, stop: mpsc::UnboundedSender<()>, } impl Future for StreamDispatcherService { type Item = (); type Error = (); fn poll(&mut self) -> Poll { match self.fut.poll() { Ok(Async::Ready(_)) => Ok(Async::Ready(())), Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => { let _ = self.stop.unbounded_send(()); Ok(Async::Ready(())) } } } }