use std::marker::PhantomData; use futures::{try_ready, Async, Future, IntoFuture, Poll}; use super::{IntoNewService, IntoService, NewService, Service}; use crate::cell::Cell; /// `Apply` service combinator pub struct AndThenApply where A: Service, B: Service, F: FnMut(A::Response, &mut B) -> Out, Out: IntoFuture, Out::Error: Into, { a: A, b: Cell, f: Cell, r: PhantomData<(Out,)>, } impl AndThenApply where A: Service, B: Service, F: FnMut(A::Response, &mut B) -> Out, Out: IntoFuture, Out::Error: Into, { /// Create new `Apply` combinator pub fn new, B1: IntoService>(a: A1, b: B1, f: F) -> Self { Self { f: Cell::new(f), a: a.into_service(), b: Cell::new(b.into_service()), r: PhantomData, } } } impl Clone for AndThenApply where A: Service + Clone, B: Service, F: FnMut(A::Response, &mut B) -> Out, Out: IntoFuture, Out::Error: Into, { fn clone(&self) -> Self { AndThenApply { a: self.a.clone(), b: self.b.clone(), f: self.f.clone(), r: PhantomData, } } } impl Service for AndThenApply where A: Service, B: Service, F: FnMut(A::Response, &mut B) -> Out, Out: IntoFuture, Out::Error: Into, { type Request = A::Request; type Response = Out::Item; type Error = A::Error; type Future = AndThenApplyFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { try_ready!(self.a.poll_ready()); self.b.get_mut().poll_ready() } fn call(&mut self, req: A::Request) -> Self::Future { AndThenApplyFuture { b: self.b.clone(), f: self.f.clone(), fut_b: None, fut_a: Some(self.a.call(req)), } } } pub struct AndThenApplyFuture where A: Service, B: Service, F: FnMut(A::Response, &mut B) -> Out, Out: IntoFuture, Out::Error: Into, { b: Cell, f: Cell, fut_a: Option, fut_b: Option, } impl Future for AndThenApplyFuture where A: Service, B: Service, F: FnMut(A::Response, &mut B) -> Out, Out: IntoFuture, Out::Error: Into, { type Item = Out::Item; type Error = A::Error; fn poll(&mut self) -> Poll { if let Some(ref mut fut) = self.fut_b { return fut.poll().map_err(|e| e.into()); } match self.fut_a.as_mut().expect("Bug in actix-service").poll() { Ok(Async::Ready(resp)) => { let _ = self.fut_a.take(); self.fut_b = Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future()); self.poll() } Ok(Async::NotReady) => Ok(Async::NotReady), Err(err) => Err(err), } } } /// `ApplyNewService` new service combinator pub struct AndThenApplyNewService { a: A, b: B, f: Cell, r: PhantomData<(Out)>, } impl AndThenApplyNewService where A: NewService, B: NewService, F: FnMut(A::Response, &mut B::Service) -> Out, Out: IntoFuture, Out::Error: Into, { /// Create new `ApplyNewService` new service instance pub fn new, B1: IntoNewService>(a: A1, b: B1, f: F) -> Self { Self { f: Cell::new(f), a: a.into_new_service(), b: b.into_new_service(), r: PhantomData, } } } impl Clone for AndThenApplyNewService where A: Clone, B: Clone, { fn clone(&self) -> Self { Self { a: self.a.clone(), b: self.b.clone(), f: self.f.clone(), r: PhantomData, } } } impl NewService for AndThenApplyNewService where A: NewService, B: NewService, F: FnMut(A::Response, &mut B::Service) -> Out, Out: IntoFuture, Out::Error: Into, { type Request = A::Request; type Response = Out::Item; type Error = A::Error; type InitError = A::InitError; type Service = AndThenApply; type Future = AndThenApplyNewServiceFuture; fn new_service(&self) -> Self::Future { AndThenApplyNewServiceFuture { a: None, b: None, f: self.f.clone(), fut_a: self.a.new_service(), fut_b: self.b.new_service(), } } } pub struct AndThenApplyNewServiceFuture where A: NewService, B: NewService, F: FnMut(A::Response, &mut B::Service) -> Out, Out: IntoFuture, Out::Error: Into, { fut_b: B::Future, fut_a: A::Future, f: Cell, a: Option, b: Option, } impl Future for AndThenApplyNewServiceFuture where A: NewService, B: NewService, F: FnMut(A::Response, &mut B::Service) -> Out, Out: IntoFuture, Out::Error: Into, { type Item = AndThenApply; type Error = A::InitError; fn poll(&mut self) -> Poll { if self.a.is_none() { if let Async::Ready(service) = self.fut_a.poll()? { self.a = Some(service); } } if self.b.is_none() { if let Async::Ready(service) = self.fut_b.poll()? { self.b = Some(service); } } if self.a.is_some() && self.b.is_some() { Ok(Async::Ready(AndThenApply { f: self.f.clone(), a: self.a.take().unwrap(), b: Cell::new(self.b.take().unwrap()), r: PhantomData, })) } else { Ok(Async::NotReady) } } } #[cfg(test)] mod tests { use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; use crate::{Blank, BlankNewService, NewService, Service, ServiceExt}; #[derive(Clone)] struct Srv; impl Service for Srv { type Request = (); type Response = (); type Error = (); type Future = FutureResult<(), ()>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, _: ()) -> Self::Future { ok(()) } } #[test] fn test_call() { let mut srv = Blank::new().apply_fn(Srv, |req: &'static str, srv| { srv.call(()).map(move |res| (req, res)) }); assert!(srv.poll_ready().is_ok()); let res = srv.call("srv").poll(); assert!(res.is_ok()); assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); } #[test] fn test_new_service() { let new_srv = BlankNewService::new_unit().apply_fn( || Ok(Srv), |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), ); if let Async::Ready(mut srv) = new_srv.new_service().poll().unwrap() { assert!(srv.poll_ready().is_ok()); let res = srv.call("srv").poll(); assert!(res.is_ok()); assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); } else { panic!() } } }