use std::marker::PhantomData; use actix_rt::spawn; use actix_service::{IntoService, NewService, Service}; use futures::unsync::mpsc; use futures::{future, Async, Future, Poll, Stream}; pub struct StreamDispatcher { stream: S, service: T, item: Option>, stop_rx: mpsc::UnboundedReceiver<()>, stop_tx: mpsc::UnboundedSender<()>, } impl StreamDispatcher where S: Stream, T: Service, Response = (), Error = ()>, T::Future: 'static, { pub fn new(stream: S, service: F) -> Self where F: IntoService>, { let (stop_tx, stop_rx) = mpsc::unbounded(); StreamDispatcher { stream, item: None, service: service.into_service(), stop_rx, stop_tx, } } } impl Future for StreamDispatcher where S: Stream, T: Service, Response = (), Error = ()>, T::Future: 'static, { type Item = (); type Error = (); fn poll(&mut self) -> Poll { if let Ok(Async::Ready(Some(_))) = self.stop_rx.poll() { return Ok(Async::Ready(())); } let mut item = self.item.take(); loop { if item.is_some() { match self.service.poll_ready()? { Async::Ready(_) => spawn(StreamDispatcherService { fut: self.service.call(item.take().unwrap()), stop: self.stop_tx.clone(), }), Async::NotReady => { self.item = item; return Ok(Async::NotReady); } } } match self.stream.poll() { Ok(Async::Ready(Some(el))) => item = Some(Ok(el)), Err(err) => item = Some(Err(err)), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(())), } } } } struct StreamDispatcherService { fut: F, stop: mpsc::UnboundedSender<()>, } impl Future for StreamDispatcherService { type Item = (); type Error = (); fn poll(&mut self) -> Poll { match self.fut.poll() { Ok(Async::Ready(_)) => Ok(Async::Ready(())), Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => { let _ = self.stop.unbounded_send(()); Ok(Async::Ready(())) } } } } /// `NewService` that implements, read one item from the stream. pub struct TakeItem { _t: PhantomData, } impl TakeItem { /// Create new `TakeRequest` instance. pub fn new() -> Self { TakeItem { _t: PhantomData } } } impl Default for TakeItem { fn default() -> Self { TakeItem { _t: PhantomData } } } impl Clone for TakeItem { fn clone(&self) -> TakeItem { TakeItem { _t: PhantomData } } } impl NewService for TakeItem { type Response = (Option, T); type Error = T::Error; type InitError = (); type Service = TakeItemService; type Future = future::FutureResult; fn new_service(&self) -> Self::Future { future::ok(TakeItemService { _t: PhantomData }) } } /// `NewService` that implements, read one request from framed object feature. pub struct TakeItemService { _t: PhantomData, } impl Clone for TakeItemService { fn clone(&self) -> TakeItemService { TakeItemService { _t: PhantomData } } } impl Service for TakeItemService { type Response = (Option, T); type Error = T::Error; type Future = TakeItemServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: T) -> Self::Future { TakeItemServiceResponse { stream: Some(req) } } } #[doc(hidden)] pub struct TakeItemServiceResponse { stream: Option, } impl Future for TakeItemServiceResponse { type Item = (Option, T); type Error = T::Error; fn poll(&mut self) -> Poll { match self.stream.as_mut().expect("Use after finish").poll()? { Async::Ready(item) => Ok(Async::Ready((item, self.stream.take().unwrap()))), Async::NotReady => Ok(Async::NotReady), } } }