1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 01:11:07 +01:00

add service and new service for stream dispatcher

This commit is contained in:
Nikolay Kim 2019-01-13 23:12:46 -08:00
parent cfb62ccc40
commit 615a0d52ed

View File

@ -1,39 +1,152 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::rc::Rc;
use actix_rt::spawn; use actix_rt::spawn;
use actix_service::{IntoService, NewService, Service}; use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, Future, FutureResult};
use futures::unsync::mpsc; use futures::unsync::mpsc;
use futures::{future, Async, Future, Poll, Stream}; use futures::{Async, Poll, Stream};
type Request<T> = Result<<T as IntoStream>::Item, <T as IntoStream>::Error>;
pub trait IntoStream {
type Item;
type Error;
type Stream: Stream<Item = Self::Item, Error = Self::Error>;
fn into_stream(self) -> Self::Stream;
}
impl<T> IntoStream for T
where
T: Stream,
{
type Item = T::Item;
type Error = T::Error;
type Stream = T;
fn into_stream(self) -> Self::Stream {
self
}
}
pub struct StreamNewService<S, T, E> {
factory: Rc<T>,
_t: PhantomData<(S, E)>,
}
impl<S, T, E> StreamNewService<S, T, E>
where
S: IntoStream,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
// pub fn new<F: IntoNewService<T, Request<S>>>(factory: F) -> Self {
// Self {
// factory: Rc::new(factory.into_new_service()),
// _t: PhantomData,
// }
// }
pub fn new(factory: T) -> Self {
Self {
factory: Rc::new(factory),
_t: PhantomData,
}
}
}
impl<S, T, E> Clone for StreamNewService<S, T, E> {
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
_t: PhantomData,
}
}
}
impl<S, T, E> NewService<S> for StreamNewService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
type Response = ();
type Error = E;
type InitError = E;
type Service = StreamService<S, T, E>;
type Future = FutureResult<Self::Service, E>;
fn new_service(&self) -> Self::Future {
ok(StreamService {
factory: self.factory.clone(),
_t: PhantomData,
})
}
}
pub struct StreamService<S, T, E> {
factory: Rc<T>,
_t: PhantomData<(S, E)>,
}
impl<S, T, E> Service<S> for StreamService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
type Response = ();
type Error = E;
type Future = Box<Future<Item = (), Error = E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: S) -> Self::Future {
Box::new(
self.factory
.new_service()
.and_then(move |srv| StreamDispatcher::new(req, srv)),
)
}
}
pub struct StreamDispatcher<S, T> pub struct StreamDispatcher<S, T>
where where
S: Stream, S: IntoStream + 'static,
T: Service<Result<S::Item, S::Error>>, T: Service<Request<S>, Response = ()> + 'static,
T::Future: 'static,
{ {
stream: S, stream: S,
service: T, service: T,
item: Option<Result<S::Item, S::Error>>, err_rx: mpsc::UnboundedReceiver<T::Error>,
stop_rx: mpsc::UnboundedReceiver<T::Error>, err_tx: mpsc::UnboundedSender<T::Error>,
stop_tx: mpsc::UnboundedSender<T::Error>,
} }
impl<S, T> StreamDispatcher<S, T> impl<S, T> StreamDispatcher<S, T>
where where
S: Stream, S: Stream,
T: Service<Result<S::Item, S::Error>, Response = ()>, T: Service<Request<S>, Response = ()>,
T::Future: 'static, T::Future: 'static,
{ {
pub fn new<F>(stream: S, service: F) -> Self pub fn new<F1, F2>(stream: F1, service: F2) -> Self
where where
F: IntoService<T, Result<S::Item, S::Error>>, F1: IntoStream<Stream = S, Item = S::Item, Error = S::Error>,
F2: IntoService<T, Request<S>>,
{ {
let (stop_tx, stop_rx) = mpsc::unbounded(); let (err_tx, err_rx) = mpsc::unbounded();
StreamDispatcher { StreamDispatcher {
stream, err_rx,
item: None, err_tx,
stream: stream.into_stream(),
service: service.into_service(), service: service.into_service(),
stop_rx,
stop_tx,
} }
} }
} }
@ -41,37 +154,32 @@ where
impl<S, T> Future for StreamDispatcher<S, T> impl<S, T> Future for StreamDispatcher<S, T>
where where
S: Stream, S: Stream,
T: Service<Result<S::Item, S::Error>, Response = ()>, T: Service<Request<S>, Response = ()>,
T::Future: 'static, T::Future: 'static,
{ {
type Item = (); type Item = ();
type Error = T::Error; type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(Some(e))) = self.stop_rx.poll() { if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() {
return Err(e); return Err(e);
} }
let mut item = self.item.take();
loop { loop {
if item.is_some() { if let Async::Ready(_) = self.service.poll_ready()? {
match self.service.poll_ready()? { match self.stream.poll() {
Async::Ready(_) => spawn(StreamDispatcherService { Ok(Async::Ready(Some(item))) => spawn(StreamDispatcherService {
fut: self.service.call(item.take().unwrap()), fut: self.service.call(Ok(item)),
stop: self.stop_tx.clone(), stop: self.err_tx.clone(),
}), }),
Async::NotReady => { Err(err) => spawn(StreamDispatcherService {
self.item = item; fut: self.service.call(Err(err)),
return Ok(Async::NotReady); stop: self.err_tx.clone(),
} }),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
} }
} }
match self.stream.poll() {
Ok(Async::Ready(Some(el))) => item = Some(Ok(el)),
Err(err) => item = Some(Err(err)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
}
} }
} }
} }
@ -126,10 +234,10 @@ impl<T: Stream> NewService<T> for TakeItem<T> {
type Error = T::Error; type Error = T::Error;
type InitError = (); type InitError = ();
type Service = TakeItemService<T>; type Service = TakeItemService<T>;
type Future = future::FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future { fn new_service(&self) -> Self::Future {
future::ok(TakeItemService { _t: PhantomData }) ok(TakeItemService { _t: PhantomData })
} }
} }