1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

Refactor actix_http::h1::service (#2117)

This commit is contained in:
fakeshadow 2021-03-26 09:15:04 -07:00 committed by GitHub
parent 2f7f1fa97a
commit 6822bf2f58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,6 +1,4 @@
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, net}; use std::{fmt, net};
@ -8,7 +6,7 @@ use std::{fmt, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use futures_core::ready; use futures_core::{future::LocalBoxFuture, ready};
use futures_util::future::ready; use futures_util::future::ready;
use crate::body::MessageBody; use crate::body::MessageBody;
@ -60,14 +58,17 @@ where
impl<S, B, X, U> H1Service<TcpStream, S, B, X, U> impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
where where
S: ServiceFactory<Request, Config = ()>, S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>, S::Error: Into<Error>,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>, X::Error: Into<Error>,
X::InitError: fmt::Debug, X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>, U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>, U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
@ -94,17 +95,21 @@ mod openssl {
use super::*; use super::*;
use actix_service::ServiceFactoryExt; use actix_service::ServiceFactoryExt;
use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, TlsStream}; use actix_tls::accept::{
use actix_tls::accept::TlsError; openssl::{Acceptor, SslAcceptor, SslError, TlsStream},
TlsError,
};
impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U> impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
where where
S: ServiceFactory<Request, Config = ()>, S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>, S::Error: Into<Error>,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>, X::Error: Into<Error>,
X::InitError: fmt::Debug, X::InitError: fmt::Debug,
U: ServiceFactory< U: ServiceFactory<
@ -112,6 +117,7 @@ mod openssl {
Config = (), Config = (),
Response = (), Response = (),
>, >,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>, U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
@ -143,19 +149,25 @@ mod openssl {
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
mod rustls { mod rustls {
use super::*; use super::*;
use std::io;
use actix_service::ServiceFactoryExt; use actix_service::ServiceFactoryExt;
use actix_tls::accept::rustls::{Acceptor, ServerConfig, TlsStream}; use actix_tls::accept::{
use actix_tls::accept::TlsError; rustls::{Acceptor, ServerConfig, TlsStream},
use std::{fmt, io}; TlsError,
};
impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U> impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
where where
S: ServiceFactory<Request, Config = ()>, S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>, S::Error: Into<Error>,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>, X::Error: Into<Error>,
X::InitError: fmt::Debug, X::InitError: fmt::Debug,
U: ServiceFactory< U: ServiceFactory<
@ -163,6 +175,7 @@ mod rustls {
Config = (), Config = (),
Response = (), Response = (),
>, >,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>, U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
@ -241,16 +254,19 @@ where
impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)> impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)>
for H1Service<T, S, B, X, U> for H1Service<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin + 'static,
S: ServiceFactory<Request, Config = ()>, S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
B: MessageBody, B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>, X::Error: Into<Error>,
X::InitError: fmt::Debug, X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>, U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>, U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
@ -259,103 +275,42 @@ where
type Config = (); type Config = ();
type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>; type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
type InitError = (); type InitError = ();
type Future = H1ServiceResponse<T, S, B, X, U>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
H1ServiceResponse { let service = self.srv.new_service(());
fut: self.srv.new_service(()), let expect = self.expect.new_service(());
fut_ex: Some(self.expect.new_service(())), let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())), let on_connect_ext = self.on_connect_ext.clone();
expect: None, let cfg = self.cfg.clone();
upgrade: None,
on_connect_ext: self.on_connect_ext.clone(), Box::pin(async move {
cfg: Some(self.cfg.clone()), let expect = expect
_phantom: PhantomData, .await
.map_err(|e| log::error!("Init http expect service error: {:?}", e))?;
let upgrade = match upgrade {
Some(upgrade) => {
let upgrade = upgrade.await.map_err(|e| {
log::error!("Init http upgrade service error: {:?}", e)
})?;
Some(upgrade)
} }
} None => None,
} };
#[doc(hidden)] let service = service
#[pin_project::pin_project] .await
pub struct H1ServiceResponse<T, S, B, X, U> .map_err(|e| log::error!("Init http service error: {:?}", e))?;
where
S: ServiceFactory<Request>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
{
#[pin]
fut: S::Future,
#[pin]
fut_ex: Option<X::Future>,
#[pin]
fut_upg: Option<U::Future>,
expect: Option<X::Service>,
upgrade: Option<U::Service>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: Option<ServiceConfig>,
_phantom: PhantomData<B>,
}
impl<T, S, B, X, U> Future for H1ServiceResponse<T, S, B, X, U> Ok(H1ServiceHandler::new(
where cfg,
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
{
type Output = Result<H1ServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
if let Some(fut) = this.fut_ex.as_pin_mut() {
let expect = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.expect = Some(expect);
this.fut_ex.set(None);
}
if let Some(fut) = this.fut_upg.as_pin_mut() {
let upgrade = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.upgrade = Some(upgrade);
this.fut_upg.set(None);
}
let result = ready!(this
.fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Poll::Ready(result.map(|service| {
let this = self.as_mut().project();
H1ServiceHandler::new(
this.cfg.take().unwrap(),
service, service,
this.expect.take().unwrap(), expect,
this.upgrade.take(), upgrade,
this.on_connect_ext.clone(), on_connect_ext,
) ))
})) })
} }
} }
@ -417,47 +372,27 @@ where
type Future = Dispatcher<T, S, B, X, U>; type Future = Dispatcher<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self ready!(self.flow.expect.poll_ready(cx)).map_err(|e| {
.flow
.expect
.poll_ready(cx)
.map_err(|e| {
let e = e.into(); let e = e.into();
log::error!("Http service readiness error: {:?}", e); log::error!("Http expect service readiness error: {:?}", e);
DispatchError::Service(e) DispatchError::Service(e)
})? })?;
.is_ready();
let ready = self if let Some(ref upg) = self.flow.upgrade {
.flow ready!(upg.poll_ready(cx)).map_err(|e| {
.service
.poll_ready(cx)
.map_err(|e| {
let e = e.into(); let e = e.into();
log::error!("Http service readiness error: {:?}", e); log::error!("Http upgrade service readiness error: {:?}", e);
DispatchError::Service(e) DispatchError::Service(e)
})? })?;
.is_ready()
&& ready;
let ready = if let Some(ref upg) = self.flow.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready
} else {
ready
}; };
if ready { ready!(self.flow.service.poll_ready(cx)).map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else {
Poll::Pending
}
} }
fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future { fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {