use std::{ fmt, future::Future, marker::PhantomData, net, pin::Pin, rc::Rc, task::{Context, Poll}, }; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{ fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _, }; use futures_core::{future::LocalBoxFuture, ready}; use pin_project_lite::pin_project; use tracing::error; use crate::{ body::{BoxBody, MessageBody}, builder::HttpServiceBuilder, error::DispatchError, h1, ConnectCallback, OnConnectData, Protocol, Request, Response, ServiceConfig, }; /// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol. pub struct HttpService { srv: S, cfg: ServiceConfig, expect: X, upgrade: Option, on_connect_ext: Option>>, _phantom: PhantomData, } impl HttpService where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, { /// Constructs builder for `HttpService` instance. pub fn build() -> HttpServiceBuilder { HttpServiceBuilder::default() } } impl HttpService where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, { /// Constructs new `HttpService` instance from service with default config. pub fn new>(service: F) -> Self { HttpService { cfg: ServiceConfig::default(), srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } /// Constructs new `HttpService` instance from config and service. pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { HttpService { cfg, srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } } impl HttpService where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody, { /// Sets service for `Expect: 100-Continue` handling. /// /// An expect service is called with requests that contain an `Expect` header. A successful /// response type is also a request which will be forwarded to the main service. pub fn expect(self, expect: X1) -> HttpService where X1: ServiceFactory, X1::Error: Into>, X1::InitError: fmt::Debug, { HttpService { expect, cfg: self.cfg, srv: self.srv, upgrade: self.upgrade, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Sets service for custom `Connection: Upgrade` handling. /// /// If service is provided then normal requests handling get halted and this service get called /// with original request and framed object. pub fn upgrade(self, upgrade: Option) -> HttpService where U1: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, { HttpService { upgrade, cfg: self.cfg, srv: self.srv, expect: self.expect, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Set connect callback with mutable access to request data container. pub(crate) fn on_connect_ext(mut self, f: Option>>) -> Self { self.on_connect_ext = f; self } } impl HttpService where S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { /// Create simple tcp stream service pub fn tcp( self, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = DispatchError, InitError = (), > { fn_service(|io: TcpStream| async { let peer_addr = io.peer_addr().ok(); Ok((io, Protocol::Http1, peer_addr)) }) .and_then(self) } } /// Configuration options used when accepting TLS connection. #[cfg(any(feature = "openssl", feature = "rustls"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "openssl", feature = "rustls"))))] #[derive(Debug, Default)] pub struct TlsAcceptorConfig { pub(crate) handshake_timeout: Option, } #[cfg(any(feature = "openssl", feature = "rustls"))] impl TlsAcceptorConfig { /// Set TLS handshake timeout duration. pub fn handshake_timeout(self, dur: std::time::Duration) -> Self { Self { handshake_timeout: Some(dur), // ..self } } } #[cfg(feature = "openssl")] mod openssl { use actix_service::ServiceFactoryExt as _; use actix_tls::accept::{ openssl::{ reexports::{Error as SslError, SslAcceptor}, Acceptor, TlsStream, }, TlsError, }; use super::*; impl HttpService, S, B, X, U> where S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { /// Create OpenSSL based service. #[cfg_attr(docsrs, doc(cfg(feature = "openssl")))] pub fn openssl( self, acceptor: SslAcceptor, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { self.openssl_with_config(acceptor, TlsAcceptorConfig::default()) } /// Create OpenSSL based service with custom TLS acceptor configuration. #[cfg_attr(docsrs, doc(cfg(feature = "openssl")))] pub fn openssl_with_config( self, acceptor: SslAcceptor, tls_acceptor_config: TlsAcceptorConfig, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { let mut acceptor = Acceptor::new(acceptor); if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout { acceptor.set_handshake_timeout(handshake_timeout); } acceptor .map_init_err(|_| { unreachable!("TLS acceptor service factory does not error on init") }) .map_err(TlsError::into_service_error) .map(|io: TlsStream| { let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 } else { Protocol::Http1 } } else { Protocol::Http1 }; let peer_addr = io.get_ref().peer_addr().ok(); (io, proto, peer_addr) }) .and_then(self.map_err(TlsError::Service)) } } } #[cfg(feature = "rustls")] mod rustls { use std::io; use actix_service::ServiceFactoryExt as _; use actix_tls::accept::{ rustls::{reexports::ServerConfig, Acceptor, TlsStream}, TlsError, }; use super::*; impl HttpService, S, B, X, U> where S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { /// Create Rustls based service. #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] pub fn rustls( self, config: ServerConfig, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { self.rustls_with_config(config, TlsAcceptorConfig::default()) } /// Create Rustls based service with custom TLS acceptor configuration. #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] pub fn rustls_with_config( self, mut config: ServerConfig, tls_acceptor_config: TlsAcceptorConfig, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; protos.extend_from_slice(&config.alpn_protocols); config.alpn_protocols = protos; let mut acceptor = Acceptor::new(config); if let Some(handshake_timeout) = tls_acceptor_config.handshake_timeout { acceptor.set_handshake_timeout(handshake_timeout); } acceptor .map_init_err(|_| { unreachable!("TLS acceptor service factory does not error on init") }) .map_err(TlsError::into_service_error) .and_then(|io: TlsStream| async { let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 } else { Protocol::Http1 } } else { Protocol::Http1 }; let peer_addr = io.get_ref().0.peer_addr().ok(); Ok((io, proto, peer_addr)) }) .and_then(self.map_err(TlsError::Service)) } } } impl ServiceFactory<(T, Protocol, Option)> for HttpService where T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { type Response = (); type Error = DispatchError; type Config = (); type Service = HttpServiceHandler; type InitError = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { let service = self.srv.new_service(()); let expect = self.expect.new_service(()); let upgrade = self.upgrade.as_ref().map(|s| s.new_service(())); let on_connect_ext = self.on_connect_ext.clone(); let cfg = self.cfg.clone(); Box::pin(async move { let expect = expect .await .map_err(|e| error!("Init http expect service error: {:?}", e))?; let upgrade = match upgrade { Some(upgrade) => { let upgrade = upgrade .await .map_err(|e| error!("Init http upgrade service error: {:?}", e))?; Some(upgrade) } None => None, }; let service = service .await .map_err(|e| error!("Init http service error: {:?}", e))?; Ok(HttpServiceHandler::new( cfg, service, expect, upgrade, on_connect_ext, )) }) } } /// `Service` implementation for HTTP/1 and HTTP/2 transport pub struct HttpServiceHandler where S: Service, X: Service, U: Service<(Request, Framed)>, { pub(super) flow: Rc>, pub(super) cfg: ServiceConfig, pub(super) on_connect_ext: Option>>, _phantom: PhantomData, } impl HttpServiceHandler where S: Service, S::Error: Into>, X: Service, X::Error: Into>, U: Service<(Request, Framed)>, U::Error: Into>, { pub(super) fn new( cfg: ServiceConfig, service: S, expect: X, upgrade: Option, on_connect_ext: Option>>, ) -> HttpServiceHandler { HttpServiceHandler { cfg, on_connect_ext, flow: HttpFlow::new(service, expect, upgrade), _phantom: PhantomData, } } pub(super) fn _poll_ready( &self, cx: &mut Context<'_>, ) -> Poll>> { ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?; ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?; if let Some(ref upg) = self.flow.upgrade { ready!(upg.poll_ready(cx).map_err(Into::into))?; }; Poll::Ready(Ok(())) } } /// A collection of services that describe an HTTP request flow. pub(super) struct HttpFlow { pub(super) service: S, pub(super) expect: X, pub(super) upgrade: Option, } impl HttpFlow { pub(super) fn new(service: S, expect: X, upgrade: Option) -> Rc { Rc::new(Self { service, expect, upgrade, }) } } impl Service<(T, Protocol, Option)> for HttpServiceHandler where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into> + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display + Into>, { type Response = (); type Error = DispatchError; type Future = HttpServiceHandlerResponse; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self._poll_ready(cx).map_err(|err| { error!("HTTP service readiness error: {:?}", err); DispatchError::Service(err) }) } fn call( &self, (io, proto, peer_addr): (T, Protocol, Option), ) -> Self::Future { let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); match proto { #[cfg(feature = "http2")] Protocol::Http2 => HttpServiceHandlerResponse { state: State::H2Handshake { handshake: Some(( crate::h2::handshake_with_timeout(io, &self.cfg), self.cfg.clone(), self.flow.clone(), conn_data, peer_addr, )), }, }, #[cfg(not(feature = "http2"))] Protocol::Http2 => { panic!("HTTP/2 support is disabled (enable with the `http2` feature flag)") } Protocol::Http1 => HttpServiceHandlerResponse { state: State::H1 { dispatcher: h1::Dispatcher::new( io, self.flow.clone(), self.cfg.clone(), peer_addr, conn_data, ), }, }, proto => unimplemented!("Unsupported HTTP version: {:?}.", proto), } } } #[cfg(not(feature = "http2"))] pin_project! { #[project = StateProj] enum State where T: AsyncRead, T: AsyncWrite, T: Unpin, S: Service, S::Future: 'static, S::Error: Into>, B: MessageBody, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { H1 { #[pin] dispatcher: h1::Dispatcher }, } } #[cfg(feature = "http2")] pin_project! { #[project = StateProj] enum State where T: AsyncRead, T: AsyncWrite, T: Unpin, S: Service, S::Future: 'static, S::Error: Into>, B: MessageBody, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { H1 { #[pin] dispatcher: h1::Dispatcher }, H2 { #[pin] dispatcher: crate::h2::Dispatcher }, H2Handshake { handshake: Option<( crate::h2::HandshakeWithTimeout, ServiceConfig, Rc>, OnConnectData, Option, )>, }, } } pin_project! { pub struct HttpServiceHandlerResponse where T: AsyncRead, T: AsyncWrite, T: Unpin, S: Service, S::Error: Into>, S::Error: 'static, S::Future: 'static, S::Response: Into>, S::Response: 'static, B: MessageBody, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { #[pin] state: State, } } impl Future for HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into> + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { type Output = Result<(), DispatchError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().project().state.project() { StateProj::H1 { dispatcher } => dispatcher.poll(cx), #[cfg(feature = "http2")] StateProj::H2 { dispatcher } => dispatcher.poll(cx), #[cfg(feature = "http2")] StateProj::H2Handshake { handshake: data } => { match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { Ok((conn, timer)) => { let (_, config, flow, conn_data, peer_addr) = data.take().unwrap(); self.as_mut().project().state.set(State::H2 { dispatcher: crate::h2::Dispatcher::new( conn, flow, config, peer_addr, conn_data, timer, ), }); self.poll(cx) } Err(err) => { tracing::trace!("H2 handshake error: {}", err); Poll::Ready(Err(err)) } } } } } }