use std::marker::PhantomData; use std::rc::Rc; use std::task::{Context, Poll}; use std::{fmt, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{ fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _, }; use actix_utils::future::ready; use futures_core::future::LocalBoxFuture; use crate::body::MessageBody; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; use crate::service::HttpServiceHandler; use crate::{ConnectCallback, OnConnectData}; use super::codec::Codec; use super::dispatcher::Dispatcher; use super::{ExpectHandler, UpgradeHandler}; /// `ServiceFactory` implementation for HTTP1 transport pub struct H1Service { srv: S, cfg: ServiceConfig, expect: X, upgrade: Option, on_connect_ext: Option>>, _phantom: PhantomData, } impl H1Service where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, { /// Create new `HttpService` instance with config. pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { H1Service { cfg, srv: service.into_factory(), expect: ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } } impl H1Service where S: ServiceFactory, S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, B::Error: Into, X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { /// Create simple tcp stream service pub fn tcp( self, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = DispatchError, InitError = (), > { fn_service(|io: TcpStream| { let peer_addr = io.peer_addr().ok(); ready(Ok((io, peer_addr))) }) .and_then(self) } } #[cfg(feature = "openssl")] mod openssl { use super::*; use actix_service::ServiceFactoryExt; use actix_tls::accept::{ openssl::{Acceptor, SslAcceptor, SslError, TlsStream}, TlsError, }; impl H1Service, S, B, X, U> where S: ServiceFactory, S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, B::Error: Into, X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, Codec>), Config = (), Response = (), >, U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { /// Create openssl based service pub fn openssl( self, acceptor: SslAcceptor, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { Acceptor::new(acceptor) .map_err(TlsError::Tls) .map_init_err(|_| panic!()) .and_then(|io: TlsStream| { let peer_addr = io.get_ref().peer_addr().ok(); ready(Ok((io, peer_addr))) }) .and_then(self.map_err(TlsError::Service)) } } } #[cfg(feature = "rustls")] mod rustls { use super::*; use std::io; use actix_service::ServiceFactoryExt; use actix_tls::accept::{ rustls::{Acceptor, ServerConfig, TlsStream}, TlsError, }; impl H1Service, S, B, X, U> where S: ServiceFactory, S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, B::Error: Into, X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, Codec>), Config = (), Response = (), >, U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { /// Create rustls based service pub fn rustls( self, config: ServerConfig, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { Acceptor::new(config) .map_err(TlsError::Tls) .map_init_err(|_| panic!()) .and_then(|io: TlsStream| { let peer_addr = io.get_ref().0.peer_addr().ok(); ready(Ok((io, peer_addr))) }) .and_then(self.map_err(TlsError::Service)) } } } impl H1Service where S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, { pub fn expect(self, expect: X1) -> H1Service where X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, { H1Service { expect, cfg: self.cfg, srv: self.srv, upgrade: self.upgrade, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } pub fn upgrade(self, upgrade: Option) -> H1Service where U1: ServiceFactory<(Request, Framed), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, { H1Service { upgrade, cfg: self.cfg, srv: self.srv, expect: self.expect, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Set on connect callback. pub(crate) fn on_connect_ext(mut self, f: Option>>) -> Self { self.on_connect_ext = f; self } } impl ServiceFactory<(T, Option)> for H1Service where T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, S::Future: 'static, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, B::Error: Into, X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { type Response = (); type Error = DispatchError; type Config = (); type Service = H1ServiceHandler; type InitError = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { let service = self.srv.new_service(()); let expect = self.expect.new_service(()); let upgrade = self.upgrade.as_ref().map(|s| s.new_service(())); let on_connect_ext = self.on_connect_ext.clone(); let cfg = self.cfg.clone(); Box::pin(async move { let expect = expect .await .map_err(|e| log::error!("Init http expect service error: {:?}", e))?; let upgrade = match upgrade { Some(upgrade) => { let upgrade = upgrade.await.map_err(|e| { log::error!("Init http upgrade service error: {:?}", e) })?; Some(upgrade) } None => None, }; let service = service .await .map_err(|e| log::error!("Init http service error: {:?}", e))?; Ok(H1ServiceHandler::new( cfg, service, expect, upgrade, on_connect_ext, )) }) } } /// `Service` implementation for HTTP/1 transport pub type H1ServiceHandler = HttpServiceHandler; impl Service<(T, Option)> for HttpServiceHandler where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, S::Response: Into>, B: MessageBody, B::Error: Into, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display + Into, { type Response = (); type Error = DispatchError; type Future = Dispatcher; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self._poll_ready(cx).map_err(|e| { log::error!("HTTP/1 service readiness error: {:?}", e); DispatchError::Service(e) }) } fn call(&self, (io, addr): (T, Option)) -> Self::Future { let on_connect_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); Dispatcher::new( io, self.cfg.clone(), self.flow.clone(), on_connect_data, addr, ) } }