1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-30 18:44:36 +01:00

optimize service combinators memory layout

This commit is contained in:
Nikolay Kim 2019-12-05 12:37:26 +06:00
parent c6eb318536
commit 6f41b80cb4
24 changed files with 366 additions and 399 deletions

View File

@ -33,7 +33,7 @@ openssl = ["open-ssl", "tokio-openssl"]
uri = ["http"] uri = ["http"]
[dependencies] [dependencies]
actix-service = "1.0.0-alpha.2" actix-service = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.2" actix-codec = "0.2.0-alpha.2"
actix-utils = "1.0.0-alpha.2" actix-utils = "1.0.0-alpha.2"
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2"

View File

@ -18,7 +18,7 @@ name = "actix_ioframe"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "1.0.0-alpha.2" actix-service = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.2" actix-codec = "0.2.0-alpha.2"
actix-utils = "1.0.0-alpha.2" actix-utils = "1.0.0-alpha.2"
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2"

View File

@ -21,7 +21,7 @@ path = "src/lib.rs"
default = [] default = []
[dependencies] [dependencies]
actix-service = "1.0.0-alpha.2" actix-service = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2"
actix-codec = "0.2.0-alpha.2" actix-codec = "0.2.0-alpha.2"
actix-utils = "1.0.0-alpha.2" actix-utils = "1.0.0-alpha.2"

View File

@ -2,17 +2,24 @@
## [1.0.0-alpha.3] - 2019-12-xx ## [1.0.0-alpha.3] - 2019-12-xx
### Add missing Clone impls ### Changed
### Restore `Transform::map_init_err()` combinator * Add missing Clone impls
* Restore `Transform::map_init_err()` combinator
* Restore `Service/Factory::apply_fn()` in form of `Pipeline/Factory::and_then_apply_fn()`
* Optimize service combinators and futures memory layout
### Restore `Service/Factory::apply_fn()` in form of `Pipeline/Factory::and_then_apply_fn()`
## [1.0.0-alpha.2] - 2019-12-02 ## [1.0.0-alpha.2] - 2019-12-02
### Use owned config value for service factory ### Changed
### Renamed BoxedNewService/BoxedService to BoxServiceFactory/BoxService * Use owned config value for service factory
* Renamed BoxedNewService/BoxedService to BoxServiceFactory/BoxService
## [1.0.0-alpha.1] - 2019-11-25 ## [1.0.0-alpha.1] - 2019-11-25

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "1.0.0-alpha.2" version = "1.0.0-alpha.3"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service" description = "Actix Service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -23,8 +23,8 @@ name = "actix_service"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
futures = "0.3.1" futures-util = "0.3.1"
pin-project-lite = "0.1.1" pin-project = "0.4.6"
[dev-dependencies] [dev-dependencies]
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2"

View File

