1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-24 16:32:59 +01:00
actix-web/actix-http/src/h2/service.rs

387 lines
11 KiB
Rust
Raw Normal View History

use std::future::Future;
2019-02-02 05:18:44 +01:00
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{net, rc::Rc};
2019-02-02 05:18:44 +01:00
2019-12-13 05:59:02 +01:00
use actix_codec::{AsyncRead, AsyncWrite};
2019-12-02 12:33:11 +01:00
use actix_rt::net::TcpStream;
use actix_service::{
2019-12-08 14:25:24 +01:00
fn_factory, fn_service, pipeline_factory, IntoServiceFactory, Service,
2019-12-02 12:33:11 +01:00
ServiceFactory,
};
2019-02-02 05:18:44 +01:00
use bytes::Bytes;
2019-12-13 06:24:57 +01:00
use futures_core::ready;
use futures_util::future::ok;
2019-12-13 05:59:02 +01:00
use h2::server::{self, Handshake};
2019-02-02 05:18:44 +01:00
use log::error;
use crate::body::MessageBody;
use crate::cloneable::CloneableService;
2019-12-13 05:59:02 +01:00
use crate::config::ServiceConfig;
use crate::error::{DispatchError, Error};
2019-02-02 05:18:44 +01:00
use crate::request::Request;
use crate::response::Response;
use crate::{ConnectCallback, Extensions};
2019-02-02 05:18:44 +01:00
2019-02-06 20:44:15 +01:00
use super::dispatcher::Dispatcher;
2019-02-02 05:18:44 +01:00
/// `ServiceFactory` implementation for HTTP2 transport
2019-12-02 12:33:11 +01:00
pub struct H2Service<T, S, B> {
2019-02-02 05:18:44 +01:00
srv: S,
cfg: ServiceConfig,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
2019-12-02 12:33:11 +01:00
_t: PhantomData<(T, B)>,
2019-02-02 05:18:44 +01:00
}
2019-12-02 12:33:11 +01:00
impl<T, S, B> H2Service<T, S, B>
2019-02-02 05:18:44 +01:00
where
S: ServiceFactory<Request, Config = ()>,
2019-11-19 13:54:19 +01:00
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
2019-02-06 20:44:15 +01:00
B: MessageBody + 'static,
2019-02-02 05:18:44 +01:00
{
/// Create new `HttpService` instance with config.
pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
cfg: ServiceConfig,
service: F,
) -> Self {
H2Service {
cfg,
on_connect_ext: None,
srv: service.into_factory(),
_t: PhantomData,
}
}
2019-06-28 10:34:26 +02:00
/// Set on connect callback.
pub(crate) fn on_connect_ext(mut self, f: Option<Rc<ConnectCallback<T>>>) -> Self {
self.on_connect_ext = f;
self
}
2019-02-02 05:18:44 +01:00
}
2019-12-02 12:33:11 +01:00
impl<S, B> H2Service<TcpStream, S, B>
where
S: ServiceFactory<Request, Config = ()>,
2019-12-02 12:33:11 +01:00
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
2019-12-02 12:33:11 +01:00
B: MessageBody + 'static,
{
/// Create simple tcp based service
pub fn tcp(
self,
) -> impl ServiceFactory<
TcpStream,
2019-12-02 12:33:11 +01:00
Config = (),
Response = (),
Error = DispatchError,
InitError = S::InitError,
> {
2020-02-27 03:10:55 +01:00
pipeline_factory(fn_factory(|| async {
Ok::<_, S::InitError>(fn_service(|io: TcpStream| {
let peer_addr = io.peer_addr().ok();
ok::<_, DispatchError>((io, peer_addr))
}))
2019-12-02 12:33:11 +01:00
}))
.and_then(self)
}
}
#[cfg(feature = "openssl")]
mod openssl {
use actix_service::{fn_factory, fn_service, ServiceFactoryExt};
use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, SslStream};
use actix_tls::accept::TlsError;
2019-12-02 12:33:11 +01:00
use super::*;
impl<S, B> H2Service<SslStream<TcpStream>, S, B>
where
S: ServiceFactory<Request, Config = ()>,
2019-12-02 12:33:11 +01:00
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
2019-12-02 12:33:11 +01:00
B: MessageBody + 'static,
{
/// Create ssl based service
pub fn openssl(
self,
acceptor: SslAcceptor,
) -> impl ServiceFactory<
TcpStream,
2019-12-02 12:33:11 +01:00
Config = (),
Response = (),
Error = TlsError<SslError, DispatchError>,
2019-12-02 12:33:11 +01:00
InitError = S::InitError,
> {
pipeline_factory(
Acceptor::new(acceptor)
2020-09-09 10:20:54 +02:00
.map_err(TlsError::Tls)
2019-12-02 12:33:11 +01:00
.map_init_err(|_| panic!()),
)
2019-12-08 14:25:24 +01:00
.and_then(fn_factory(|| {
ok::<_, S::InitError>(fn_service(|io: SslStream<TcpStream>| {
2019-12-02 12:33:11 +01:00
let peer_addr = io.get_ref().peer_addr().ok();
ok((io, peer_addr))
}))
}))
2020-09-09 10:20:54 +02:00
.and_then(self.map_err(TlsError::Service))
2019-12-02 12:33:11 +01:00
}
}
}
2019-12-05 18:35:43 +01:00
#[cfg(feature = "rustls")]
mod rustls {
use super::*;
use actix_service::ServiceFactoryExt;
use actix_tls::accept::rustls::{Acceptor, ServerConfig, TlsStream};
use actix_tls::accept::TlsError;
2019-12-13 06:24:57 +01:00
use std::io;
2019-12-05 18:35:43 +01:00
impl<S, B> H2Service<TlsStream<TcpStream>, S, B>
where
S: ServiceFactory<Request, Config = ()>,
2019-12-05 18:35:43 +01:00
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
2019-12-05 18:35:43 +01:00
B: MessageBody + 'static,
{
/// Create openssl based service
pub fn rustls(
self,
mut config: ServerConfig,
) -> impl ServiceFactory<
TcpStream,
2019-12-05 18:35:43 +01:00
Config = (),
Response = (),
2020-09-09 10:20:54 +02:00
Error = TlsError<io::Error, DispatchError>,
2019-12-05 18:35:43 +01:00
InitError = S::InitError,
> {
let protos = vec!["h2".to_string().into()];
config.set_protocols(&protos);
pipeline_factory(
Acceptor::new(config)
2020-09-09 10:20:54 +02:00
.map_err(TlsError::Tls)
2019-12-05 18:35:43 +01:00
.map_init_err(|_| panic!()),
)
2019-12-08 14:25:24 +01:00
.and_then(fn_factory(|| {
ok::<_, S::InitError>(fn_service(|io: TlsStream<TcpStream>| {
2019-12-05 18:35:43 +01:00
let peer_addr = io.get_ref().0.peer_addr().ok();
ok((io, peer_addr))
}))
}))
2020-09-09 10:20:54 +02:00
.and_then(self.map_err(TlsError::Service))
2019-12-05 18:35:43 +01:00
}
}
}
impl<T, S, B> ServiceFactory<(T, Option<net::SocketAddr>)> for H2Service<T, S, B>
2019-02-02 05:18:44 +01:00
where
2019-12-02 12:33:11 +01:00
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request, Config = ()>,
2019-11-19 13:54:19 +01:00
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
2019-02-06 20:44:15 +01:00
B: MessageBody + 'static,
2019-02-02 05:18:44 +01:00
{
2019-02-06 20:44:15 +01:00
type Response = ();
2019-03-07 07:56:34 +01:00
type Error = DispatchError;
type Config = ();
2019-12-02 12:33:11 +01:00
type Service = H2ServiceHandler<T, S::Service, B>;
type InitError = S::InitError;
2019-12-02 12:33:11 +01:00
type Future = H2ServiceResponse<T, S, B>;
2019-02-02 05:18:44 +01:00
2019-12-02 16:37:13 +01:00
fn new_service(&self, _: ()) -> Self::Future {
2019-02-02 05:18:44 +01:00
H2ServiceResponse {
2019-12-02 16:37:13 +01:00
fut: self.srv.new_service(()),
2019-02-02 05:18:44 +01:00
cfg: Some(self.cfg.clone()),
on_connect_ext: self.on_connect_ext.clone(),
2019-02-02 05:18:44 +01:00
_t: PhantomData,
}
}
}
#[doc(hidden)]
2019-11-19 13:54:19 +01:00
#[pin_project::pin_project]
pub struct H2ServiceResponse<T, S, B>
where
S: ServiceFactory<Request>,
{
2019-11-19 13:54:19 +01:00
#[pin]
fut: S::Future,
2019-02-02 05:18:44 +01:00
cfg: Option<ServiceConfig>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_t: PhantomData<B>,
2019-02-02 05:18:44 +01:00
}
2019-12-02 12:33:11 +01:00
impl<T, S, B> Future for H2ServiceResponse<T, S, B>
2019-02-02 05:18:44 +01:00
where
2019-12-02 12:33:11 +01:00
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request, Config = ()>,
2019-11-19 13:54:19 +01:00
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
2019-02-06 20:44:15 +01:00
B: MessageBody + 'static,
2019-02-02 05:18:44 +01:00
{
2019-12-02 12:33:11 +01:00
type Output = Result<H2ServiceHandler<T, S::Service, B>, S::InitError>;
2019-02-02 05:18:44 +01:00
2019-12-07 19:46:51 +01:00
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2019-11-19 13:54:19 +01:00
let this = self.as_mut().project();
2019-11-19 13:54:19 +01:00
Poll::Ready(ready!(this.fut.poll(cx)).map(|service| {
let this = self.as_mut().project();
H2ServiceHandler::new(
this.cfg.take().unwrap(),
this.on_connect_ext.clone(),
service,
)
}))
2019-02-02 05:18:44 +01:00
}
}
/// `Service` implementation for http/2 transport
pub struct H2ServiceHandler<T, S, B>
where
S: Service<Request>,
{
srv: CloneableService<S>,
2019-02-02 05:18:44 +01:00
cfg: ServiceConfig,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_t: PhantomData<B>,
2019-02-02 05:18:44 +01:00
}
2019-12-02 12:33:11 +01:00
impl<T, S, B> H2ServiceHandler<T, S, B>
2019-02-02 05:18:44 +01:00
where
S: Service<Request>,
2019-11-19 13:54:19 +01:00
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
2019-02-06 20:44:15 +01:00
B: MessageBody + 'static,
2019-02-02 05:18:44 +01:00
{
2019-06-28 10:34:26 +02:00
fn new(
cfg: ServiceConfig,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
2019-06-28 10:34:26 +02:00
srv: S,
2019-12-02 12:33:11 +01:00
) -> H2ServiceHandler<T, S, B> {
2019-02-02 05:18:44 +01:00
H2ServiceHandler {
cfg,
on_connect_ext,
srv: CloneableService::new(srv),
2019-02-02 05:18:44 +01:00
_t: PhantomData,
}
}
}
impl<T, S, B> Service<(T, Option<net::SocketAddr>)> for H2ServiceHandler<T, S, B>
2019-02-02 05:18:44 +01:00
where
2019-12-02 12:33:11 +01:00
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
2019-11-19 13:54:19 +01:00
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
2019-02-06 20:44:15 +01:00
B: MessageBody + 'static,
2019-02-02 05:18:44 +01:00
{
2019-02-06 20:44:15 +01:00
type Response = ();
2019-03-07 07:56:34 +01:00
type Error = DispatchError;
2019-02-02 05:18:44 +01:00
type Future = H2ServiceHandlerResponse<T, S, B>;
2019-12-07 19:46:51 +01:00
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.srv.poll_ready(cx).map_err(|e| {
2019-04-06 01:46:44 +02:00
let e = e.into();
2019-02-06 20:44:15 +01:00
error!("Service readiness error: {:?}", e);
2019-04-06 01:46:44 +02:00
DispatchError::Service(e)
2019-02-06 20:44:15 +01:00
})
2019-02-02 05:18:44 +01:00
}
fn call(&mut self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {
let mut connect_extensions = Extensions::new();
if let Some(ref handler) = self.on_connect_ext {
// run on_connect_ext callback, populating connect extensions
handler(&io, &mut connect_extensions);
}
2019-06-28 10:34:26 +02:00
2019-02-02 05:18:44 +01:00
H2ServiceHandlerResponse {
2019-02-06 20:44:15 +01:00
state: State::Handshake(
Some(self.srv.clone()),
Some(self.cfg.clone()),
2019-12-02 12:33:11 +01:00
addr,
Some(connect_extensions),
server::handshake(io),
2019-02-06 20:44:15 +01:00
),
2019-02-02 05:18:44 +01:00
}
}
}
enum State<T, S: Service<Request>, B: MessageBody>
2019-04-04 19:59:34 +02:00
where
2019-12-02 12:33:11 +01:00
T: AsyncRead + AsyncWrite + Unpin,
2019-04-04 19:59:34 +02:00
S::Future: 'static,
{
2019-02-06 20:44:15 +01:00
Incoming(Dispatcher<T, S, B>),
Handshake(
Option<CloneableService<S>>,
Option<ServiceConfig>,
Option<net::SocketAddr>,
Option<Extensions>,
Handshake<T, Bytes>,
),
2019-02-02 05:18:44 +01:00
}
pub struct H2ServiceHandlerResponse<T, S, B>
where
2019-12-02 12:33:11 +01:00
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
2019-11-19 13:54:19 +01:00
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
2019-02-06 20:44:15 +01:00
B: MessageBody + 'static,
2019-02-02 05:18:44 +01:00
{
2019-02-06 20:44:15 +01:00
state: State<T, S, B>,
2019-02-02 05:18:44 +01:00
}
impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B>
where
2019-12-02 12:33:11 +01:00
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
2019-11-19 13:54:19 +01:00
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
2019-02-02 05:18:44 +01:00
B: MessageBody,
{
type Output = Result<(), DispatchError>;
2019-02-02 05:18:44 +01:00
2019-12-07 19:46:51 +01:00
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2019-02-06 20:44:15 +01:00
match self.state {
State::Incoming(ref mut disp) => Pin::new(disp).poll(cx),
State::Handshake(
ref mut srv,
ref mut config,
ref peer_addr,
ref mut on_connect_data,
ref mut handshake,
) => match Pin::new(handshake).poll(cx) {
Poll::Ready(Ok(conn)) => {
self.state = State::Incoming(Dispatcher::new(
srv.take().unwrap(),
conn,
on_connect_data.take().unwrap(),
config.take().unwrap(),
None,
*peer_addr,
));
self.poll(cx)
2019-02-06 20:44:15 +01:00
}
Poll::Ready(Err(err)) => {
trace!("H2 handshake error: {}", err);
Poll::Ready(Err(err.into()))
}
Poll::Pending => Poll::Pending,
},
2019-02-06 20:44:15 +01:00
}
2019-02-02 05:18:44 +01:00
}
}