From 615a0d52ed19dc241515a42c97984e40992f4a02 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 13 Jan 2019 23:12:46 -0800 Subject: [PATCH] add service and new service for stream dispatcher --- actix-utils/src/stream.rs | 178 ++++++++++++++++++++++++++++++-------- 1 file changed, 143 insertions(+), 35 deletions(-) diff --git a/actix-utils/src/stream.rs b/actix-utils/src/stream.rs index b17258f0..271aae06 100644 --- a/actix-utils/src/stream.rs +++ b/actix-utils/src/stream.rs @@ -1,39 +1,152 @@ use std::marker::PhantomData; +use std::rc::Rc; use actix_rt::spawn; -use actix_service::{IntoService, NewService, Service}; +use actix_service::{IntoNewService, IntoService, NewService, Service}; +use futures::future::{ok, Future, FutureResult}; use futures::unsync::mpsc; -use futures::{future, Async, Future, Poll, Stream}; +use futures::{Async, Poll, Stream}; + +type Request = Result<::Item, ::Error>; + +pub trait IntoStream { + type Item; + type Error; + type Stream: Stream; + + fn into_stream(self) -> Self::Stream; +} + +impl IntoStream for T +where + T: Stream, +{ + type Item = T::Item; + type Error = T::Error; + type Stream = T; + + fn into_stream(self) -> Self::Stream { + self + } +} + +pub struct StreamNewService { + factory: Rc, + _t: PhantomData<(S, E)>, +} + +impl StreamNewService +where + S: IntoStream, + T: NewService, Response = (), Error = E, InitError = E>, + T::Future: 'static, + T::Service: 'static, + >>::Future: 'static, +{ + // pub fn new>>(factory: F) -> Self { + // Self { + // factory: Rc::new(factory.into_new_service()), + // _t: PhantomData, + // } + // } + pub fn new(factory: T) -> Self { + Self { + factory: Rc::new(factory), + _t: PhantomData, + } + } +} + +impl Clone for StreamNewService { + fn clone(&self) -> Self { + Self { + factory: self.factory.clone(), + _t: PhantomData, + } + } +} + +impl NewService for StreamNewService +where + S: IntoStream + 'static, + T: NewService, Response = (), Error = E, InitError = E>, + T::Future: 'static, + T::Service: 'static, + >>::Future: 'static, +{ + type Response = (); + type Error = E; + type InitError = E; + type Service = StreamService; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(StreamService { + factory: self.factory.clone(), + _t: PhantomData, + }) + } +} + +pub struct StreamService { + factory: Rc, + _t: PhantomData<(S, E)>, +} + +impl Service for StreamService +where + S: IntoStream + 'static, + T: NewService, Response = (), Error = E, InitError = E>, + T::Future: 'static, + T::Service: 'static, + >>::Future: 'static, +{ + type Response = (); + type Error = E; + type Future = Box>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: S) -> Self::Future { + Box::new( + self.factory + .new_service() + .and_then(move |srv| StreamDispatcher::new(req, srv)), + ) + } +} pub struct StreamDispatcher where - S: Stream, - T: Service>, + S: IntoStream + 'static, + T: Service, Response = ()> + 'static, + T::Future: 'static, { stream: S, service: T, - item: Option>, - stop_rx: mpsc::UnboundedReceiver, - stop_tx: mpsc::UnboundedSender, + err_rx: mpsc::UnboundedReceiver, + err_tx: mpsc::UnboundedSender, } impl StreamDispatcher where S: Stream, - T: Service, Response = ()>, + T: Service, Response = ()>, T::Future: 'static, { - pub fn new(stream: S, service: F) -> Self + pub fn new(stream: F1, service: F2) -> Self where - F: IntoService>, + F1: IntoStream, + F2: IntoService>, { - let (stop_tx, stop_rx) = mpsc::unbounded(); + let (err_tx, err_rx) = mpsc::unbounded(); StreamDispatcher { - stream, - item: None, + err_rx, + err_tx, + stream: stream.into_stream(), service: service.into_service(), - stop_rx, - stop_tx, } } } @@ -41,37 +154,32 @@ where impl Future for StreamDispatcher where S: Stream, - T: Service, Response = ()>, + T: Service, Response = ()>, T::Future: 'static, { type Item = (); type Error = T::Error; fn poll(&mut self) -> Poll { - if let Ok(Async::Ready(Some(e))) = self.stop_rx.poll() { + if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() { return Err(e); } - let mut item = self.item.take(); loop { - if item.is_some() { - match self.service.poll_ready()? { - Async::Ready(_) => spawn(StreamDispatcherService { - fut: self.service.call(item.take().unwrap()), - stop: self.stop_tx.clone(), + if let Async::Ready(_) = self.service.poll_ready()? { + match self.stream.poll() { + Ok(Async::Ready(Some(item))) => spawn(StreamDispatcherService { + fut: self.service.call(Ok(item)), + stop: self.err_tx.clone(), }), - Async::NotReady => { - self.item = item; - return Ok(Async::NotReady); - } + Err(err) => spawn(StreamDispatcherService { + fut: self.service.call(Err(err)), + stop: self.err_tx.clone(), + }), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), } } - match self.stream.poll() { - Ok(Async::Ready(Some(el))) => item = Some(Ok(el)), - Err(err) => item = Some(Err(err)), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - } } } } @@ -126,10 +234,10 @@ impl NewService for TakeItem { type Error = T::Error; type InitError = (); type Service = TakeItemService; - type Future = future::FutureResult; + type Future = FutureResult; fn new_service(&self) -> Self::Future { - future::ok(TakeItemService { _t: PhantomData }) + ok(TakeItemService { _t: PhantomData }) } }