From bd977373bc97d3915cff942a78e2e4aee18b3c98 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 3 Feb 2019 10:42:27 -0800 Subject: [PATCH] generalize apply combinator with transform trait --- actix-service/CHANGES.md | 7 + actix-service/Cargo.toml | 2 +- actix-service/src/and_then_apply.rs | 197 +++++++------- actix-service/src/and_then_apply_fn.rs | 302 ++++++++++++++++++++++ actix-service/src/apply.rs | 270 ++++++++++++-------- actix-service/src/blank.rs | 85 ++++++ actix-service/src/fn_service.rs | 100 ++++---- actix-service/src/fn_transform.rs | 123 +++++++++ actix-service/src/lib.rs | 62 ++++- actix-service/src/transform.rs | 341 +++++++++++++++++++++++++ actix-utils/CHANGES.md | 7 + actix-utils/Cargo.toml | 5 +- actix-utils/src/inflight.rs | 175 +++++++------ actix-utils/src/timeout.rs | 200 +++++++++------ 14 files changed, 1443 insertions(+), 433 deletions(-) create mode 100644 actix-service/src/and_then_apply_fn.rs create mode 100644 actix-service/src/blank.rs create mode 100644 actix-service/src/fn_transform.rs create mode 100644 actix-service/src/transform.rs diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 720d1485..4682d910 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.2.1] - 2019-02-xx + +### Changed + +* Generalize `.apply` combinator with Transform trait + + ## [0.2.0] - 2019-02-01 ### Changed diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index b5a80edd..7ea60982 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-service" -version = "0.2.0" +version = "0.2.1" authors = ["Nikolay Kim "] description = "Actix Service" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-service/src/and_then_apply.rs b/actix-service/src/and_then_apply.rs index 48ba8d52..96c68619 100644 --- a/actix-service/src/and_then_apply.rs +++ b/actix-service/src/and_then_apply.rs @@ -1,125 +1,120 @@ -use std::marker::PhantomData; +use futures::{Async, Future, Poll}; -use futures::{try_ready, Async, Future, IntoFuture, Poll}; - -use super::{IntoNewService, IntoService, NewService, Service}; +use super::{NewService, NewTransform, Service, Transform}; use crate::cell::Cell; /// `Apply` service combinator -pub struct AndThenApply +pub struct AndThenTransform where A: Service, B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: Transform, + T::Error: From, { a: A, b: Cell, - f: Cell, - r: PhantomData<(Out,)>, + t: Cell, } -impl AndThenApply +impl AndThenTransform where A: Service, B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: Transform, + T::Error: From, { /// Create new `Apply` combinator - pub fn new, B1: IntoService>(a: A1, b: B1, f: F) -> Self { + pub fn new(t: T, a: A, b: B) -> Self { Self { - f: Cell::new(f), - a: a.into_service(), - b: Cell::new(b.into_service()), - r: PhantomData, + a, + b: Cell::new(b), + t: Cell::new(t), } } } -impl Clone for AndThenApply +impl Clone for AndThenTransform where A: Service + Clone, B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: Transform, + T::Error: From, { fn clone(&self) -> Self { - AndThenApply { + AndThenTransform { a: self.a.clone(), b: self.b.clone(), - f: self.f.clone(), - r: PhantomData, + t: self.t.clone(), } } } -impl Service for AndThenApply +impl Service for AndThenTransform where A: Service, B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: Transform, + T::Error: From, { type Request = A::Request; - type Response = Out::Item; - type Error = A::Error; - type Future = AndThenApplyFuture; + type Response = T::Response; + type Error = T::Error; + type Future = AndThenTransformFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - try_ready!(self.a.poll_ready()); - self.b.get_mut().poll_ready().map_err(|e| e.into()) + let notready = Async::NotReady == self.a.poll_ready()?; + let notready = Async::NotReady == self.b.get_mut().poll_ready()? || notready; + let notready = Async::NotReady == self.t.get_mut().poll_ready()? || notready; + + if notready { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } } fn call(&mut self, req: A::Request) -> Self::Future { - AndThenApplyFuture { + AndThenTransformFuture { b: self.b.clone(), - f: self.f.clone(), - fut_b: None, + t: self.t.clone(), + fut_t: None, fut_a: Some(self.a.call(req)), } } } -pub struct AndThenApplyFuture +pub struct AndThenTransformFuture where A: Service, B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: Transform, + T::Error: From, { b: Cell, - f: Cell, + t: Cell, fut_a: Option, - fut_b: Option, + fut_t: Option, } -impl Future for AndThenApplyFuture +impl Future for AndThenTransformFuture where A: Service, B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: Transform, + T::Error: From, { - type Item = Out::Item; - type Error = A::Error; + type Item = T::Response; + type Error = T::Error; fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { + if let Some(ref mut fut) = self.fut_t { return fut.poll().map_err(|e| e.into()); } match self.fut_a.as_mut().expect("Bug in actix-service").poll() { Ok(Async::Ready(resp)) => { let _ = self.fut_a.take(); - self.fut_b = - Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future()); + self.fut_t = Some(self.t.get_mut().call(resp, self.b.get_mut())); self.poll() } Ok(Async::NotReady) => Ok(Async::NotReady), @@ -128,102 +123,100 @@ where } } -/// `ApplyNewService` new service combinator -pub struct AndThenApplyNewService { +/// `Apply` new service combinator +pub struct AndThenTransformNewService { a: A, b: B, - f: Cell, - r: PhantomData<(Out)>, + t: T, } -impl AndThenApplyNewService +impl AndThenTransformNewService where A: NewService, B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: NewTransform, + T::Error: From, { /// Create new `ApplyNewService` new service instance - pub fn new, B1: IntoNewService>(a: A1, b: B1, f: F) -> Self { - Self { - f: Cell::new(f), - a: a.into_new_service(), - b: b.into_new_service(), - r: PhantomData, - } + pub fn new(t: T, a: A, b: B) -> Self { + Self { a, b, t } } } -impl Clone for AndThenApplyNewService +impl Clone for AndThenTransformNewService where A: Clone, B: Clone, + T: Clone, { fn clone(&self) -> Self { Self { a: self.a.clone(), b: self.b.clone(), - f: self.f.clone(), - r: PhantomData, + t: self.t.clone(), } } } -impl NewService for AndThenApplyNewService +impl NewService for AndThenTransformNewService where A: NewService, B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: NewTransform, + T::Error: From, { type Request = A::Request; - type Response = Out::Item; - type Error = A::Error; + type Response = T::Response; + type Error = T::Error; - type InitError = A::InitError; - type Service = AndThenApply; - type Future = AndThenApplyNewServiceFuture; + type InitError = T::InitError; + type Service = AndThenTransform; + type Future = AndThenTransformNewServiceFuture; fn new_service(&self) -> Self::Future { - AndThenApplyNewServiceFuture { + AndThenTransformNewServiceFuture { a: None, b: None, - f: self.f.clone(), + t: None, fut_a: self.a.new_service(), fut_b: self.b.new_service(), + fut_t: self.t.new_transform(), } } } -pub struct AndThenApplyNewServiceFuture +pub struct AndThenTransformNewServiceFuture where A: NewService, B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: NewTransform, + T::Error: From, { fut_b: B::Future, fut_a: A::Future, - f: Cell, + fut_t: T::Future, a: Option, b: Option, + t: Option, } -impl Future for AndThenApplyNewServiceFuture +impl Future for AndThenTransformNewServiceFuture where A: NewService, B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, + T: NewTransform, + T::Error: From, { - type Item = AndThenApply; - type Error = A::InitError; + type Item = AndThenTransform; + type Error = T::InitError; fn poll(&mut self) -> Poll { + if self.t.is_none() { + if let Async::Ready(transform) = self.fut_t.poll()? { + self.t = Some(transform); + } + } + if self.a.is_none() { if let Async::Ready(service) = self.fut_a.poll()? { self.a = Some(service); @@ -236,12 +229,11 @@ where } } - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(AndThenApply { - f: self.f.clone(), + if self.a.is_some() && self.b.is_some() && self.t.is_some() { + Ok(Async::Ready(AndThenTransform { a: self.a.take().unwrap(), + t: Cell::new(self.t.take().unwrap()), b: Cell::new(self.b.take().unwrap()), - r: PhantomData, })) } else { Ok(Async::NotReady) @@ -274,12 +266,13 @@ mod tests { } #[test] - fn test_call() { + fn test_apply() { let blank = |req| Ok(req); - let mut srv = blank.into_service().apply(Srv, |req: &'static str, srv| { - srv.call(()).map(move |res| (req, res)) - }); + let mut srv = blank.into_service().apply( + |req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)), + Srv, + ); assert!(srv.poll_ready().is_ok()); let res = srv.call("srv").poll(); assert!(res.is_ok()); @@ -291,8 +284,8 @@ mod tests { let blank = || Ok::<_, ()>((|req| Ok(req)).into_service()); let new_srv = blank.into_new_service().apply( + |req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)), || Ok(Srv), - |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), ); if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() { assert!(srv.poll_ready().is_ok()); diff --git a/actix-service/src/and_then_apply_fn.rs b/actix-service/src/and_then_apply_fn.rs new file mode 100644 index 00000000..f6f7f266 --- /dev/null +++ b/actix-service/src/and_then_apply_fn.rs @@ -0,0 +1,302 @@ +use std::marker::PhantomData; + +use futures::{try_ready, Async, Future, IntoFuture, Poll}; + +use super::{IntoNewService, IntoService, NewService, Service}; +use crate::cell::Cell; + +/// `Apply` service combinator +pub struct AndThenApply +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + a: A, + b: Cell, + f: Cell, + r: PhantomData<(Out,)>, +} + +impl AndThenApply +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + /// Create new `Apply` combinator + pub fn new, B1: IntoService>(a: A1, b: B1, f: F) -> Self { + Self { + f: Cell::new(f), + a: a.into_service(), + b: Cell::new(b.into_service()), + r: PhantomData, + } + } +} + +impl Clone for AndThenApply +where + A: Service + Clone, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + fn clone(&self) -> Self { + AndThenApply { + a: self.a.clone(), + b: self.b.clone(), + f: self.f.clone(), + r: PhantomData, + } + } +} + +impl Service for AndThenApply +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + type Request = A::Request; + type Response = Out::Item; + type Error = A::Error; + type Future = AndThenApplyFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + try_ready!(self.a.poll_ready()); + self.b.get_mut().poll_ready().map_err(|e| e.into()) + } + + fn call(&mut self, req: A::Request) -> Self::Future { + AndThenApplyFuture { + b: self.b.clone(), + f: self.f.clone(), + fut_b: None, + fut_a: Some(self.a.call(req)), + } + } +} + +pub struct AndThenApplyFuture +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + b: Cell, + f: Cell, + fut_a: Option, + fut_b: Option, +} + +impl Future for AndThenApplyFuture +where + A: Service, + B: Service, + F: FnMut(A::Response, &mut B) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + type Item = Out::Item; + type Error = A::Error; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut_b { + return fut.poll().map_err(|e| e.into()); + } + + match self.fut_a.as_mut().expect("Bug in actix-service").poll() { + Ok(Async::Ready(resp)) => { + let _ = self.fut_a.take(); + self.fut_b = + Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future()); + self.poll() + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err.into()), + } + } +} + +/// `ApplyNewService` new service combinator +pub struct AndThenApplyNewService { + a: A, + b: B, + f: Cell, + r: PhantomData<(Out)>, +} + +impl AndThenApplyNewService +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + /// Create new `ApplyNewService` new service instance + pub fn new, B1: IntoNewService>(a: A1, b: B1, f: F) -> Self { + Self { + f: Cell::new(f), + a: a.into_new_service(), + b: b.into_new_service(), + r: PhantomData, + } + } +} + +impl Clone for AndThenApplyNewService +where + A: Clone, + B: Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + b: self.b.clone(), + f: self.f.clone(), + r: PhantomData, + } + } +} + +impl NewService for AndThenApplyNewService +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + type Request = A::Request; + type Response = Out::Item; + type Error = A::Error; + + type InitError = A::InitError; + type Service = AndThenApply; + type Future = AndThenApplyNewServiceFuture; + + fn new_service(&self) -> Self::Future { + AndThenApplyNewServiceFuture { + a: None, + b: None, + f: self.f.clone(), + fut_a: self.a.new_service(), + fut_b: self.b.new_service(), + } + } +} + +pub struct AndThenApplyNewServiceFuture +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + fut_b: B::Future, + fut_a: A::Future, + f: Cell, + a: Option, + b: Option, +} + +impl Future for AndThenApplyNewServiceFuture +where + A: NewService, + B: NewService, + F: FnMut(A::Response, &mut B::Service) -> Out, + Out: IntoFuture, + Out::Error: Into, +{ + type Item = AndThenApply; + type Error = A::InitError; + + fn poll(&mut self) -> Poll { + if self.a.is_none() { + if let Async::Ready(service) = self.fut_a.poll()? { + self.a = Some(service); + } + } + + if self.b.is_none() { + if let Async::Ready(service) = self.fut_b.poll()? { + self.b = Some(service); + } + } + + if self.a.is_some() && self.b.is_some() { + Ok(Async::Ready(AndThenApply { + f: self.f.clone(), + a: self.a.take().unwrap(), + b: Cell::new(self.b.take().unwrap()), + r: PhantomData, + })) + } else { + Ok(Async::NotReady) + } + } +} + +#[cfg(test)] +mod tests { + use futures::future::{ok, FutureResult}; + use futures::{Async, Future, Poll}; + + use crate::{Blank, BlankNewService, NewService, Service, ServiceExt}; + + #[derive(Clone)] + struct Srv; + impl Service for Srv { + type Request = (); + type Response = (); + type Error = (); + type Future = FutureResult<(), ()>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + ok(()) + } + } + + #[test] + fn test_call() { + let mut srv = Blank::new().apply_fn(Srv, |req: &'static str, srv| { + srv.call(()).map(move |res| (req, res)) + }); + assert!(srv.poll_ready().is_ok()); + let res = srv.call("srv").poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + } + + #[test] + fn test_new_service() { + let new_srv = BlankNewService::new_unit().apply_fn( + || Ok(Srv), + |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + ); + if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() { + assert!(srv.poll_ready().is_ok()); + let res = srv.call("srv").poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + } else { + panic!() + } + } +} diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index eb31acec..aa7a8cc1 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -1,172 +1,214 @@ -use std::marker::PhantomData; +use futures::{try_ready, Async, Future, IntoFuture, Poll}; -use futures::{Async, Future, IntoFuture, Poll}; - -use super::{IntoNewService, IntoService, NewService, Service}; +use super::{FnNewTransform, FnTransform}; +use super::{ + IntoNewService, IntoNewTransform, IntoService, IntoTransform, NewService, NewTransform, + Service, Transform, +}; /// `Apply` service combinator -pub struct Apply +pub struct Apply where - T: Service, + T: Transform, + T::Error: From, + S: Service, { - service: T, - f: F, - r: PhantomData<(In, Out)>, + transform: T, + service: S, } -impl Apply +impl Apply where - T: Service, - F: FnMut(In, &mut T) -> Out, - Out: IntoFuture, - Out::Error: From, + T: Transform, + T::Error: From, + S: Service, { /// Create new `Apply` combinator - pub fn new>(service: I, f: F) -> Self { + pub fn new, S1: IntoService>( + transform: T1, + service: S1, + ) -> Self { Self { + transform: transform.into_transform(), service: service.into_service(), - f, - r: PhantomData, } } } -impl Clone for Apply +impl Apply, S> where - T: Service + Clone, - F: Clone, + F: FnMut(Req, &mut S) -> Out, + Out: IntoFuture, + Out::Error: From, + S: Service, +{ + /// Create new `Apply` combinator + pub fn new_fn>(transform: F, service: S1) -> Self { + Self { + transform: transform.into_transform(), + service: service.into_service(), + } + } +} + +impl Clone for Apply +where + S: Service + Clone, + T::Error: From, + T: Transform + Clone, { fn clone(&self) -> Self { Apply { service: self.service.clone(), - f: self.f.clone(), - r: PhantomData, + transform: self.transform.clone(), } } } -impl Service for Apply +impl Service for Apply where - T: Service, - F: FnMut(In, &mut T) -> Out, - Out: IntoFuture, - Out::Error: From, + T: Transform, + T::Error: From, + S: Service, { - type Request = In; - type Response = Out::Item; - type Error = Out::Error; - type Future = Out::Future; + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Future = T::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(|e| e.into()) + try_ready!(self.service.poll_ready()); + self.transform.poll_ready().map_err(|e| e.into()) } - fn call(&mut self, req: In) -> Self::Future { - (self.f)(req, &mut self.service).into_future() + fn call(&mut self, req: Self::Request) -> Self::Future { + self.transform.call(req, &mut self.service).into_future() } } /// `ApplyNewService` new service combinator -pub struct ApplyNewService +pub struct ApplyNewService where - T: NewService, + T::Error: From, + // T::InitError: From, + T: NewTransform, + S: NewService, { - service: T, - f: F, - r: PhantomData<(In, Out)>, + transform: T, + service: S, } -impl ApplyNewService +impl ApplyNewService where - T: NewService, - F: FnMut(In, &mut T::Service) -> Out + Clone, - Out: IntoFuture, - Out::Error: From, + T: NewTransform, + T::Error: From, + S: NewService, { /// Create new `ApplyNewService` new service instance - pub fn new>(service: F1, f: F) -> Self { + pub fn new, S1: IntoNewService>( + transform: T1, + service: S1, + ) -> Self { Self { - f, + transform: transform.into_new_transform(), service: service.into_new_service(), - r: PhantomData, } } } -impl Clone for ApplyNewService +impl ApplyNewService, S> where - T: NewService + Clone, - F: FnMut(In, &mut T::Service) -> Out + Clone, + F: FnMut(Req, &mut S::Service) -> Out + Clone, Out: IntoFuture, + Out::Error: From, + S: NewService, +{ + /// Create new `Apply` combinator factory + pub fn new_fn>(service: S1, transform: F) -> Self { + Self { + service: service.into_new_service(), + transform: FnNewTransform::new(transform), + } + } +} + +impl Clone for ApplyNewService +where + T: NewTransform + Clone, + T::Error: From, + S: NewService + Clone, { fn clone(&self) -> Self { Self { service: self.service.clone(), - f: self.f.clone(), - r: PhantomData, + transform: self.transform.clone(), } } } -impl NewService for ApplyNewService +impl NewService for ApplyNewService where - T: NewService, - F: FnMut(In, &mut T::Service) -> Out + Clone, - Out: IntoFuture, - Out::Error: From, + T: NewTransform, + T::Error: From, + S: NewService, { - type Request = In; - type Response = Out::Item; - type Error = Out::Error; - type Service = Apply; + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Service = Apply; type InitError = T::InitError; - type Future = ApplyNewServiceFuture; + type Future = ApplyNewServiceFuture; fn new_service(&self) -> Self::Future { - ApplyNewServiceFuture::new(self.service.new_service(), self.f.clone()) - } -} - -pub struct ApplyNewServiceFuture -where - T: NewService, - F: FnMut(In, &mut T::Service) -> Out + Clone, - Out: IntoFuture, -{ - fut: T::Future, - f: Option, - r: PhantomData<(In, Out)>, -} - -impl ApplyNewServiceFuture -where - T: NewService, - F: FnMut(In, &mut T::Service) -> Out + Clone, - Out: IntoFuture, -{ - fn new(fut: T::Future, f: F) -> Self { ApplyNewServiceFuture { - f: Some(f), - fut, - r: PhantomData, + fut_t: self.transform.new_transform(), + fut_s: self.service.new_service(), + service: None, + transform: None, } } } -impl Future for ApplyNewServiceFuture +pub struct ApplyNewServiceFuture where - T: NewService, - F: FnMut(In, &mut T::Service) -> Out + Clone, - Out: IntoFuture, - Out::Error: From, + T: NewTransform, + T::Error: From, + S: NewService, { - type Item = Apply; + fut_s: S::Future, + fut_t: T::Future, + service: Option, + transform: Option, +} + +impl Future for ApplyNewServiceFuture +where + T: NewTransform, + T::Error: From, + S: NewService, +{ + type Item = Apply; type Error = T::InitError; fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(Apply::new(service, self.f.take().unwrap()))) + if self.transform.is_none() { + if let Async::Ready(transform) = self.fut_t.poll()? { + self.transform = Some(transform); + } + } + + if self.service.is_none() { + if let Async::Ready(service) = self.fut_s.poll()? { + self.service = Some(service); + } + } + + if self.transform.is_some() && self.service.is_some() { + Ok(Async::Ready(Apply { + service: self.service.take().unwrap(), + transform: self.transform.take().unwrap(), + })) } else { Ok(Async::NotReady) } @@ -178,7 +220,8 @@ mod tests { use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; - use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt}; + use super::*; + use crate::{NewService, Service}; #[derive(Clone)] struct Srv; @@ -198,12 +241,11 @@ mod tests { } #[test] - fn test_call() { - let blank = |req| Ok(req); - - let mut srv = blank.into_service().apply(Srv, |req: &'static str, srv| { - srv.call(()).map(move |res| (req, res)) - }); + fn test_apply() { + let mut srv = Apply::new_fn( + |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + Srv, + ); assert!(srv.poll_ready().is_ok()); let res = srv.call("srv").poll(); assert!(res.is_ok()); @@ -212,11 +254,9 @@ mod tests { #[test] fn test_new_service() { - let blank = || Ok::<_, ()>((|req| Ok(req)).into_service()); - - let new_srv = blank.into_new_service().apply( - || Ok(Srv), - |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + let new_srv = ApplyNewService::new( + |req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)), + || Ok::<_, ()>(Srv), ); if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() { assert!(srv.poll_ready().is_ok()); @@ -227,4 +267,20 @@ mod tests { panic!() } } + + // #[test] + // fn test_new_service_fn() { + // let new_srv = ApplyNewService::new_fn( + // || Ok(Srv), + // |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + // ); + // if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() { + // assert!(srv.poll_ready().is_ok()); + // let res = srv.call("srv").poll(); + // assert!(res.is_ok()); + // assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + // } else { + // panic!() + // } + // } } diff --git a/actix-service/src/blank.rs b/actix-service/src/blank.rs new file mode 100644 index 00000000..ff736123 --- /dev/null +++ b/actix-service/src/blank.rs @@ -0,0 +1,85 @@ +use std::marker::PhantomData; + +use futures::future::{ok, FutureResult}; +use futures::{Async, Poll}; + +use super::{NewService, Service}; + +/// Empty service +#[derive(Clone)] +pub struct Blank { + _t: PhantomData<(R, E)>, +} + +impl Blank { + //pub fn new() -> Blank { + // Blank { _t: PhantomData } + //} + + pub fn err(self) -> Blank { + Blank { _t: PhantomData } + } +} + +impl Blank { + pub fn new() -> Blank { + Blank { _t: PhantomData } + } +} + +impl Default for Blank { + fn default() -> Blank { + Blank { _t: PhantomData } + } +} + +impl Service for Blank { + type Request = R; + type Response = R; + type Error = E; + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: R) -> Self::Future { + ok(req) + } +} + +/// Empty service factory +pub struct BlankNewService { + _t: PhantomData<(R, E1, E2)>, +} + +impl BlankNewService { + pub fn new() -> BlankNewService { + BlankNewService { _t: PhantomData } + } +} + +impl BlankNewService { + pub fn new_unit() -> BlankNewService { + BlankNewService { _t: PhantomData } + } +} + +impl Default for BlankNewService { + fn default() -> BlankNewService { + Self::new() + } +} + +impl NewService for BlankNewService { + type Request = R; + type Response = R; + type Error = E1; + type InitError = E2; + type Service = Blank; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(Blank::default()) + } +} diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 88ed8917..9ede9826 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -1,56 +1,51 @@ -use std::marker; +use std::marker::PhantomData; -use futures::{ - future::{ok, FutureResult}, - Async, IntoFuture, Poll, -}; +use futures::future::{ok, FutureResult}; +use futures::{Async, IntoFuture, Poll}; use super::{IntoNewService, IntoService, NewService, Service}; -pub struct FnService +pub struct FnService where - F: FnMut(Req) -> Fut, - Fut: IntoFuture, + F: FnMut(Req) -> Out, + Out: IntoFuture, { f: F, - _t: marker::PhantomData<(Req, Resp, E)>, + _t: PhantomData<(Req,)>, } -impl FnService +impl FnService where - F: FnMut(Req) -> Fut, - Fut: IntoFuture, + F: FnMut(Req) -> Out, + Out: IntoFuture, { pub fn new(f: F) -> Self { - FnService { - f, - _t: marker::PhantomData, - } + FnService { f, _t: PhantomData } } } -impl Clone for FnService +impl Clone for FnService where - F: FnMut(Req) -> Fut + Clone, - Fut: IntoFuture, + F: FnMut(Req) -> Out + Clone, + Out: IntoFuture, { fn clone(&self) -> Self { FnService { f: self.f.clone(), - _t: marker::PhantomData, + _t: PhantomData, } } } -impl Service for FnService +impl Service for FnService where - F: FnMut(Req) -> Fut, - Fut: IntoFuture, + F: FnMut(Req) -> Out, + Out: IntoFuture, { type Request = Req; - type Response = Resp; - type Error = E; - type Future = Fut::Future; + type Response = Out::Item; + type Error = Out::Error; + type Future = Out::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) @@ -61,47 +56,44 @@ where } } -impl IntoService> for F +impl IntoService> for F where - F: FnMut(Req) -> Fut + 'static, - Fut: IntoFuture, + F: FnMut(Req) -> Out + 'static, + Out: IntoFuture, { - fn into_service(self) -> FnService { + fn into_service(self) -> FnService { FnService::new(self) } } -pub struct FnNewService +pub struct FnNewService where - F: FnMut(Req) -> Fut, - Fut: IntoFuture, + F: FnMut(Req) -> Out, + Out: IntoFuture, { f: F, - _t: marker::PhantomData<(Req, Resp, Err)>, + _t: PhantomData<(Req,)>, } -impl FnNewService +impl FnNewService where - F: FnMut(Req) -> Fut + Clone, - Fut: IntoFuture, + F: FnMut(Req) -> Out + Clone, + Out: IntoFuture, { pub fn new(f: F) -> Self { - FnNewService { - f, - _t: marker::PhantomData, - } + FnNewService { f, _t: PhantomData } } } -impl NewService for FnNewService +impl NewService for FnNewService where - F: FnMut(Req) -> Fut + Clone, - Fut: IntoFuture, + F: FnMut(Req) -> Out + Clone, + Out: IntoFuture, { type Request = Req; - type Response = Resp; - type Error = Err; - type Service = FnService; + type Response = Out::Item; + type Error = Out::Error; + type Service = FnService; type InitError = (); type Future = FutureResult; @@ -110,20 +102,20 @@ where } } -impl IntoNewService> for F +impl IntoNewService> for F where - F: FnMut(Req) -> Fut + Clone + 'static, - Fut: IntoFuture, + F: FnMut(Req) -> Out + Clone + 'static, + Out: IntoFuture, { - fn into_new_service(self) -> FnNewService { + fn into_new_service(self) -> FnNewService { FnNewService::new(self) } } -impl Clone for FnNewService +impl Clone for FnNewService where - F: FnMut(Req) -> Fut + Clone, - Fut: IntoFuture, + F: FnMut(Req) -> Out + Clone, + Out: IntoFuture, { fn clone(&self) -> Self { Self::new(self.f.clone()) diff --git a/actix-service/src/fn_transform.rs b/actix-service/src/fn_transform.rs new file mode 100644 index 00000000..fc9de01d --- /dev/null +++ b/actix-service/src/fn_transform.rs @@ -0,0 +1,123 @@ +use std::marker::PhantomData; + +use futures::future::{ok, FutureResult}; +use futures::{Async, IntoFuture, Poll}; + +use super::{IntoNewTransform, IntoTransform, NewTransform, Transform}; + +pub struct FnTransform +where + F: FnMut(Req, &mut S) -> Res, + Res: IntoFuture, +{ + f: F, + _t: PhantomData<(S, Req, Res)>, +} + +impl FnTransform +where + F: FnMut(Req, &mut S) -> Res, + Res: IntoFuture, +{ + pub fn new(f: F) -> Self { + FnTransform { f, _t: PhantomData } + } +} + +impl Clone for FnTransform +where + F: FnMut(Req, &mut S) -> Res + Clone, + Res: IntoFuture, +{ + fn clone(&self) -> Self { + FnTransform { + f: self.f.clone(), + _t: PhantomData, + } + } +} + +impl Transform for FnTransform +where + F: FnMut(Req, &mut S) -> Res, + Res: IntoFuture, +{ + type Request = Req; + type Response = Res::Item; + type Error = Res::Error; + type Future = Res::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, request: Req, service: &mut S) -> Self::Future { + (self.f)(request, service).into_future() + } +} + +impl IntoTransform, S> for F +where + F: FnMut(Req, &mut S) -> Res, + Res: IntoFuture, +{ + fn into_transform(self) -> FnTransform { + FnTransform::new(self) + } +} + +pub struct FnNewTransform +where + F: FnMut(Req, &mut S) -> Out + Clone, + Out: IntoFuture, +{ + f: F, + _t: PhantomData<(S, Req, Out, Err)>, +} + +impl FnNewTransform +where + F: FnMut(Req, &mut S) -> Res + Clone, + Res: IntoFuture, +{ + pub fn new(f: F) -> Self { + FnNewTransform { f, _t: PhantomData } + } +} + +impl NewTransform for FnNewTransform +where + F: FnMut(Req, &mut S) -> Res + Clone, + Res: IntoFuture, +{ + type Request = Req; + type Response = Res::Item; + type Error = Res::Error; + type Transform = FnTransform; + type InitError = Err; + type Future = FutureResult; + + fn new_transform(&self) -> Self::Future { + ok(FnTransform::new(self.f.clone())) + } +} + +impl IntoNewTransform, S> for F +where + F: FnMut(Req, &mut S) -> Res + Clone, + Res: IntoFuture, +{ + fn into_new_transform(self) -> FnNewTransform { + FnNewTransform::new(self) + } +} + +impl Clone for FnNewTransform +where + F: FnMut(Req, &mut S) -> Res + Clone, + Res: IntoFuture, +{ + fn clone(&self) -> Self { + Self::new(self.f.clone()) + } +} diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 017d5fdd..0672c324 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -2,24 +2,32 @@ use futures::{Future, IntoFuture, Poll}; mod and_then; mod and_then_apply; +mod and_then_apply_fn; mod apply; +mod blank; mod cell; mod fn_service; +mod fn_transform; mod from_err; mod map; mod map_err; mod map_init_err; mod then; +mod transform; pub use self::and_then::{AndThen, AndThenNewService}; -pub use self::and_then_apply::{AndThenApply, AndThenApplyNewService}; +use self::and_then_apply::{AndThenTransform, AndThenTransformNewService}; +use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService}; pub use self::apply::{Apply, ApplyNewService}; +pub use self::blank::{Blank, BlankNewService}; pub use self::fn_service::{FnNewService, FnService}; +pub use self::fn_transform::{FnNewTransform, FnTransform}; pub use self::from_err::{FromErr, FromErrNewService}; pub use self::map::{Map, MapNewService}; pub use self::map_err::{MapErr, MapErrNewService}; pub use self::map_init_err::MapInitErr; pub use self::then::{Then, ThenNewService}; +pub use self::transform::{IntoNewTransform, IntoTransform, NewTransform, Transform}; /// An asynchronous function from `Request` to a `Response`. pub trait Service { @@ -61,16 +69,30 @@ pub trait Service { /// An extension trait for `Service`s that provides a variety of convenient /// adapters pub trait ServiceExt: Service { - /// Apply function to specified service and use it as a next service in + /// Apply tranformation to specified service and use it as a next service in /// chain. - fn apply(self, service: I, f: F) -> AndThenApply + fn apply(self, transform: T1, service: B1) -> AndThenTransform + where + Self: Sized, + T: Transform, + T::Error: From, + T1: IntoTransform, + B: Service, + B1: IntoService, + { + AndThenTransform::new(transform.into_transform(), self, service.into_service()) + } + + /// Apply function to specified service and use it as a next service in + /// chain. + fn apply_fn(self, service: B1, f: F) -> AndThenApply where Self: Sized, - B: Service, - I: IntoService, F: FnMut(Self::Response, &mut B) -> Out, Out: IntoFuture, Out::Error: Into, + B: Service, + B1: IntoService, { AndThenApply::new(self, service, f) } @@ -190,14 +212,32 @@ pub trait NewService { /// Apply function to specified service and use it as a next service in /// chain. - fn apply( + fn apply( self, - service: I, - f: F, - ) -> AndThenApplyNewService + transform: T1, + service: B1, + ) -> AndThenTransformNewService where Self: Sized, - B: NewService, + T: NewTransform, + T::Error: From, + T1: IntoNewTransform, + B: NewService, + B1: IntoNewService, + { + AndThenTransformNewService::new( + transform.into_new_transform(), + self, + service.into_new_service(), + ) + } + + /// Apply function to specified service and use it as a next service in + /// chain. + fn apply_fn(self, service: I, f: F) -> AndThenApplyNewService + where + Self: Sized, + B: NewService, I: IntoNewService, F: FnMut(Self::Response, &mut B::Service) -> Out, Out: IntoFuture, @@ -345,7 +385,7 @@ where fn into_service(self) -> T; } -/// Trait for types that can be converted to a Service +/// Trait for types that can be converted to a `NewService` pub trait IntoNewService where T: NewService, diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs new file mode 100644 index 00000000..96c553ba --- /dev/null +++ b/actix-service/src/transform.rs @@ -0,0 +1,341 @@ +use std::marker::PhantomData; + +use futures::{Async, Future, Poll}; + +use super::Service; + +/// An asynchronous function for transforming service call result. +pub trait Transform { + /// Requests handled by the service. + type Request; + + /// Responses given by the service. + type Response; + + /// Errors produced by the service. + type Error; + + /// The future response value. + type Future: Future; + + /// Returns `Ready` when the service is able to process requests. + /// + /// This method is similar to `Service::poll_ready` method. + fn poll_ready(&mut self) -> Poll<(), Self::Error>; + + /// Process the request and apply it to provided service, + /// return the response asynchronously. + fn call(&mut self, request: Self::Request, service: &mut Service) -> Self::Future; + + /// Map this transform's error to a different error, returning a new transform. + /// + /// This function is similar to the `Result::map_err` where it will change + /// the error type of the underlying transform. This is useful for example to + /// ensure that services and transforms have the same error type. + /// + /// Note that this function consumes the receiving transform and returns a + /// wrapped version of it. + fn map_err(self, f: F) -> TransformMapErr + where + Self: Sized, + F: Fn(Self::Error) -> E, + { + TransformMapErr::new(self, f) + } +} + +/// `Transform` service factory +pub trait NewTransform { + /// Requests handled by the service. + type Request; + + /// Responses given by the service. + type Response; + + /// Errors produced by the service. + type Error; + + /// The `TransformService` value created by this factory + type Transform: Transform< + Service, + Request = Self::Request, + Response = Self::Response, + Error = Self::Error, + >; + + /// Errors produced while building a service. + type InitError; + + /// The future response value. + type Future: Future; + + /// Create and return a new service value asynchronously. + fn new_transform(&self) -> Self::Future; + + /// Map this transforms's output to a different type, returning a new transform + /// of the resulting type. + fn map_err(self, f: F) -> TransformMapErrNewTransform + where + Self: Sized, + F: Fn(Self::Error) -> E, + { + TransformMapErrNewTransform::new(self, f) + } +} + +impl<'a, T, S> Transform for &'a mut T +where + T: Transform + 'a, + S: Service, +{ + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Future = T::Future; + + fn poll_ready(&mut self) -> Poll<(), T::Error> { + (**self).poll_ready() + } + + fn call(&mut self, request: Self::Request, service: &mut S) -> T::Future { + (**self).call(request, service) + } +} + +impl Transform for Box +where + T: Transform + ?Sized, + S: Service, +{ + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Future = T::Future; + + fn poll_ready(&mut self) -> Poll<(), S::Error> { + (**self).poll_ready() + } + + fn call(&mut self, request: Self::Request, service: &mut S) -> T::Future { + (**self).call(request, service) + } +} + +/// Trait for types that can be converted to a `TransformService` +pub trait IntoTransform +where + T: Transform, +{ + /// Convert to a `TransformService` + fn into_transform(self) -> T; +} + +/// Trait for types that can be converted to a TransfromNewService +pub trait IntoNewTransform +where + T: NewTransform, +{ + /// Convert to an `TranformNewService` + fn into_new_transform(self) -> T; +} + +impl IntoTransform for T +where + T: Transform, +{ + fn into_transform(self) -> T { + self + } +} + +impl IntoNewTransform for T +where + T: NewTransform, +{ + fn into_new_transform(self) -> T { + self + } +} + +/// Service for the `map_err` combinator, changing the type of a transform's +/// error. +/// +/// This is created by the `Transform::map_err` method. +pub struct TransformMapErr { + transform: T, + f: F, + _t: PhantomData<(S, E)>, +} + +impl TransformMapErr { + /// Create new `MapErr` combinator + pub fn new(transform: T, f: F) -> Self + where + T: Transform, + F: Fn(T::Error) -> E, + { + Self { + transform, + f, + _t: PhantomData, + } + } +} + +impl Clone for TransformMapErr +where + T: Clone, + F: Clone, +{ + fn clone(&self) -> Self { + TransformMapErr { + transform: self.transform.clone(), + f: self.f.clone(), + _t: PhantomData, + } + } +} + +impl Transform for TransformMapErr +where + T: Transform, + F: Fn(T::Error) -> E + Clone, +{ + type Request = T::Request; + type Response = T::Response; + type Error = E; + type Future = TransformMapErrFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.transform.poll_ready().map_err(&self.f) + } + + fn call(&mut self, req: T::Request, service: &mut S) -> Self::Future { + TransformMapErrFuture::new(self.transform.call(req, service), self.f.clone()) + } +} + +pub struct TransformMapErrFuture +where + T: Transform, + F: Fn(T::Error) -> E, +{ + f: F, + fut: T::Future, +} + +impl TransformMapErrFuture +where + T: Transform, + F: Fn(T::Error) -> E, +{ + fn new(fut: T::Future, f: F) -> Self { + TransformMapErrFuture { f, fut } + } +} + +impl Future for TransformMapErrFuture +where + T: Transform, + F: Fn(T::Error) -> E, +{ + type Item = T::Response; + type Error = E; + + fn poll(&mut self) -> Poll { + self.fut.poll().map_err(&self.f) + } +} + +/// NewTransform for the `map_err` combinator, changing the type of a new +/// transform's error. +/// +/// This is created by the `NewTransform::map_err` method. +pub struct TransformMapErrNewTransform { + t: T, + f: F, + e: PhantomData<(S, E)>, +} + +impl TransformMapErrNewTransform { + /// Create new `MapErr` new service instance + pub fn new(t: T, f: F) -> Self + where + T: NewTransform, + F: Fn(T::Error) -> E, + { + Self { + t, + f, + e: PhantomData, + } + } +} + +impl Clone for TransformMapErrNewTransform +where + T: Clone, + F: Clone, +{ + fn clone(&self) -> Self { + Self { + t: self.t.clone(), + f: self.f.clone(), + e: PhantomData, + } + } +} + +impl NewTransform for TransformMapErrNewTransform +where + T: NewTransform, + F: Fn(T::Error) -> E + Clone, +{ + type Request = T::Request; + type Response = T::Response; + type Error = E; + type Transform = TransformMapErr; + + type InitError = T::InitError; + type Future = TransformMapErrNewTransformFuture; + + fn new_transform(&self) -> Self::Future { + TransformMapErrNewTransformFuture::new(self.t.new_transform(), self.f.clone()) + } +} + +pub struct TransformMapErrNewTransformFuture +where + T: NewTransform, + F: Fn(T::Error) -> E, +{ + fut: T::Future, + f: F, +} + +impl TransformMapErrNewTransformFuture +where + T: NewTransform, + F: Fn(T::Error) -> E, +{ + fn new(fut: T::Future, f: F) -> Self { + TransformMapErrNewTransformFuture { f, fut } + } +} + +impl Future for TransformMapErrNewTransformFuture +where + T: NewTransform, + F: Fn(T::Error) -> E + Clone, +{ + type Item = TransformMapErr; + type Error = T::InitError; + + fn poll(&mut self) -> Poll { + if let Async::Ready(tr) = self.fut.poll()? { + Ok(Async::Ready(TransformMapErr::new(tr, self.f.clone()))) + } else { + Ok(Async::NotReady) + } + } +} diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index cde45be1..eef4f712 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.2.1] - 2019-02-xx + +### Changed + +* Convert `Timeout` and `InFlight` services to a transforms + + ## [0.2.0] - 2019-02-01 * Fix framed transport error handling diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index a7ef8ff1..ca652119 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "0.2.0" +version = "0.2.1" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] @@ -18,7 +18,8 @@ name = "actix_utils" path = "src/lib.rs" [dependencies] -actix-service = "0.2.0" +#actix-service = "0.2.0" +actix-service = { path="../actix-service" } actix-codec = "0.1.0" bytes = "0.4" futures = "0.1" diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs index ca4cce00..41a5c43a 100644 --- a/actix-utils/src/inflight.rs +++ b/actix-utils/src/inflight.rs @@ -1,5 +1,6 @@ -use actix_service::{IntoNewService, IntoService, NewService, Service}; -use futures::{try_ready, Async, Future, Poll}; +use actix_service::{NewTransform, Service, Transform}; +use futures::future::{ok, FutureResult}; +use futures::{Async, Future, Poll}; use super::counter::{Counter, CounterGuard}; @@ -7,98 +8,48 @@ use super::counter::{Counter, CounterGuard}; /// async requests. /// /// Default number of in-flight requests is 15 -pub struct InFlight { - factory: T, +pub struct InFlight { max_inflight: usize, } -impl InFlight { - pub fn new(factory: F) -> Self - where - T: NewService, - F: IntoNewService, - { - Self { - factory: factory.into_new_service(), - max_inflight: 15, - } - } - - /// Set max number of in-flight requests. - /// - /// By default max in-flight requests is 15. - pub fn max_inflight(mut self, max: usize) -> Self { - self.max_inflight = max; - self +impl InFlight { + pub fn new(max: usize) -> Self { + Self { max_inflight: max } } } -impl NewService for InFlight -where - T: NewService, -{ +impl Default for InFlight { + fn default() -> Self { + Self::new(15) + } +} + +impl NewTransform for InFlight { type Request = T::Request; type Response = T::Response; type Error = T::Error; - type InitError = T::InitError; - type Service = InFlightService; - type Future = InFlightResponseFuture; + type InitError = (); + type Transform = InFlightService; + type Future = FutureResult; - fn new_service(&self) -> Self::Future { - InFlightResponseFuture { - fut: self.factory.new_service(), - max_inflight: self.max_inflight, - } + fn new_transform(&self) -> Self::Future { + ok(InFlightService::new(self.max_inflight)) } } -pub struct InFlightResponseFuture { - fut: T::Future, - max_inflight: usize, -} - -impl Future for InFlightResponseFuture { - type Item = InFlightService; - type Error = T::InitError; - - fn poll(&mut self) -> Poll { - Ok(Async::Ready(InFlightService::with_max_inflight( - self.max_inflight, - try_ready!(self.fut.poll()), - ))) - } -} - -pub struct InFlightService { - service: T, +pub struct InFlightService { count: Counter, } -impl InFlightService { - pub fn new(service: F) -> Self - where - T: Service, - F: IntoService, - { +impl InFlightService { + pub fn new(max: usize) -> Self { Self { - service: service.into_service(), - count: Counter::new(15), - } - } - - pub fn with_max_inflight(max: usize, service: F) -> Self - where - T: Service, - F: IntoService, - { - Self { - service: service.into_service(), count: Counter::new(max), } } } -impl Service for InFlightService +impl Transform for InFlightService where T: Service, { @@ -108,17 +59,17 @@ where type Future = InFlightServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let res = self.service.poll_ready()?; - if res.is_ready() && !self.count.available() { + if !self.count.available() { log::trace!("InFlight limit exceeded"); return Ok(Async::NotReady); + } else { + return Ok(Async::Ready(())); } - Ok(res) } - fn call(&mut self, req: T::Request) -> Self::Future { + fn call(&mut self, req: T::Request, service: &mut T) -> Self::Future { InFlightServiceResponse { - fut: self.service.call(req), + fut: service.call(req), _guard: self.count.get(), } } @@ -138,3 +89,73 @@ impl Future for InFlightServiceResponse { self.fut.poll() } } + +#[cfg(test)] +mod tests { + use futures::future::lazy; + use futures::{Async, Poll}; + + use std::time::Duration; + + use super::*; + use actix_service::{Blank, BlankNewService, NewService, Service, ServiceExt}; + + struct SleepService(Duration); + + impl Service for SleepService { + type Request = (); + type Response = (); + type Error = (); + type Future = Box>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + Box::new(tokio_timer::sleep(self.0).map_err(|_| ())) + } + } + + #[test] + fn test_transform() { + let wait_time = Duration::from_millis(50); + let _ = actix_rt::System::new("test").block_on(lazy(|| { + let mut srv = Blank::new().apply(InFlightService::new(1), SleepService(wait_time)); + assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); + + let mut res = srv.call(()); + let _ = res.poll(); + assert_eq!(srv.poll_ready(), Ok(Async::NotReady)); + + drop(res); + assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); + + Ok::<_, ()>(()) + })); + } + + #[test] + fn test_newtransform() { + let wait_time = Duration::from_millis(50); + let _ = actix_rt::System::new("test").block_on(lazy(|| { + let srv = + BlankNewService::new().apply(InFlight::new(1), || Ok(SleepService(wait_time))); + + if let Async::Ready(mut srv) = srv.new_service().poll().unwrap() { + assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); + + let mut res = srv.call(()); + let _ = res.poll(); + assert_eq!(srv.poll_ready(), Ok(Async::NotReady)); + + drop(res); + assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); + } else { + panic!() + } + + Ok::<_, ()>(()) + })); + } +} diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 6846e7a8..c5d2ee65 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -5,15 +5,14 @@ use std::fmt; use std::time::Duration; -use actix_service::{NewService, Service}; -use futures::try_ready; +use actix_service::{NewTransform, Service, Transform}; +use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; use tokio_timer::{clock, Delay}; /// Applies a timeout to requests. -#[derive(Debug)] -pub struct Timeout { - inner: T, +#[derive(Debug, Clone)] +pub struct Timeout { timeout: Duration, } @@ -25,6 +24,12 @@ pub enum TimeoutError { Timeout, } +impl From for TimeoutError { + fn from(err: E) -> Self { + TimeoutError::Service(err) + } +} + impl fmt::Debug for TimeoutError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -34,107 +39,73 @@ impl fmt::Debug for TimeoutError { } } -impl Timeout { - pub fn new(timeout: Duration, inner: T) -> Self - where - T: NewService + Clone, - { - Timeout { inner, timeout } - } -} - -impl Clone for Timeout -where - T: Clone, -{ - fn clone(&self) -> Self { - Timeout { - inner: self.inner.clone(), - timeout: self.timeout, +impl PartialEq for TimeoutError { + fn eq(&self, other: &TimeoutError) -> bool { + match self { + TimeoutError::Service(e1) => match other { + TimeoutError::Service(e2) => e1 == e2, + TimeoutError::Timeout => false, + }, + TimeoutError::Timeout => match other { + TimeoutError::Service(_) => false, + TimeoutError::Timeout => true, + }, } } } -impl NewService for Timeout -where - T: NewService + Clone, -{ - type Request = T::Request; - type Response = T::Response; - type Error = TimeoutError; - type InitError = T::InitError; - type Service = TimeoutService; - type Future = TimeoutFut; - - fn new_service(&self) -> Self::Future { - TimeoutFut { - fut: self.inner.new_service(), - timeout: self.timeout, - } +impl Timeout { + pub fn new(timeout: Duration) -> Self { + Timeout { timeout } } } -/// `Timeout` response future -#[derive(Debug)] -pub struct TimeoutFut { - fut: T::Future, - timeout: Duration, -} - -impl Future for TimeoutFut +impl NewTransform for Timeout where - T: NewService, + S: Service, { - type Item = TimeoutService; - type Error = T::InitError; + type Request = S::Request; + type Response = S::Response; + type Error = TimeoutError; + type InitError = (); + type Transform = TimeoutService; + type Future = FutureResult; - fn poll(&mut self) -> Poll { - let service = try_ready!(self.fut.poll()); - Ok(Async::Ready(TimeoutService::new(self.timeout, service))) + fn new_transform(&self) -> Self::Future { + ok(TimeoutService { + timeout: self.timeout, + }) } } /// Applies a timeout to requests. -#[derive(Debug)] -pub struct TimeoutService { - inner: T, +#[derive(Debug, Clone)] +pub struct TimeoutService { timeout: Duration, } -impl TimeoutService { - pub fn new(timeout: Duration, inner: T) -> Self - where - T: Service, - { - TimeoutService { inner, timeout } +impl TimeoutService { + pub fn new(timeout: Duration) -> Self { + TimeoutService { timeout } } } -impl Clone for TimeoutService { - fn clone(&self) -> Self { - TimeoutService { - inner: self.inner.clone(), - timeout: self.timeout, - } - } -} - -impl Service for TimeoutService +impl Transform for TimeoutService where - T: Service, + S: Service, { - type Request = T::Request; - type Response = T::Response; - type Error = TimeoutError; - type Future = TimeoutServiceResponse; + type Request = S::Request; + type Response = S::Response; + type Error = TimeoutError; + type Future = TimeoutServiceResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(TimeoutError::Service) + Ok(Async::Ready(())) } - fn call(&mut self, request: T::Request) -> Self::Future { + fn call(&mut self, request: S::Request, service: &mut S) -> Self::Future { TimeoutServiceResponse { - fut: self.inner.call(request), + fut: service.call(request), sleep: Delay::new(clock::now() + self.timeout), } } @@ -170,3 +141,74 @@ where } } } + +#[cfg(test)] +mod tests { + use futures::future::lazy; + use futures::{Async, Poll}; + + use std::time::Duration; + + use super::*; + use actix_service::{Blank, BlankNewService, NewService, Service, ServiceExt}; + + struct SleepService(Duration); + + impl Service for SleepService { + type Request = (); + type Response = (); + type Error = (); + type Future = Box>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + Box::new(tokio_timer::sleep(self.0).map_err(|_| ())) + } + } + + #[test] + fn test_success() { + let resolution = Duration::from_millis(100); + let wait_time = Duration::from_millis(50); + + let res = actix_rt::System::new("test").block_on(lazy(|| { + let mut timeout = Blank::default() + .apply(TimeoutService::new(resolution), SleepService(wait_time)); + timeout.call(()) + })); + assert_eq!(res, Ok(())); + } + + #[test] + fn test_timeout() { + let resolution = Duration::from_millis(100); + let wait_time = Duration::from_millis(150); + + let res = actix_rt::System::new("test").block_on(lazy(|| { + let mut timeout = Blank::default() + .apply(TimeoutService::new(resolution), SleepService(wait_time)); + timeout.call(()) + })); + assert_eq!(res, Err(TimeoutError::Timeout)); + } + + #[test] + fn test_timeout_newservice() { + let resolution = Duration::from_millis(100); + let wait_time = Duration::from_millis(150); + + let res = actix_rt::System::new("test").block_on(lazy(|| { + let timeout = BlankNewService::default() + .apply(Timeout::new(resolution), || Ok(SleepService(wait_time))); + if let Async::Ready(mut to) = timeout.new_service().poll().unwrap() { + to.call(()) + } else { + panic!() + } + })); + assert_eq!(res, Err(TimeoutError::Timeout)); + } +}