use std::marker::PhantomData; use std::rc::Rc; use actix_rt::spawn; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, Future, FutureResult}; use futures::unsync::mpsc; use futures::{Async, Poll, Stream}; type Request = Result<::Item, ::Error>; pub trait IntoStream { type Item; type Error; type Stream: Stream; fn into_stream(self) -> Self::Stream; } impl IntoStream for T where T: Stream, { type Item = T::Item; type Error = T::Error; type Stream = T; fn into_stream(self) -> Self::Stream { self } } pub struct StreamNewService { factory: Rc, _t: PhantomData<(S, E)>, } impl StreamNewService where S: IntoStream, T: NewService, Response = (), Error = E, InitError = E>, T::Future: 'static, T::Service: 'static, >>::Future: 'static, { pub fn new>>(factory: F) -> Self { Self { factory: Rc::new(factory.into_new_service()), _t: PhantomData, } } } impl Clone for StreamNewService { fn clone(&self) -> Self { Self { factory: self.factory.clone(), _t: PhantomData, } } } impl NewService for StreamNewService where S: IntoStream + 'static, T: NewService, Response = (), Error = E, InitError = E>, T::Future: 'static, T::Service: 'static, >>::Future: 'static, { type Response = (); type Error = E; type InitError = E; type Service = StreamService; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(StreamService { factory: self.factory.clone(), _t: PhantomData, }) } } pub struct StreamService { factory: Rc, _t: PhantomData<(S, E)>, } impl Service for StreamService where S: IntoStream + 'static, T: NewService, Response = (), Error = E, InitError = E>, T::Future: 'static, T::Service: 'static, >>::Future: 'static, { type Response = (); type Error = E; type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: S) -> Self::Future { Box::new( self.factory .new_service() .and_then(move |srv| StreamDispatcher::new(req, srv)), ) } } pub struct StreamDispatcher where S: IntoStream + 'static, T: Service, Response = ()> + 'static, T::Future: 'static, { stream: S, service: T, err_rx: mpsc::UnboundedReceiver, err_tx: mpsc::UnboundedSender, } impl StreamDispatcher where S: Stream, T: Service, Response = ()>, T::Future: 'static, { pub fn new(stream: F1, service: F2) -> Self where F1: IntoStream, F2: IntoService>, { let (err_tx, err_rx) = mpsc::unbounded(); StreamDispatcher { err_rx, err_tx, stream: stream.into_stream(), service: service.into_service(), } } } impl Future for StreamDispatcher where S: Stream, T: Service, Response = ()>, T::Future: 'static, { type Item = (); type Error = T::Error; fn poll(&mut self) -> Poll { if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() { return Err(e); } loop { if let Async::Ready(_) = self.service.poll_ready()? { match self.stream.poll() { Ok(Async::Ready(Some(item))) => spawn(StreamDispatcherService { fut: self.service.call(Ok(item)), stop: self.err_tx.clone(), }), Err(err) => spawn(StreamDispatcherService { fut: self.service.call(Err(err)), stop: self.err_tx.clone(), }), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(())), } } } } } struct StreamDispatcherService { fut: F, stop: mpsc::UnboundedSender, } impl Future for StreamDispatcherService { type Item = (); type Error = (); fn poll(&mut self) -> Poll { match self.fut.poll() { Ok(Async::Ready(_)) => Ok(Async::Ready(())), Ok(Async::NotReady) => Ok(Async::NotReady), Err(e) => { let _ = self.stop.unbounded_send(e); Ok(Async::Ready(())) } } } } /// `NewService` that implements, read one item from the stream. pub struct TakeItem { _t: PhantomData, } impl TakeItem { /// Create new `TakeRequest` instance. pub fn new() -> Self { TakeItem { _t: PhantomData } } } impl Default for TakeItem { fn default() -> Self { TakeItem { _t: PhantomData } } } impl Clone for TakeItem { fn clone(&self) -> TakeItem { TakeItem { _t: PhantomData } } } impl NewService for TakeItem { type Response = (Option, T); type Error = T::Error; type InitError = (); type Service = TakeItemService; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(TakeItemService { _t: PhantomData }) } } /// `NewService` that implements, read one request from framed object feature. pub struct TakeItemService { _t: PhantomData, } impl Clone for TakeItemService { fn clone(&self) -> TakeItemService { TakeItemService { _t: PhantomData } } } impl Service for TakeItemService { type Response = (Option, T); type Error = T::Error; type Future = TakeItemServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: T) -> Self::Future { TakeItemServiceResponse { stream: Some(req) } } } #[doc(hidden)] pub struct TakeItemServiceResponse { stream: Option, } impl Future for TakeItemServiceResponse { type Item = (Option, T); type Error = T::Error; fn poll(&mut self) -> Poll { match self.stream.as_mut().expect("Use after finish").poll()? { Async::Ready(item) => Ok(Async::Ready((item, self.stream.take().unwrap()))), Async::NotReady => Ok(Async::NotReady), } } }