@ -9,10 +9,7 @@ use crate::cell::Cell;
/// of another service which completes successfully. /// of another service which completes successfully.
/// ///
/// This is created by the `ServiceExt::and_then` method. /// This is created by the `ServiceExt::and_then` method.
pub struct AndThenService<A, B> { pub struct AndThenService<A, B>(Cell<(A, B)>);
a: A,
b: Cell<B>,
}
impl<A, B> AndThenService<A, B> { impl<A, B> AndThenService<A, B> {
/// Create new `AndThen` combinator /// Create new `AndThen` combinator
@ -21,19 +18,13 @@ impl<A, B> AndThenService<A, B> {
A: Service, A: Service,
B: Service<Request = A::Response, Error = A::Error>, B: Service<Request = A::Response, Error = A::Error>,
{ {
Self { a, b: Cell::new(b) } Self(Cell::new((a, b)))
} }
} }
impl<A, B> Clone for AndThenService<A, B> impl<A, B> Clone for AndThenService<A, B> {
where
A: Clone,
{
fn clone(&self) -> Self { fn clone(&self) -> Self {
AndThenService { AndThenService(self.0.clone())
a: self.a.clone(),
b: self.b.clone(),
}
} }
} }
@ -48,8 +39,10 @@ where
type Future = AndThenServiceResponse<A, B>; type Future = AndThenServiceResponse<A, B>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.a.poll_ready(cx)?.is_ready(); let srv = self.0.get_mut();
if !self.b.get_mut().poll_ready(cx)?.is_ready() || not_ready {
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending Poll::Pending
} else { } else {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@ -57,36 +50,30 @@ where
} }
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
AndThenServiceResponse::new(self.a.call(req), self.b.clone()) AndThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), self.0.clone()),
}
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct AndThenServiceResponse<A, B> pub struct AndThenServiceResponse<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
b: Cell<B>,
#[pin]
fut_b: Option<B::Future>,
#[pin]
fut_a: Option<A::Future>,
}
}
impl<A, B> AndThenServiceResponse<A, B>
where where
A: Service, A: Service,
B: Service<Request = A::Response, Error = A::Error>, B: Service<Request = A::Response, Error = A::Error>,
{ {
fn new(a: A::Future, b: Cell<B>) -> Self { #[pin]
AndThenServiceResponse { state: State<A, B>,
b, }
fut_a: Some(a),
fut_b: None, #[pin_project::pin_project]
} enum State<A, B>
} where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Cell<(A, B)>),
B(#[pin] B::Future),
} }
impl<A, B> Future for AndThenServiceResponse<A, B> impl<A, B> Future for AndThenServiceResponse<A, B>
@ -96,28 +83,21 @@ where
{ {
type Output = Result<B::Response, A::Error>; type Output = Result<B::Response, A::Error>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
loop { #[project]
if let Some(fut) = this.fut_b.as_pin_mut() { match this.state.as_mut().project() {
return fut.poll(cx); State::A(fut, b) => match fut.poll(cx)? {
} Poll::Ready(res) => {
let fut = b.get_mut().1.call(res);
match this this.state.set(State::B(fut));
.fut_a self.poll(cx)
.as_pin_mut()
.expect("Bug in actix-service")
.poll(cx)
{
Poll::Ready(Ok(resp)) => {
this = self.as_mut().project();
this.fut_a.set(None);
this.fut_b.set(Some(this.b.get_mut().call(resp)));
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
} }
Poll::Pending => Poll::Pending,
},
State::B(fut) => fut.poll(cx),
} }
} }
} }
@ -202,20 +182,19 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct AndThenServiceFactoryResponse<A, B> pub struct AndThenServiceFactoryResponse<A, B>
where where
A: ServiceFactory, A: ServiceFactory,
B: ServiceFactory<Request = A::Response>, B: ServiceFactory<Request = A::Response>,
{ {
#[pin]
fut_b: B::Future,
#[pin] #[pin]
fut_a: A::Future, fut_a: A::Future,
#[pin]
fut_b: B::Future,
a: Option<A::Service>, a: Option<A::Service>,
b: Option<B::Service>, b: Option<B::Service>,
}
} }
impl<A, B> AndThenServiceFactoryResponse<A, B> impl<A, B> AndThenServiceFactoryResponse<A, B>
@ -270,7 +249,7 @@ mod tests {
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::future::{lazy, ok, ready, Ready}; use futures_util::future::{lazy, ok, ready, Ready};
use crate::{factory_fn, pipeline, pipeline_factory, Service, ServiceFactory}; use crate::{factory_fn, pipeline, pipeline_factory, Service, ServiceFactory};

View File

@ -3,8 +3,6 @@ use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::ready;
use crate::cell::Cell; use crate::cell::Cell;
use crate::{Service, ServiceFactory}; use crate::{Service, ServiceFactory};
@ -18,8 +16,7 @@ where
Err: From<A::Error> + From<B::Error>, Err: From<A::Error> + From<B::Error>,
{ {
a: A, a: A,
b: Cell<B>, b: Cell<(B, F)>,
f: Cell<F>,
r: PhantomData<(Fut, Res, Err)>, r: PhantomData<(Fut, Res, Err)>,
} }
@ -35,8 +32,7 @@ where
pub(crate) fn new(a: A, b: B, f: F) -> Self { pub(crate) fn new(a: A, b: B, f: F) -> Self {
Self { Self {
a, a,
f: Cell::new(f), b: Cell::new((b, f)),
b: Cell::new(b),
r: PhantomData, r: PhantomData,
} }
} }
@ -54,7 +50,6 @@ where
AndThenApplyFn { AndThenApplyFn {
a: self.a.clone(), a: self.a.clone(),
b: self.b.clone(), b: self.b.clone(),
f: self.f.clone(),
r: PhantomData, r: PhantomData,
} }
} }
@ -75,7 +70,7 @@ where
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = self.a.poll_ready(cx)?.is_pending(); let not_ready = self.a.poll_ready(cx)?.is_pending();
if self.b.get_mut().poll_ready(cx)?.is_pending() || not_ready { if self.b.get_mut().0.poll_ready(cx)?.is_pending() || not_ready {
Poll::Pending Poll::Pending
} else { } else {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@ -84,31 +79,37 @@ where
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
AndThenApplyFnFuture { AndThenApplyFnFuture {
b: self.b.clone(), state: State::A(self.a.call(req), self.b.clone()),
f: self.f.clone(),
fut_a: Some(self.a.call(req)),
fut_b: None,
} }
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct AndThenApplyFnFuture<A, B, F, Fut, Res, Err> pub struct AndThenApplyFnFuture<A, B, F, Fut, Res, Err>
where where
A: Service, A: Service,
B: Service, B: Service,
F: FnMut(A::Response, &mut B) -> Fut, F: FnMut(A::Response, &mut B) -> Fut,
Fut: Future<Output = Result<Res, Err>>, Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error>, Err: From<A::Error>,
Err: From<B::Error>, Err: From<B::Error>,
{ {
b: Cell<B>,
f: Cell<F>,
#[pin] #[pin]
fut_a: Option<A::Future>, state: State<A, B, F, Fut, Res, Err>,
#[pin] }
fut_b: Option<Fut>,
} #[pin_project::pin_project]
enum State<A, B, F, Fut, Res, Err>
where
A: Service,
B: Service,
F: FnMut(A::Response, &mut B) -> Fut,
Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error>,
Err: From<B::Error>,
{
A(#[pin] A::Future, Cell<(B, F)>),
B(#[pin] Fut),
} }
impl<A, B, F, Fut, Res, Err> Future for AndThenApplyFnFuture<A, B, F, Fut, Res, Err> impl<A, B, F, Fut, Res, Err> Future for AndThenApplyFnFuture<A, B, F, Fut, Res, Err>
@ -121,28 +122,22 @@ where
{ {
type Output = Result<Res, Err>; type Output = Result<Res, Err>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
if let Some(fut) = this.fut_b.as_pin_mut() { #[project]
return Poll::Ready(ready!(fut.poll(cx)).map_err(|e| e.into())); match this.state.as_mut().project() {
} State::A(fut, b) => match fut.poll(cx)? {
Poll::Ready(res) => {
match this let b = b.get_mut();
.fut_a let fut = (&mut b.1)(res, &mut b.0);
.as_pin_mut() this.state.set(State::B(fut));
.expect("Bug in actix-service")
.poll(cx)
{
Poll::Ready(Ok(resp)) => {
this = self.as_mut().project();
this.fut_b
.set(Some((&mut *this.f.get_mut())(resp, this.b.get_mut())));
this.fut_a.set(None);
self.poll(cx) self.poll(cx)
} }
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), },
State::B(fut) => fut.poll(cx),
} }
} }
} }
@ -151,7 +146,7 @@ where
pub struct AndThenApplyFnFactory<A, B, F, Fut, Res, Err> { pub struct AndThenApplyFnFactory<A, B, F, Fut, Res, Err> {
a: A, a: A,
b: B, b: B,
f: Cell<F>, f: F,
r: PhantomData<(Fut, Res, Err)>, r: PhantomData<(Fut, Res, Err)>,
} }
@ -159,7 +154,7 @@ impl<A, B, F, Fut, Res, Err> AndThenApplyFnFactory<A, B, F, Fut, Res, Err>
where where
A: ServiceFactory, A: ServiceFactory,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>, B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut, F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>, Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>, Err: From<A::Error> + From<B::Error>,
{ {
@ -168,7 +163,7 @@ where
Self { Self {
a: a, a: a,
b: b, b: b,
f: Cell::new(f), f: f,
r: PhantomData, r: PhantomData,
} }
} }
@ -178,6 +173,7 @@ impl<A, B, F, Fut, Res, Err> Clone for AndThenApplyFnFactory<A, B, F, Fut, Res,
where where
A: Clone, A: Clone,
B: Clone, B: Clone,
F: Clone,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
@ -194,7 +190,7 @@ where
A: ServiceFactory, A: ServiceFactory,
A::Config: Clone, A::Config: Clone,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>, B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut, F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>, Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>, Err: From<A::Error> + From<B::Error>,
{ {
@ -217,31 +213,30 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err> pub struct AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err>
where where
A: ServiceFactory, A: ServiceFactory,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>, B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut, F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>, Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error>, Err: From<A::Error>,
Err: From<B::Error>, Err: From<B::Error>,
{ {
#[pin] #[pin]
fut_b: B::Future, fut_b: B::Future,
#[pin] #[pin]
fut_a: A::Future, fut_a: A::Future,
f: Cell<F>, f: F,
a: Option<A::Service>, a: Option<A::Service>,
b: Option<B::Service>, b: Option<B::Service>,
}
} }
impl<A, B, F, Fut, Res, Err> Future for AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err> impl<A, B, F, Fut, Res, Err> Future for AndThenApplyFnFactoryResponse<A, B, F, Fut, Res, Err>
where where
A: ServiceFactory, A: ServiceFactory,
B: ServiceFactory<Config = A::Config, InitError = A::InitError>, B: ServiceFactory<Config = A::Config, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Fut, F: FnMut(A::Response, &mut B::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>, Fut: Future<Output = Result<Res, Err>>,
Err: From<A::Error> + From<B::Error>, Err: From<A::Error> + From<B::Error>,
{ {
@ -265,9 +260,8 @@ where
if this.a.is_some() && this.b.is_some() { if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(AndThenApplyFn { Poll::Ready(Ok(AndThenApplyFn {
f: this.f.clone(),
a: this.a.take().unwrap(), a: this.a.take().unwrap(),
b: Cell::new(this.b.take().unwrap()), b: Cell::new((this.b.take().unwrap(), this.f.clone())),
r: PhantomData, r: PhantomData,
})) }))
} else { } else {
@ -280,7 +274,7 @@ where
mod tests { mod tests {
use super::*; use super::*;
use futures::future::{lazy, ok, Ready, TryFutureExt}; use futures_util::future::{lazy, ok, Ready, TryFutureExt};
use crate::{pipeline, pipeline_factory, service_fn2, Service, ServiceFactory}; use crate::{pipeline, pipeline_factory, service_fn2, Service, ServiceFactory};

View File

@ -67,8 +67,8 @@ where
type Error = Err; type Error = Err;
type Future = R; type Future = R;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(futures::ready!(self.service.poll_ready(ctx))) Poll::Ready(futures_util::ready!(self.service.poll_ready(cx)))
} }
fn call(&mut self, req: In) -> Self::Future { fn call(&mut self, req: In) -> Self::Future {
@ -139,24 +139,23 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct ApplyServiceFactoryResponse<T, F, R, In, Out, Err> pub struct ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
where where
T: ServiceFactory<Error = Err>, T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R, F: FnMut(In, &mut T::Service) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
#[pin] #[pin]
fut: T::Future, fut: T::Future,
f: Option<F>, f: Option<F>,
r: PhantomData<(In, Out)>, r: PhantomData<(In, Out)>,
}
} }
impl<T, F, R, In, Out, Err> ApplyServiceFactoryResponse<T, F, R, In, Out, Err> impl<T, F, R, In, Out, Err> ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
where where
T: ServiceFactory<Error = Err>, T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R + Clone, F: FnMut(In, &mut T::Service) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
fn new(fut: T::Future, f: F) -> Self { fn new(fut: T::Future, f: F) -> Self {
@ -171,7 +170,7 @@ where
impl<T, F, R, In, Out, Err> Future for ApplyServiceFactoryResponse<T, F, R, In, Out, Err> impl<T, F, R, In, Out, Err> Future for ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
where where
T: ServiceFactory<Error = Err>, T: ServiceFactory<Error = Err>,
F: FnMut(In, &mut T::Service) -> R + Clone, F: FnMut(In, &mut T::Service) -> R,
R: Future<Output = Result<Out, Err>>, R: Future<Output = Result<Out, Err>>,
{ {
type Output = Result<Apply<T::Service, F, R, In, Out, Err>, T::InitError>; type Output = Result<Apply<T::Service, F, R, In, Out, Err>, T::InitError>;
@ -191,7 +190,7 @@ where
mod tests { mod tests {
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::future::{lazy, ok, Ready}; use futures_util::future::{lazy, ok, Ready};
use super::*; use super::*;
use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; use crate::{pipeline, pipeline_factory, Service, ServiceFactory};

View File

@ -15,8 +15,7 @@ where
S: Service, S: Service,
{ {
ApplyConfigService { ApplyConfigService {
f: Cell::new(f), srv: Cell::new((srv, f)),
srv: Cell::new(srv),
_t: PhantomData, _t: PhantomData,
} }
} }
@ -35,8 +34,7 @@ where
S: Service, S: Service,
{ {
ApplyConfigServiceFactory { ApplyConfigServiceFactory {
f: Cell::new(f), srv: Cell::new((srv, f)),
srv: Cell::new(srv),
_t: PhantomData, _t: PhantomData,
} }
} }
@ -49,8 +47,7 @@ where
R: Future<Output = Result<S, E>>, R: Future<Output = Result<S, E>>,
S: Service, S: Service,
{ {
f: Cell<F>, srv: Cell<(T, F)>,
srv: Cell<T>,
_t: PhantomData<(C, R, S)>, _t: PhantomData<(C, R, S)>,
} }
@ -63,7 +60,6 @@ where
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
ApplyConfigService { ApplyConfigService {
f: self.f.clone(),
srv: self.srv.clone(), srv: self.srv.clone(),
_t: PhantomData, _t: PhantomData,
} }
@ -87,7 +83,10 @@ where
type Future = R; type Future = R;
fn new_service(&self, cfg: C) -> Self::Future { fn new_service(&self, cfg: C) -> Self::Future {
unsafe { (self.f.get_mut_unsafe())(cfg, self.srv.get_mut_unsafe()) } unsafe {
let srv = self.srv.get_mut_unsafe();
(srv.1)(cfg, &mut srv.0)
}
} }
} }
@ -99,8 +98,7 @@ where
R: Future<Output = Result<S, T::InitError>>, R: Future<Output = Result<S, T::InitError>>,
S: Service, S: Service,
{ {
f: Cell<F>, srv: Cell<(T, F)>,
srv: Cell<T>,
_t: PhantomData<(C, R, S)>, _t: PhantomData<(C, R, S)>,
} }
@ -113,7 +111,6 @@ where
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
f: self.f.clone(),
srv: self.srv.clone(), srv: self.srv.clone(),
_t: PhantomData, _t: PhantomData,
} }
@ -139,34 +136,39 @@ where
fn new_service(&self, cfg: C) -> Self::Future { fn new_service(&self, cfg: C) -> Self::Future {
ApplyConfigServiceFactoryResponse { ApplyConfigServiceFactoryResponse {
f: self.f.clone(),
cfg: Some(cfg), cfg: Some(cfg),
fut: None, store: self.srv.clone(),
srv: None, state: State::A(self.srv.get_ref().0.new_service(())),
srv_fut: Some(self.srv.get_ref().new_service(())),
_t: PhantomData,
} }
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct ApplyConfigServiceFactoryResponse<F, C, T, R, S> pub struct ApplyConfigServiceFactoryResponse<F, C, T, R, S>
where where
F: FnMut(C, &mut T::Service) -> R, F: FnMut(C, &mut T::Service) -> R,
T: ServiceFactory<Config = ()>, T: ServiceFactory<Config = ()>,
T::InitError: From<T::Error>, T::InitError: From<T::Error>,
R: Future<Output = Result<S, T::InitError>>, R: Future<Output = Result<S, T::InitError>>,
S: Service, S: Service,
{ {
cfg: Option<C>, cfg: Option<C>,
f: Cell<F>, store: Cell<(T, F)>,
srv: Option<T::Service>,
#[pin] #[pin]
srv_fut: Option<T::Future>, state: State<T, R, S>,
#[pin] }
fut: Option<R>,
_t: PhantomData<(S,)>, #[pin_project::pin_project]
} enum State<T, R, S>
where
T: ServiceFactory<Config = ()>,
T::InitError: From<T::Error>,
R: Future<Output = Result<S, T::InitError>>,
S: Service,
{
A(#[pin] T::Future),
B(T::Service),
C(#[pin] R),
} }
impl<F, C, T, R, S> Future for ApplyConfigServiceFactoryResponse<F, C, T, R, S> impl<F, C, T, R, S> Future for ApplyConfigServiceFactoryResponse<F, C, T, R, S>
@ -179,37 +181,28 @@ where
{ {
type Output = Result<S, T::InitError>; type Output = Result<S, T::InitError>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
loop { #[project]
if let Some(fut) = this.srv_fut.as_pin_mut() { match this.state.as_mut().project() {
match fut.poll(cx)? { State::A(fut) => match fut.poll(cx)? {
Poll::Pending => return Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(srv) => { Poll::Ready(srv) => {
this = self.as_mut().project(); this.state.set(State::B(srv));
this.srv_fut.set(None); self.poll(cx)
*this.srv = Some(srv);
continue;
} }
} },
} State::B(srv) => match srv.poll_ready(cx)? {
if let Some(fut) = this.fut.as_pin_mut() {
return fut.poll(cx);
} else if let Some(srv) = this.srv {
match srv.poll_ready(cx)? {
Poll::Ready(_) => { Poll::Ready(_) => {
let fut = this.f.get_mut()(this.cfg.take().unwrap(), srv); let fut = (this.store.get_mut().1)(this.cfg.take().unwrap(), srv);
this = self.as_mut().project(); this.state.set(State::C(fut));
this.fut.set(Some(fut)); self.poll(cx)
continue;
}
Poll::Pending => return Poll::Pending,
}
} else {
return Poll::Pending;
} }
Poll::Pending => Poll::Pending,
},
State::C(fut) => fut.poll(cx),
} }
} }
} }

View File

@ -2,7 +2,7 @@ use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::future::FutureExt; use futures_util::future::FutureExt;
use crate::{Service, ServiceFactory}; use crate::{Service, ServiceFactory};

View File

@ -1,4 +1,5 @@
//! Custom cell impl, internal use only //! Custom cell impl, internal use only
use std::task::{Context, Poll};
use std::{cell::UnsafeCell, fmt, rc::Rc}; use std::{cell::UnsafeCell, fmt, rc::Rc};
pub(crate) struct Cell<T> { pub(crate) struct Cell<T> {
@ -39,3 +40,18 @@ impl<T> Cell<T> {
&mut *self.inner.as_ref().get() &mut *self.inner.as_ref().get()
} }
} }
impl<T: crate::Service> crate::Service for Cell<T> {
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().poll_ready(cx)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.get_mut().call(req)
}
}

View File

@ -2,7 +2,7 @@ use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::future::{ok, Ready}; use futures_util::future::{ok, Ready};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};

View File

@ -1,9 +1,6 @@
#![deny(rust_2018_idioms, warnings)] #![deny(rust_2018_idioms, warnings)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#[macro_use]
extern crate pin_project_lite;
use std::cell::RefCell; use std::cell::RefCell;
use std::future::Future; use std::future::Future;
use std::rc::Rc; use std::rc::Rc;

View File

@ -62,16 +62,15 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct MapFuture<A, F, Response> pub struct MapFuture<A, F, Response>
where where
A: Service, A: Service,
F: FnMut(A::Response) -> Response, F: FnMut(A::Response) -> Response,
{ {
f: F, f: F,
#[pin] #[pin]
fut: A::Future, fut: A::Future,
}
} }
impl<A, F, Response> MapFuture<A, F, Response> impl<A, F, Response> MapFuture<A, F, Response>
@ -157,16 +156,15 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct MapServiceFuture<A, F, Res> pub struct MapServiceFuture<A, F, Res>
where where
A: ServiceFactory, A: ServiceFactory,
F: FnMut(A::Response) -> Res, F: FnMut(A::Response) -> Res,
{ {
#[pin] #[pin]
fut: A::Future, fut: A::Future,
f: Option<F>, f: Option<F>,
}
} }
impl<A, F, Res> MapServiceFuture<A, F, Res> impl<A, F, Res> MapServiceFuture<A, F, Res>
@ -199,7 +197,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::future::{lazy, ok, Ready}; use futures_util::future::{lazy, ok, Ready};
use super::*; use super::*;
use crate::{IntoServiceFactory, Service, ServiceFactory}; use crate::{IntoServiceFactory, Service, ServiceFactory};

View File

@ -63,16 +63,15 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct MapErrFuture<A, F, E> pub struct MapErrFuture<A, F, E>
where where
A: Service, A: Service,
F: Fn(A::Error) -> E, F: Fn(A::Error) -> E,
{ {
f: F, f: F,
#[pin] #[pin]
fut: A::Future, fut: A::Future,
}
} }
impl<A, F, E> MapErrFuture<A, F, E> impl<A, F, E> MapErrFuture<A, F, E>
@ -160,16 +159,15 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct MapErrServiceFuture<A, F, E> pub struct MapErrServiceFuture<A, F, E>
where where
A: ServiceFactory, A: ServiceFactory,
F: Fn(A::Error) -> E, F: Fn(A::Error) -> E,
{ {
#[pin] #[pin]
fut: A::Future, fut: A::Future,
f: F, f: F,
}
} }
impl<A, F, E> MapErrServiceFuture<A, F, E> impl<A, F, E> MapErrServiceFuture<A, F, E>
@ -201,7 +199,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::future::{err, lazy, ok, Ready}; use futures_util::future::{err, lazy, ok, Ready};
use super::*; use super::*;
use crate::{IntoServiceFactory, Service, ServiceFactory}; use crate::{IntoServiceFactory, Service, ServiceFactory};

View File

@ -60,16 +60,15 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct MapInitErrFuture<A, F, E> pub struct MapInitErrFuture<A, F, E>
where where
A: ServiceFactory, A: ServiceFactory,
F: Fn(A::InitError) -> E, F: Fn(A::InitError) -> E,
{ {
f: F, f: F,
#[pin] #[pin]
fut: A::Future, fut: A::Future,
}
} }
impl<A, F, E> MapInitErrFuture<A, F, E> impl<A, F, E> MapInitErrFuture<A, F, E>

