use std::marker::PhantomData; use std::rc::Rc; use actix_service::{IntoNewService, IntoService, NewService, Service}; use futures::future::{ok, Future, FutureResult}; use futures::unsync::mpsc; use futures::{Async, Poll, Stream}; type Request = Result<::Item, ::Error>; pub trait IntoStream { type Item; type Error; type Stream: Stream; fn into_stream(self) -> Self::Stream; } impl IntoStream for T where T: Stream, { type Item = T::Item; type Error = T::Error; type Stream = T; fn into_stream(self) -> Self::Stream { self } } pub struct StreamNewService { factory: Rc, _t: PhantomData<(S, E, C)>, } impl StreamNewService where C: Clone, S: IntoStream, T: NewService, Response = (), Error = E, InitError = E>, T::Future: 'static, T::Service: 'static, ::Future: 'static, { pub fn new>(factory: F) -> Self { Self { factory: Rc::new(factory.into_new_service()), _t: PhantomData, } } } impl Clone for StreamNewService { fn clone(&self) -> Self { Self { factory: self.factory.clone(), _t: PhantomData, } } } impl NewService for StreamNewService where C: Clone, S: IntoStream + 'static, T: NewService, Response = (), Error = E, InitError = E>, T::Future: 'static, T::Service: 'static, ::Future: 'static, { type Request = S; type Response = (); type Error = E; type InitError = E; type Service = StreamService; type Future = FutureResult; fn new_service(&self, cfg: &C) -> Self::Future { ok(StreamService { factory: self.factory.clone(), config: cfg.clone(), _t: PhantomData, }) } } pub struct StreamService { factory: Rc, config: C, _t: PhantomData<(S, E)>, } impl Service for StreamService where S: IntoStream + 'static, T: NewService, Response = (), Error = E, InitError = E>, T::Future: 'static, T::Service: 'static, ::Future: 'static, C: Clone, { type Request = S; type Response = (); type Error = E; type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: S) -> Self::Future { Box::new( self.factory .new_service(&self.config) .and_then(move |srv| StreamDispatcher::new(req, srv)), ) } } pub struct StreamDispatcher where S: IntoStream + 'static, T: Service, Response = ()> + 'static, T::Future: 'static, { stream: S, service: T, err_rx: mpsc::UnboundedReceiver, err_tx: mpsc::UnboundedSender, } impl StreamDispatcher where S: Stream, T: Service, Response = ()>, T::Future: 'static, { pub fn new(stream: F1, service: F2) -> Self where F1: IntoStream, F2: IntoService, { let (err_tx, err_rx) = mpsc::unbounded(); StreamDispatcher { err_rx, err_tx, stream: stream.into_stream(), service: service.into_service(), } } } impl Future for StreamDispatcher where S: Stream, T: Service, Response = ()>, T::Future: 'static, { type Item = (); type Error = T::Error; fn poll(&mut self) -> Poll { if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() { return Err(e); } loop { match self.service.poll_ready()? { Async::Ready(_) => match self.stream.poll() { Ok(Async::Ready(Some(item))) => { tokio_current_thread::spawn(StreamDispatcherService { fut: self.service.call(Ok(item)), stop: self.err_tx.clone(), }) } Err(err) => tokio_current_thread::spawn(StreamDispatcherService { fut: self.service.call(Err(err)), stop: self.err_tx.clone(), }), Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::Ready(())), }, Async::NotReady => return Ok(Async::NotReady), } } } } struct StreamDispatcherService { fut: F, stop: mpsc::UnboundedSender, } impl Future for StreamDispatcherService { type Item = (); type Error = (); fn poll(&mut self) -> Poll { match self.fut.poll() { Ok(Async::Ready(_)) => Ok(Async::Ready(())), Ok(Async::NotReady) => Ok(Async::NotReady), Err(e) => { let _ = self.stop.unbounded_send(e); Ok(Async::Ready(())) } } } } /// `NewService` that implements, read one item from the stream. pub struct TakeItem { _t: PhantomData, } impl TakeItem { /// Create new `TakeRequest` instance. pub fn new() -> Self { TakeItem { _t: PhantomData } } } impl Default for TakeItem { fn default() -> Self { TakeItem { _t: PhantomData } } } impl Clone for TakeItem { fn clone(&self) -> TakeItem { TakeItem { _t: PhantomData } } } impl NewService<()> for TakeItem { type Request = T; type Response = (Option, T); type Error = T::Error; type InitError = (); type Service = TakeItemService; type Future = FutureResult; fn new_service(&self, _: &()) -> Self::Future { ok(TakeItemService { _t: PhantomData }) } } /// `NewService` that implements, read one request from framed object feature. pub struct TakeItemService { _t: PhantomData, } impl Clone for TakeItemService { fn clone(&self) -> TakeItemService { TakeItemService { _t: PhantomData } } } impl Service for TakeItemService { type Request = T; type Response = (Option, T); type Error = T::Error; type Future = TakeItemServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: T) -> Self::Future { TakeItemServiceResponse { stream: Some(req) } } } #[doc(hidden)] pub struct TakeItemServiceResponse { stream: Option, } impl Future for TakeItemServiceResponse { type Item = (Option, T); type Error = T::Error; fn poll(&mut self) -> Poll { match self.stream.as_mut().expect("Use after finish").poll()? { Async::Ready(item) => Ok(Async::Ready((item, self.stream.take().unwrap()))), Async::NotReady => Ok(Async::NotReady), } } }