1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-31 03:57:00 +02:00

Fixed unsoundness in .and_then()/.then() service combinators

This commit is contained in:
Nikolay Kim
2020-01-16 16:58:11 -08:00
parent e7c2439543
commit dbfa13d6be
8 changed files with 164 additions and 116 deletions

View File

@@ -1,5 +1,6 @@
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
@@ -8,11 +9,8 @@ use crate::cell::Cell;
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `ServiceExt::then` method.
pub struct ThenService<A, B> {
a: A,
b: Cell<B>,
}
/// This is created by the `Pipeline::then` method.
pub(crate) struct ThenService<A, B>(Cell<(A, B)>);
impl<A, B> ThenService<A, B> {
/// Create new `.then()` combinator
@@ -21,19 +19,13 @@ impl<A, B> ThenService<A, B> {
A: Service,
B: Service<Request = Result<A::Response, A::Error>, Error = A::Error>,
{
Self { a, b: Cell::new(b) }
Self(Cell::new((a, b)))
}
}
impl<A, B> Clone for ThenService<A, B>
where
A: Clone,
{
impl<A, B> Clone for ThenService<A, B> {
fn clone(&self) -> Self {
ThenService {
a: self.a.clone(),
b: self.b.clone(),
}
ThenService(self.0.clone())
}
}
@@ -47,9 +39,10 @@ where
type Error = B::Error;
type Future = ThenServiceResponse<A, B>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.a.poll_ready(ctx)?.is_ready();
if !self.b.get_mut().poll_ready(ctx)?.is_ready() || not_ready {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
@@ -58,13 +51,13 @@ where
fn call(&mut self, req: A::Request) -> Self::Future {
ThenServiceResponse {
state: State::A(self.a.call(req), Some(self.b.clone())),
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),
}
}
}
#[pin_project::pin_project]
pub struct ThenServiceResponse<A, B>
pub(crate) struct ThenServiceResponse<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
@@ -79,7 +72,7 @@ where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
A(#[pin] A::Future, Option<Cell<B>>),
A(#[pin] A::Future, Option<Cell<(A, B)>>),
B(#[pin] B::Future),
Empty,
}
@@ -101,7 +94,7 @@ where
Poll::Ready(res) => {
let mut b = b.take().unwrap();
this.state.set(State::Empty); // drop fut A
let fut = b.get_mut().call(res);
let fut = b.get_mut().1.call(res);
this.state.set(State::B(fut));
self.poll(cx)
}
@@ -117,10 +110,7 @@ where
}
/// `.then()` service factory combinator
pub struct ThenServiceFactory<A, B> {
a: A,
b: B,
}
pub(crate) struct ThenServiceFactory<A, B>(Rc<(A, B)>);
impl<A, B> ThenServiceFactory<A, B>
where
@@ -135,7 +125,7 @@ where
{
/// Create new `AndThen` combinator
pub(crate) fn new(a: A, b: B) -> Self {
Self { a, b }
Self(Rc::new((a, b)))
}
}
@@ -160,28 +150,19 @@ where
type Future = ThenServiceFactoryResponse<A, B>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
ThenServiceFactoryResponse::new(
self.a.new_service(cfg.clone()),
self.b.new_service(cfg),
)
let srv = &*self.0;
ThenServiceFactoryResponse::new(srv.0.new_service(cfg.clone()), srv.1.new_service(cfg))
}
}
impl<A, B> Clone for ThenServiceFactory<A, B>
where
A: Clone,
B: Clone,
{
impl<A, B> Clone for ThenServiceFactory<A, B> {
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
}
Self(self.0.clone())
}
}
#[pin_project::pin_project]
pub struct ThenServiceFactoryResponse<A, B>
pub(crate) struct ThenServiceFactoryResponse<A, B>
where
A: ServiceFactory,
B: ServiceFactory<