View File

@ -197,7 +197,7 @@ impl<T: ServiceFactory> PipelineFactory<T> {
T::Config: Clone, T::Config: Clone,
I: IntoServiceFactory<U>, I: IntoServiceFactory<U>,
U: ServiceFactory<Config = T::Config, InitError = T::InitError>, U: ServiceFactory<Config = T::Config, InitError = T::InitError>,
F: FnMut(T::Response, &mut U::Service) -> Fut, F: FnMut(T::Response, &mut U::Service) -> Fut + Clone,
Fut: Future<Output = Result<Res, Err>>, Fut: Future<Output = Result<Res, Err>>,
Err: From<T::Error> + From<U::Error>, Err: From<T::Error> + From<U::Error>,
{ {

View File

@ -57,36 +57,30 @@ where
} }
fn call(&mut self, req: A::Request) -> Self::Future { fn call(&mut self, req: A::Request) -> Self::Future {
ThenServiceResponse::new(self.a.call(req), self.b.clone()) ThenServiceResponse {
state: ThenServiceResponseState::A(self.a.call(req), self.b.clone()),
}
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct ThenServiceResponse<A, B> pub struct ThenServiceResponse<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
b: Cell<B>,
#[pin]
fut_b: Option<B::Future>,
#[pin]
fut_a: Option<A::Future>,
}
}
impl<A, B> ThenServiceResponse<A, B>
where where
A: Service, A: Service,
B: Service<Request = Result<A::Response, A::Error>>, B: Service<Request = Result<A::Response, A::Error>>,
{ {
fn new(a: A::Future, b: Cell<B>) -> Self { #[pin]
ThenServiceResponse { state: ThenServiceResponseState<A, B>,
b, }
fut_a: Some(a),
fut_b: None, #[pin_project::pin_project]
} enum ThenServiceResponseState<A, B>
} where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
A(#[pin] A::Future, Cell<B>),
B(#[pin] B::Future),
} }
impl<A, B> Future for ThenServiceResponse<A, B> impl<A, B> Future for ThenServiceResponse<A, B>
@ -96,27 +90,21 @@ where
{ {
type Output = Result<B::Response, B::Error>; type Output = Result<B::Response, B::Error>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
loop { #[project]
if let Some(fut) = this.fut_b.as_pin_mut() { match this.state.as_mut().project() {
return fut.poll(cx); ThenServiceResponseState::A(fut, b) => match fut.poll(cx) {
} Poll::Ready(res) => {
let fut = b.get_mut().call(res);
match this this.state.set(ThenServiceResponseState::B(fut));
.fut_a self.poll(cx)
.as_pin_mut()
.expect("Bug in actix-service")
.poll(cx)
{
Poll::Ready(r) => {
this = self.as_mut().project();
this.fut_b.set(Some(this.b.get_mut().call(r)));
}
Poll::Pending => return Poll::Pending,
} }
Poll::Pending => Poll::Pending,
},
ThenServiceResponseState::B(fut) => fut.poll(cx),
} }
} }
} }
@ -185,23 +173,23 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct ThenServiceFactoryResponse<A, B> pub struct ThenServiceFactoryResponse<A, B>
where where
A: ServiceFactory, A: ServiceFactory,
B: ServiceFactory< B: ServiceFactory<
Config = A::Config, Config = A::Config,
Request = Result<A::Response, A::Error>, Request = Result<A::Response, A::Error>,
Error = A::Error, Error = A::Error,
InitError = A::InitError> InitError = A::InitError,
{ >,
{
#[pin] #[pin]
fut_b: B::Future, fut_b: B::Future,
#[pin] #[pin]
fut_a: A::Future, fut_a: A::Future,
a: Option<A::Service>, a: Option<A::Service>,
b: Option<B::Service>, b: Option<B::Service>,
}
} }
impl<A, B> ThenServiceFactoryResponse<A, B> impl<A, B> ThenServiceFactoryResponse<A, B>
@ -266,7 +254,7 @@ mod tests {
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::future::{err, lazy, ok, ready, Ready}; use futures_util::future::{err, lazy, ok, ready, Ready};
use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; use crate::{pipeline, pipeline_factory, Service, ServiceFactory};

View File

@ -94,10 +94,7 @@ where
} }
/// `Apply` transform to new service /// `Apply` transform to new service
pub struct ApplyTransform<T, S> { pub struct ApplyTransform<T, S>(Rc<(T, S)>);
s: Rc<S>,
t: Rc<T>,
}
impl<T, S> ApplyTransform<T, S> impl<T, S> ApplyTransform<T, S>
where where
@ -106,19 +103,13 @@ where
{ {
/// Create new `ApplyTransform` new service instance /// Create new `ApplyTransform` new service instance
fn new(t: T, service: S) -> Self { fn new(t: T, service: S) -> Self {
Self { Self(Rc::new((t, service)))
s: Rc::new(service),
t: Rc::new(t),
}
} }
} }
impl<T, S> Clone for ApplyTransform<T, S> { impl<T, S> Clone for ApplyTransform<T, S> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
ApplyTransform { ApplyTransform(self.0.clone())
s: self.s.clone(),
t: self.t.clone(),
}
} }
} }
@ -138,25 +129,31 @@ where
fn new_service(&self, cfg: S::Config) -> Self::Future { fn new_service(&self, cfg: S::Config) -> Self::Future {
ApplyTransformFuture { ApplyTransformFuture {
t_cell: self.t.clone(), store: self.0.clone(),
fut_a: self.s.new_service(cfg), state: ApplyTransformFutureState::A(self.0.as_ref().1.new_service(cfg)),
fut_t: None,
} }
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct ApplyTransformFuture<T, S> pub struct ApplyTransformFuture<T, S>
where where
S: ServiceFactory, S: ServiceFactory,
T: Transform<S::Service, InitError = S::InitError>, T: Transform<S::Service, InitError = S::InitError>,
{ {
store: Rc<(T, S)>,
#[pin] #[pin]
fut_a: S::Future, state: ApplyTransformFutureState<T, S>,
#[pin] }
fut_t: Option<T::Future>,
t_cell: Rc<T>, #[pin_project::pin_project]
} pub enum ApplyTransformFutureState<T, S>
where
S: ServiceFactory,
T: Transform<S::Service, InitError = S::InitError>,
{
A(#[pin] S::Future),
B(#[pin] T::Future),
} }
impl<T, S> Future for ApplyTransformFuture<T, S> impl<T, S> Future for ApplyTransformFuture<T, S>
@ -166,20 +163,21 @@ where
{ {
type Output = Result<T::Transform, T::InitError>; type Output = Result<T::Transform, T::InitError>;
#[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
if let Some(fut) = this.fut_t.as_pin_mut() { #[project]
return fut.poll(cx); match this.state.as_mut().project() {
ApplyTransformFutureState::A(fut) => match fut.poll(cx)? {
Poll::Ready(srv) => {
let fut = this.store.0.new_transform(srv);
this.state.set(ApplyTransformFutureState::B(fut));
self.poll(cx)
} }
Poll::Pending => Poll::Pending,
if let Poll::Ready(service) = this.fut_a.poll(cx)? { },
let fut = this.t_cell.new_transform(service); ApplyTransformFutureState::B(fut) => fut.poll(cx),
this = self.as_mut().project();
this.fut_t.set(Some(fut));
this.fut_t.as_pin_mut().unwrap().poll(cx)
} else {
Poll::Pending
} }
} }
} }

View File

@ -64,16 +64,15 @@ where
} }
} }
pin_project! { #[pin_project::pin_project]
pub struct TransformMapInitErrFuture<T, S, F, E> pub struct TransformMapInitErrFuture<T, S, F, E>
where where
T: Transform<S>, T: Transform<S>,
F: Fn(T::InitError) -> E, F: Fn(T::InitError) -> E,
{ {
#[pin] #[pin]
fut: T::Future, fut: T::Future,
f: F, f: F,
}
} }
impl<T, S, F, E> Future for TransformMapInitErrFuture<T, S, F, E> impl<T, S, F, E> Future for TransformMapInitErrFuture<T, S, F, E>

