From 0a4fe2200333dfa47da7dcffd95c17dab371a8cc Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 3 Dec 2019 19:59:28 +0600 Subject: [PATCH] Restore Service/Factory::apply_fn() in form of Pipeline/Factory::and_then_apply_fn() --- actix-service/CHANGES.md | 1 + actix-service/src/and_then_apply_fn.rs | 333 +++++++++++++++++++++++++ actix-service/src/lib.rs | 2 + actix-service/src/pipeline.rs | 47 ++++ 4 files changed, 383 insertions(+) create mode 100644 actix-service/src/and_then_apply_fn.rs diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 0c85bd13..04d7b792 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -6,6 +6,7 @@ ### Restore `Transform::map_init_err()` combinator +### Restore `Service/Factory::apply_fn()` in form of `Pipeline/Factory::and_then_apply_fn()` ## [1.0.0-alpha.2] - 2019-12-02 diff --git a/actix-service/src/and_then_apply_fn.rs b/actix-service/src/and_then_apply_fn.rs new file mode 100644 index 00000000..a372a2f3 --- /dev/null +++ b/actix-service/src/and_then_apply_fn.rs @@ -0,0 +1,333 @@ +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::ready; + +use crate::cell::Cell; +use crate::{Service, ServiceFactory}; + +/// `Apply` service combinator +pub struct AndThenApplyFn +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Fut, + Fut: Future>, + Err: From + From, +{ + a: A, + b: Cell, + f: Cell, + r: PhantomData<(Fut, Res, Err)>, +} + +impl AndThenApplyFn +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Fut, + Fut: Future>, + Err: From + From, +{ + /// Create new `Apply` combinator + pub(crate) fn new(a: A, b: B, f: F) -> Self { + Self { + a, + f: Cell::new(f), + b: Cell::new(b), + r: PhantomData, + } + } +} + +impl Clone for AndThenApplyFn +where + A: Service + Clone, + B: Service, + F: FnMut(A::Response, &mut B) -> Fut, + Fut: Future>, + Err: From + From, +{ + fn clone(&self) -> Self { + AndThenApplyFn { + a: self.a.clone(), + b: self.b.clone(), + f: self.f.clone(), + r: PhantomData, + } + } +} + +impl Service for AndThenApplyFn +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Fut, + Fut: Future>, + Err: From + From, +{ + type Request = A::Request; + type Response = Res; + type Error = Err; + type Future = AndThenApplyFnFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let not_ready = self.a.poll_ready(cx)?.is_pending(); + if self.b.get_mut().poll_ready(cx)?.is_pending() || not_ready { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } + + fn call(&mut self, req: A::Request) -> Self::Future { + AndThenApplyFnFuture { + b: self.b.clone(), + f: self.f.clone(), + fut_a: Some(self.a.call(req)), + fut_b: None, + } + } +} + +pin_project! { + pub struct AndThenApplyFnFuture + where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Fut, + Fut: Future>, + Err: From, + Err: From, + { + b: Cell, + f: Cell, + #[pin] + fut_a: Option, + #[pin] + fut_b: Option, + } +} + +impl Future for AndThenApplyFnFuture +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Fut, + Fut: Future>, + Err: From + From, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); + + if let Some(fut) = this.fut_b.as_pin_mut() { + return Poll::Ready(ready!(fut.poll(cx)).map_err(|e| e.into())); + } + + match this + .fut_a + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx) + { + Poll::Ready(Ok(resp)) => { + this = self.as_mut().project(); + this.fut_b + .set(Some((&mut *this.f.get_mut())(resp, this.b.get_mut()))); + this.fut_a.set(None); + self.poll(cx) + } + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + } + } +} + +/// `AndThenApplyFn` service factory +pub struct AndThenApplyFnFactory { + a: A, + b: B, + f: Cell, + r: PhantomData<(Fut, Res, Err)>, +} + +impl AndThenApplyFnFactory +where + A: ServiceFactory, + B: ServiceFactory, + F: FnMut(A::Response, &mut B::Service) -> Fut, + Fut: Future>, + Err: From + From, +{ + /// Create new `ApplyNewService` new service instance + pub(crate) fn new(a: A, b: B, f: F) -> Self { + Self { + a: a, + b: b, + f: Cell::new(f), + r: PhantomData, + } + } +} + +impl Clone for AndThenApplyFnFactory +where + A: Clone, + B: Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + b: self.b.clone(), + f: self.f.clone(), + r: PhantomData, + } + } +} + +impl ServiceFactory for AndThenApplyFnFactory +where + A: ServiceFactory, + A::Config: Clone, + B: ServiceFactory, + F: FnMut(A::Response, &mut B::Service) -> Fut, + Fut: Future>, + Err: From + From, +{ + type Request = A::Request; + type Response = Res; + type Error = Err; + type Service = AndThenApplyFn; + type Config = A::Config; + type InitError = A::InitError; + type Future = AndThenApplyFnFactoryResponse; + + fn new_service(&self, cfg: A::Config) -> Self::Future { + AndThenApplyFnFactoryResponse { + a: None, + b: None, + f: self.f.clone(), + fut_a: self.a.new_service(cfg.clone()), + fut_b: self.b.new_service(cfg), + } + } +} + +pin_project! { + pub struct AndThenApplyFnFactoryResponse + where + A: ServiceFactory, + B: ServiceFactory, + F: FnMut(A::Response, &mut B::Service) -> Fut, + Fut: Future>, + Err: From, + Err: From, + { + #[pin] + fut_b: B::Future, + #[pin] + fut_a: A::Future, + f: Cell, + a: Option, + b: Option, + } +} + +impl Future for AndThenApplyFnFactoryResponse +where + A: ServiceFactory, + B: ServiceFactory, + F: FnMut(A::Response, &mut B::Service) -> Fut, + Fut: Future>, + Err: From + From, +{ + type Output = + Result, A::InitError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if this.a.is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); + } + } + + if this.b.is_none() { + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); + } + } + + if this.a.is_some() && this.b.is_some() { + Poll::Ready(Ok(AndThenApplyFn { + f: this.f.clone(), + a: this.a.take().unwrap(), + b: Cell::new(this.b.take().unwrap()), + r: PhantomData, + })) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use futures::future::{lazy, ok, Ready, TryFutureExt}; + + use crate::{pipeline, pipeline_factory, service_fn2, Service, ServiceFactory}; + + #[derive(Clone)] + struct Srv; + impl Service for Srv { + type Request = (); + type Response = (); + type Error = (); + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + ok(req) + } + } + + #[actix_rt::test] + async fn test_service() { + let mut srv = pipeline(|r: &'static str| ok(r)) + .and_then_apply_fn(Srv, |req: &'static str, s| { + s.call(()).map_ok(move |res| (req, res)) + }); + let res = lazy(|cx| srv.poll_ready(cx)).await; + assert_eq!(res, Poll::Ready(Ok(()))); + + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), ("srv", ())); + } + + #[actix_rt::test] + async fn test_service_factory() { + let new_srv = pipeline_factory(|| ok::<_, ()>(service_fn2(|r: &'static str| ok(r)))) + .and_then_apply_fn( + || ok(Srv), + |req: &'static str, s| s.call(()).map_ok(move |res| (req, res)), + ); + let mut srv = new_srv.new_service(()).await.unwrap(); + let res = lazy(|cx| srv.poll_ready(cx)).await; + assert_eq!(res, Poll::Ready(Ok(()))); + + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), ("srv", ())); + } +} diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index f0cd60e3..5bf29d4b 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::task::{self, Context, Poll}; mod and_then; +mod and_then_apply_fn; mod apply; mod apply_cfg; pub mod boxed; @@ -313,6 +314,7 @@ where pub mod dev { pub use crate::and_then::{AndThenService, AndThenServiceFactory}; + pub use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory}; pub use crate::apply::{Apply, ApplyServiceFactory}; pub use crate::apply_cfg::{ApplyConfigService, ApplyConfigServiceFactory}; pub use crate::fn_service::{ diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 6e7e945b..b49a8fee 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -1,6 +1,8 @@ +use std::future::Future; use std::task::{Context, Poll}; use crate::and_then::{AndThenService, AndThenServiceFactory}; +use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory}; use crate::map::{Map, MapServiceFactory}; use crate::map_err::{MapErr, MapErrServiceFactory}; use crate::map_init_err::MapInitErr; @@ -53,6 +55,28 @@ impl Pipeline { } } + /// Apply function to specified service and use it as a next service in + /// chain. + /// + /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))` + pub fn and_then_apply_fn( + self, + service: I, + f: F, + ) -> Pipeline> + where + Self: Sized, + I: IntoService, + U: Service, + F: FnMut(T::Response, &mut U) -> Fut, + Fut: Future>, + Err: From + From, + { + Pipeline { + service: AndThenApplyFn::new(self.service, service.into_service(), f), + } + } + /// Chain on a computation for when a call to the service finished, /// passing the result of the call to the next service `U`. /// @@ -159,6 +183,29 @@ impl PipelineFactory { } } + /// Apply function to specified service and use it as a next service in + /// chain. + /// + /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))` + pub fn and_then_apply_fn( + self, + factory: I, + f: F, + ) -> PipelineFactory> + where + Self: Sized, + T::Config: Clone, + I: IntoServiceFactory, + U: ServiceFactory, + F: FnMut(T::Response, &mut U::Service) -> Fut, + Fut: Future>, + Err: From + From, + { + PipelineFactory { + factory: AndThenApplyFnFactory::new(self.factory, factory.into_factory(), f), + } + } + /// Create `NewService` to chain on a computation for when a call to the /// service finished, passing the result of the call to the next /// service `U`.