diff --git a/src/service/and_then.rs b/src/service/and_then.rs index f2665637..187ab167 100644 --- a/src/service/and_then.rs +++ b/src/service/and_then.rs @@ -52,11 +52,8 @@ where type Future = AndThenFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - match self.a.poll_ready() { - Ok(Async::Ready(_)) => self.b.borrow_mut().poll_ready(), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), - } + let _ = try_ready!(self.a.poll_ready()); + self.b.borrow_mut().poll_ready() } fn call(&mut self, req: Self::Request) -> Self::Future { diff --git a/src/service/mod.rs b/src/service/mod.rs index 852d87b8..0b486f39 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -10,6 +10,7 @@ mod from_err; mod map; mod map_err; mod map_init_err; +mod then; pub use self::and_then::{AndThen, AndThenNewService}; pub use self::apply::{Apply, ApplyNewService}; @@ -18,6 +19,7 @@ pub use self::from_err::{FromErr, FromErrNewService}; pub use self::map::{Map, MapNewService}; pub use self::map_err::{MapErr, MapErrNewService}; pub use self::map_init_err::MapInitErr; +pub use self::then::{Then, ThenNewService}; /// An extension trait for `Service`s that provides a variety of convenient /// adapters @@ -69,6 +71,19 @@ pub trait ServiceExt: Service { FromErr::new(self) } + /// Chain on a computation for when a call to the service finished, + /// passing the result of the call to the next service `B`. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + fn then(self, service: B) -> Then + where + Self: Sized, + B: Service, Error = Self::Error>, + { + Then::new(self, service) + } + /// Map this service's output to a different type, returning a new service /// of the resulting type. /// @@ -131,7 +146,8 @@ pub trait NewServiceExt: NewService { AndThenNewService::new(self, new_service) } - /// Map this service's error and new service's init error to any error + /// `NewService` that create service to map this service's error + /// and new service's init error to any error /// implementing `From` for this service`s `Error`. /// /// Note that this function consumes the receiving new service and returns a @@ -144,6 +160,25 @@ pub trait NewServiceExt: NewService { FromErrNewService::new(self) } + /// Create `NewService` to chain on a computation for when a call to the + /// service finished, passing the result of the call to the next + /// service `B`. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + fn then(self, new_service: F) -> ThenNewService + where + Self: Sized, + F: IntoNewService, + B: NewService< + Request = Result, + Error = Self::Error, + InitError = Self::InitError, + >, + { + ThenNewService::new(self, new_service) + } + fn map(self, f: F) -> MapNewService where Self: Sized, diff --git a/src/service/then.rs b/src/service/then.rs new file mode 100644 index 00000000..d31e807d --- /dev/null +++ b/src/service/then.rs @@ -0,0 +1,330 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use futures::{Async, Future, Poll}; + +use super::{IntoNewService, NewService, Service}; + +/// Service for the `then` combinator, chaining a computation onto the end of +/// another service. +/// +/// This is created by the `ServiceExt::then` method. +pub struct Then { + a: A, + b: Rc>, +} + +impl Then +where + A: Service, + B: Service, Error = A::Error>, +{ + /// Create new `Then` combinator + pub fn new(a: A, b: B) -> Then { + Then { + a, + b: Rc::new(RefCell::new(b)), + } + } +} + +impl Clone for Then +where + A: Service + Clone, + B: Service, Error = A::Error>, +{ + fn clone(&self) -> Self { + Then { + a: self.a.clone(), + b: self.b.clone(), + } + } +} + +impl Service for Then +where + A: Service, + B: Service, Error = A::Error>, +{ + type Request = A::Request; + type Response = B::Response; + type Error = B::Error; + type Future = ThenFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + let _ = try_ready!(self.a.poll_ready()); + self.b.borrow_mut().poll_ready() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + ThenFuture::new(self.a.call(req), self.b.clone()) + } +} + +pub struct ThenFuture +where + A: Service, + B: Service>, +{ + b: Rc>, + fut_b: Option, + fut_a: A::Future, +} + +impl ThenFuture +where + A: Service, + B: Service>, +{ + fn new(fut_a: A::Future, b: Rc>) -> Self { + ThenFuture { + b, + fut_a, + fut_b: None, + } + } +} + +impl Future for ThenFuture +where + A: Service, + B: Service>, +{ + type Item = B::Response; + type Error = B::Error; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut_b { + return fut.poll(); + } + + match self.fut_a.poll() { + Ok(Async::Ready(resp)) => { + self.fut_b = Some(self.b.borrow_mut().call(Ok(resp))); + self.poll() + } + Err(err) => { + self.fut_b = Some(self.b.borrow_mut().call(Err(err))); + self.poll() + } + Ok(Async::NotReady) => Ok(Async::NotReady), + } + } +} + +/// `ThenNewService` new service combinator +pub struct ThenNewService { + a: A, + b: B, +} + +impl ThenNewService +where + A: NewService, + B: NewService, +{ + /// Create new `AndThen` combinator + pub fn new>(a: A, f: F) -> Self { + Self { + a, + b: f.into_new_service(), + } + } +} + +impl NewService for ThenNewService +where + A: NewService, + B: NewService< + Request = Result, + Error = A::Error, + InitError = A::InitError, + >, +{ + type Request = A::Request; + type Response = B::Response; + type Error = A::Error; + type Service = Then; + + type InitError = A::InitError; + type Future = ThenNewServiceFuture; + + fn new_service(&self) -> Self::Future { + ThenNewServiceFuture::new(self.a.new_service(), self.b.new_service()) + } +} + +impl Clone for ThenNewService +where + A: NewService + Clone, + B: NewService< + Request = Result, + Error = A::Error, + InitError = A::InitError, + > + Clone, +{ + fn clone(&self) -> Self { + Self { + a: self.a.clone(), + b: self.b.clone(), + } + } +} + +pub struct ThenNewServiceFuture +where + A: NewService, + B: NewService, +{ + fut_b: B::Future, + fut_a: A::Future, + a: Option, + b: Option, +} + +impl ThenNewServiceFuture +where + A: NewService, + B: NewService, +{ + fn new(fut_a: A::Future, fut_b: B::Future) -> Self { + ThenNewServiceFuture { + fut_a, + fut_b, + a: None, + b: None, + } + } +} + +impl Future for ThenNewServiceFuture +where + A: NewService, + B: NewService< + Request = Result, + Error = A::Error, + InitError = A::InitError, + >, +{ + type Item = Then; + type Error = A::InitError; + + fn poll(&mut self) -> Poll { + if self.a.is_none() { + if let Async::Ready(service) = self.fut_a.poll()? { + self.a = Some(service); + } + } + + if self.b.is_none() { + if let Async::Ready(service) = self.fut_b.poll()? { + self.b = Some(service); + } + } + + if self.a.is_some() && self.b.is_some() { + Ok(Async::Ready(Then::new( + self.a.take().unwrap(), + self.b.take().unwrap(), + ))) + } else { + Ok(Async::NotReady) + } + } +} + +#[cfg(test)] +mod tests { + use futures::future::{err, ok, FutureResult}; + use futures::{Async, Poll}; + use std::cell::Cell; + use std::rc::Rc; + + use super::*; + use service::{NewServiceExt, ServiceExt}; + + struct Srv1(Rc>); + impl Service for Srv1 { + type Request = Result<&'static str, &'static str>; + type Response = &'static str; + type Error = (); + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.set(self.0.get() + 1); + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + match req { + Ok(msg) => ok(msg), + Err(_) => err(()), + } + } + } + + #[derive(Clone)] + struct Srv2(Rc>); + + impl Service for Srv2 { + type Request = Result<&'static str, ()>; + type Response = (&'static str, &'static str); + type Error = (); + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.set(self.0.get() + 1); + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + match req { + Ok(msg) => ok((msg, "ok")), + Err(()) => ok(("srv2", "err")), + } + } + } + + #[test] + fn test_poll_ready() { + let cnt = Rc::new(Cell::new(0)); + let mut srv = Srv1(cnt.clone()).then(Srv2(cnt.clone())); + let res = srv.poll_ready(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(())); + assert_eq!(cnt.get(), 2); + } + + #[test] + fn test_call() { + let cnt = Rc::new(Cell::new(0)); + let mut srv = Srv1(cnt.clone()).then(Srv2(cnt)); + + let res = srv.call(Ok("srv1")).poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok"))); + + let res = srv.call(Err("srv")).poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv2", "err"))); + } + + #[test] + fn test_new_service() { + let cnt = Rc::new(Cell::new(0)); + let cnt2 = cnt.clone(); + let blank = move || Ok::<_, ()>(Srv1(cnt2.clone())); + let new_srv = blank.into_new_service().then(move || Ok(Srv2(cnt.clone()))); + if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() { + let res = srv.call(Ok("srv1")).poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok"))); + + let res = srv.call(Err("srv")).poll(); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), Async::Ready(("srv2", "err"))); + } else { + panic!() + } + } +}