View File

@ -20,7 +20,7 @@ path = "src/lib.rs"
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2"
actix-macros = "0.1.0-alpha.1" actix-macros = "0.1.0-alpha.1"
actix-server = "1.0.0-alpha.2" actix-server = "1.0.0-alpha.2"
actix-service = "1.0.0-alpha.2" actix-service = "1.0.0-alpha.3"
log = "0.4" log = "0.4"
net2 = "0.2" net2 = "0.2"

View File

@ -29,7 +29,7 @@ openssl = ["open-ssl", "tokio-openssl"]
rustls = ["rust-tls", "webpki"] rustls = ["rust-tls", "webpki"]
[dependencies] [dependencies]
actix-service = "1.0.0-alpha.2" actix-service = "1.0.0-alpha.3"
actix-codec = "0.2.0-alpha.2" actix-codec = "0.2.0-alpha.2"
actix-utils = "1.0.0-alpha.2" actix-utils = "1.0.0-alpha.2"
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2"

View File

@ -1,5 +1,7 @@
# Changes # Changes
## [1.0.0-alpha.3] - 2019-12-xx
* Fix oneshot * Fix oneshot
## [1.0.0-alpha.2] - 2019-12-02 ## [1.0.0-alpha.2] - 2019-12-02

View File

@ -18,7 +18,7 @@ name = "actix_utils"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "1.0.0-alpha.2" actix-service = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0-alpha.2"
actix-codec = "0.2.0-alpha.2" actix-codec = "0.2.0-alpha.2"
bytes = "0.4" bytes = "0.4"