1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-12 14:27:05 +02:00

Compare commits

..

15 Commits

Author SHA1 Message Date
Nikolay Kim
515bfad830 prep release 2019-01-24 20:42:30 -08:00
Nikolay Kim
0340d82314 better ergonomics for .apply combinator 2019-01-24 20:41:42 -08:00
Nikolay Kim
88548199d7 change apply combinator error semantic 2019-01-24 19:19:44 -08:00
Nikolay Kim
278176fca5 use FnMut instead of Fn 2019-01-16 15:33:33 -08:00
Nikolay Kim
2c8e7c4ae4 add helper constructors to Either service 2019-01-16 15:33:10 -08:00
Nikolay Kim
b6414d6197 use FnMut instead of Fn 2019-01-16 15:00:23 -08:00
Nikolay Kim
f94ef5248e add Clone impl for Either service 2019-01-16 15:00:08 -08:00
Nikolay Kim
cbc378b67f allow deserialize from the path 2019-01-15 19:25:49 -08:00
Nikolay Kim
db2367b26e properly check readiness inclosed service 2019-01-14 09:41:10 -08:00
Nikolay Kim
0bee4db270 use new service converter 2019-01-13 23:30:42 -08:00
Nikolay Kim
615a0d52ed add service and new service for stream dispatcher 2019-01-13 23:12:46 -08:00
Nikolay Kim
cfb62ccc40 Make Out::Error convertable from T::Error for apply combinator 2019-01-13 22:58:23 -08:00
Nikolay Kim
605a947c3e update version 2019-01-13 10:06:54 -08:00
Nikolay Kim
31f0a96c20 Upgrade trust-dns-proto 2019-01-13 10:03:33 -08:00
Nikolay Kim
66a7c59aaf fix changelog 2019-01-11 21:42:13 -08:00
12 changed files with 581 additions and 79 deletions

View File

@@ -1,5 +1,10 @@
# Changes
## [0.1.1] - 2019-01-13
* Upgrade trust-dns-proto
## [0.1.0] - 2018-12-09
* Move server to separate crate

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-connector"
version = "0.1.0"
version = "0.1.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"]
@@ -32,8 +32,8 @@ actix-codec = "0.1.0"
actix-rt = "0.1.0"
futures = "0.1"
tokio-tcp = "0.1"
trust-dns-proto = "^0.5.0"
trust-dns-resolver = "^0.10.0"
trust-dns-proto = "0.6.2"
trust-dns-resolver = "0.10.2"
# openssl
openssl = { version="0.10", optional = true }

View File

@@ -1,15 +1,31 @@
# Changes
## [0.1.6] - 2019-01-24
### Changed
* Use `FnMut` instead of `Fn` for .apply() and .map() combinators and `FnService` type
* Change `.apply()` error semantic, new service's error is `From<Self::Error>`
## [0.1.5] - 2019-01-13
### Changed
* Make `Out::Error` convertable from `T::Error` for apply combinator
## [0.1.4] - 2019-01-11
## Changed
### Changed
* Use `FnMut` instead of `Fn` for `FnService`
## [0.1.3] - 2018-12-12
## Changed
### Changed
* Split service combinators to separate trait

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-service"
version = "0.1.4"
version = "0.1.6"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service"
keywords = ["network", "framework", "async", "futures"]

View File

