use std::cell::RefCell; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use crate::{Service, ServiceFactory}; /// `Apply` service combinator pub(crate) struct AndThenApplyFn where S1: Service, S2: Service, F: FnMut(S1::Response, &mut S2) -> Fut, Fut: Future>, Err: From + From, { svc: Rc>, _phantom: PhantomData<(Fut, Req, In, Res, Err)>, } impl AndThenApplyFn where S1: Service, S2: Service, F: FnMut(S1::Response, &mut S2) -> Fut, Fut: Future>, Err: From + From, { /// Create new `Apply` combinator pub(crate) fn new(a: S1, b: S2, wrap_fn: F) -> Self { Self { svc: Rc::new(RefCell::new((a, b, wrap_fn))), _phantom: PhantomData, } } } impl Clone for AndThenApplyFn where S1: Service, S2: Service, F: FnMut(S1::Response, &mut S2) -> Fut, Fut: Future>, Err: From + From, { fn clone(&self) -> Self { AndThenApplyFn { svc: self.svc.clone(), _phantom: PhantomData, } } } impl Service for AndThenApplyFn where S1: Service, S2: Service, F: FnMut(S1::Response, &mut S2) -> Fut, Fut: Future>, Err: From + From, { type Response = Res; type Error = Err; type Future = AndThenApplyFnFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let mut inner = self.svc.borrow_mut(); let not_ready = inner.0.poll_ready(cx)?.is_pending(); if inner.1.poll_ready(cx)?.is_pending() || not_ready { Poll::Pending } else { Poll::Ready(Ok(())) } } fn call(&mut self, req: Req) -> Self::Future { let fut = self.svc.borrow_mut().0.call(req); AndThenApplyFnFuture { state: State::A(fut, Some(self.svc.clone())), } } } #[pin_project::pin_project] pub(crate) struct AndThenApplyFnFuture where S1: Service, S2: Service, F: FnMut(S1::Response, &mut S2) -> Fut, Fut: Future>, Err: From + From, { #[pin] state: State, } #[pin_project::pin_project(project = StateProj)] enum State where S1: Service, S2: Service, F: FnMut(S1::Response, &mut S2) -> Fut, Fut: Future>, Err: From + From, { A(#[pin] S1::Future, Option>>), B(#[pin] Fut), Empty(PhantomData), } impl Future for AndThenApplyFnFuture where S1: Service, S2: Service, F: FnMut(S1::Response, &mut S2) -> Fut, Fut: Future>, Err: From + From, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); match this.state.as_mut().project() { StateProj::A(fut, b) => match fut.poll(cx)? { Poll::Ready(res) => { let b = Option::take(b).unwrap(); this.state.set(State::Empty(PhantomData)); let (_, b, f) = &mut *b.borrow_mut(); let fut = f(res, b); this.state.set(State::B(fut)); self.poll(cx) } Poll::Pending => Poll::Pending, }, StateProj::B(fut) => fut.poll(cx).map(|r| { this.state.set(State::Empty(PhantomData)); r }), StateProj::Empty(_) => { panic!("future must not be polled after it returned `Poll::Ready`") } } } } /// `AndThenApplyFn` service factory pub(crate) struct AndThenApplyFnFactory { srv: Rc<(SF1, SF2, F)>, _phantom: PhantomData<(Fut, Req, In, Res, Err)>, } impl AndThenApplyFnFactory where SF1: ServiceFactory, SF2: ServiceFactory, F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, Fut: Future>, Err: From + From, { /// Create new `ApplyNewService` new service instance pub(crate) fn new(a: SF1, b: SF2, wrap_fn: F) -> Self { Self { srv: Rc::new((a, b, wrap_fn)), _phantom: PhantomData, } } } impl Clone for AndThenApplyFnFactory { fn clone(&self) -> Self { Self { srv: self.srv.clone(), _phantom: PhantomData, } } } impl ServiceFactory for AndThenApplyFnFactory where SF1: ServiceFactory, SF1::Config: Clone, SF2: ServiceFactory, F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, Fut: Future>, Err: From + From, { type Response = Res; type Error = Err; type Service = AndThenApplyFn; type Config = SF1::Config; type InitError = SF1::InitError; type Future = AndThenApplyFnFactoryResponse; fn new_service(&self, cfg: SF1::Config) -> Self::Future { let srv = &*self.srv; AndThenApplyFnFactoryResponse { s1: None, s2: None, wrap_fn: srv.2.clone(), fut_s1: srv.0.new_service(cfg.clone()), fut_s2: srv.1.new_service(cfg), _phantom: PhantomData, } } } #[pin_project::pin_project] pub(crate) struct AndThenApplyFnFactoryResponse where SF1: ServiceFactory, SF2: ServiceFactory, F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, Fut: Future>, Err: From, Err: From, { #[pin] fut_s1: SF1::Future, #[pin] fut_s2: SF2::Future, wrap_fn: F, s1: Option, s2: Option, _phantom: PhantomData, } impl Future for AndThenApplyFnFactoryResponse where SF1: ServiceFactory, SF2: ServiceFactory, F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, Fut: Future>, Err: From + From, { type Output = Result< AndThenApplyFn, SF1::InitError, >; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if this.s1.is_none() { if let Poll::Ready(service) = this.fut_s1.poll(cx)? { *this.s1 = Some(service); } } if this.s2.is_none() { if let Poll::Ready(service) = this.fut_s2.poll(cx)? { *this.s2 = Some(service); } } if this.s1.is_some() && this.s2.is_some() { Poll::Ready(Ok(AndThenApplyFn { svc: Rc::new(RefCell::new(( Option::take(this.s1).unwrap(), Option::take(this.s2).unwrap(), this.wrap_fn.clone(), ))), _phantom: PhantomData, })) } else { Poll::Pending } } } #[cfg(test)] mod tests { use super::*; use futures_util::future::{lazy, ok, Ready, TryFutureExt}; use crate::{fn_service, pipeline, pipeline_factory, Service, ServiceFactory}; #[derive(Clone)] struct Srv; impl Service for Srv { type Response = (); type Error = (); type Future = Ready>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: u8) -> Self::Future { let _ = req; ok(()) } } #[actix_rt::test] async fn test_service() { let mut srv = pipeline(ok).and_then_apply_fn(Srv, |req: &'static str, s| { s.call(1).map_ok(move |res| (req, res)) }); let res = lazy(|cx| srv.poll_ready(cx)).await; assert!(res.is_ready()); let res = srv.call("srv").await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv", ())); } #[actix_rt::test] async fn test_service_factory() { let new_srv = pipeline_factory(|| ok::<_, ()>(fn_service(ok))).and_then_apply_fn( || ok(Srv), |req: &'static str, s| s.call(1).map_ok(move |res| (req, res)), ); let mut srv = new_srv.new_service(()).await.unwrap(); let res = lazy(|cx| srv.poll_ready(cx)).await; assert!(res.is_ready()); let res = srv.call("srv").await; assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv", ())); } }