1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 10:51:48 +01:00

service improvements (#233)

This commit is contained in:
Rob Ede 2020-12-27 14:15:42 +00:00 committed by GitHub
parent 33c9aa6988
commit 8a58a341a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 387 additions and 1179 deletions

View File

@ -1,29 +0,0 @@
name: Benchmark (Linux)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
check_benchmark:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
profile: minimal
override: true
- name: Check benchmark
uses: actions-rs/cargo@v1
with:
command: bench
args: --package=actix-service

View File

@ -75,9 +75,7 @@ impl<T: Address> Service<Connect<T>> for TcpConnector<T> {
#[allow(clippy::type_complexity)]
type Future = Either<TcpConnectorResponse<T>, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, req: Connect<T>) -> Self::Future {
let port = req.port();

View File

@ -110,9 +110,7 @@ impl<T: Address> Service<Connect<T>> for Resolver<T> {
Ready<Result<Connect<T>, Self::Error>>,
>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, mut req: Connect<T>) -> Self::Future {
if req.addr.is_some() {

View File

@ -94,9 +94,7 @@ impl<T: Address> Service<Connect<T>> for ConnectService<T> {
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, req: Connect<T>) -> Self::Future {
ConnectServiceResponse {
@ -163,9 +161,7 @@ impl<T: Address + 'static> Service<Connect<T>> for TcpConnectService<T> {
type Error = ConnectError;
type Future = TcpConnectServiceResponse<T>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, req: Connect<T>) -> Self::Future {
TcpConnectServiceResponse {

View File

@ -100,9 +100,7 @@ where
#[allow(clippy::type_complexity)]
type Future = Either<ConnectAsyncExt<T, U>, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
@ -220,9 +218,7 @@ impl<T: Address + 'static> Service for OpensslConnectService<T> {
type Error = ConnectError;
type Future = OpensslConnectServiceResponse<T>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, req: Connect<T>) -> Self::Future {
OpensslConnectServiceResponse {

View File

@ -96,9 +96,7 @@ where
type Error = std::io::Error;
type Future = ConnectAsyncExt<T, U>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());

View File

@ -3,10 +3,14 @@
## Unreleased - 2020-xx-xx
* `Service`, other traits, and many type signatures now take the the request type as a type
parameter instead of an associated type. [#232]
* Upgrade `pin-project` to `1.0`.
* Add `always_ready!` and `forward_ready!` macros. [#233]
* Crate is now `no_std`. [#233]
* Migrate pin projections to `pin-project-lite`. [#233]
* Remove `AndThenApplyFn` and Pipeline `and_then_apply_fn`. Use the
`.and_then(apply_fn(...))` construction. [#233]
[#232]: https://github.com/actix/actix-net/pull/232
[#233]: https://github.com/actix/actix-net/pull/233
## 1.0.6 - 2020-08-09

View File

@ -17,17 +17,10 @@ name = "actix_service"
path = "src/lib.rs"
[dependencies]
futures-util = "0.3.1"
pin-project = "1.0.0"
pin-project-lite = "0.2"
futures-util = { version = "0.3.7", default-features = false }
futures-core = { version = "0.3.7", default-features = false }
[dev-dependencies]
actix-rt = "1.0.0"
criterion = "0.3"
[[bench]]
name = "unsafecell_vs_refcell"
harness = false
[[bench]]
name = "and_then"
harness = false

View File

@ -1,334 +0,0 @@
use actix_service::boxed::BoxFuture;
use actix_service::IntoService;
use actix_service::Service;
/// Benchmark various implementations of and_then
use criterion::{criterion_main, Criterion};
use futures_util::future::join_all;
use futures_util::future::TryFutureExt;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{
cell::{RefCell, UnsafeCell},
marker::PhantomData,
};
/*
* Test services A,B for AndThen service implementations
*/
async fn svc1(_: ()) -> Result<usize, ()> {
Ok(1)
}
async fn svc2(req: usize) -> Result<usize, ()> {
Ok(req + 1)
}
/*
* AndThenUC - original AndThen service based on UnsafeCell
* Cut down version of actix_service::AndThenService based on actix-service::Cell
*/
struct AndThenUC<A, Req, B>(Rc<UnsafeCell<(A, B)>>, PhantomData<Req>);
impl<A, Req, B> AndThenUC<A, Req, B> {
fn new(a: A, b: B) -> Self
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
Self(Rc::new(UnsafeCell::new((a, b))), PhantomData)
}
}
impl<A, Req, B> Clone for AndThenUC<A, Req, B> {
fn clone(&self) -> Self {
Self(self.0.clone(), PhantomData)
}
}
impl<A, Req, B> Service<Req> for AndThenUC<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
type Response = B::Response;
type Error = A::Error;
type Future = AndThenServiceResponse<A, Req, B>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Req) -> Self::Future {
let fut = unsafe { &mut *(*self.0).get() }.0.call(req);
AndThenServiceResponse {
state: State::A(fut, Some(self.0.clone())),
_phantom: PhantomData,
}
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenServiceResponse<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
#[pin]
state: State<A, Req, B>,
_phantom: PhantomData<Req>,
}
#[pin_project::pin_project(project = StateProj)]
enum State<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Rc<UnsafeCell<(A, B)>>>),
B(#[pin] B::Future),
Empty(PhantomData<Req>),
}
impl<A, Req, B> Future for AndThenServiceResponse<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
type Output = Result<B::Response, A::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateProj::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let b = b.take().unwrap();
this.state.set(State::Empty(PhantomData)); // drop fut A
let fut = unsafe { &mut (*b.get()).1 }.call(res);
this.state.set(State::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(State::Empty(PhantomData));
r
}),
StateProj::Empty(_) => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
/*
* AndThenRC - AndThen service based on RefCell
*/
struct AndThenRC<A, Req, B>(Rc<RefCell<(A, B)>>, PhantomData<Req>);
impl<A, Req, B> AndThenRC<A, Req, B> {
fn new(a: A, b: B) -> Self
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
Self(Rc::new(RefCell::new((a, b))), PhantomData)
}
}
impl<A, Req, B> Clone for AndThenRC<A, Req, B> {
fn clone(&self) -> Self {
Self(self.0.clone(), PhantomData)
}
}
impl<A, Req, B> Service<Req> for AndThenRC<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
type Response = B::Response;
type Error = A::Error;
type Future = AndThenServiceResponseRC<A, Req, B>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Req) -> Self::Future {
let fut = self.0.borrow_mut().0.call(req);
AndThenServiceResponseRC {
state: StateRC::A(fut, Some(self.0.clone())),
}
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenServiceResponseRC<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
#[pin]
state: StateRC<A, Req, B>,
}
#[pin_project::pin_project(project = StateRCProj)]
enum StateRC<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Rc<RefCell<(A, B)>>>),
B(#[pin] B::Future),
Empty(PhantomData<Req>),
}
impl<A, Req, B> Future for AndThenServiceResponseRC<A, Req, B>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
type Output = Result<B::Response, A::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateRCProj::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let b = b.take().unwrap();
this.state.set(StateRC::Empty(PhantomData)); // drop fut A
let fut = b.borrow_mut().1.call(res);
this.state.set(StateRC::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateRCProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(StateRC::Empty(PhantomData));
r
}),
StateRCProj::Empty(_) => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
/*
* AndThenRCFuture - AndThen service based on RefCell
* and standard futures::future::and_then combinator in a Box
*/
struct AndThenRCFuture<A, Req, B>(Rc<RefCell<(A, B)>>, PhantomData<Req>);
impl<A, Req, B> AndThenRCFuture<A, Req, B> {
fn new(a: A, b: B) -> Self
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
Self(Rc::new(RefCell::new((a, b))), PhantomData)
}
}
impl<A, Req, B> Clone for AndThenRCFuture<A, Req, B> {
fn clone(&self) -> Self {
Self(self.0.clone(), PhantomData)
}
}
impl<A, Req, B> Service<Req> for AndThenRCFuture<A, Req, B>
where
A: Service<Req> + 'static,
A::Future: 'static,
B: Service<A::Response, Error = A::Error> + 'static,
B::Future: 'static,
{
type Response = B::Response;
type Error = A::Error;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Req) -> Self::Future {
let fut = self.0.borrow_mut().0.call(req);
let core = self.0.clone();
let fut2 = move |res| (*core).borrow_mut().1.call(res);
Box::pin(fut.and_then(fut2))
}
}
/// Criterion Benchmark for async Service
/// Should be used from within criterion group:
/// ```rust,ignore
/// let mut criterion: ::criterion::Criterion<_> =
/// ::criterion::Criterion::default().configure_from_args();
/// bench_async_service(&mut criterion, ok_service(), "async_service_direct");
/// ```
///
/// Usable for benching Service wrappers:
/// Using minimum service code implementation we first measure
/// time to run minimum service, then measure time with wrapper.
///
/// Sample output
/// async_service_direct time: [1.0908 us 1.1656 us 1.2613 us]
pub fn bench_async_service<S>(c: &mut Criterion, srv: S, name: &str)
where
S: Service<(), Response = usize, Error = ()> + Clone + 'static,
{
let mut rt = actix_rt::System::new("test");
// start benchmark loops
c.bench_function(name, move |b| {
b.iter_custom(|iters| {
let mut srvs: Vec<_> = (1..iters).map(|_| srv.clone()).collect();
// exclude request generation, it appears it takes significant time vs call (3us vs 1us)
let start = std::time::Instant::now();
// benchmark body
rt.block_on(async move { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await });
// check that at least first request succeeded
start.elapsed()
})
});
}
pub fn service_benches() {
let mut criterion: ::criterion::Criterion<_> =
::criterion::Criterion::default().configure_from_args();
bench_async_service(
&mut criterion,
AndThenUC::new(svc1.into_service(), svc2.into_service()),
"AndThen with UnsafeCell",
);
bench_async_service(
&mut criterion,
AndThenRC::new(svc1.into_service(), svc2.into_service()),
"AndThen with RefCell",
);
bench_async_service(
&mut criterion,
AndThenUC::new(svc1.into_service(), svc2.into_service()),
"AndThen with UnsafeCell",
);
bench_async_service(
&mut criterion,
AndThenRC::new(svc1.into_service(), svc2.into_service()),
"AndThen with RefCell",
);
bench_async_service(
&mut criterion,
AndThenRCFuture::new(svc1.into_service(), svc2.into_service()),
"AndThen with RefCell via future::and_then",
);
}
criterion_main!(service_benches);

View File

@ -1,110 +0,0 @@
use actix_service::Service;
use criterion::{criterion_main, Criterion};
use futures_util::future::join_all;
use futures_util::future::{ok, Ready};
use std::cell::{RefCell, UnsafeCell};
use std::rc::Rc;
use std::task::{Context, Poll};
struct SrvUC(Rc<UnsafeCell<usize>>);
impl Default for SrvUC {
fn default() -> Self {
Self(Rc::new(UnsafeCell::new(0)))
}
}
impl Clone for SrvUC {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl Service<()> for SrvUC {
type Response = usize;
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
unsafe { *(*self.0).get() = *(*self.0).get() + 1 };
ok(unsafe { *self.0.get() })
}
}
struct SrvRC(Rc<RefCell<usize>>);
impl Default for SrvRC {
fn default() -> Self {
Self(Rc::new(RefCell::new(0)))
}
}
impl Clone for SrvRC {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl Service<()> for SrvRC {
type Response = usize;
type Error = ();
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
let prev = *self.0.borrow();
*(*self.0).borrow_mut() = prev + 1;
ok(*self.0.borrow())
}
}
/// Criterion Benchmark for async Service
/// Should be used from within criterion group:
/// ```rust,ignore
/// let mut criterion: ::criterion::Criterion<_> =
/// ::criterion::Criterion::default().configure_from_args();
/// bench_async_service(&mut criterion, ok_service(), "async_service_direct");
/// ```
///
/// Usable for benching Service wrappers:
/// Using minimum service code implementation we first measure
/// time to run minimum service, then measure time with wrapper.
///
/// Sample output
/// async_service_direct time: [1.0908 us 1.1656 us 1.2613 us]
pub fn bench_async_service<S>(c: &mut Criterion, srv: S, name: &str)
where
S: Service<(), Response = usize, Error = ()> + Clone + 'static,
{
let mut rt = actix_rt::System::new("test");
// start benchmark loops
c.bench_function(name, move |b| {
b.iter_custom(|iters| {
let mut srvs: Vec<_> = (1..iters).map(|_| srv.clone()).collect();
// exclude request generation, it appears it takes significant time vs call (3us vs 1us)
let start = std::time::Instant::now();
// benchmark body
rt.block_on(async move { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await });
// check that at least first request succeeded
start.elapsed()
})
});
}
pub fn service_benches() {
let mut criterion: ::criterion::Criterion<_> =
::criterion::Criterion::default().configure_from_args();
bench_async_service(&mut criterion, SrvUC::default(), "Service with UnsafeCell");
bench_async_service(&mut criterion, SrvRC::default(), "Service with RefCell");
bench_async_service(&mut criterion, SrvUC::default(), "Service with UnsafeCell");
bench_async_service(&mut criterion, SrvRC::default(), "Service with RefCell");
}
criterion_main!(service_benches);

View File

@ -1,8 +1,13 @@
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{cell::RefCell, marker::PhantomData};
use alloc::rc::Rc;
use core::{
cell::RefCell,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use super::{Service, ServiceFactory};
@ -50,30 +55,43 @@ where
fn call(&mut self, req: Req) -> Self::Future {
AndThenServiceResponse {
state: State::A(self.0.borrow_mut().0.call(req), Some(self.0.clone())),
state: State::A {
fut: self.0.borrow_mut().0.call(req),
b: Some(self.0.clone()),
},
}
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenServiceResponse<A, B, Req>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
#[pin]
state: State<A, B, Req>,
pin_project! {
pub(crate) struct AndThenServiceResponse<A, B, Req>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
#[pin]
state: State<A, B, Req>,
}
}
#[pin_project::pin_project(project = StateProj)]
enum State<A, B, Req>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Rc<RefCell<(A, B)>>>),
B(#[pin] B::Future),
Empty,
pin_project! {
#[project = StateProj]
enum State<A, B, Req>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
{
A {
#[pin]
fut: A::Future,
b: Option<Rc<RefCell<(A, B)>>>,
},
B {
#[pin]
fut: B::Future,
},
Empty,
}
}
impl<A, B, Req> Future for AndThenServiceResponse<A, B, Req>
@ -87,17 +105,17 @@ where
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateProj::A(fut, b) => match fut.poll(cx)? {
StateProj::A { fut, b } => match fut.poll(cx)? {
Poll::Ready(res) => {
let b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.borrow_mut().1.call(res);
this.state.set(State::B(fut));
this.state.set(State::B { fut });
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateProj::B(fut) => fut.poll(cx).map(|r| {
StateProj::B { fut } => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
@ -191,19 +209,20 @@ where
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenServiceFactoryResponse<A, B, Req>
where
A: ServiceFactory<Req>,
B: ServiceFactory<A::Response>,
{
#[pin]
fut_a: A::Future,
#[pin]
fut_b: B::Future,
pin_project! {
pub(crate) struct AndThenServiceFactoryResponse<A, B, Req>
where
A: ServiceFactory<Req>,
B: ServiceFactory<A::Response>,
{
#[pin]
fut_a: A::Future,
#[pin]
fut_b: B::Future,
a: Option<A::Service>,
b: Option<B::Service>,
a: Option<A::Service>,
b: Option<B::Service>,
}
}
impl<A, B, Req> AndThenServiceFactoryResponse<A, B, Req>
@ -254,9 +273,11 @@ where
#[cfg(test)]
mod tests {
use std::cell::Cell;
use std::rc::Rc;
use std::task::{Context, Poll};
use alloc::rc::Rc;
use core::{
cell::Cell,
task::{Context, Poll},
};
use futures_util::future::{lazy, ok, ready, Ready};

View File

@ -1,334 +0,0 @@
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use crate::{Service, ServiceFactory};
/// `Apply` service combinator
pub(crate) struct AndThenApplyFn<S1, S2, F, Fut, Req, In, Res, Err>
where
S1: Service<Req>,
S2: Service<In>,
F: FnMut(S1::Response, &mut S2) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S1::Error> + From<S2::Error>,
{
svc: Rc<RefCell<(S1, S2, F)>>,
_phantom: PhantomData<(Fut, Req, In, Res, Err)>,
}
impl<S1, S2, F, Fut, Req, In, Res, Err> AndThenApplyFn<S1, S2, F, Fut, Req, In, Res, Err>
where
S1: Service<Req>,
S2: Service<In>,
F: FnMut(S1::Response, &mut S2) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S1::Error> + From<S2::Error>,
{
/// Create new `Apply` combinator
pub(crate) fn new(a: S1, b: S2, wrap_fn: F) -> Self {
Self {
svc: Rc::new(RefCell::new((a, b, wrap_fn))),
_phantom: PhantomData,
}
}
}
impl<S1, S2, F, Fut, Req, In, Res, Err> Clone
for AndThenApplyFn<S1, S2, F, Fut, Req, In, Res, Err>
where
S1: Service<Req>,
S2: Service<In>,
F: FnMut(S1::Response, &mut S2) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S1::Error> + From<S2::Error>,
{
fn clone(&self) -> Self {
AndThenApplyFn {
svc: self.svc.clone(),
_phantom: PhantomData,
}
}
}
impl<S1, S2, F, Fut, Req, In, Res, Err> Service<Req>
for AndThenApplyFn<S1, S2, F, Fut, Req, In, Res, Err>
where
S1: Service<Req>,
S2: Service<In>,
F: FnMut(S1::Response, &mut S2) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S1::Error> + From<S2::Error>,
{
type Response = Res;
type Error = Err;
type Future = AndThenApplyFnFuture<S1, S2, F, Fut, Req, In, Res, Err>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut inner = self.svc.borrow_mut();
let not_ready = inner.0.poll_ready(cx)?.is_pending();
if inner.1.poll_ready(cx)?.is_pending() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: Req) -> Self::Future {
let fut = self.svc.borrow_mut().0.call(req);
AndThenApplyFnFuture {
state: State::A(fut, Some(self.svc.clone())),
}
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenApplyFnFuture<S1, S2, F, Fut, Req, In, Res, Err>
where
S1: Service<Req>,
S2: Service<In>,
F: FnMut(S1::Response, &mut S2) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S1::Error> + From<S2::Error>,
{
#[pin]
state: State<S1, S2, F, Fut, Req, In, Res, Err>,
}
#[pin_project::pin_project(project = StateProj)]
enum State<S1, S2, F, Fut, Req, In, Res, Err>
where
S1: Service<Req>,
S2: Service<In>,
F: FnMut(S1::Response, &mut S2) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S1::Error> + From<S2::Error>,
{
A(#[pin] S1::Future, Option<Rc<RefCell<(S1, S2, F)>>>),
B(#[pin] Fut),
Empty(PhantomData<In>),
}
impl<S1, S2, F, Fut, Req, In, Res, Err> Future
for AndThenApplyFnFuture<S1, S2, F, Fut, Req, In, Res, Err>
where
S1: Service<Req>,
S2: Service<In>,
F: FnMut(S1::Response, &mut S2) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S1::Error> + From<S2::Error>,
{
type Output = Result<Res, Err>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateProj::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
let b = Option::take(b).unwrap();
this.state.set(State::Empty(PhantomData));
let (_, b, f) = &mut *b.borrow_mut();
let fut = f(res, b);
this.state.set(State::B(fut));
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateProj::B(fut) => fut.poll(cx).map(|r| {
this.state.set(State::Empty(PhantomData));
r
}),
StateProj::Empty(_) => {
panic!("future must not be polled after it returned `Poll::Ready`")
}
}
}
}
/// `AndThenApplyFn` service factory
pub(crate) struct AndThenApplyFnFactory<SF1, SF2, F, Fut, Req, In, Res, Err> {
srv: Rc<(SF1, SF2, F)>,
_phantom: PhantomData<(Fut, Req, In, Res, Err)>,
}
impl<SF1, SF2, F, Fut, Req, In, Res, Err>
AndThenApplyFnFactory<SF1, SF2, F, Fut, Req, In, Res, Err>
where
SF1: ServiceFactory<Req>,
SF2: ServiceFactory<In, Config = SF1::Config, InitError = SF1::InitError>,
F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<SF1::Error> + From<SF2::Error>,
{
/// Create new `ApplyNewService` new service instance
pub(crate) fn new(a: SF1, b: SF2, wrap_fn: F) -> Self {
Self {
srv: Rc::new((a, b, wrap_fn)),
_phantom: PhantomData,
}
}
}
impl<SF1, SF2, F, Fut, Req, In, Res, Err> Clone
for AndThenApplyFnFactory<SF1, SF2, F, Fut, Req, In, Res, Err>
{
fn clone(&self) -> Self {
Self {
srv: self.srv.clone(),
_phantom: PhantomData,
}
}
}
impl<SF1, SF2, F, Fut, Req, In, Res, Err> ServiceFactory<Req>
for AndThenApplyFnFactory<SF1, SF2, F, Fut, Req, In, Res, Err>
where
SF1: ServiceFactory<Req>,
SF1::Config: Clone,
SF2: ServiceFactory<In, Config = SF1::Config, InitError = SF1::InitError>,
F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<SF1::Error> + From<SF2::Error>,
{
type Response = Res;
type Error = Err;
type Service = AndThenApplyFn<SF1::Service, SF2::Service, F, Fut, Req, In, Res, Err>;
type Config = SF1::Config;
type InitError = SF1::InitError;
type Future = AndThenApplyFnFactoryResponse<SF1, SF2, F, Fut, Req, In, Res, Err>;
fn new_service(&self, cfg: SF1::Config) -> Self::Future {
let srv = &*self.srv;
AndThenApplyFnFactoryResponse {
s1: None,
s2: None,
wrap_fn: srv.2.clone(),
fut_s1: srv.0.new_service(cfg.clone()),
fut_s2: srv.1.new_service(cfg),
_phantom: PhantomData,
}
}
}
#[pin_project::pin_project]
pub(crate) struct AndThenApplyFnFactoryResponse<SF1, SF2, F, Fut, Req, In, Res, Err>
where
SF1: ServiceFactory<Req>,
SF2: ServiceFactory<In, Config = SF1::Config, InitError = SF1::InitError>,
F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<SF1::Error>,
Err: From<SF2::Error>,
{
#[pin]
fut_s1: SF1::Future,
#[pin]
fut_s2: SF2::Future,
wrap_fn: F,
s1: Option<SF1::Service>,
s2: Option<SF2::Service>,
_phantom: PhantomData<In>,
}
impl<SF1, SF2, F, Fut, Req, In, Res, Err> Future
for AndThenApplyFnFactoryResponse<SF1, SF2, F, Fut, Req, In, Res, Err>
where
SF1: ServiceFactory<Req>,
SF2: ServiceFactory<In, Config = SF1::Config, InitError = SF1::InitError>,
F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<SF1::Error> + From<SF2::Error>,
{
type Output = Result<
AndThenApplyFn<SF1::Service, SF2::Service, F, Fut, Req, In, Res, Err>,
SF1::InitError,
>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.s1.is_none() {
if let Poll::Ready(service) = this.fut_s1.poll(cx)? {
*this.s1 = Some(service);
}
}
if this.s2.is_none() {
if let Poll::Ready(service) = this.fut_s2.poll(cx)? {
*this.s2 = Some(service);
}
}
if this.s1.is_some() && this.s2.is_some() {
Poll::Ready(Ok(AndThenApplyFn {
svc: Rc::new(RefCell::new((
Option::take(this.s1).unwrap(),
Option::take(this.s2).unwrap(),
this.wrap_fn.clone(),
))),
_phantom: PhantomData,
}))
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::{lazy, ok, Ready, TryFutureExt};
use crate::{fn_service, pipeline, pipeline_factory, Service, ServiceFactory};
#[derive(Clone)]
struct Srv;
impl Service<u8> for Srv {
type Response = ();
type Error = ();
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: u8) -> Self::Future {
let _ = req;
ok(())
}
}
#[actix_rt::test]
async fn test_service() {
let mut srv = pipeline(ok).and_then_apply_fn(Srv, |req: &'static str, s| {
s.call(1).map_ok(move |res| (req, res))
});
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert!(res.is_ready());
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv", ()));
}
#[actix_rt::test]
async fn test_service_factory() {
let new_srv = pipeline_factory(|| ok::<_, ()>(fn_service(ok))).and_then_apply_fn(
|| ok(Srv),
|req: &'static str, s| s.call(1).map_ok(move |res| (req, res)),
);
let mut srv = new_srv.new_service(()).await.unwrap();
let res = lazy(|cx| srv.poll_ready(cx)).await;
assert!(res.is_ready());
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv", ()));
}
}

View File

@ -1,11 +1,12 @@
use std::{
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures_util::ready;
use futures_core::ready;
use pin_project_lite::pin_project;
use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
@ -94,9 +95,7 @@ where
type Error = Err;
type Future = Fut;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(ready!(self.service.poll_ready(cx)))
}
crate::forward_ready!(service);
fn call(&mut self, req: Req) -> Self::Future {
(self.wrap_fn)(req, &mut self.service)
@ -162,17 +161,18 @@ where
}
}
#[pin_project::pin_project]
pub struct ApplyServiceFactoryResponse<SF, F, Fut, Req, In, Res, Err>
where
SF: ServiceFactory<In, Error = Err>,
F: FnMut(Req, &mut SF::Service) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
#[pin]
fut: SF::Future,
wrap_fn: Option<F>,
_phantom: PhantomData<(Req, Res)>,
pin_project! {
pub struct ApplyServiceFactoryResponse<SF, F, Fut, Req, In, Res, Err>
where
SF: ServiceFactory<In, Error = Err>,
F: FnMut(Req, &mut SF::Service) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
{
#[pin]
fut: SF::Future,
wrap_fn: Option<F>,
_phantom: PhantomData<(Req, Res)>,
}
}
impl<SF, F, Fut, Req, In, Res, Err> ApplyServiceFactoryResponse<SF, F, Fut, Req, In, Res, Err>
@ -203,13 +203,13 @@ where
let this = self.project();
let svc = ready!(this.fut.poll(cx))?;
Poll::Ready(Ok(Apply::new(svc, Option::take(this.wrap_fn).unwrap())))
Poll::Ready(Ok(Apply::new(svc, this.wrap_fn.take().unwrap())))
}
}
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use core::task::Poll;
use futures_util::future::{lazy, ok, Ready};
@ -224,9 +224,7 @@ mod tests {
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
crate::always_ready!();
fn call(&mut self, _: ()) -> Self::Future {
ok(())

View File

@ -1,9 +1,13 @@
use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use alloc::rc::Rc;
use core::{
cell::RefCell,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use crate::{Service, ServiceFactory};
@ -156,37 +160,42 @@ where
ApplyConfigServiceFactoryResponse {
cfg: Some(cfg),
store: self.srv.clone(),
state: State::A(self.srv.borrow().0.new_service(())),
state: State::A {
fut: self.srv.borrow().0.new_service(()),
},
}
}
}
#[pin_project::pin_project]
struct ApplyConfigServiceFactoryResponse<SF, Req, F, Cfg, Fut, S>
where
SF: ServiceFactory<Req, Config = ()>,
SF::InitError: From<SF::Error>,
F: FnMut(Cfg, &mut SF::Service) -> Fut,
Fut: Future<Output = Result<S, SF::InitError>>,
S: Service<Req>,
{
cfg: Option<Cfg>,
store: Rc<RefCell<(SF, F)>>,
#[pin]
state: State<SF, Fut, S, Req>,
pin_project! {
struct ApplyConfigServiceFactoryResponse<SF, Req, F, Cfg, Fut, S>
where
SF: ServiceFactory<Req, Config = ()>,
SF::InitError: From<SF::Error>,
F: FnMut(Cfg, &mut SF::Service) -> Fut,
Fut: Future<Output = Result<S, SF::InitError>>,
S: Service<Req>,
{
cfg: Option<Cfg>,
store: Rc<RefCell<(SF, F)>>,
#[pin]
state: State<SF, Fut, S, Req>,
}
}
#[pin_project::pin_project(project = StateProj)]
enum State<SF, Fut, S, Req>
where
SF: ServiceFactory<Req, Config = ()>,
SF::InitError: From<SF::Error>,
Fut: Future<Output = Result<S, SF::InitError>>,
S: Service<Req>,
{
A(#[pin] SF::Future),
B(SF::Service),
C(#[pin] Fut),
pin_project! {
#[project = StateProj]
enum State<SF, Fut, S, Req>
where
SF: ServiceFactory<Req, Config = ()>,
SF::InitError: From<SF::Error>,
Fut: Future<Output = Result<S, SF::InitError>>,
S: Service<Req>,
{
A { #[pin] fut: SF::Future },
B { svc: SF::Service },
C { #[pin] fut: Fut },
}
}
impl<SF, Req, F, Cfg, Fut, S> Future
@ -204,25 +213,25 @@ where
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateProj::A(fut) => match fut.poll(cx)? {
StateProj::A { fut } => match fut.poll(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(srv) => {
this.state.set(State::B(srv));
Poll::Ready(svc) => {
this.state.set(State::B { svc });
self.poll(cx)
}
},
StateProj::B(srv) => match srv.poll_ready(cx)? {
StateProj::B { svc } => match svc.poll_ready(cx)? {
Poll::Ready(_) => {
{
let (_, f) = &mut *this.store.borrow_mut();
let fut = f(this.cfg.take().unwrap(), srv);
this.state.set(State::C(fut));
let fut = f(this.cfg.take().unwrap(), svc);
this.state.set(State::C { fut });
}
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateProj::C(fut) => fut.poll(cx),
StateProj::C { fut } => fut.poll(cx),
}
}
}

View File

@ -1,6 +1,10 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, marker::PhantomData};
use alloc::boxed::Box;
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures_util::future::FutureExt;
@ -28,7 +32,7 @@ where
{
BoxServiceFactory(Box::new(FactoryWrapper {
factory,
_t: std::marker::PhantomData,
_t: PhantomData,
}))
}
@ -75,12 +79,9 @@ where
}
}
struct FactoryWrapper<SF, Req, C>
where
SF: ServiceFactory<Req>,
{
struct FactoryWrapper<SF, Req, Cfg> {
factory: SF,
_t: PhantomData<(C, Req)>,
_t: PhantomData<(Req, Cfg)>,
}
impl<SF, Req, Cfg, Res, Err, InitErr> ServiceFactory<Req> for FactoryWrapper<SF, Req, Cfg>

View File

@ -1,6 +1,4 @@
use std::future::Future;
use std::marker::PhantomData;
use std::task::{Context, Poll};
use core::{future::Future, marker::PhantomData, task::Poll};
use futures_util::future::{ok, Ready};
@ -143,9 +141,7 @@ where
type Error = Err;
type Future = Fut;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
crate::always_ready!();
fn call(&mut self, req: Req) -> Self::Future {
(self.f)(req)
@ -200,9 +196,7 @@ where
type Error = Err;
type Future = Fut;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
crate::always_ready!();
fn call(&mut self, req: Req) -> Self::Future {
(self.f)(req)
@ -361,7 +355,7 @@ where
#[cfg(test)]
mod tests {
use std::task::Poll;
use core::task::Poll;
use futures_util::future::{lazy, ok};

View File

@ -1,18 +1,21 @@
//! See [`Service`] docs for information on this crate's foundational trait.
#![no_std]
#![deny(rust_2018_idioms, nonstandard_style)]
#![allow(clippy::type_complexity)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::cell::RefCell;
use std::future::Future;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{self, Context, Poll};
extern crate alloc;
use alloc::{boxed::Box, rc::Rc, sync::Arc};
use core::{
cell::RefCell,
future::Future,
task::{self, Context, Poll},
};
mod and_then;
mod and_then_apply_fn;
mod apply;
mod apply_cfg;
pub mod boxed;
@ -359,3 +362,27 @@ pub mod dev {
pub use crate::transform::ApplyTransform;
pub use crate::transform_err::TransformMapInitErr;
}
#[macro_export]
macro_rules! always_ready {
() => {
fn poll_ready(
&mut self,
_: &mut ::core::task::Context<'_>,
) -> ::core::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
};
}
#[macro_export]
macro_rules! forward_ready {
($field:ident) => {
fn poll_ready(
&mut self,
cx: &mut ::core::task::Context<'_>,
) -> ::core::task::Poll<Result<(), Self::Error>> {
self.$field.poll_ready(cx)
}
};
}

View File

@ -1,7 +1,11 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use super::{Service, ServiceFactory};
@ -52,24 +56,23 @@ where
type Error = A::Error;
type Future = MapFuture<A, F, Req, Res>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx)
}
crate::forward_ready!(service);
fn call(&mut self, req: Req) -> Self::Future {
MapFuture::new(self.service.call(req), self.f.clone())
}
}
#[pin_project::pin_project]
pub struct MapFuture<A, F, Req, Res>
where
A: Service<Req>,
F: FnMut(A::Response) -> Res,
{
f: F,
#[pin]
fut: A::Future,
pin_project! {
pub struct MapFuture<A, F, Req, Res>
where
A: Service<Req>,
F: FnMut(A::Response) -> Res,
{
f: F,
#[pin]
fut: A::Future,
}
}
impl<A, F, Req, Res> MapFuture<A, F, Req, Res>
@ -154,15 +157,16 @@ where
}
}
#[pin_project::pin_project]
pub struct MapServiceFuture<A, F, Req, Res>
where
A: ServiceFactory<Req>,
F: FnMut(A::Response) -> Res,
{
#[pin]
fut: A::Future,
f: Option<F>,
pin_project! {
pub struct MapServiceFuture<A, F, Req, Res>
where
A: ServiceFactory<Req>,
F: FnMut(A::Response) -> Res,
{
#[pin]
fut: A::Future,
f: Option<F>,
}
}
impl<A, F, Req, Res> MapServiceFuture<A, F, Req, Res>
@ -207,9 +211,7 @@ mod tests {
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
crate::always_ready!();
fn call(&mut self, _: ()) -> Self::Future {
ok(())

View File

@ -1,4 +1,4 @@
use std::marker::PhantomData;
use core::marker::PhantomData;
use super::{IntoServiceFactory, ServiceFactory};

View File

@ -1,7 +1,11 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use super::{Service, ServiceFactory};
@ -62,15 +66,16 @@ where
}
}
#[pin_project::pin_project]
pub struct MapErrFuture<A, Req, F, E>
where
A: Service<Req>,
F: Fn(A::Error) -> E,
{
f: F,
#[pin]
fut: A::Future,
pin_project! {
pub struct MapErrFuture<A, Req, F, E>
where
A: Service<Req>,
F: Fn(A::Error) -> E,
{
f: F,
#[pin]
fut: A::Future,
}
}
impl<A, Req, F, E> MapErrFuture<A, Req, F, E>
@ -157,15 +162,16 @@ where
}
}
#[pin_project::pin_project]
pub struct MapErrServiceFuture<A, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E,
{
#[pin]
fut: A::Future,
f: F,
pin_project! {
pub struct MapErrServiceFuture<A, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E,
{
#[pin]
fut: A::Future,
f: F,
}
}
impl<A, Req, F, E> MapErrServiceFuture<A, Req, F, E>

View File

@ -1,7 +1,11 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use super::ServiceFactory;
@ -59,15 +63,16 @@ where
}
}
#[pin_project::pin_project]
pub struct MapInitErrFuture<A, F, Req, E>
where
A: ServiceFactory<Req>,
F: Fn(A::InitError) -> E,
{
f: F,
#[pin]
fut: A::Future,
pin_project! {
pub struct MapInitErrFuture<A, F, Req, E>
where
A: ServiceFactory<Req>,
F: Fn(A::InitError) -> E,
{
f: F,
#[pin]
fut: A::Future,
}
}
impl<A, F, Req, E> MapInitErrFuture<A, F, Req, E>

View File

@ -1,8 +1,9 @@
use std::task::{Context, Poll};
use std::{future::Future, marker::PhantomData};
use core::{
marker::PhantomData,
task::{Context, Poll},
};
use crate::and_then::{AndThenService, AndThenServiceFactory};
use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory};
use crate::map::{Map, MapServiceFactory};
use crate::map_err::{MapErr, MapErrServiceFactory};
use crate::map_init_err::MapInitErr;
@ -67,28 +68,6 @@ where
}
}
/// Apply function to specified service and use it as a next service in chain.
///
/// Short version of `pipeline_factory(...).and_then(apply_fn(...))`
pub fn and_then_apply_fn<I, S1, F, Fut, In, Res, Err>(
self,
service: I,
wrap_fn: F,
) -> Pipeline<impl Service<Req, Response = Res, Error = Err> + Clone, Req>
where
Self: Sized,
I: IntoService<S1, In>,
S1: Service<In>,
F: FnMut(S::Response, &mut S1) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<S::Error> + From<S1::Error>,
{
Pipeline {
service: AndThenApplyFn::new(self.service, service.into_service(), wrap_fn),
_phantom: PhantomData,
}
}
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `U`.
///
@ -219,39 +198,6 @@ where
}
}
/// Apply function to specified service and use it as a next service in chain.
///
/// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))`
pub fn and_then_apply_fn<I, SF1, Fut, F, In, Res, Err>(
self,
factory: I,
wrap_fn: F,
) -> PipelineFactory<
impl ServiceFactory<
Req,
Response = Res,
Error = Err,
Config = SF::Config,
InitError = SF::InitError,
Service = impl Service<Req, Response = Res, Error = Err> + Clone,
> + Clone,
Req,
>
where
Self: Sized,
SF::Config: Clone,
I: IntoServiceFactory<SF1, In>,
SF1: ServiceFactory<In, Config = SF::Config, InitError = SF::InitError>,
F: FnMut(SF::Response, &mut SF1::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>,
Err: From<SF::Error> + From<SF1::Error>,
{
PipelineFactory {
factory: AndThenApplyFnFactory::new(self.factory, factory.into_factory(), wrap_fn),
_phantom: PhantomData,
}
}
/// Create `NewService` to chain on a computation for when a call to the
/// service finished, passing the result of the call to the next
/// service `U`.

View File

@ -1,8 +1,13 @@
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{cell::RefCell, marker::PhantomData};
use alloc::rc::Rc;
use core::{
cell::RefCell,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use super::{Service, ServiceFactory};
@ -50,30 +55,36 @@ where
fn call(&mut self, req: Req) -> Self::Future {
ThenServiceResponse {
state: State::A(self.0.borrow_mut().0.call(req), Some(self.0.clone())),
state: State::A {
fut: self.0.borrow_mut().0.call(req),
b: Some(self.0.clone()),
},
}
}
}
#[pin_project::pin_project]
pub(crate) struct ThenServiceResponse<A, B, Req>
where
A: Service<Req>,
B: Service<Result<A::Response, A::Error>>,
{
#[pin]
state: State<A, B, Req>,
pin_project! {
pub(crate) struct ThenServiceResponse<A, B, Req>
where
A: Service<Req>,
B: Service<Result<A::Response, A::Error>>,
{
#[pin]
state: State<A, B, Req>,
}
}
#[pin_project::pin_project(project = StateProj)]
enum State<A, B, Req>
where
A: Service<Req>,
B: Service<Result<A::Response, A::Error>>,
{
A(#[pin] A::Future, Option<Rc<RefCell<(A, B)>>>),
B(#[pin] B::Future),
Empty,
pin_project! {
#[project = StateProj]
enum State<A, B, Req>
where
A: Service<Req>,
B: Service<Result<A::Response, A::Error>>,
{
A { #[pin] fut: A::Future, b: Option<Rc<RefCell<(A, B)>>> },
B { #[pin] fut: B::Future },
Empty,
}
}
impl<A, B, Req> Future for ThenServiceResponse<A, B, Req>
@ -87,17 +98,17 @@ where
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
StateProj::A(fut, b) => match fut.poll(cx) {
StateProj::A { fut, b } => match fut.poll(cx) {
Poll::Ready(res) => {
let b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.borrow_mut().1.call(res);
this.state.set(State::B(fut));
this.state.set(State::B { fut });
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
StateProj::B(fut) => fut.poll(cx).map(|r| {
StateProj::B { fut } => fut.poll(cx).map(|r| {
this.state.set(State::Empty);
r
}),
@ -159,23 +170,24 @@ impl<A, B, Req> Clone for ThenServiceFactory<A, B, Req> {
}
}
#[pin_project::pin_project]
pub(crate) struct ThenServiceFactoryResponse<A, B, Req>
where
A: ServiceFactory<Req>,
B: ServiceFactory<
Result<A::Response, A::Error>,
Config = A::Config,
Error = A::Error,
InitError = A::InitError,
>,
{
#[pin]
fut_b: B::Future,
#[pin]
fut_a: A::Future,
a: Option<A::Service>,
b: Option<B::Service>,
pin_project! {
pub(crate) struct ThenServiceFactoryResponse<A, B, Req>
where
A: ServiceFactory<Req>,
B: ServiceFactory<
Result<A::Response, A::Error>,
Config = A::Config,
Error = A::Error,
InitError = A::InitError,
>,
{
#[pin]
fut_b: B::Future,
#[pin]
fut_a: A::Future,
a: Option<A::Service>,
b: Option<B::Service>,
}
}
impl<A, B, Req> ThenServiceFactoryResponse<A, B, Req>
@ -236,9 +248,11 @@ where
#[cfg(test)]
mod tests {
use std::cell::Cell;
use std::rc::Rc;
use std::task::{Context, Poll};
use alloc::rc::Rc;
use core::{
cell::Cell,
task::{Context, Poll},
};
use futures_util::future::{err, lazy, ok, ready, Ready};

View File

@ -1,8 +1,12 @@
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{future::Future, marker::PhantomData};
use alloc::{rc::Rc, sync::Arc};
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use crate::transform_err::TransformMapInitErr;
use crate::{IntoServiceFactory, Service, ServiceFactory};
@ -185,30 +189,35 @@ where
fn new_service(&self, cfg: S::Config) -> Self::Future {
ApplyTransformFuture {
store: self.0.clone(),
state: ApplyTransformFutureState::A(self.0.as_ref().1.new_service(cfg)),
state: ApplyTransformFutureState::A {
fut: self.0.as_ref().1.new_service(cfg),
},
}
}
}
#[pin_project::pin_project]
pub struct ApplyTransformFuture<T, S, Req>
where
S: ServiceFactory<Req>,
T: Transform<S::Service, Req, InitError = S::InitError>,
{
store: Rc<(T, S)>,
#[pin]
state: ApplyTransformFutureState<T, S, Req>,
pin_project! {
pub struct ApplyTransformFuture<T, S, Req>
where
S: ServiceFactory<Req>,
T: Transform<S::Service, Req, InitError = S::InitError>,
{
store: Rc<(T, S)>,
#[pin]
state: ApplyTransformFutureState<T, S, Req>,
}
}
#[pin_project::pin_project(project = ApplyTransformFutureStateProj)]
pub enum ApplyTransformFutureState<T, S, Req>
where
S: ServiceFactory<Req>,
T: Transform<S::Service, Req, InitError = S::InitError>,
{
A(#[pin] S::Future),
B(#[pin] T::Future),
pin_project! {
#[project = ApplyTransformFutureStateProj]
pub enum ApplyTransformFutureState<T, S, Req>
where
S: ServiceFactory<Req>,
T: Transform<S::Service, Req, InitError = S::InitError>,
{
A { #[pin] fut: S::Future },
B { #[pin] fut: T::Future },
}
}
impl<T, S, Req> Future for ApplyTransformFuture<T, S, Req>
@ -222,15 +231,15 @@ where
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
ApplyTransformFutureStateProj::A(fut) => match fut.poll(cx)? {
ApplyTransformFutureStateProj::A { fut } => match fut.poll(cx)? {
Poll::Ready(srv) => {
let fut = this.store.0.new_transform(srv);
this.state.set(ApplyTransformFutureState::B(fut));
this.state.set(ApplyTransformFutureState::B { fut });
self.poll(cx)
}
Poll::Pending => Poll::Pending,
},
ApplyTransformFutureStateProj::B(fut) => fut.poll(cx),
ApplyTransformFutureStateProj::B { fut } => fut.poll(cx),
}
}
}

View File

@ -1,7 +1,11 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
use super::Transform;
@ -63,15 +67,16 @@ where
}
}
#[pin_project::pin_project]
pub struct TransformMapInitErrFuture<T, S, F, E, Req>
where
pin_project! {
pub struct TransformMapInitErrFuture<T, S, F, E, Req>
where
T: Transform<S, Req>,
F: Fn(T::InitError) -> E,
{
#[pin]
fut: T::Future,
f: F,
{
#[pin]
fut: T::Future,
f: F,
}
}
impl<T, S, F, E, Req> Future for TransformMapInitErrFuture<T, S, F, E, Req>

View File

@ -4,8 +4,7 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::marker::PhantomData;
use std::task::{Context, Poll};
use core::marker::PhantomData;
use actix_service::{
apply, dev::ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform,
@ -36,9 +35,7 @@ where
type Error = S::Error;
type Future = Either<S::Future, Instrumented<S::Future>>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(ctx)
}
actix_service::forward_ready!(inner);
fn call(&mut self, req: Req) -> Self::Future {
let span = (self.make_span)(&req);

View File

@ -201,7 +201,7 @@ where
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use std::task::Poll;
use std::time::Duration;
use super::*;
@ -215,9 +215,7 @@ mod tests {
type Error = ();
type Future = LocalBoxFuture<'static, Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
actix_service::always_ready!();
fn call(&mut self, _: ()) -> Self::Future {
actix_rt::time::delay_for(self.0)