use std::{ error::Error as StdError, fmt, future::Future, marker::PhantomData, net, pin::Pin, rc::Rc, task::{Context, Poll}, }; use ::h2::server::{handshake as h2_handshake, Handshake as H2Handshake}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{ fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _, }; use bytes::Bytes; use futures_core::{future::LocalBoxFuture, ready}; use pin_project::pin_project; use crate::{ body::{AnyBody, MessageBody}, builder::HttpServiceBuilder, config::{KeepAlive, ServiceConfig}, error::DispatchError, h1, h2, ConnectCallback, OnConnectData, Protocol, Request, Response, }; /// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol. pub struct HttpService { srv: S, cfg: ServiceConfig, expect: X, upgrade: Option, on_connect_ext: Option>>, _phantom: PhantomData, } impl HttpService where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, { /// Create builder for `HttpService` instance. pub fn build() -> HttpServiceBuilder { HttpServiceBuilder::new() } } impl HttpService where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, B::Error: Into>, { /// Create new `HttpService` instance. pub fn new>(service: F) -> Self { let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None); HttpService { cfg, srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } /// Create new `HttpService` instance with config. pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { HttpService { cfg, srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } } impl HttpService where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody, { /// Provide service for `EXPECT: 100-Continue` support. /// /// Service get called with request that contains `EXPECT` header. /// Service must return request in case of success, in that case /// request will be forwarded to main service. pub fn expect(self, expect: X1) -> HttpService where X1: ServiceFactory, X1::Error: Into>, X1::InitError: fmt::Debug, { HttpService { expect, cfg: self.cfg, srv: self.srv, upgrade: self.upgrade, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Provide service for custom `Connection: UPGRADE` support. /// /// If service is provided then normal requests handling get halted /// and this service get called with original request and framed object. pub fn upgrade(self, upgrade: Option) -> HttpService where U1: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, { HttpService { upgrade, cfg: self.cfg, srv: self.srv, expect: self.expect, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Set connect callback with mutable access to request data container. pub(crate) fn on_connect_ext(mut self, f: Option>>) -> Self { self.on_connect_ext = f; self } } impl HttpService where S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, B::Error: Into>, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed), Config = (), Response = (), >, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { /// Create simple tcp stream service pub fn tcp( self, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = DispatchError, InitError = (), > { fn_service(|io: TcpStream| async { let peer_addr = io.peer_addr().ok(); Ok((io, Protocol::Http1, peer_addr)) }) .and_then(self) } } #[cfg(feature = "openssl")] mod openssl { use actix_service::ServiceFactoryExt as _; use actix_tls::accept::{ openssl::{ reexports::{Error as SslError, SslAcceptor}, Acceptor, TlsStream, }, TlsError, }; use super::*; impl HttpService, S, B, X, U> where S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, B::Error: Into>, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { /// Create OpenSSL based service. pub fn openssl( self, acceptor: SslAcceptor, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { Acceptor::new(acceptor) .map_init_err(|_| { unreachable!("TLS acceptor service factory does not error on init") }) .map_err(TlsError::into_service_error) .map(|io: TlsStream| { let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 } else { Protocol::Http1 } } else { Protocol::Http1 }; let peer_addr = io.get_ref().peer_addr().ok(); (io, proto, peer_addr) }) .and_then(self.map_err(TlsError::Service)) } } } #[cfg(feature = "rustls")] mod rustls { use std::io; use actix_service::ServiceFactoryExt as _; use actix_tls::accept::{ rustls::{reexports::ServerConfig, Acceptor, TlsStream}, TlsError, }; use super::*; impl HttpService, S, B, X, U> where S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, B::Error: Into>, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { /// Create Rustls based service. pub fn rustls( self, mut config: ServerConfig, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { let mut protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; protos.extend_from_slice(&config.alpn_protocols); config.alpn_protocols = protos; Acceptor::new(config) .map_init_err(|_| { unreachable!("TLS acceptor service factory does not error on init") }) .map_err(TlsError::into_service_error) .and_then(|io: TlsStream| async { let proto = if let Some(protos) = io.get_ref().1.alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 } else { Protocol::Http1 } } else { Protocol::Http1 }; let peer_addr = io.get_ref().0.peer_addr().ok(); Ok((io, proto, peer_addr)) }) .and_then(self.map_err(TlsError::Service)) } } } impl ServiceFactory<(T, Protocol, Option)> for HttpService where T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, S::Future: 'static, S::Error: Into> + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, B::Error: Into>, X: ServiceFactory, X::Future: 'static, X::Error: Into>, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Future: 'static, U::Error: fmt::Display + Into>, U::InitError: fmt::Debug, { type Response = (); type Error = DispatchError; type Config = (); type Service = HttpServiceHandler; type InitError = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { let service = self.srv.new_service(()); let expect = self.expect.new_service(()); let upgrade = self.upgrade.as_ref().map(|s| s.new_service(())); let on_connect_ext = self.on_connect_ext.clone(); let cfg = self.cfg.clone(); Box::pin(async move { let expect = expect .await .map_err(|e| log::error!("Init http expect service error: {:?}", e))?; let upgrade = match upgrade { Some(upgrade) => { let upgrade = upgrade.await.map_err(|e| { log::error!("Init http upgrade service error: {:?}", e) })?; Some(upgrade) } None => None, }; let service = service .await .map_err(|e| log::error!("Init http service error: {:?}", e))?; Ok(HttpServiceHandler::new( cfg, service, expect, upgrade, on_connect_ext, )) }) } } /// `Service` implementation for HTTP/1 and HTTP/2 transport pub struct HttpServiceHandler where S: Service, X: Service, U: Service<(Request, Framed)>, { pub(super) flow: Rc>, pub(super) cfg: ServiceConfig, pub(super) on_connect_ext: Option>>, _phantom: PhantomData, } impl HttpServiceHandler where S: Service, S::Error: Into>, X: Service, X::Error: Into>, U: Service<(Request, Framed)>, U::Error: Into>, { pub(super) fn new( cfg: ServiceConfig, service: S, expect: X, upgrade: Option, on_connect_ext: Option>>, ) -> HttpServiceHandler { HttpServiceHandler { cfg, on_connect_ext, flow: HttpFlow::new(service, expect, upgrade), _phantom: PhantomData, } } pub(super) fn _poll_ready( &self, cx: &mut Context<'_>, ) -> Poll>> { ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?; ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?; if let Some(ref upg) = self.flow.upgrade { ready!(upg.poll_ready(cx).map_err(Into::into))?; }; Poll::Ready(Ok(())) } } /// A collection of services that describe an HTTP request flow. pub(super) struct HttpFlow { pub(super) service: S, pub(super) expect: X, pub(super) upgrade: Option, } impl HttpFlow { pub(super) fn new(service: S, expect: X, upgrade: Option) -> Rc { Rc::new(Self { service, expect, upgrade, }) } } impl Service<(T, Protocol, Option)> for HttpServiceHandler where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into> + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, B::Error: Into>, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display + Into>, { type Response = (); type Error = DispatchError; type Future = HttpServiceHandlerResponse; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self._poll_ready(cx).map_err(|e| { log::error!("HTTP service readiness error: {:?}", e); DispatchError::Service(e) }) } fn call( &self, (io, proto, peer_addr): (T, Protocol, Option), ) -> Self::Future { let on_connect_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); match proto { Protocol::Http2 => HttpServiceHandlerResponse { state: State::H2Handshake(Some(( h2_handshake(io), self.cfg.clone(), self.flow.clone(), on_connect_data, peer_addr, ))), }, Protocol::Http1 => HttpServiceHandlerResponse { state: State::H1(h1::Dispatcher::new( io, self.cfg.clone(), self.flow.clone(), on_connect_data, peer_addr, )), }, proto => unimplemented!("Unsupported HTTP version: {:?}.", proto), } } } #[pin_project(project = StateProj)] enum State where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Future: 'static, S::Error: Into>, B: MessageBody, B::Error: Into>, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { H1(#[pin] h1::Dispatcher), H2(#[pin] h2::Dispatcher), H2Handshake( Option<( H2Handshake, ServiceConfig, Rc>, OnConnectData, Option, )>, ), } #[pin_project] pub struct HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into> + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody, B::Error: Into>, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { #[pin] state: State, } impl Future for HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into> + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, B::Error: Into>, X: Service, X::Error: Into>, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { type Output = Result<(), DispatchError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().project().state.project() { StateProj::H1(disp) => disp.poll(cx), StateProj::H2(disp) => disp.poll(cx), StateProj::H2Handshake(data) => { match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { Ok(conn) => { let (_, cfg, srv, on_connect_data, peer_addr) = data.take().unwrap(); self.as_mut().project().state.set(State::H2( h2::Dispatcher::new( srv, conn, on_connect_data, cfg, peer_addr, ), )); self.poll(cx) } Err(err) => { trace!("H2 handshake error: {}", err); Poll::Ready(Err(err.into())) } } } } } }