use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use std::{fmt, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use futures_core::ready; use futures_util::future::ready; use crate::body::MessageBody; use crate::cloneable::CloneableService; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; use crate::{ConnectCallback, Extensions}; use super::codec::Codec; use super::dispatcher::Dispatcher; use super::{ExpectHandler, UpgradeHandler}; /// `ServiceFactory` implementation for HTTP1 transport pub struct H1Service { srv: S, cfg: ServiceConfig, expect: X, upgrade: Option, on_connect_ext: Option>>, _phantom: PhantomData, } impl H1Service where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, { /// Create new `HttpService` instance with config. pub(crate) fn with_config>( cfg: ServiceConfig, service: F, ) -> Self { H1Service { cfg, srv: service.into_factory(), expect: ExpectHandler, upgrade: None, on_connect_ext: None, _phantom: PhantomData, } } } impl H1Service where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { /// Create simple tcp stream service pub fn tcp( self, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = DispatchError, InitError = (), > { pipeline_factory(|io: TcpStream| { let peer_addr = io.peer_addr().ok(); ready(Ok((io, peer_addr))) }) .and_then(self) } } #[cfg(feature = "openssl")] mod openssl { use super::*; use actix_service::ServiceFactoryExt; use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, SslStream}; use actix_tls::accept::TlsError; impl H1Service, S, B, X, U> where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, Codec>), Config = (), Response = (), >, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { /// Create openssl based service pub fn openssl( self, acceptor: SslAcceptor, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { pipeline_factory( Acceptor::new(acceptor) .map_err(TlsError::Tls) .map_init_err(|_| panic!()), ) .and_then(|io: SslStream| { let peer_addr = io.get_ref().peer_addr().ok(); ready(Ok((io, peer_addr))) }) .and_then(self.map_err(TlsError::Service)) } } } #[cfg(feature = "rustls")] mod rustls { use super::*; use actix_service::ServiceFactoryExt; use actix_tls::accept::rustls::{Acceptor, ServerConfig, TlsStream}; use actix_tls::accept::TlsError; use std::{fmt, io}; impl H1Service, S, B, X, U> where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< (Request, Framed, Codec>), Config = (), Response = (), >, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { /// Create rustls based service pub fn rustls( self, config: ServerConfig, ) -> impl ServiceFactory< TcpStream, Config = (), Response = (), Error = TlsError, InitError = (), > { pipeline_factory( Acceptor::new(config) .map_err(TlsError::Tls) .map_init_err(|_| panic!()), ) .and_then(|io: TlsStream| { let peer_addr = io.get_ref().0.peer_addr().ok(); ready(Ok((io, peer_addr))) }) .and_then(self.map_err(TlsError::Service)) } } } impl H1Service where S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, { pub fn expect(self, expect: X1) -> H1Service where X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, { H1Service { expect, cfg: self.cfg, srv: self.srv, upgrade: self.upgrade, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } pub fn upgrade(self, upgrade: Option) -> H1Service where U1: ServiceFactory<(Request, Framed), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, { H1Service { upgrade, cfg: self.cfg, srv: self.srv, expect: self.expect, on_connect_ext: self.on_connect_ext, _phantom: PhantomData, } } /// Set on connect callback. pub(crate) fn on_connect_ext(mut self, f: Option>>) -> Self { self.on_connect_ext = f; self } } impl ServiceFactory<(T, Option)> for H1Service where T: AsyncRead + AsyncWrite + Unpin, S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { type Response = (); type Error = DispatchError; type Config = (); type Service = H1ServiceHandler; type InitError = (); type Future = H1ServiceResponse; fn new_service(&self, _: ()) -> Self::Future { H1ServiceResponse { fut: self.srv.new_service(()), fut_ex: Some(self.expect.new_service(())), fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())), expect: None, upgrade: None, on_connect_ext: self.on_connect_ext.clone(), cfg: Some(self.cfg.clone()), _phantom: PhantomData, } } } #[doc(hidden)] #[pin_project::pin_project] pub struct H1ServiceResponse where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, { #[pin] fut: S::Future, #[pin] fut_ex: Option, #[pin] fut_upg: Option, expect: Option, upgrade: Option, on_connect_ext: Option>>, cfg: Option, _phantom: PhantomData<(T, B)>, } impl Future for H1ServiceResponse where T: AsyncRead + AsyncWrite + Unpin, S: ServiceFactory, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, { type Output = Result, ()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); if let Some(fut) = this.fut_ex.as_pin_mut() { let expect = ready!(fut .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e)))?; this = self.as_mut().project(); *this.expect = Some(expect); this.fut_ex.set(None); } if let Some(fut) = this.fut_upg.as_pin_mut() { let upgrade = ready!(fut .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e)))?; this = self.as_mut().project(); *this.upgrade = Some(upgrade); this.fut_ex.set(None); } let result = ready!(this .fut .poll(cx) .map_err(|e| log::error!("Init http service error: {:?}", e))); Poll::Ready(result.map(|service| { let this = self.as_mut().project(); H1ServiceHandler::new( this.cfg.take().unwrap(), service, this.expect.take().unwrap(), this.upgrade.take(), this.on_connect_ext.clone(), ) })) } } /// `Service` implementation for HTTP/1 transport pub struct H1ServiceHandler where S: Service, X: Service, U: Service<(Request, Framed)>, { srv: CloneableService, expect: CloneableService, upgrade: Option>, on_connect_ext: Option>>, cfg: ServiceConfig, _phantom: PhantomData, } impl H1ServiceHandler where S: Service, S::Error: Into, S::Response: Into>, B: MessageBody, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { fn new( cfg: ServiceConfig, srv: S, expect: X, upgrade: Option, on_connect_ext: Option>>, ) -> H1ServiceHandler { H1ServiceHandler { srv: CloneableService::new(srv), expect: CloneableService::new(expect), upgrade: upgrade.map(CloneableService::new), cfg, on_connect_ext, _phantom: PhantomData, } } } impl Service<(T, Option)> for H1ServiceHandler where T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into, S::Response: Into>, B: MessageBody, X: Service, X::Error: Into, U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display + Into, { type Response = (); type Error = DispatchError; type Future = Dispatcher; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let ready = self .expect .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); DispatchError::Service(e) })? .is_ready(); let ready = self .srv .poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); DispatchError::Service(e) })? .is_ready() && ready; let ready = if let Some(ref mut upg) = self.upgrade { upg.poll_ready(cx) .map_err(|e| { let e = e.into(); log::error!("Http service readiness error: {:?}", e); DispatchError::Service(e) })? .is_ready() && ready } else { ready }; if ready { Poll::Ready(Ok(())) } else { Poll::Pending } } fn call(&mut self, (io, addr): (T, Option)) -> Self::Future { let mut connect_extensions = Extensions::new(); if let Some(ref handler) = self.on_connect_ext { // run on_connect_ext callback, populating connect extensions handler(&io, &mut connect_extensions); } Dispatcher::new( io, self.cfg.clone(), self.srv.clone(), self.expect.clone(), self.upgrade.clone(), connect_extensions, addr, ) } }