From 88548199d7d9953ea68020902fe466da88f469ec Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 24 Jan 2019 19:19:44 -0800 Subject: [PATCH] change apply combinator error semantic --- actix-service/CHANGES.md | 3 + actix-service/src/and_then_apply.rs | 299 ++++++++++++++++++++++++++++ actix-service/src/lib.rs | 33 +-- 3 files changed, 321 insertions(+), 14 deletions(-) create mode 100644 actix-service/src/and_then_apply.rs diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index afd194f5..8d63f57e 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -6,6 +6,9 @@ * Use `FnMut` instead of `Fn` for .apply() and .map() combinators and `FnService` type +* Change `.apply()` error semantic, new service's error is `From` + + ## [0.1.5] - 2019-01-13 ### Changed diff --git a/actix-service/src/and_then_apply.rs b/actix-service/src/and_then_apply.rs new file mode 100644 index 00000000..125cb896 --- /dev/null +++ b/actix-service/src/and_then_apply.rs @@ -0,0 +1,299 @@ +use std::marker::PhantomData; + +use futures::{try_ready, Async, Future, IntoFuture, Poll}; + +use super::{IntoNewService, IntoService, NewService, Service}; +use crate::cell::Cell; + +/// `Apply` service combinator +pub struct AndThenApply { + a: A, + b: Cell, + f: Cell, + r: PhantomData<(Out, Req1, Req2)>, +} + +impl AndThenApply +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + /// Create new `Apply` combinator + pub fn new, B1: IntoService>(a: A1, b: B1, f: F) -> Self { + Self { + f: Cell::new(f), + a: a.into_service(), + b: Cell::new(b.into_service()), + r: PhantomData, + } + } +} + +impl Clone for AndThenApply +where + A: Clone, +{ + fn clone(&self) -> Self { + AndThenApply { + a: self.a.clone(), + b: self.b.clone(), + f: self.f.clone(), + r: PhantomData, + } + } +} + +impl Service for AndThenApply +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + type Response = Out::Item; + type Error = Out::Error; + type Future = AndThenApplyFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + try_ready!(self.a.poll_ready().map_err(|e| e.into())); + self.b.get_mut().poll_ready().map_err(|e| e.into()) + } + + fn call(&mut self, req: Req1) -> Self::Future { + AndThenApplyFuture { + b: self.b.clone(), + f: self.f.clone(), + fut_b: None, + fut_a: Some(self.a.call(req)), + _t: PhantomData, + } + } +} + +pub struct AndThenApplyFuture +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + b: Cell, + f: Cell, + fut_a: Option, + fut_b: Option, + _t: PhantomData, +} + +impl Future for AndThenApplyFuture +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + type Item = Out::Item; + type Error = Out::Error; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut_b { + return fut.poll(); + } + + match self.fut_a.as_mut().expect("Bug in actix-service").poll() { + Ok(Async::Ready(resp)) => { + let _ = self.fut_a.take(); + self.fut_b = + Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future()); + self.poll() + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err.into()), + } + } +} + +/// `ApplyNewService` new service combinator +pub struct AndThenApplyNewService { + a: A, + b: B, + f: Cell, + r: PhantomData<(Out, Req1, Req2)>, +} + +impl AndThenApplyNewService +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + /// Create new `ApplyNewService` new service instance + pub fn new, B1: IntoNewService>( + a: A1, + b: B1, + f: F, + ) -> Self { + Self { + f: Cell::new(f), + a: a.into_new_service(), + b: b.into_new_service(), + r: PhantomData, + } + } +} + +impl Clone for AndThenApplyNewService +where + A: Clone, + B: Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + b: self.b.clone(), + f: self.f.clone(), + r: PhantomData, + } + } +} + +impl NewService + for AndThenApplyNewService +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + type Response = Out::Item; + type Error = Out::Error; + + type InitError = A::InitError; + type Service = AndThenApply; + type Future = AndThenApplyNewServiceFuture; + + fn new_service(&self) -> Self::Future { + AndThenApplyNewServiceFuture { + a: None, + b: None, + f: self.f.clone(), + fut_a: self.a.new_service(), + fut_b: self.b.new_service(), + } + } +} + +pub struct AndThenApplyNewServiceFuture +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + fut_b: B::Future, + fut_a: A::Future, + f: Cell, + a: Option, + b: Option, +} + +impl Future for AndThenApplyNewServiceFuture +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: From, +{ + type Item = AndThenApply; + type Error = A::InitError; + + fn poll(&mut self) -> Poll { + if self.a.is_none() { + if let Async::Ready(service) = self.fut_a.poll()? { + self.a = Some(service); + } + } + + if self.b.is_none() { + if let Async::Ready(service) = self.fut_b.poll()? { + self.b = Some(service); + } + } + + if self.a.is_some() && self.b.is_some() { + Ok(Async::Ready(AndThenApply { + f: self.f.clone(), + a: self.a.take().unwrap(), + b: Cell::new(self.b.take().unwrap()), + r: PhantomData, + })) + } else { + Ok(Async::NotReady) + } + } +} + +#[cfg(test)] +mod tests { + use futures::future::{ok, FutureResult}; + use futures::{Async, Future, Poll}; + + use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt}; + + #[derive(Clone)] + struct Srv; + impl Service<()> for Srv { + type Response = (); + type Error = (); + type Future = FutureResult<(), ()>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + ok(()) + } + } + + #[test] + fn test_call() { + let blank = |req| Ok(req); + + let mut srv = blank.into_service().apply(Srv, |req: &'static str, srv| { + srv.call(()).map(move |res| (req, res)) + }); + assert!(srv.poll_ready().is_ok()); + let res = srv.call("srv").poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + } + + #[test] + fn test_new_service() { + let blank = || Ok::<_, ()>((|req| Ok(req)).into_service()); + + let new_srv = blank.into_new_service().apply( + || Ok(Srv), + |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + ); + if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() { + assert!(srv.poll_ready().is_ok()); + let res = srv.call("srv").poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + } else { + panic!() + } + } +} diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 0d2e05d5..38fae4d7 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -1,5 +1,7 @@ use futures::{Future, IntoFuture, Poll}; + mod and_then; +mod and_then_apply; mod apply; mod cell; mod fn_service; @@ -10,6 +12,7 @@ mod map_init_err; mod then; pub use self::and_then::{AndThen, AndThenNewService}; +pub use self::and_then_apply::{AndThenApply, AndThenApplyNewService}; pub use self::apply::{Apply, ApplyNewService}; pub use self::fn_service::{FnNewService, FnService}; pub use self::from_err::{FromErr, FromErrNewService}; @@ -57,19 +60,20 @@ pub trait Service { pub trait ServiceExt: Service { /// Apply function to specified service and use it as a next service in /// chain. - fn apply( + fn apply( self, service: I, f: F, - ) -> AndThen> + ) -> AndThenApply where Self: Sized, - T: Service, - I: IntoService, - F: FnMut(Self::Response, &mut T) -> Out, - Out: IntoFuture, + B: Service, + I: IntoService, + F: FnMut(Self::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: From, { - self.and_then(Apply::new(service.into_service(), f)) + AndThenApply::new(self, service, f) } /// Call another service after call to this one has resolved successfully. @@ -182,19 +186,20 @@ pub trait NewService { /// Apply function to specified service and use it as a next service in /// chain. - fn apply( + fn apply( self, service: I, f: F, - ) -> AndThenNewService> + ) -> AndThenApplyNewService where Self: Sized, - T: NewService, - I: IntoNewService, - F: FnMut(Self::Response, &mut T::Service) -> Out + Clone, - Out: IntoFuture, + B: NewService, + I: IntoNewService, + F: FnMut(Self::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: From, { - self.and_then(ApplyNewService::new(service, f)) + AndThenApplyNewService::new(self, service, f) } /// Call another service after call to this one has resolved successfully.