@@ -0,0 +1,310 @@
use std::marker::PhantomData;
use futures::{try_ready, Async, Future, IntoFuture, Poll};
use super::{IntoNewService, IntoService, NewService, Service};
use crate::cell::Cell;
/// `Apply` service combinator
pub struct AndThenApply<A, B, F, Out, Req1, Req2>
where
A: Service<Req1>,
B: Service<Req2, Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
a: A,
b: Cell<B>,
f: Cell<F>,
r: PhantomData<(Out, Req1, Req2)>,
}
impl<A, B, F, Out, Req1, Req2> AndThenApply<A, B, F, Out, Req1, Req2>
where
A: Service<Req1>,
B: Service<Req2, Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
/// Create new `Apply` combinator
pub fn new<A1: IntoService<A, Req1>, B1: IntoService<B, Req2>>(a: A1, b: B1, f: F) -> Self {
Self {
f: Cell::new(f),
a: a.into_service(),
b: Cell::new(b.into_service()),
r: PhantomData,
}
}
}
impl<A, B, F, Out, Req1, Req2> Clone for AndThenApply<A, B, F, Out, Req1, Req2>
where
A: Service<Req1> + Clone,
B: Service<Req2, Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
fn clone(&self) -> Self {
AndThenApply {
a: self.a.clone(),
b: self.b.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<A, B, F, Out, Req1, Req2> Service<Req1> for AndThenApply<A, B, F, Out, Req1, Req2>
where
A: Service<Req1>,
B: Service<Req2, Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Response = Out::Item;
type Error = A::Error;
type Future = AndThenApplyFuture<A, B, F, Out, Req1, Req2>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
try_ready!(self.a.poll_ready());
self.b.get_mut().poll_ready().map_err(|e| e.into())
}
fn call(&mut self, req: Req1) -> Self::Future {
AndThenApplyFuture {
b: self.b.clone(),
f: self.f.clone(),
fut_b: None,
fut_a: Some(self.a.call(req)),
_t: PhantomData,
}
}
}
pub struct AndThenApplyFuture<A, B, F, Out, Req1, Req2>
where
A: Service<Req1>,
B: Service<Req2, Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
b: Cell<B>,
f: Cell<F>,
fut_a: Option<A::Future>,
fut_b: Option<Out::Future>,
_t: PhantomData<Req2>,
}
impl<A, B, F, Out, Req1, Req2> Future for AndThenApplyFuture<A, B, F, Out, Req1, Req2>
where
A: Service<Req1>,
B: Service<Req2, Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Item = Out::Item;
type Error = A::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_b {
return fut.poll().map_err(|e| e.into());
}
match self.fut_a.as_mut().expect("Bug in actix-service").poll() {
Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b =
Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future());
self.poll()
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
}
}
/// `ApplyNewService` new service combinator
pub struct AndThenApplyNewService<A, B, F, Out, Req1, Req2> {
a: A,
b: B,
f: Cell<F>,
r: PhantomData<(Out, Req1, Req2)>,
}
impl<A, B, F, Out, Req1, Req2> AndThenApplyNewService<A, B, F, Out, Req1, Req2>
where
A: NewService<Req1>,
B: NewService<Req2, Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
/// Create new `ApplyNewService` new service instance
pub fn new<A1: IntoNewService<A, Req1>, B1: IntoNewService<B, Req2>>(
a: A1,
b: B1,
f: F,
) -> Self {
Self {
f: Cell::new(f),
a: a.into_new_service(),
b: b.into_new_service(),
r: PhantomData,
}
}
}
impl<A, B, F, Out, Req1, Req2> Clone for AndThenApplyNewService<A, B, F, Out, Req1, Req2>
where
A: Clone,
B: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<A, B, F, Out, Req1, Req2> NewService<Req1>
for AndThenApplyNewService<A, B, F, Out, Req1, Req2>
where
A: NewService<Req1>,
B: NewService<Req2, Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Response = Out::Item;
type Error = A::Error;
type InitError = A::InitError;
type Service = AndThenApply<A::Service, B::Service, F, Out, Req1, Req2>;
type Future = AndThenApplyNewServiceFuture<A, B, F, Out, Req1, Req2>;
fn new_service(&self) -> Self::Future {
AndThenApplyNewServiceFuture {
a: None,
b: None,
f: self.f.clone(),
fut_a: self.a.new_service(),
fut_b: self.b.new_service(),
}
}
}
pub struct AndThenApplyNewServiceFuture<A, B, F, Out, Req1, Req2>
where
A: NewService<Req1>,
B: NewService<Req2, Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
fut_b: B::Future,
fut_a: A::Future,
f: Cell<F>,
a: Option<A::Service>,
b: Option<B::Service>,
}
impl<A, B, F, Out, Req1, Req2> Future for AndThenApplyNewServiceFuture<A, B, F, Out, Req1, Req2>
where
A: NewService<Req1>,
B: NewService<Req2, Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Item = AndThenApply<A::Service, B::Service, F, Out, Req1, Req2>;
type Error = A::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
}
}
if self.b.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.b = Some(service);
}
}
if self.a.is_some() && self.b.is_some() {
Ok(Async::Ready(AndThenApply {
f: self.f.clone(),
a: self.a.take().unwrap(),
b: Cell::new(self.b.take().unwrap()),
r: PhantomData,
}))
} else {
Ok(Async::NotReady)
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt};
#[derive(Clone)]
struct Srv;
impl Service<()> for Srv {
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, _: ()) -> Self::Future {
ok(())
}
}
#[test]
fn test_call() {
let blank = |req| Ok(req);
let mut srv = blank.into_service().apply(Srv, |req: &'static str, srv| {
srv.call(()).map(move |res| (req, res))
});
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>((|req| Ok(req)).into_service());
let new_srv = blank.into_new_service().apply(
|| Ok(Srv),
|req: &'static str, srv| srv.call(()).map(move |res| (req, res)),
);
if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() {
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
} else {
panic!()
}
}
}

