1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-28 04:12:40 +01:00
actix-net/actix-utils/src/stream.rs

281 lines
7.1 KiB
Rust
Raw Normal View History

2018-10-06 01:00:44 +02:00
use std::marker::PhantomData;
use std::rc::Rc;
2018-10-06 01:00:44 +02:00
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, Future, FutureResult};
2018-09-01 19:29:56 +02:00
use futures::unsync::mpsc;
use futures::{Async, Poll, Stream};
type Request<T> = Result<<T as IntoStream>::Item, <T as IntoStream>::Error>;
pub trait IntoStream {
type Item;
type Error;
type Stream: Stream<Item = Self::Item, Error = Self::Error>;
fn into_stream(self) -> Self::Stream;
}
impl<T> IntoStream for T
where
T: Stream,
{
type Item = T::Item;
type Error = T::Error;
type Stream = T;
fn into_stream(self) -> Self::Stream {
self
}
}
pub struct StreamNewService<S, T, E> {
factory: Rc<T>,
_t: PhantomData<(S, E)>,
}
impl<S, T, E> StreamNewService<S, T, E>
where
S: IntoStream,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
2019-01-14 08:30:42 +01:00
pub fn new<F: IntoNewService<T, Request<S>>>(factory: F) -> Self {
Self {
2019-01-14 08:30:42 +01:00
factory: Rc::new(factory.into_new_service()),
_t: PhantomData,
}
}
}
impl<S, T, E> Clone for StreamNewService<S, T, E> {
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
_t: PhantomData,
}
}
}
impl<S, T, E> NewService<S> for StreamNewService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
type Response = ();
type Error = E;
type InitError = E;
type Service = StreamService<S, T, E>;
type Future = FutureResult<Self::Service, E>;
fn new_service(&self) -> Self::Future {
ok(StreamService {
factory: self.factory.clone(),
_t: PhantomData,
})
}
}
pub struct StreamService<S, T, E> {
factory: Rc<T>,
_t: PhantomData<(S, E)>,
}
impl<S, T, E> Service<S> for StreamService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
type Response = ();
type Error = E;
type Future = Box<Future<Item = (), Error = E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: S) -> Self::Future {
Box::new(
self.factory
.new_service()
.and_then(move |srv| StreamDispatcher::new(req, srv)),
)
}
}
2018-09-01 19:29:56 +02:00
pub struct StreamDispatcher<S, T>
where
S: IntoStream + 'static,
T: Service<Request<S>, Response = ()> + 'static,
T::Future: 'static,
{
2018-09-01 19:29:56 +02:00
stream: S,
service: T,
err_rx: mpsc::UnboundedReceiver<T::Error>,
err_tx: mpsc::UnboundedSender<T::Error>,
2018-09-01 19:29:56 +02:00
}
impl<S, T> StreamDispatcher<S, T>
where
S: Stream,
T: Service<Request<S>, Response = ()>,
2018-09-01 19:29:56 +02:00
T::Future: 'static,
{
pub fn new<F1, F2>(stream: F1, service: F2) -> Self
2018-11-30 03:56:15 +01:00
where
F1: IntoStream<Stream = S, Item = S::Item, Error = S::Error>,
F2: IntoService<T, Request<S>>,
2018-11-30 03:56:15 +01:00
{
let (err_tx, err_rx) = mpsc::unbounded();
2018-09-01 19:29:56 +02:00
StreamDispatcher {
err_rx,
err_tx,
stream: stream.into_stream(),
service: service.into_service(),
2018-09-01 19:29:56 +02:00
}
}
}
impl<S, T> Future for StreamDispatcher<S, T>
where
S: Stream,
T: Service<Request<S>, Response = ()>,
2018-09-01 19:29:56 +02:00
T::Future: 'static,
{
type Item = ();
type Error = T::Error;
2018-09-01 19:29:56 +02:00
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() {
return Err(e);
2018-09-01 19:29:56 +02:00
}
loop {
match self.service.poll_ready()? {
Async::Ready(_) => match self.stream.poll() {
2019-01-26 22:15:17 +01:00
Ok(Async::Ready(Some(item))) => {
tokio_current_thread::spawn(StreamDispatcherService {
fut: self.service.call(Ok(item)),
stop: self.err_tx.clone(),
})
}
Err(err) => tokio_current_thread::spawn(StreamDispatcherService {
fut: self.service.call(Err(err)),
stop: self.err_tx.clone(),
}),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
},
Async::NotReady => return Ok(Async::NotReady),
2018-09-01 19:29:56 +02:00
}
}
}
}
struct StreamDispatcherService<F: Future> {
fut: F,
stop: mpsc::UnboundedSender<F::Error>,
2018-09-01 19:29:56 +02:00
}
impl<F: Future> Future for StreamDispatcherService<F> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
let _ = self.stop.unbounded_send(e);
2018-09-01 19:29:56 +02:00
Ok(Async::Ready(()))
}
}
}
}
2018-10-06 01:00:44 +02:00
/// `NewService` that implements, read one item from the stream.
pub struct TakeItem<T> {
_t: PhantomData<T>,
}
impl<T> TakeItem<T> {
/// Create new `TakeRequest` instance.
pub fn new() -> Self {
TakeItem { _t: PhantomData }
}
}
2018-12-06 23:04:42 +01:00
impl<T> Default for TakeItem<T> {
fn default() -> Self {
TakeItem { _t: PhantomData }
}
}
impl<T> Clone for TakeItem<T> {
fn clone(&self) -> TakeItem<T> {
TakeItem { _t: PhantomData }
}
}
2018-11-30 03:56:15 +01:00
impl<T: Stream> NewService<T> for TakeItem<T> {
2018-10-06 01:00:44 +02:00
type Response = (Option<T::Item>, T);
type Error = T::Error;
type InitError = ();
type Service = TakeItemService<T>;
type Future = FutureResult<Self::Service, Self::InitError>;
2018-10-06 01:00:44 +02:00
fn new_service(&self) -> Self::Future {
ok(TakeItemService { _t: PhantomData })
2018-10-06 01:00:44 +02:00
}
}
/// `NewService` that implements, read one request from framed object feature.
pub struct TakeItemService<T> {
_t: PhantomData<T>,
}
impl<T> Clone for TakeItemService<T> {
fn clone(&self) -> TakeItemService<T> {
TakeItemService { _t: PhantomData }
}
}
2018-11-30 03:56:15 +01:00
impl<T: Stream> Service<T> for TakeItemService<T> {
2018-10-06 01:00:44 +02:00
type Response = (Option<T::Item>, T);
type Error = T::Error;
type Future = TakeItemServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
2018-11-30 03:56:15 +01:00
fn call(&mut self, req: T) -> Self::Future {
2018-10-06 01:00:44 +02:00
TakeItemServiceResponse { stream: Some(req) }
}
}
#[doc(hidden)]
pub struct TakeItemServiceResponse<T: Stream> {
stream: Option<T>,
}
impl<T: Stream> Future for TakeItemServiceResponse<T> {
type Item = (Option<T::Item>, T);
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.stream.as_mut().expect("Use after finish").poll()? {
Async::Ready(item) => Ok(Async::Ready((item, self.stream.take().unwrap()))),
Async::NotReady => Ok(Async::NotReady),
}
}
}