From 8c48bf4de7cfc9969dc02f8b78e6456e3db5f867 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 4 Mar 2019 19:38:11 -0800 Subject: [PATCH] simplify transform trait --- actix-service/CHANGES.md | 7 + actix-service/Cargo.toml | 2 +- actix-service/src/and_then_apply.rs | 194 +++----------- actix-service/src/apply.rs | 266 ++++++++------------ actix-service/src/boxed.rs | 6 +- actix-service/src/fn_transform.rs | 120 +++------ actix-service/src/lib.rs | 29 +-- actix-service/src/transform.rs | 143 ++--------- actix-service/src/transform_map_err.rs | 188 -------------- actix-service/src/transform_map_init_err.rs | 36 +-- actix-utils/CHANGES.md | 7 + actix-utils/Cargo.toml | 5 +- actix-utils/src/inflight.rs | 35 +-- actix-utils/src/order.rs | 45 ++-- actix-utils/src/timeout.rs | 30 +-- 15 files changed, 288 insertions(+), 825 deletions(-) delete mode 100644 actix-service/src/transform_map_err.rs diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 65b8aa48..60fd53d7 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.3.1] - 2019-03-04 + +### Changed + +* Simplify Transform trait + + ## [0.3.0] - 2019-03-02 ## Added diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index acac7f0e..ebdc1104 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-service" -version = "0.3.0" +version = "0.3.1" authors = ["Nikolay Kim "] description = "Actix Service" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-service/src/and_then_apply.rs b/actix-service/src/and_then_apply.rs index 3c4691eb..70858a9c 100644 --- a/actix-service/src/and_then_apply.rs +++ b/actix-service/src/and_then_apply.rs @@ -1,141 +1,24 @@ +use std::rc::Rc; + use futures::{Async, Future, Poll}; -use super::{NewService, NewTransform, Service, Transform}; -use crate::cell::Cell; - -/// `Apply` service combinator -pub struct AndThenTransform -where - A: Service, - B: Service, - T: Transform, - T::Error: From, -{ - a: A, - b: Cell, - t: Cell, -} - -impl AndThenTransform -where - A: Service, - B: Service, - T: Transform, - T::Error: From, -{ - /// Create new `Apply` combinator - pub fn new(t: T, a: A, b: B) -> Self { - Self { - a, - b: Cell::new(b), - t: Cell::new(t), - } - } -} - -impl Clone for AndThenTransform -where - A: Service + Clone, - B: Service, - T: Transform, - T::Error: From, -{ - fn clone(&self) -> Self { - AndThenTransform { - a: self.a.clone(), - b: self.b.clone(), - t: self.t.clone(), - } - } -} - -impl Service for AndThenTransform -where - A: Service, - B: Service, - T: Transform, - T::Error: From, -{ - type Request = A::Request; - type Response = T::Response; - type Error = T::Error; - type Future = AndThenTransformFuture; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let notready = Async::NotReady == self.a.poll_ready()?; - let notready = Async::NotReady == self.b.get_mut().poll_ready()? || notready; - let notready = Async::NotReady == self.t.get_mut().poll_ready()? || notready; - - if notready { - Ok(Async::NotReady) - } else { - Ok(Async::Ready(())) - } - } - - fn call(&mut self, req: A::Request) -> Self::Future { - AndThenTransformFuture { - b: self.b.clone(), - t: self.t.clone(), - fut_t: None, - fut_a: Some(self.a.call(req)), - } - } -} - -pub struct AndThenTransformFuture -where - A: Service, - B: Service, - T: Transform, - T::Error: From, -{ - b: Cell, - t: Cell, - fut_a: Option, - fut_t: Option, -} - -impl Future for AndThenTransformFuture -where - A: Service, - B: Service, - T: Transform, - T::Error: From, -{ - type Item = T::Response; - type Error = T::Error; - - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_t { - return fut.poll(); - } - - match self.fut_a.as_mut().expect("Bug in actix-service").poll() { - Ok(Async::Ready(resp)) => { - let _ = self.fut_a.take(); - self.fut_t = Some(self.t.get_mut().call(resp, self.b.get_mut())); - self.poll() - } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err.into()), - } - } -} +use crate::and_then::AndThen; +use crate::from_err::FromErr; +use crate::{NewService, Transform}; /// `Apply` new service combinator pub struct AndThenTransformNewService { a: A, b: B, - t: T, + t: Rc, _t: std::marker::PhantomData, } impl AndThenTransformNewService where A: NewService, - B: NewService, - T: NewTransform, + B: NewService, + T: Transform, T::Error: From, { /// Create new `ApplyNewService` new service instance @@ -143,7 +26,7 @@ where Self { a, b, - t, + t: Rc::new(t), _t: std::marker::PhantomData, } } @@ -153,7 +36,6 @@ impl Clone for AndThenTransformNewService where A: Clone, B: Clone, - T: Clone, { fn clone(&self) -> Self { Self { @@ -168,8 +50,8 @@ where impl NewService for AndThenTransformNewService where A: NewService, - B: NewService, - T: NewTransform, + B: NewService, + T: Transform, T::Error: From, { type Request = A::Request; @@ -177,17 +59,17 @@ where type Error = T::Error; type InitError = T::InitError; - type Service = AndThenTransform; + type Service = AndThen, T::Transform>; type Future = AndThenTransformNewServiceFuture; fn new_service(&self, cfg: &C) -> Self::Future { AndThenTransformNewServiceFuture { a: None, - b: None, t: None, + t_cell: self.t.clone(), fut_a: self.a.new_service(cfg), fut_b: self.b.new_service(cfg), - fut_t: self.t.new_transform(cfg), + fut_t: None, } } } @@ -195,32 +77,32 @@ where pub struct AndThenTransformNewServiceFuture where A: NewService, - B: NewService, - T: NewTransform, + B: NewService, + T: Transform, T::Error: From, { - fut_b: B::Future, fut_a: A::Future, - fut_t: T::Future, + fut_b: B::Future, + fut_t: Option, a: Option, - b: Option, t: Option, + t_cell: Rc, } impl Future for AndThenTransformNewServiceFuture where A: NewService, - B: NewService, - T: NewTransform, + B: NewService, + T: Transform, T::Error: From, { - type Item = AndThenTransform; + type Item = AndThen, T::Transform>; type Error = T::InitError; fn poll(&mut self) -> Poll { - if self.t.is_none() { - if let Async::Ready(transform) = self.fut_t.poll()? { - self.t = Some(transform); + if self.fut_t.is_none() { + if let Async::Ready(service) = self.fut_b.poll()? { + self.fut_t = Some(self.t_cell.new_transform(service)); } } @@ -230,18 +112,17 @@ where } } - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); + if let Some(ref mut fut) = self.fut_t { + if let Async::Ready(transform) = fut.poll()? { + self.t = Some(transform); } } - if self.a.is_some() && self.b.is_some() && self.t.is_some() { - Ok(Async::Ready(AndThenTransform { - a: self.a.take().unwrap(), - t: Cell::new(self.t.take().unwrap()), - b: Cell::new(self.b.take().unwrap()), - })) + if self.a.is_some() && self.t.is_some() { + Ok(Async::Ready(AndThen::new( + FromErr::new(self.a.take().unwrap()), + self.t.take().unwrap(), + ))) } else { Ok(Async::NotReady) } @@ -276,10 +157,11 @@ mod tests { fn test_apply() { let blank = |req| Ok(req); - let mut srv = blank.into_service().apply( - |req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)), - Srv, - ); + let mut srv = blank + .into_service() + .apply_fn(Srv, |req: &'static str, srv: &mut Srv| { + srv.call(()).map(move |res| (req, res)) + }); assert!(srv.poll_ready().is_ok()); let res = srv.call("srv").poll(); assert!(res.is_ok()); diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index 3efef1bf..d5dda2d4 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -1,218 +1,172 @@ -use futures::{try_ready, Async, Future, IntoFuture, Poll}; +use std::marker::PhantomData; -use super::{FnNewTransform, FnTransform}; -use super::{ - IntoNewService, IntoNewTransform, IntoService, IntoTransform, NewService, NewTransform, - Service, Transform, -}; +use futures::{Async, Future, IntoFuture, Poll}; + +use super::{IntoNewService, IntoService, NewService, Service}; /// `Apply` service combinator -pub struct Apply +pub struct Apply where - T: Transform, - T::Error: From, - S: Service, + T: Service, { - transform: T, - service: S, + service: T, + f: F, + r: PhantomData<(In, Out)>, } -impl Apply +impl Apply where - T: Transform, - T::Error: From, - S: Service, -{ - /// Create new `Apply` combinator - pub fn new, S1: IntoService>( - transform: T1, - service: S1, - ) -> Self { - Self { - transform: transform.into_transform(), - service: service.into_service(), - } - } -} - -impl Apply, S> -where - F: FnMut(Req, &mut S) -> Out, + T: Service, + F: FnMut(In, &mut T) -> Out, Out: IntoFuture, - Out::Error: From, - S: Service, + Out::Error: From, { /// Create new `Apply` combinator - pub fn new_fn>(service: S1, transform: F) -> Self { + pub fn new>(service: I, f: F) -> Self { Self { service: service.into_service(), - transform: transform.into_transform(), + f, + r: PhantomData, } } } -impl Clone for Apply +impl Clone for Apply where - S: Service + Clone, - T::Error: From, - T: Transform + Clone, + T: Service + Clone, + F: Clone, { fn clone(&self) -> Self { Apply { service: self.service.clone(), - transform: self.transform.clone(), + f: self.f.clone(), + r: PhantomData, } } } -impl Service for Apply +impl Service for Apply where - T: Transform, - T::Error: From, - S: Service, + T: Service, + F: FnMut(In, &mut T) -> Out, + Out: IntoFuture, + Out::Error: From, { - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Future = T::Future; + type Request = In; + type Response = Out::Item; + type Error = Out::Error; + type Future = Out::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - try_ready!(self.service.poll_ready()); - self.transform.poll_ready() + self.service.poll_ready().map_err(|e| e.into()) } - fn call(&mut self, req: Self::Request) -> Self::Future { - self.transform.call(req, &mut self.service).into_future() + fn call(&mut self, req: In) -> Self::Future { + (self.f)(req, &mut self.service).into_future() } } /// `ApplyNewService` new service combinator -pub struct ApplyNewService +pub struct ApplyNewService where - T: NewTransform, - T::Error: From, - S: NewService, + T: NewService, { - transform: T, - service: S, - _t: std::marker::PhantomData, + service: T, + f: F, + r: PhantomData<(In, Out, Cfg)>, } -impl ApplyNewService +impl ApplyNewService where - T: NewTransform, - T::Error: From, - S: NewService, + T: NewService, + F: FnMut(In, &mut T::Service) -> Out + Clone, + Out: IntoFuture, + Out::Error: From, { /// Create new `ApplyNewService` new service instance - pub fn new, S1: IntoNewService>( - transform: T1, - service: S1, - ) -> Self { + pub fn new>(service: F1, f: F) -> Self { Self { - transform: transform.into_new_transform(), + f, service: service.into_new_service(), - _t: std::marker::PhantomData, + r: PhantomData, } } } -impl - ApplyNewService, S, Cfg> +impl Clone for ApplyNewService where - F: FnMut(In, &mut S::Service) -> Out + Clone, + T: NewService + Clone, + F: FnMut(In, &mut T::Service) -> Out + Clone, Out: IntoFuture, - Out::Error: From, - S: NewService, -{ - /// Create new `Apply` combinator factory - pub fn new_fn>(service: S1, transform: F) -> Self { - Self { - service: service.into_new_service(), - transform: FnNewTransform::new(transform), - _t: std::marker::PhantomData, - } - } -} - -impl Clone for ApplyNewService -where - T: NewTransform + Clone, - T::Error: From, - S: NewService + Clone, { fn clone(&self) -> Self { Self { service: self.service.clone(), - transform: self.transform.clone(), - _t: std::marker::PhantomData, + f: self.f.clone(), + r: PhantomData, } } } -impl NewService for ApplyNewService +impl NewService for ApplyNewService where - T: NewTransform, - T::Error: From, - S: NewService, + T: NewService, + F: FnMut(In, &mut T::Service) -> Out + Clone, + Out: IntoFuture, + Out::Error: From, { - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Service = Apply; + type Request = In; + type Response = Out::Item; + type Error = Out::Error; + type Service = Apply; type InitError = T::InitError; - type Future = ApplyNewServiceFuture; + type Future = ApplyNewServiceFuture; - fn new_service(&self, cfg: &C) -> Self::Future { + fn new_service(&self, cfg: &Cfg) -> Self::Future { + ApplyNewServiceFuture::new(self.service.new_service(cfg), self.f.clone()) + } +} + +pub struct ApplyNewServiceFuture +where + T: NewService, + F: FnMut(In, &mut T::Service) -> Out + Clone, + Out: IntoFuture, +{ + fut: T::Future, + f: Option, + r: PhantomData<(In, Out)>, +} + +impl ApplyNewServiceFuture +where + T: NewService, + F: FnMut(In, &mut T::Service) -> Out + Clone, + Out: IntoFuture, +{ + fn new(fut: T::Future, f: F) -> Self { ApplyNewServiceFuture { - fut_t: self.transform.new_transform(cfg), - fut_s: self.service.new_service(cfg), - service: None, - transform: None, + f: Some(f), + fut, + r: PhantomData, } } } -pub struct ApplyNewServiceFuture +impl Future for ApplyNewServiceFuture where - T: NewTransform, - T::Error: From, - S: NewService, + T: NewService, + F: FnMut(In, &mut T::Service) -> Out + Clone, + Out: IntoFuture, + Out::Error: From, { - fut_s: S::Future, - fut_t: T::Future, - service: Option, - transform: Option, -} - -impl Future for ApplyNewServiceFuture -where - T: NewTransform, - T::Error: From, - S: NewService, -{ - type Item = Apply; + type Item = Apply; type Error = T::InitError; fn poll(&mut self) -> Poll { - if self.transform.is_none() { - if let Async::Ready(transform) = self.fut_t.poll()? { - self.transform = Some(transform); - } - } - - if self.service.is_none() { - if let Async::Ready(service) = self.fut_s.poll()? { - self.service = Some(service); - } - } - - if self.transform.is_some() && self.service.is_some() { - Ok(Async::Ready(Apply { - service: self.service.take().unwrap(), - transform: self.transform.take().unwrap(), - })) + if let Async::Ready(service) = self.fut.poll()? { + Ok(Async::Ready(Apply::new(service, self.f.take().unwrap()))) } else { Ok(Async::NotReady) } @@ -225,7 +179,7 @@ mod tests { use futures::{Async, Future, Poll}; use super::*; - use crate::{NewService, Service}; + use crate::{IntoService, NewService, Service, ServiceExt}; #[derive(Clone)] struct Srv; @@ -245,10 +199,14 @@ mod tests { } #[test] - fn test_apply() { - let mut srv = Apply::new_fn(Srv, |req: &'static str, srv| { - srv.call(()).map(move |res| (req, res)) - }); + fn test_call() { + let blank = |req| Ok(req); + + let mut srv = blank + .into_service() + .apply_fn(Srv, |req: &'static str, srv| { + srv.call(()).map(move |res| (req, res)) + }); assert!(srv.poll_ready().is_ok()); let res = srv.call("srv").poll(); assert!(res.is_ok()); @@ -258,22 +216,6 @@ mod tests { #[test] fn test_new_service() { let new_srv = ApplyNewService::new( - |req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)), - || Ok::<_, ()>(Srv), - ); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); - } else { - panic!() - } - } - - #[test] - fn test_new_service_fn() { - let new_srv = ApplyNewService::new_fn( || Ok::<_, ()>(Srv), |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), ); diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index 2083d588..e5257327 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -96,11 +96,7 @@ where type Future = Box>; fn new_service(&self, cfg: &C) -> Self::Future { - Box::new( - self.service - .new_service(cfg) - .map(|service| ServiceWrapper::boxed(service)), - ) + Box::new(self.service.new_service(cfg).map(ServiceWrapper::boxed)) } } diff --git a/actix-service/src/fn_transform.rs b/actix-service/src/fn_transform.rs index 383b70bc..7c3773ba 100644 --- a/actix-service/src/fn_transform.rs +++ b/actix-service/src/fn_transform.rs @@ -1,122 +1,64 @@ use std::marker::PhantomData; use futures::future::{ok, FutureResult}; -use futures::{Async, IntoFuture, Poll}; +use futures::IntoFuture; -use crate::{IntoNewTransform, IntoTransform, NewTransform, Transform}; +use crate::{Apply, IntoTransform, Service, Transform}; -pub struct FnTransform +pub struct FnTransform where - F: FnMut(Req, &mut S) -> Res, - Res: IntoFuture, + F: FnMut(In, &mut S) -> Out + Clone, + Out: IntoFuture, { f: F, - _t: PhantomData<(S, Req, Res)>, + _t: PhantomData<(S, In, Out, Err)>, } -impl FnTransform +impl FnTransform where - F: FnMut(Req, &mut S) -> Res, - Res: IntoFuture, + F: FnMut(In, &mut S) -> Out + Clone, + Out: IntoFuture, { pub fn new(f: F) -> Self { FnTransform { f, _t: PhantomData } } } -impl Clone for FnTransform +impl Transform for FnTransform where - F: FnMut(Req, &mut S) -> Res + Clone, - Res: IntoFuture, + S: Service, + F: FnMut(In, &mut S) -> Out + Clone, + Out: IntoFuture, + Out::Error: From, { - fn clone(&self) -> Self { - FnTransform { - f: self.f.clone(), - _t: PhantomData, - } + type Request = In; + type Response = Out::Item; + type Error = Out::Error; + type Transform = Apply; + type InitError = Err; + type Future = FutureResult; + + fn new_transform(&self, service: S) -> Self::Future { + ok(Apply::new(service, self.f.clone())) } } -impl Transform for FnTransform +impl IntoTransform, S> for F where - F: FnMut(Req, &mut S) -> Res, - Res: IntoFuture, + S: Service, + F: FnMut(In, &mut S) -> Out + Clone, + Out: IntoFuture, + Out::Error: From, { - type Request = Req; - type Response = Res::Item; - type Error = Res::Error; - type Future = Res::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, request: Req, service: &mut S) -> Self::Future { - (self.f)(request, service).into_future() - } -} - -impl IntoTransform, S> for F -where - F: FnMut(Req, &mut S) -> Res, - Res: IntoFuture, -{ - fn into_transform(self) -> FnTransform { + fn into_transform(self) -> FnTransform { FnTransform::new(self) } } -pub struct FnNewTransform +impl Clone for FnTransform where - F: FnMut(Req, &mut S) -> Out + Clone, + F: FnMut(In, &mut S) -> Out + Clone, Out: IntoFuture, -{ - f: F, - _t: PhantomData<(S, Req, Out, Err, Cfg)>, -} - -impl FnNewTransform -where - F: FnMut(Req, &mut S) -> Res + Clone, - Res: IntoFuture, -{ - pub fn new(f: F) -> Self { - FnNewTransform { f, _t: PhantomData } - } -} - -impl NewTransform for FnNewTransform -where - F: FnMut(Req, &mut S) -> Res + Clone, - Res: IntoFuture, -{ - type Request = Req; - type Response = Res::Item; - type Error = Res::Error; - type Transform = FnTransform; - type InitError = Err; - type Future = FutureResult; - - fn new_transform(&self, _: &Cfg) -> Self::Future { - ok(FnTransform::new(self.f.clone())) - } -} - -impl - IntoNewTransform, S, Cfg> for F -where - F: FnMut(Req, &mut S) -> Res + Clone, - Res: IntoFuture, -{ - fn into_new_transform(self) -> FnNewTransform { - FnNewTransform::new(self) - } -} - -impl Clone for FnNewTransform -where - F: FnMut(Req, &mut S) -> Res + Clone, - Res: IntoFuture, { fn clone(&self) -> Self { Self::new(self.f.clone()) diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 666d137d..03705081 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -20,21 +20,20 @@ mod map_err; mod map_init_err; mod then; mod transform; -mod transform_map_err; mod transform_map_init_err; pub use self::and_then::{AndThen, AndThenNewService}; -use self::and_then_apply::{AndThenTransform, AndThenTransformNewService}; +use self::and_then_apply::AndThenTransformNewService; use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService}; pub use self::apply::{Apply, ApplyNewService}; pub use self::fn_service::{fn_cfg_factory, fn_factory, fn_service, FnService}; -pub use self::fn_transform::{FnNewTransform, FnTransform}; +pub use self::fn_transform::FnTransform; pub use self::from_err::{FromErr, FromErrNewService}; pub use self::map::{Map, MapNewService}; pub use self::map_err::{MapErr, MapErrNewService}; pub use self::map_init_err::MapInitErr; pub use self::then::{Then, ThenNewService}; -pub use self::transform::{IntoNewTransform, IntoTransform, NewTransform, Transform}; +pub use self::transform::{IntoTransform, Transform}; /// An asynchronous function from `Request` to a `Response`. pub trait Service { @@ -76,20 +75,6 @@ pub trait Service { /// An extension trait for `Service`s that provides a variety of convenient /// adapters pub trait ServiceExt: Service { - /// Apply tranformation to specified service and use it as a next service in - /// chain. - fn apply(self, transform: T1, service: B1) -> AndThenTransform - where - Self: Sized, - T: Transform, - T::Error: From, - T1: IntoTransform, - B: Service, - B1: IntoService, - { - AndThenTransform::new(transform.into_transform(), self, service.into_service()) - } - /// Apply function to specified service and use it as a next service in /// chain. fn apply_fn(self, service: B1, f: F) -> AndThenApply @@ -228,14 +213,14 @@ pub trait NewService { ) -> AndThenTransformNewService where Self: Sized, - T: NewTransform, + T: Transform, T::Error: From, - T1: IntoNewTransform, - B: NewService, + T1: IntoTransform, + B: NewService, B1: IntoNewService, { AndThenTransformNewService::new( - transform.into_new_transform(), + transform.into_transform(), self, service.into_new_service(), ) diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index e7357c78..4621d09c 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -1,56 +1,16 @@ use std::rc::Rc; use std::sync::Arc; -use futures::{Future, Poll}; +use futures::Future; -use crate::transform_map_err::{TransformMapErr, TransformMapErrNewTransform}; use crate::transform_map_init_err::TransformMapInitErr; use crate::Service; -/// An asynchronous function for transforming service call result. -pub trait Transform { - /// Requests handled by the service. - type Request; - - /// Responses given by the service. - type Response; - - /// Errors produced by the service. - type Error; - - /// The future response value. - type Future: Future; - - /// Returns `Ready` when the service is able to process requests. - /// - /// This method is similar to `Service::poll_ready` method. - fn poll_ready(&mut self) -> Poll<(), Self::Error>; - - /// Process the request and apply it to provided service, - /// return the response asynchronously. - fn call(&mut self, request: Self::Request, service: &mut Service) -> Self::Future; - - /// Map this transform's error to a different error, returning a new transform. - /// - /// This function is similar to the `Result::map_err` where it will change - /// the error type of the underlying transform. This is useful for example to - /// ensure that services and transforms have the same error type. - /// - /// Note that this function consumes the receiving transform and returns a - /// wrapped version of it. - fn map_err(self, f: F) -> TransformMapErr - where - Self: Sized, - F: Fn(Self::Error) -> E, - { - TransformMapErr::new(self, f) - } -} - -/// `Transform` service factory +/// `Transform` service factory. /// +/// Transform factory creates service that wraps other services. /// `Config` is a service factory configuration type. -pub trait NewTransform { +pub trait Transform { /// Requests handled by the service. type Request; @@ -61,8 +21,7 @@ pub trait NewTransform { type Error; /// The `TransformService` value created by this factory - type Transform: Transform< - Service, + type Transform: Service< Request = Self::Request, Response = Self::Response, Error = Self::Error, @@ -75,21 +34,11 @@ pub trait NewTransform { type Future: Future; /// Create and return a new service value asynchronously. - fn new_transform(&self, cfg: &Config) -> Self::Future; - - /// Map this transforms's output to a different type, returning a new transform - /// of the resulting type. - fn map_err(self, f: F) -> TransformMapErrNewTransform - where - Self: Sized, - F: Fn(Self::Error) -> E, - { - TransformMapErrNewTransform::new(self, f) - } + fn new_transform(&self, service: S) -> Self::Future; /// Map this service's factory init error to a different error, /// returning a new transform service factory. - fn map_init_err(self, f: F) -> TransformMapInitErr + fn map_init_err(self, f: F) -> TransformMapInitErr where Self: Sized, F: Fn(Self::InitError) -> E, @@ -98,77 +47,39 @@ pub trait NewTransform { } } -impl<'a, T, S> Transform for &'a mut T +impl Transform for Rc where - T: Transform + 'a, - S: Service, + T: Transform, { type Request = T::Request; type Response = T::Response; type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self) -> Poll<(), T::Error> { - (**self).poll_ready() - } - - fn call(&mut self, request: Self::Request, service: &mut S) -> T::Future { - (**self).call(request, service) - } -} - -impl Transform for Box -where - T: Transform + ?Sized, - S: Service, -{ - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self) -> Poll<(), S::Error> { - (**self).poll_ready() - } - - fn call(&mut self, request: Self::Request, service: &mut S) -> T::Future { - (**self).call(request, service) - } -} - -impl NewTransform for Rc -where - T: NewTransform, -{ - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Transform = T::Transform; type InitError = T::InitError; + type Transform = T::Transform; type Future = T::Future; - fn new_transform(&self, cfg: &C) -> T::Future { - self.as_ref().new_transform(cfg) + fn new_transform(&self, service: S) -> T::Future { + self.as_ref().new_transform(service) } } -impl NewTransform for Arc +impl Transform for Arc where - T: NewTransform, + T: Transform, { type Request = T::Request; type Response = T::Response; type Error = T::Error; - type Transform = T::Transform; type InitError = T::InitError; + type Transform = T::Transform; type Future = T::Future; - fn new_transform(&self, cfg: &C) -> T::Future { - self.as_ref().new_transform(cfg) + fn new_transform(&self, service: S) -> T::Future { + self.as_ref().new_transform(service) } } -/// Trait for types that can be converted to a `TransformService` +/// Trait for types that can be converted to a *transform service* pub trait IntoTransform where T: Transform, @@ -177,15 +88,6 @@ where fn into_transform(self) -> T; } -/// Trait for types that can be converted to a TransfromNewService -pub trait IntoNewTransform -where - T: NewTransform, -{ - /// Convert to an `TranformNewService` - fn into_new_transform(self) -> T; -} - impl IntoTransform for T where T: Transform, @@ -194,12 +96,3 @@ where self } } - -impl IntoNewTransform for T -where - T: NewTransform, -{ - fn into_new_transform(self) -> T { - self - } -} diff --git a/actix-service/src/transform_map_err.rs b/actix-service/src/transform_map_err.rs deleted file mode 100644 index 048e6403..00000000 --- a/actix-service/src/transform_map_err.rs +++ /dev/null @@ -1,188 +0,0 @@ -use std::marker::PhantomData; - -use futures::{Async, Future, Poll}; - -use super::{NewTransform, Transform}; - -/// Service for the `map_err` combinator, changing the type of a transform's -/// error. -/// -/// This is created by the `Transform::map_err` method. -pub struct TransformMapErr { - transform: T, - f: F, - _t: PhantomData<(S, E)>, -} - -impl TransformMapErr { - /// Create new `MapErr` combinator - pub fn new(transform: T, f: F) -> Self - where - T: Transform, - F: Fn(T::Error) -> E, - { - Self { - transform, - f, - _t: PhantomData, - } - } -} - -impl Clone for TransformMapErr -where - T: Clone, - F: Clone, -{ - fn clone(&self) -> Self { - TransformMapErr { - transform: self.transform.clone(), - f: self.f.clone(), - _t: PhantomData, - } - } -} - -impl Transform for TransformMapErr -where - T: Transform, - F: Fn(T::Error) -> E + Clone, -{ - type Request = T::Request; - type Response = T::Response; - type Error = E; - type Future = TransformMapErrFuture; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.transform.poll_ready().map_err(&self.f) - } - - fn call(&mut self, req: T::Request, service: &mut S) -> Self::Future { - TransformMapErrFuture::new(self.transform.call(req, service), self.f.clone()) - } -} - -pub struct TransformMapErrFuture -where - T: Transform, - F: Fn(T::Error) -> E, -{ - f: F, - fut: T::Future, -} - -impl TransformMapErrFuture -where - T: Transform, - F: Fn(T::Error) -> E, -{ - fn new(fut: T::Future, f: F) -> Self { - TransformMapErrFuture { f, fut } - } -} - -impl Future for TransformMapErrFuture -where - T: Transform, - F: Fn(T::Error) -> E, -{ - type Item = T::Response; - type Error = E; - - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) - } -} - -/// NewTransform for the `map_err` combinator, changing the type of a new -/// transform's error. -/// -/// This is created by the `NewTransform::map_err` method. -pub struct TransformMapErrNewTransform { - t: T, - f: F, - e: PhantomData<(S, C, E)>, -} - -impl TransformMapErrNewTransform { - /// Create new `MapErr` new service instance - pub fn new(t: T, f: F) -> Self - where - T: NewTransform, - F: Fn(T::Error) -> E, - { - Self { - t, - f, - e: PhantomData, - } - } -} - -impl Clone for TransformMapErrNewTransform -where - T: Clone, - F: Clone, -{ - fn clone(&self) -> Self { - Self { - t: self.t.clone(), - f: self.f.clone(), - e: PhantomData, - } - } -} - -impl NewTransform for TransformMapErrNewTransform -where - T: NewTransform, - F: Fn(T::Error) -> E + Clone, -{ - type Request = T::Request; - type Response = T::Response; - type Error = E; - type Transform = TransformMapErr; - - type InitError = T::InitError; - type Future = TransformMapErrNewTransformFuture; - - fn new_transform(&self, cfg: &C) -> Self::Future { - TransformMapErrNewTransformFuture::new(self.t.new_transform(cfg), self.f.clone()) - } -} - -pub struct TransformMapErrNewTransformFuture -where - T: NewTransform, - F: Fn(T::Error) -> E, -{ - fut: T::Future, - f: F, -} - -impl TransformMapErrNewTransformFuture -where - T: NewTransform, - F: Fn(T::Error) -> E, -{ - fn new(fut: T::Future, f: F) -> Self { - TransformMapErrNewTransformFuture { f, fut } - } -} - -impl Future for TransformMapErrNewTransformFuture -where - T: NewTransform, - F: Fn(T::Error) -> E + Clone, -{ - type Item = TransformMapErr; - type Error = T::InitError; - - fn poll(&mut self) -> Poll { - if let Async::Ready(tr) = self.fut.poll()? { - Ok(Async::Ready(TransformMapErr::new(tr, self.f.clone()))) - } else { - Ok(Async::NotReady) - } - } -} diff --git a/actix-service/src/transform_map_init_err.rs b/actix-service/src/transform_map_init_err.rs index bddd68ff..0b0dd9b9 100644 --- a/actix-service/src/transform_map_init_err.rs +++ b/actix-service/src/transform_map_init_err.rs @@ -2,23 +2,23 @@ use std::marker::PhantomData; use futures::{Future, Poll}; -use super::NewTransform; +use super::Transform; /// NewTransform for the `map_init_err` combinator, changing the type of a new /// transform's error. /// /// This is created by the `NewTransform::map_init_err` method. -pub struct TransformMapInitErr { +pub struct TransformMapInitErr { t: T, f: F, - e: PhantomData<(S, C, E)>, + e: PhantomData<(S, E)>, } -impl TransformMapInitErr { +impl TransformMapInitErr { /// Create new `MapInitErr` new transform instance pub fn new(t: T, f: F) -> Self where - T: NewTransform, + T: Transform, F: Fn(T::InitError) -> E, { Self { @@ -29,7 +29,7 @@ impl TransformMapInitErr { } } -impl Clone for TransformMapInitErr +impl Clone for TransformMapInitErr where T: Clone, F: Clone, @@ -43,9 +43,9 @@ where } } -impl NewTransform for TransformMapInitErr +impl Transform for TransformMapInitErr where - T: NewTransform, + T: Transform, F: Fn(T::InitError) -> E + Clone, { type Request = T::Request; @@ -54,25 +54,25 @@ where type Transform = T::Transform; type InitError = E; - type Future = TransformMapInitErrFuture; + type Future = TransformMapInitErrFuture; - fn new_transform(&self, cfg: &C) -> Self::Future { - TransformMapInitErrFuture::new(self.t.new_transform(cfg), self.f.clone()) + fn new_transform(&self, service: S) -> Self::Future { + TransformMapInitErrFuture::new(self.t.new_transform(service), self.f.clone()) } } -pub struct TransformMapInitErrFuture +pub struct TransformMapInitErrFuture where - T: NewTransform, + T: Transform, F: Fn(T::InitError) -> E, { fut: T::Future, f: F, } -impl TransformMapInitErrFuture +impl TransformMapInitErrFuture where - T: NewTransform, + T: Transform, F: Fn(T::InitError) -> E, { fn new(fut: T::Future, f: F) -> Self { @@ -80,15 +80,15 @@ where } } -impl Future for TransformMapInitErrFuture +impl Future for TransformMapInitErrFuture where - T: NewTransform, + T: Transform, F: Fn(T::InitError) -> E + Clone, { type Item = T::Transform; type Error = E; fn poll(&mut self) -> Poll { - self.fut.poll().map_err(|e| (self.f)(e)) + self.fut.poll().map_err(&self.f) } } diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 8d834be6..05a89916 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.3.1] - 2019-03-04 + +### Changed + +* Use new type of transform trait + + ## [0.3.0] - 2019-03-02 ### Changed diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index c104ad33..6eb09260 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "0.3.0" +version = "0.3.1" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] @@ -18,7 +18,8 @@ name = "actix_utils" path = "src/lib.rs" [dependencies] -actix-service = "0.3.0" +#actix-service = "0.3.1" +actix-service = { path="../actix-service" } actix-codec = "0.1.0" bytes = "0.4" futures = "0.1.24" diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs index 32ddd54f..41955f35 100644 --- a/actix-utils/src/inflight.rs +++ b/actix-utils/src/inflight.rs @@ -1,4 +1,4 @@ -use actix_service::{NewTransform, Service, Transform, Void}; +use actix_service::{Service, Transform, Void}; use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; @@ -24,32 +24,34 @@ impl Default for InFlight { } } -impl NewTransform for InFlight { - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; +impl Transform for InFlight { + type Request = S::Request; + type Response = S::Response; + type Error = S::Error; type InitError = Void; - type Transform = InFlightService; + type Transform = InFlightService; type Future = FutureResult; - fn new_transform(&self, _: &C) -> Self::Future { - ok(InFlightService::new(self.max_inflight)) + fn new_transform(&self, service: S) -> Self::Future { + ok(InFlightService::new(self.max_inflight, service)) } } -pub struct InFlightService { +pub struct InFlightService { count: Counter, + service: S, } -impl InFlightService { - pub fn new(max: usize) -> Self { +impl InFlightService { + pub fn new(max: usize, service: S) -> Self { Self { + service, count: Counter::new(max), } } } -impl Transform for InFlightService +impl Service for InFlightService where T: Service, { @@ -59,6 +61,8 @@ where type Future = InFlightServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready()?; + if !self.count.available() { log::trace!("InFlight limit exceeded"); Ok(Async::NotReady) @@ -67,9 +71,9 @@ where } } - fn call(&mut self, req: T::Request, service: &mut T) -> Self::Future { + fn call(&mut self, req: T::Request) -> Self::Future { InFlightServiceResponse { - fut: service.call(req), + fut: self.service.call(req), _guard: self.count.get(), } } @@ -122,7 +126,8 @@ mod tests { fn test_transform() { let wait_time = Duration::from_millis(50); let _ = actix_rt::System::new("test").block_on(lazy(|| { - let mut srv = Blank::new().apply(InFlightService::new(1), SleepService(wait_time)); + let mut srv = + Blank::new().and_then(InFlightService::new(1, SleepService(wait_time))); assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); let mut res = srv.call(()); diff --git a/actix-utils/src/order.rs b/actix-utils/src/order.rs index 293a022b..b267a0b4 100644 --- a/actix-utils/src/order.rs +++ b/actix-utils/src/order.rs @@ -3,7 +3,7 @@ use std::fmt; use std::marker::PhantomData; use std::rc::Rc; -use actix_service::{NewTransform, Service, Transform, Void}; +use actix_service::{Service, Transform, Void}; use futures::future::{ok, FutureResult}; use futures::task::AtomicTask; use futures::unsync::oneshot; @@ -63,13 +63,8 @@ where Self { _t: PhantomData } } - pub fn service() -> impl Transform< - S, - Request = S::Request, - Response = S::Response, - Error = InOrderError, - > { - InOrderService::new() + pub fn service(service: S) -> InOrderService { + InOrderService::new(service) } } @@ -85,7 +80,7 @@ where } } -impl NewTransform for InOrder +impl Transform for InOrder where S: Service, S::Response: 'static, @@ -99,12 +94,13 @@ where type Transform = InOrderService; type Future = FutureResult; - fn new_transform(&self, _: &C) -> Self::Future { - ok(InOrderService::new()) + fn new_transform(&self, service: S) -> Self::Future { + ok(InOrderService::new(service)) } } pub struct InOrderService { + service: S, task: Rc, acks: VecDeque>, } @@ -116,27 +112,16 @@ where S::Future: 'static, S::Error: 'static, { - pub fn new() -> Self { + pub fn new(service: S) -> Self { Self { + service, acks: VecDeque::new(), task: Rc::new(AtomicTask::new()), } } } -impl Default for InOrderService -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - fn default() -> Self { - Self::new() - } -} - -impl Transform for InOrderService +impl Service for InOrderService where S: Service, S::Response: 'static, @@ -149,8 +134,12 @@ where type Future = InOrderServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { + // poll_ready could be called from different task self.task.register(); + // check nested service + self.service.poll_ready().map_err(InOrderError::Service)?; + // check acks while !self.acks.is_empty() { let rec = self.acks.front_mut().unwrap(); @@ -167,13 +156,13 @@ where Ok(Async::Ready(())) } - fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future { + fn call(&mut self, request: S::Request) -> Self::Future { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); self.acks.push_back(Record { rx: rx1, tx: tx2 }); let task = self.task.clone(); - tokio_current_thread::spawn(service.call(request).then(move |res| { + tokio_current_thread::spawn(self.service.call(request).then(move |res| { task.notify(); let _ = tx1.send(res); Ok(()) @@ -257,7 +246,7 @@ mod tests { let rx3 = rx3; let tx_stop = tx_stop; let _ = actix_rt::System::new("test").block_on(lazy(move || { - let mut srv = Blank::new().apply(InOrderService::new(), Srv); + let mut srv = Blank::new().and_then(InOrderService::new(Srv)); let res1 = srv.call(rx1); let res2 = srv.call(rx2); diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 5c452854..6200c02b 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -6,7 +6,7 @@ use std::fmt; use std::marker::PhantomData; use std::time::Duration; -use actix_service::{NewTransform, Service, Transform}; +use actix_service::{Service, Transform}; use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; use tokio_timer::{clock, Delay}; @@ -80,7 +80,7 @@ impl Clone for Timeout { } } -impl NewTransform for Timeout +impl Transform for Timeout where S: Service, { @@ -88,11 +88,12 @@ where type Response = S::Response; type Error = TimeoutError; type InitError = E; - type Transform = TimeoutService; + type Transform = TimeoutService; type Future = FutureResult; - fn new_transform(&self, _: &C) -> Self::Future { + fn new_transform(&self, service: S) -> Self::Future { ok(TimeoutService { + service, timeout: self.timeout, }) } @@ -100,17 +101,18 @@ where /// Applies a timeout to requests. #[derive(Debug, Clone)] -pub struct TimeoutService { +pub struct TimeoutService { + service: S, timeout: Duration, } -impl TimeoutService { - pub fn new(timeout: Duration) -> Self { - TimeoutService { timeout } +impl TimeoutService { + pub fn new(timeout: Duration, service: S) -> Self { + TimeoutService { service, timeout } } } -impl Transform for TimeoutService +impl Service for TimeoutService where S: Service, { @@ -120,12 +122,12 @@ where type Future = TimeoutServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + self.service.poll_ready().map_err(TimeoutError::Service) } - fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future { + fn call(&mut self, request: S::Request) -> Self::Future { TimeoutServiceResponse { - fut: service.call(request), + fut: self.service.call(request), sleep: Delay::new(clock::now() + self.timeout), } } @@ -197,7 +199,7 @@ mod tests { let res = actix_rt::System::new("test").block_on(lazy(|| { let mut timeout = Blank::default() - .apply(TimeoutService::new(resolution), SleepService(wait_time)); + .and_then(TimeoutService::new(resolution, SleepService(wait_time))); timeout.call(()) })); assert_eq!(res, Ok(())); @@ -210,7 +212,7 @@ mod tests { let res = actix_rt::System::new("test").block_on(lazy(|| { let mut timeout = Blank::default() - .apply(TimeoutService::new(resolution), SleepService(wait_time)); + .and_then(TimeoutService::new(resolution, SleepService(wait_time))); timeout.call(()) })); assert_eq!(res, Err(TimeoutError::Timeout));