View File

@@ -17,8 +17,9 @@ where
impl<T, F, In, Out, Request> Apply<T, F, In, Out, Request>
where
T: Service<Request>,
F: Fn(In, &mut T) -> Out,
F: FnMut(In, &mut T) -> Out,
Out: IntoFuture,
Out::Error: From<T::Error>,
{
/// Create new `Apply` combinator
pub fn new<I: IntoService<T, Request>>(service: I, f: F) -> Self {
@@ -46,16 +47,17 @@ where
impl<T, F, In, Out, Request> Service<In> for Apply<T, F, In, Out, Request>
where
T: Service<Request, Error = Out::Error>,
F: Fn(In, &mut T) -> Out,
T: Service<Request>,
F: FnMut(In, &mut T) -> Out,
Out: IntoFuture,
Out::Error: From<T::Error>,
{
type Response = Out::Item;
type Error = Out::Error;
type Future = Out::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
self.service.poll_ready().map_err(|e| e.into())
}
fn call(&mut self, req: In) -> Self::Future {
@@ -76,8 +78,9 @@ where
impl<T, F, In, Out, Request> ApplyNewService<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
F: FnMut(In, &mut T::Service) -> Out,
Out: IntoFuture,
Out::Error: From<T::Error>,
{
/// Create new `ApplyNewService` new service instance
pub fn new<F1: IntoNewService<T, Request>>(service: F1, f: F) -> Self {
@@ -92,7 +95,7 @@ where
impl<T, F, In, Out, Request> Clone for ApplyNewService<T, F, In, Out, Request>
where
T: NewService<Request> + Clone,
F: Fn(Out, &mut T::Service) -> Out + Clone,
F: FnMut(In, &mut T::Service) -> Out + Clone,
Out: IntoFuture,
{
fn clone(&self) -> Self {
@@ -106,9 +109,10 @@ where
impl<T, F, In, Out, Request> NewService<In> for ApplyNewService<T, F, In, Out, Request>
where
T: NewService<Request, Error = Out::Error>,
F: Fn(In, &mut T::Service) -> Out + Clone,
T: NewService<Request>,
F: FnMut(In, &mut T::Service) -> Out + Clone,
Out: IntoFuture,
Out::Error: From<T::Error>,
{
type Response = Out::Item;
type Error = Out::Error;
@@ -125,7 +129,7 @@ where
pub struct ApplyNewServiceFuture<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
F: FnMut(In, &mut T::Service) -> Out,
Out: IntoFuture,
{
fut: T::Future,
@@ -136,7 +140,7 @@ where
impl<T, F, In, Out, Request> ApplyNewServiceFuture<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
F: FnMut(In, &mut T::Service) -> Out,
Out: IntoFuture,
{
fn new(fut: T::Future, f: F) -> Self {
@@ -151,8 +155,9 @@ where
impl<T, F, In, Out, Request> Future for ApplyNewServiceFuture<T, F, In, Out, Request>
where
T: NewService<Request>,
F: Fn(In, &mut T::Service) -> Out,
F: FnMut(In, &mut T::Service) -> Out,
Out: IntoFuture,
Out::Error: From<T::Error>,
{
type Item = Apply<T::Service, F, In, Out, Request>;
type Error = T::InitError;

View File

@@ -1,5 +1,7 @@
use futures::{Future, IntoFuture, Poll};
mod and_then;
mod and_then_apply;
mod apply;
mod cell;
mod fn_service;
@@ -10,6 +12,7 @@ mod map_init_err;
mod then;
pub use self::and_then::{AndThen, AndThenNewService};
pub use self::and_then_apply::{AndThenApply, AndThenApplyNewService};
pub use self::apply::{Apply, ApplyNewService};
pub use self::fn_service::{FnNewService, FnService};
pub use self::from_err::{FromErr, FromErrNewService};
@@ -57,19 +60,20 @@ pub trait Service<Request> {
pub trait ServiceExt<Request>: Service<Request> {
/// Apply function to specified service and use it as a next service in
/// chain.
fn apply<T, I, F, Out, Req>(
fn apply<B, I, F, Out, Req>(
self,
service: I,
f: F,
) -> AndThen<Self, Apply<T, F, Self::Response, Out, Req>>
) -> AndThenApply<Self, B, F, Out, Request, Req>
where
Self: Sized,
T: Service<Req, Error = Self::Error>,
I: IntoService<T, Req>,
F: Fn(Self::Response, &mut T) -> Out,
Out: IntoFuture<Error = Self::Error>,
B: Service<Req, Error = Self::Error>,
I: IntoService<B, Req>,
F: FnMut(Self::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<Self::Error>,
{
self.and_then(Apply::new(service.into_service(), f))
AndThenApply::new(self, service, f)
}
/// Call another service after call to this one has resolved successfully.
@@ -128,7 +132,7 @@ pub trait ServiceExt<Request>: Service<Request> {
fn map<F, R>(self, f: F) -> Map<Self, F, R>
where
Self: Sized,
F: Fn(Self::Response) -> R,
F: FnMut(Self::Response) -> R,
{
Map::new(self, f)
}
@@ -182,19 +186,20 @@ pub trait NewService<Request> {
/// Apply function to specified service and use it as a next service in
/// chain.
fn apply<T, I, F, Out, Req>(
fn apply<B, I, F, Out, Req>(
self,
service: I,
f: F,
) -> AndThenNewService<Self, ApplyNewService<T, F, Self::Response, Out, Req>>
) -> AndThenApplyNewService<Self, B, F, Out, Request, Req>
where
Self: Sized,
T: NewService<Req, InitError = Self::InitError, Error = Self::Error>,
I: IntoNewService<T, Req>,
F: Fn(Self::Response, &mut T::Service) -> Out + Clone,
Out: IntoFuture<Error = Self::Error>,
B: NewService<Req, Error = Self::Error, InitError = Self::InitError>,
I: IntoNewService<B, Req>,
F: FnMut(Self::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<Self::Error>,
{
self.and_then(ApplyNewService::new(service, f))
AndThenApplyNewService::new(self, service, f)
}
/// Call another service after call to this one has resolved successfully.
@@ -245,7 +250,7 @@ pub trait NewService<Request> {
fn map<F, R>(self, f: F) -> MapNewService<Self, F, R>
where
Self: Sized,
F: Fn(Self::Response) -> R,
F: FnMut(Self::Response) -> R,
{
MapNewService::new(self, f)
}

View File

@@ -18,7 +18,7 @@ impl<A, F, Response> Map<A, F, Response> {
pub fn new<Request>(service: A, f: F) -> Self
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
Self {
service,
@@ -45,7 +45,7 @@ where
impl<A, F, Request, Response> Service<Request> for Map<A, F, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response + Clone,
F: FnMut(A::Response) -> Response + Clone,
{
type Response = Response;
type Error = A::Error;
@@ -63,7 +63,7 @@ where
pub struct MapFuture<A, F, Request, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
f: F,
fut: A::Future,
@@ -72,7 +72,7 @@ where
impl<A, F, Request, Response> MapFuture<A, F, Request, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
fn new(fut: A::Future, f: F) -> Self {
MapFuture { f, fut }
@@ -82,7 +82,7 @@ where
impl<A, F, Request, Response> Future for MapFuture<A, F, Request, Response>
where
A: Service<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
type Item = Response;
type Error = A::Error;
@@ -107,7 +107,7 @@ impl<A, F, Response> MapNewService<A, F, Response> {
pub fn new<Request>(a: A, f: F) -> Self
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
Self {
a,
@@ -134,7 +134,7 @@ where
impl<A, F, Request, Response> NewService<Request> for MapNewService<A, F, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response + Clone,
F: FnMut(A::Response) -> Response + Clone,
{
type Response = Response;
type Error = A::Error;
@@ -151,7 +151,7 @@ where
pub struct MapNewServiceFuture<A, F, Request, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
fut: A::Future,
f: Option<F>,
@@ -160,7 +160,7 @@ where
impl<A, F, Request, Response> MapNewServiceFuture<A, F, Request, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
fn new(fut: A::Future, f: F) -> Self {
MapNewServiceFuture { f: Some(f), fut }
@@ -170,7 +170,7 @@ where
impl<A, F, Request, Response> Future for MapNewServiceFuture<A, F, Request, Response>
where
A: NewService<Request>,
F: Fn(A::Response) -> Response,
F: FnMut(A::Response) -> Response,
{
type Item = Map<A::Service, F, Response>;
type Error = A::InitError;

View File

@@ -4,6 +4,10 @@
* Fix framed transport error handling
* Added Clone impl for Either service
* Added Service and NewService for Stream dispatcher
## [0.1.0] - 2018-12-09

View File

@@ -12,6 +12,15 @@ pub enum EitherService<A, B> {
B(B),
}
impl<A: Clone, B: Clone> Clone for EitherService<A, B> {
fn clone(&self) -> Self {
match self {
EitherService::A(srv) => EitherService::A(srv.clone()),
EitherService::B(srv) => EitherService::B(srv.clone()),
}
}
}
impl<A, B, Request> Service<Request> for EitherService<A, B>
where
A: Service<Request>,
@@ -42,6 +51,34 @@ pub enum Either<A, B> {
B(B),
}
impl<A, B> Either<A, B> {
pub fn new_a<Request>(srv: A) -> Self
where
A: NewService<Request>,
B: NewService<
Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
Either::A(srv)
}
pub fn new_b<Request>(srv: B) -> Self
where
A: NewService<Request>,
B: NewService<
Request,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
Either::B(srv)
}
}
impl<A, B, Request> NewService<Request> for Either<A, B>
where
A: NewService<Request>,
@@ -61,6 +98,15 @@ where
}
}
impl<A: Clone, B: Clone> Clone for Either<A, B> {
fn clone(&self) -> Self {
match self {
Either::A(srv) => Either::A(srv.clone()),
Either::B(srv) => Either::B(srv.clone()),
}
}
}
#[doc(hidden)]
pub enum EitherNewService<A: NewService<R>, B: NewService<R>, R> {
A(A::Future),

View File

@@ -1,39 +1,146 @@
use std::marker::PhantomData;
use std::rc::Rc;
use actix_rt::spawn;
use actix_service::{IntoService, NewService, Service};
use actix_service::{IntoNewService, IntoService, NewService, Service};
use futures::future::{ok, Future, FutureResult};
use futures::unsync::mpsc;
use futures::{future, Async, Future, Poll, Stream};
use futures::{Async, Poll, Stream};
type Request<T> = Result<<T as IntoStream>::Item, <T as IntoStream>::Error>;
pub trait IntoStream {
type Item;
type Error;
type Stream: Stream<Item = Self::Item, Error = Self::Error>;
fn into_stream(self) -> Self::Stream;
}
impl<T> IntoStream for T
where
T: Stream,
{
type Item = T::Item;
type Error = T::Error;
type Stream = T;
fn into_stream(self) -> Self::Stream {
self
}
}
pub struct StreamNewService<S, T, E> {
factory: Rc<T>,
_t: PhantomData<(S, E)>,
}
impl<S, T, E> StreamNewService<S, T, E>
where
S: IntoStream,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
pub fn new<F: IntoNewService<T, Request<S>>>(factory: F) -> Self {
Self {
factory: Rc::new(factory.into_new_service()),
_t: PhantomData,
}
}
}
impl<S, T, E> Clone for StreamNewService<S, T, E> {
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
_t: PhantomData,
}
}
}
impl<S, T, E> NewService<S> for StreamNewService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
type Response = ();
type Error = E;
type InitError = E;
type Service = StreamService<S, T, E>;
type Future = FutureResult<Self::Service, E>;
fn new_service(&self) -> Self::Future {
ok(StreamService {
factory: self.factory.clone(),
_t: PhantomData,
})
}
}
pub struct StreamService<S, T, E> {
factory: Rc<T>,
_t: PhantomData<(S, E)>,
}
impl<S, T, E> Service<S> for StreamService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service<Request<S>>>::Future: 'static,
{
type Response = ();
type Error = E;
type Future = Box<Future<Item = (), Error = E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: S) -> Self::Future {
Box::new(
self.factory
.new_service()
.and_then(move |srv| StreamDispatcher::new(req, srv)),
)
}
}
pub struct StreamDispatcher<S, T>
where
S: Stream,
T: Service<Result<S::Item, S::Error>>,
S: IntoStream + 'static,
T: Service<Request<S>, Response = ()> + 'static,
T::Future: 'static,
{
stream: S,
service: T,
item: Option<Result<S::Item, S::Error>>,
stop_rx: mpsc::UnboundedReceiver<T::Error>,
stop_tx: mpsc::UnboundedSender<T::Error>,
err_rx: mpsc::UnboundedReceiver<T::Error>,
err_tx: mpsc::UnboundedSender<T::Error>,
}
impl<S, T> StreamDispatcher<S, T>
where
S: Stream,
T: Service<Result<S::Item, S::Error>, Response = ()>,
T: Service<Request<S>, Response = ()>,
T::Future: 'static,
{
pub fn new<F>(stream: S, service: F) -> Self
pub fn new<F1, F2>(stream: F1, service: F2) -> Self
where
F: IntoService<T, Result<S::Item, S::Error>>,
F1: IntoStream<Stream = S, Item = S::Item, Error = S::Error>,
F2: IntoService<T, Request<S>>,
{
let (stop_tx, stop_rx) = mpsc::unbounded();
let (err_tx, err_rx) = mpsc::unbounded();
StreamDispatcher {
stream,
item: None,
err_rx,
err_tx,
stream: stream.into_stream(),
service: service.into_service(),
stop_rx,
stop_tx,
}
}
}
@@ -41,36 +148,32 @@ where
impl<S, T> Future for StreamDispatcher<S, T>
where
S: Stream,
T: Service<Result<S::Item, S::Error>, Response = ()>,
T: Service<Request<S>, Response = ()>,
T::Future: 'static,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(Some(e))) = self.stop_rx.poll() {
if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() {
return Err(e);
}
let mut item = self.item.take();
loop {
if item.is_some() {
match self.service.poll_ready()? {
Async::Ready(_) => spawn(StreamDispatcherService {
fut: self.service.call(item.take().unwrap()),
stop: self.stop_tx.clone(),
match self.service.poll_ready()? {
Async::Ready(_) => match self.stream.poll() {
Ok(Async::Ready(Some(item))) => spawn(StreamDispatcherService {
fut: self.service.call(Ok(item)),
stop: self.err_tx.clone(),
}),
Async::NotReady => {
self.item = item;
return Ok(Async::NotReady);
}
}
}
match self.stream.poll() {
Ok(Async::Ready(Some(el))) => item = Some(Ok(el)),
Err(err) => item = Some(Err(err)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Err(err) => spawn(StreamDispatcherService {
fut: self.service.call(Err(err)),
stop: self.err_tx.clone(),
}),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
},
Async::NotReady => return Ok(Async::NotReady),
}
}
}
@@ -126,10 +229,10 @@ impl<T: Stream> NewService<T> for TakeItem<T> {
type Error = T::Error;
type InitError = ();
type Service = TakeItemService<T>;
type Future = future::FutureResult<Self::Service, Self::InitError>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
future::ok(TakeItemService { _t: PhantomData })
ok(TakeItemService { _t: PhantomData })
}
}

View File

@@ -1,6 +1,9 @@
use std::ops::Index;
use std::rc::Rc;
use serde::de;
use crate::de::PathDeserializer;
use crate::RequestPath;
#[derive(Debug, Clone, Copy)]
@@ -149,6 +152,11 @@ impl<T: RequestPath> Path<T> {
params: self,
}
}
/// Try to deserialize matching parameters to a specified type `U`
pub fn load<'de, U: serde::Deserialize<'de>>(&'de self) -> Result<U, de::value::Error> {
de::Deserialize::deserialize(PathDeserializer::new(self))
}
}
#[derive(Debug)]