1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-30 19:54:36 +01:00

use service error for stream dispatcher

This commit is contained in:
Nikolay Kim 2019-01-05 13:19:06 -08:00
parent 7017bad4bb
commit 58ba1d8269

View File

@ -5,18 +5,22 @@ use actix_service::{IntoService, NewService, Service};
use futures::unsync::mpsc; use futures::unsync::mpsc;
use futures::{future, Async, Future, Poll, Stream}; use futures::{future, Async, Future, Poll, Stream};
pub struct StreamDispatcher<S: Stream, T> { pub struct StreamDispatcher<S, T>
where
S: Stream,
T: Service<Result<S::Item, S::Error>>,
{
stream: S, stream: S,
service: T, service: T,
item: Option<Result<S::Item, S::Error>>, item: Option<Result<S::Item, S::Error>>,
stop_rx: mpsc::UnboundedReceiver<()>, stop_rx: mpsc::UnboundedReceiver<T::Error>,
stop_tx: mpsc::UnboundedSender<()>, stop_tx: mpsc::UnboundedSender<T::Error>,
} }
impl<S, T> StreamDispatcher<S, T> impl<S, T> StreamDispatcher<S, T>
where where
S: Stream, S: Stream,
T: Service<Result<S::Item, S::Error>, Response = (), Error = ()>, T: Service<Result<S::Item, S::Error>, Response = ()>,
T::Future: 'static, T::Future: 'static,
{ {
pub fn new<F>(stream: S, service: F) -> Self pub fn new<F>(stream: S, service: F) -> Self
@ -37,15 +41,15 @@ where
impl<S, T> Future for StreamDispatcher<S, T> impl<S, T> Future for StreamDispatcher<S, T>
where where
S: Stream, S: Stream,
T: Service<Result<S::Item, S::Error>, Response = (), Error = ()>, T: Service<Result<S::Item, S::Error>, Response = ()>,
T::Future: 'static, T::Future: 'static,
{ {
type Item = (); type Item = ();
type Error = (); type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(Some(_))) = self.stop_rx.poll() { if let Ok(Async::Ready(Some(e))) = self.stop_rx.poll() {
return Ok(Async::Ready(())); return Err(e);
} }
let mut item = self.item.take(); let mut item = self.item.take();
@ -74,7 +78,7 @@ where
struct StreamDispatcherService<F: Future> { struct StreamDispatcherService<F: Future> {
fut: F, fut: F,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<F::Error>,
} }
impl<F: Future> Future for StreamDispatcherService<F> { impl<F: Future> Future for StreamDispatcherService<F> {
@ -85,8 +89,8 @@ impl<F: Future> Future for StreamDispatcherService<F> {
match self.fut.poll() { match self.fut.poll() {
Ok(Async::Ready(_)) => Ok(Async::Ready(())), Ok(Async::Ready(_)) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => { Err(e) => {
let _ = self.stop.unbounded_send(()); let _ = self.stop.unbounded_send(e);
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
} }