use std::{ fmt, future::Future, marker::PhantomData, net, pin::Pin, rc::Rc, task::{Context, Poll}, }; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use bytes::Bytes; use futures_core::ready; use h2::server::{handshake, Handshake}; use pin_project::pin_project; use crate::body::MessageBody; use crate::builder::HttpServiceBuilder; use crate::config::{KeepAlive, ServiceConfig}; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; use crate::{h1, h2::Dispatcher, ConnectCallback, OnConnectData, Protocol}; /// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol. pub struct HttpService { srv: S, cfg: ServiceConfig, expect: X, upgrade: Option, on_connect_ext: Option>>, _phantom: PhantomData, } impl HttpService where S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, { /// Create builder for `HttpService` instance. pub fn build() -> HttpServiceBuilder { HttpServiceBuilder::new() } } impl HttpService where S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, { /// Create new `HttpService` instance. pub fn new>(service: F) -> Self { let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None); HttpService { cfg, srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } /// Create new `HttpService` instance with config. pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { HttpService { cfg, srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } } impl HttpService where S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody, { /// Provide service for `EXPECT: 100-Continue` support. /// /// Service get called with request that contains `EXPECT` header. /// Service must return request in case of success, in that case /// request will be forwarded to main service. pub fn expect(self, expect: X1) -> HttpService where X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, >::Future: 'static, { HttpService { expect, cfg: self.cfg, srv: self.srv, upgrade: self.upgrade, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Provide service for custom `Connection: UPGRADE` support. /// /// If service is provided then normal requests handling get halted /// and this service get called with original request and framed object. pub fn upgrade(self, upgrade: Option) -> HttpService where U1: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, )>>::Future: 'static, { HttpService { upgrade, cfg: self.cfg, srv: self.srv, expect: self.expect, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Set connect callback with mutable access to request data container. pub(crate) fn on_connect_ext(mut self, f: Option>>) -> Self { self.on_connect_ext = f; self } } impl HttpService where S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, >::Future: 'static, U: ServiceFactory< (Request, Framed), Config = (), Response = (), >, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, )>>::Future: 'static, { /// Create simple tcp stream service pub fn tcp( self, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = DispatchError, InitError = (), > { pipeline_factory(|io: TcpStream| async { let peer_addr = io.peer_addr().ok(); Ok((io, Protocol::Http1, peer_addr)) }) .and_then(self) } } #[cfg(feature = "openssl")] mod openssl { use super::*; use actix_service::ServiceFactoryExt; use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, TlsStream}; use actix_tls::accept::TlsError; impl HttpService, S, B, X, U> where S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, >::Future: 'static, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, , h1::Codec>)>>::Future: 'static, { /// Create openssl based service pub fn openssl( self, acceptor: SslAcceptor, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { pipeline_factory( Acceptor::new(acceptor) .map_err(TlsError::Tls) .map_init_err(|_| panic!()), ) .and_then(|io: TlsStream| async { let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 } else { Protocol::Http1 } } else { Protocol::Http1 }; let peer_addr = io.get_ref().peer_addr().ok(); Ok((io, proto, peer_addr)) }) .and_then(self.map_err(TlsError::Service)) } } } #[cfg(feature = "rustls")] mod rustls { use std::io; use actix_tls::accept::rustls::{Acceptor, ServerConfig, Session, TlsStream}; use actix_tls::accept::TlsError; use super::*; use actix_service::ServiceFactoryExt; impl HttpService, S, B, X, U> where S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, >::Future: 'static, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, , h1::Codec>)>>::Future: 'static, { /// Create openssl based service pub fn rustls( self, mut config: ServerConfig, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { let protos = vec!["h2".to_string().into(), "http/1.1".to_string().into()]; config.set_protocols(&protos); pipeline_factory( Acceptor::new(config) .map_err(TlsError::Tls) .map_init_err(|_| panic!()), ) .and_then(|io: TlsStream| async { let proto = if let Some(protos) = io.get_ref().1.get_alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 } else { Protocol::Http1 } } else { Protocol::Http1 }; let peer_addr = io.get_ref().0.peer_addr().ok(); Ok((io, proto, peer_addr)) }) .and_then(self.map_err(TlsError::Service)) } } } impl ServiceFactory<(T, Protocol, Option)> for HttpService where T: AsyncRead + AsyncWrite + Unpin, S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, >::Future: 'static, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, )>>::Future: 'static, { type Response = (); type Error = DispatchError; type Config = (); type Service = HttpServiceHandler; type InitError = (); type Future = HttpServiceResponse; fn new_service(&self, _: ()) -> Self::Future { HttpServiceResponse { fut: self.srv.new_service(()), fut_ex: Some(self.expect.new_service(())), fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())), expect: None, upgrade: None, on_connect_ext: self.on_connect_ext.clone(), cfg: self.cfg.clone(), _phantom: PhantomData, } } } #[doc(hidden)] #[pin_project] pub struct HttpServiceResponse where S: ServiceFactory, X: ServiceFactory, U: ServiceFactory<(Request, Framed)>, { #[pin] fut: S::Future, #[pin] fut_ex: Option, #[pin] fut_upg: Option, expect: Option, upgrade: Option, on_connect_ext: Option>>, cfg: ServiceConfig, _phantom: PhantomData, } impl Future for HttpServiceResponse where T: AsyncRead + AsyncWrite + Unpin, S: ServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, >::Future: 'static, U: ServiceFactory<(Request, Framed), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, )>>::Future: 'static, { type Output = Result, ()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); if let Some(fut) = this.fut_ex.as_pin_mut() { let expect = ready!(fut .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e)))?; this = self.as_mut().project(); *this.expect = Some(expect); this.fut_ex.set(None); } if let Some(fut) = this.fut_upg.as_pin_mut() { let upgrade = ready!(fut .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e)))?; this = self.as_mut().project(); *this.upgrade = Some(upgrade); this.fut_upg.set(None); } let result = ready!(this .fut .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e))); Poll::Ready(result.map(|service| { let this = self.as_mut().project(); HttpServiceHandler::new( this.cfg.clone(), service, this.expect.take().unwrap(), this.upgrade.take(), this.on_connect_ext.clone(), ) })) } } /// `Service` implementation for HTTP transport pub struct HttpServiceHandler where S: Service, X: Service, U: Service<(Request, Framed)>, { flow: Rc>, cfg: ServiceConfig, on_connect_ext: Option>>, _phantom: PhantomData, } /// A collection of services that describe an HTTP request flow. pub(super) struct HttpFlow { pub(super) service: S, pub(super) expect: X, pub(super) upgrade: Option, } impl HttpFlow { pub(super) fn new(service: S, expect: X, upgrade: Option) -> Rc { Rc::new(Self { service, expect, upgrade, }) } } impl HttpServiceHandler where S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { fn new( cfg: ServiceConfig, service: S, expect: X, upgrade: Option, on_connect_ext: Option>>, ) -> HttpServiceHandler { HttpServiceHandler { cfg, on_connect_ext, flow: HttpFlow::new(service, expect, upgrade), _phantom: PhantomData, } } } impl Service<(T, Protocol, Option)> for HttpServiceHandler where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display + Into, { type Response = (); type Error = DispatchError; type Future = HttpServiceHandlerResponse; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { let ready = self .flow .expect .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); DispatchError::Service(e) })? .is_ready(); let ready = self .flow .service .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); DispatchError::Service(e) })? .is_ready() && ready; let ready = if let Some(ref upg) = self.flow.upgrade { upg.poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); DispatchError::Service(e) })? .is_ready() && ready } else { ready }; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } fn call( &self, (io, proto, peer_addr): (T, Protocol, Option), ) -> Self::Future { let on_connect_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); match proto { Protocol::Http2 => HttpServiceHandlerResponse { state: State::H2Handshake(Some(( handshake(io), self.cfg.clone(), self.flow.clone(), on_connect_data, peer_addr, ))), }, Protocol::Http1 => HttpServiceHandlerResponse { state: State::H1(h1::Dispatcher::new( io, self.cfg.clone(), self.flow.clone(), on_connect_data, peer_addr, )), }, proto => unimplemented!("Unsupported HTTP version: {:?}.", proto), } } } #[pin_project(project = StateProj)] enum State where S: Service, S::Future: 'static, S::Error: Into, T: AsyncRead + AsyncWrite + Unpin, B: MessageBody, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { H1(#[pin] h1::Dispatcher), H2(#[pin] Dispatcher), H2Handshake( Option<( Handshake, ServiceConfig, Rc>, OnConnectData, Option, )>, ), } #[pin_project] pub struct HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { #[pin] state: State, } impl Future for HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, B: MessageBody, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { type Output = Result<(), DispatchError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().project().state.project() { StateProj::H1(disp) => disp.poll(cx), StateProj::H2(disp) => disp.poll(cx), StateProj::H2Handshake(data) => { match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { Ok(conn) => { let (_, cfg, srv, on_connect_data, peer_addr) = data.take().unwrap(); self.as_mut().project().state.set(State::H2(Dispatcher::new( srv, conn, on_connect_data, cfg, peer_addr, ))); self.poll(cx) } Err(err) => { trace!("H2 handshake error: {}", err); Poll::Ready(Err(err.into())) } } } } } }