mirror of
https://github.com/fafhrd91/actix-web
synced 2025-01-31 11:02:08 +01:00
Refactor actix_http::h2::service module. Reduce loc. (#2118)
This commit is contained in:
parent
6822bf2f58
commit
60f9cfbb2a
@ -11,8 +11,8 @@ use actix_service::{
|
|||||||
ServiceFactory,
|
ServiceFactory,
|
||||||
};
|
};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_core::ready;
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
use futures_util::future::ok;
|
use futures_util::future::ready;
|
||||||
use h2::server::{handshake, Handshake};
|
use h2::server::{handshake, Handshake};
|
||||||
use log::error;
|
use log::error;
|
||||||
|
|
||||||
@ -65,6 +65,7 @@ where
|
|||||||
impl<S, B> H2Service<TcpStream, S, B>
|
impl<S, B> H2Service<TcpStream, S, B>
|
||||||
where
|
where
|
||||||
S: ServiceFactory<Request, Config = ()>,
|
S: ServiceFactory<Request, Config = ()>,
|
||||||
|
S::Future: 'static,
|
||||||
S::Error: Into<Error> + 'static,
|
S::Error: Into<Error> + 'static,
|
||||||
S::Response: Into<Response<B>> + 'static,
|
S::Response: Into<Response<B>> + 'static,
|
||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
@ -80,11 +81,11 @@ where
|
|||||||
Error = DispatchError,
|
Error = DispatchError,
|
||||||
InitError = S::InitError,
|
InitError = S::InitError,
|
||||||
> {
|
> {
|
||||||
pipeline_factory(fn_factory(|| async {
|
pipeline_factory(fn_factory(|| {
|
||||||
Ok::<_, S::InitError>(fn_service(|io: TcpStream| {
|
ready(Ok::<_, S::InitError>(fn_service(|io: TcpStream| {
|
||||||
let peer_addr = io.peer_addr().ok();
|
let peer_addr = io.peer_addr().ok();
|
||||||
ok::<_, DispatchError>((io, peer_addr))
|
ready(Ok::<_, DispatchError>((io, peer_addr)))
|
||||||
}))
|
})))
|
||||||
}))
|
}))
|
||||||
.and_then(self)
|
.and_then(self)
|
||||||
}
|
}
|
||||||
@ -101,6 +102,7 @@ mod openssl {
|
|||||||
impl<S, B> H2Service<TlsStream<TcpStream>, S, B>
|
impl<S, B> H2Service<TlsStream<TcpStream>, S, B>
|
||||||
where
|
where
|
||||||
S: ServiceFactory<Request, Config = ()>,
|
S: ServiceFactory<Request, Config = ()>,
|
||||||
|
S::Future: 'static,
|
||||||
S::Error: Into<Error> + 'static,
|
S::Error: Into<Error> + 'static,
|
||||||
S::Response: Into<Response<B>> + 'static,
|
S::Response: Into<Response<B>> + 'static,
|
||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
@ -123,10 +125,12 @@ mod openssl {
|
|||||||
.map_init_err(|_| panic!()),
|
.map_init_err(|_| panic!()),
|
||||||
)
|
)
|
||||||
.and_then(fn_factory(|| {
|
.and_then(fn_factory(|| {
|
||||||
ok::<_, S::InitError>(fn_service(|io: TlsStream<TcpStream>| {
|
ready(Ok::<_, S::InitError>(fn_service(
|
||||||
let peer_addr = io.get_ref().peer_addr().ok();
|
|io: TlsStream<TcpStream>| {
|
||||||
ok((io, peer_addr))
|
let peer_addr = io.get_ref().peer_addr().ok();
|
||||||
}))
|
ready(Ok((io, peer_addr)))
|
||||||
|
},
|
||||||
|
)))
|
||||||
}))
|
}))
|
||||||
.and_then(self.map_err(TlsError::Service))
|
.and_then(self.map_err(TlsError::Service))
|
||||||
}
|
}
|
||||||
@ -144,6 +148,7 @@ mod rustls {
|
|||||||
impl<S, B> H2Service<TlsStream<TcpStream>, S, B>
|
impl<S, B> H2Service<TlsStream<TcpStream>, S, B>
|
||||||
where
|
where
|
||||||
S: ServiceFactory<Request, Config = ()>,
|
S: ServiceFactory<Request, Config = ()>,
|
||||||
|
S::Future: 'static,
|
||||||
S::Error: Into<Error> + 'static,
|
S::Error: Into<Error> + 'static,
|
||||||
S::Response: Into<Response<B>> + 'static,
|
S::Response: Into<Response<B>> + 'static,
|
||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
@ -169,10 +174,12 @@ mod rustls {
|
|||||||
.map_init_err(|_| panic!()),
|
.map_init_err(|_| panic!()),
|
||||||
)
|
)
|
||||||
.and_then(fn_factory(|| {
|
.and_then(fn_factory(|| {
|
||||||
ok::<_, S::InitError>(fn_service(|io: TlsStream<TcpStream>| {
|
ready(Ok::<_, S::InitError>(fn_service(
|
||||||
let peer_addr = io.get_ref().0.peer_addr().ok();
|
|io: TlsStream<TcpStream>| {
|
||||||
ok((io, peer_addr))
|
let peer_addr = io.get_ref().0.peer_addr().ok();
|
||||||
}))
|
ready(Ok((io, peer_addr)))
|
||||||
|
},
|
||||||
|
)))
|
||||||
}))
|
}))
|
||||||
.and_then(self.map_err(TlsError::Service))
|
.and_then(self.map_err(TlsError::Service))
|
||||||
}
|
}
|
||||||
@ -181,8 +188,9 @@ mod rustls {
|
|||||||
|
|
||||||
impl<T, S, B> ServiceFactory<(T, Option<net::SocketAddr>)> for H2Service<T, S, B>
|
impl<T, S, B> ServiceFactory<(T, Option<net::SocketAddr>)> for H2Service<T, S, B>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
S: ServiceFactory<Request, Config = ()>,
|
S: ServiceFactory<Request, Config = ()>,
|
||||||
|
S::Future: 'static,
|
||||||
S::Error: Into<Error> + 'static,
|
S::Error: Into<Error> + 'static,
|
||||||
S::Response: Into<Response<B>> + 'static,
|
S::Response: Into<Response<B>> + 'static,
|
||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
@ -193,52 +201,16 @@ where
|
|||||||
type Config = ();
|
type Config = ();
|
||||||
type Service = H2ServiceHandler<T, S::Service, B>;
|
type Service = H2ServiceHandler<T, S::Service, B>;
|
||||||
type InitError = S::InitError;
|
type InitError = S::InitError;
|
||||||
type Future = H2ServiceResponse<T, S, B>;
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||||
|
|
||||||
fn new_service(&self, _: ()) -> Self::Future {
|
fn new_service(&self, _: ()) -> Self::Future {
|
||||||
H2ServiceResponse {
|
let service = self.srv.new_service(());
|
||||||
fut: self.srv.new_service(()),
|
let cfg = self.cfg.clone();
|
||||||
cfg: Some(self.cfg.clone()),
|
let on_connect_ext = self.on_connect_ext.clone();
|
||||||
on_connect_ext: self.on_connect_ext.clone(),
|
|
||||||
_phantom: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
Box::pin(async move {
|
||||||
#[pin_project::pin_project]
|
let service = service.await?;
|
||||||
pub struct H2ServiceResponse<T, S, B>
|
Ok(H2ServiceHandler::new(cfg, on_connect_ext, service))
|
||||||
where
|
|
||||||
S: ServiceFactory<Request>,
|
|
||||||
{
|
|
||||||
#[pin]
|
|
||||||
fut: S::Future,
|
|
||||||
cfg: Option<ServiceConfig>,
|
|
||||||
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
|
|
||||||
_phantom: PhantomData<B>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, S, B> Future for H2ServiceResponse<T, S, B>
|
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
S: ServiceFactory<Request, Config = ()>,
|
|
||||||
S::Error: Into<Error> + 'static,
|
|
||||||
S::Response: Into<Response<B>> + 'static,
|
|
||||||
<S::Service as Service<Request>>::Future: 'static,
|
|
||||||
B: MessageBody + 'static,
|
|
||||||
{
|
|
||||||
type Output = Result<H2ServiceHandler<T, S::Service, B>, S::InitError>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let this = self.as_mut().project();
|
|
||||||
|
|
||||||
this.fut.poll(cx).map_ok(|service| {
|
|
||||||
let this = self.as_mut().project();
|
|
||||||
H2ServiceHandler::new(
|
|
||||||
this.cfg.take().unwrap(),
|
|
||||||
this.on_connect_ext.clone(),
|
|
||||||
service,
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user