use std::fmt; use std::marker::PhantomData; use std::time::Duration; use actix_codec::{AsyncRead, AsyncWrite}; use actix_connect::{ default_connector, Connect as TcpConnect, Connection as TcpConnection, }; use actix_service::{apply_fn, Service, ServiceExt}; use actix_utils::timeout::{TimeoutError, TimeoutService}; use http::Uri; use tokio_tcp::TcpStream; use super::connection::Connection; use super::error::ConnectError; use super::pool::{ConnectionPool, Protocol}; use super::Connect; #[cfg(feature = "ssl")] use openssl::ssl::SslConnector; #[cfg(not(feature = "ssl"))] type SslConnector = (); /// Manages http client network connectivity /// The `Connector` type uses a builder-like combinator pattern for service /// construction that finishes by calling the `.finish()` method. /// /// ```rust,ignore /// use std::time::Duration; /// use actix_http::client::Connector; /// /// let connector = Connector::new() /// .timeout(Duration::from_secs(5)) /// .finish(); /// ``` pub struct Connector { connector: T, timeout: Duration, conn_lifetime: Duration, conn_keep_alive: Duration, disconnect_timeout: Duration, limit: usize, #[allow(dead_code)] ssl: SslConnector, _t: PhantomData, } impl Connector<(), ()> { pub fn new() -> Connector< impl Service< Request = TcpConnect, Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, TcpStream, > { let ssl = { #[cfg(feature = "ssl")] { use openssl::ssl::SslMethod; let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); let _ = ssl .set_alpn_protos(b"\x02h2\x08http/1.1") .map_err(|e| error!("Can not set alpn protocol: {:?}", e)); ssl.build() } #[cfg(not(feature = "ssl"))] {} }; Connector { ssl, connector: default_connector(), timeout: Duration::from_secs(1), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), disconnect_timeout: Duration::from_millis(3000), limit: 100, _t: PhantomData, } } } impl Connector { /// Use custom connector. pub fn connector(self, connector: T1) -> Connector where U1: AsyncRead + AsyncWrite + fmt::Debug, T1: Service< Request = TcpConnect, Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, { Connector { connector, timeout: self.timeout, conn_lifetime: self.conn_lifetime, conn_keep_alive: self.conn_keep_alive, disconnect_timeout: self.disconnect_timeout, limit: self.limit, ssl: self.ssl, _t: PhantomData, } } } impl Connector where U: AsyncRead + AsyncWrite + fmt::Debug + 'static, T: Service< Request = TcpConnect, Response = TcpConnection, Error = actix_connect::ConnectError, > + Clone, { /// Connection timeout, i.e. max time to connect to remote host including dns name resolution. /// Set to 1 second by default. pub fn timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } #[cfg(feature = "ssl")] /// Use custom `SslConnector` instance. pub fn ssl(mut self, connector: SslConnector) -> Self { self.ssl = connector; self } /// Set total number of simultaneous connections per type of scheme. /// /// If limit is 0, the connector has no limit. /// The default limit size is 100. pub fn limit(mut self, limit: usize) -> Self { self.limit = limit; self } /// Set keep-alive period for opened connection. /// /// Keep-alive period is the period between connection usage. If /// the delay between repeated usages of the same connection /// exceeds this period, the connection is closed. /// Default keep-alive period is 15 seconds. pub fn conn_keep_alive(mut self, dur: Duration) -> Self { self.conn_keep_alive = dur; self } /// Set max lifetime period for connection. /// /// Connection lifetime is max lifetime of any opened connection /// until it is closed regardless of keep-alive period. /// Default lifetime period is 75 seconds. pub fn conn_lifetime(mut self, dur: Duration) -> Self { self.conn_lifetime = dur; self } /// Set server connection disconnect timeout in milliseconds. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the socket get dropped. This timeout affects only secure connections. /// /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3000 milliseconds. pub fn disconnect_timeout(mut self, dur: Duration) -> Self { self.disconnect_timeout = dur; self } /// Finish configuration process and create connector service. /// The Connector builder always concludes by calling `finish()` last in /// its combinator chain. pub fn finish( self, ) -> impl Service + Clone { #[cfg(not(feature = "ssl"))] { let connector = TimeoutService::new( self.timeout, apply_fn(self.connector, |msg: Connect, srv| { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) }) .map_err(ConnectError::from) .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, TimeoutError::Timeout => ConnectError::Timeout, }); connect_impl::InnerConnector { tcp_pool: ConnectionPool::new( connector, self.conn_lifetime, self.conn_keep_alive, None, self.limit, ), } } #[cfg(feature = "ssl")] { const H2: &[u8] = b"h2"; use actix_connect::ssl::OpensslConnector; let ssl_service = TimeoutService::new( self.timeout, apply_fn(self.connector.clone(), |msg: Connect, srv| { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) }) .map_err(ConnectError::from) .and_then( OpensslConnector::service(self.ssl) .map_err(ConnectError::from) .map(|stream| { let sock = stream.into_parts().0; let h2 = sock .get_ref() .ssl() .selected_alpn_protocol() .map(|protos| protos.windows(2).any(|w| w == H2)) .unwrap_or(false); if h2 { (sock, Protocol::Http2) } else { (sock, Protocol::Http1) } }), ), ) .map_err(|e| match e { TimeoutError::Service(e) => e, TimeoutError::Timeout => ConnectError::Timeout, }); let tcp_service = TimeoutService::new( self.timeout, apply_fn(self.connector.clone(), |msg: Connect, srv| { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) }) .map_err(ConnectError::from) .map(|stream| (stream.into_parts().0, Protocol::Http1)), ) .map_err(|e| match e { TimeoutError::Service(e) => e, TimeoutError::Timeout => ConnectError::Timeout, }); connect_impl::InnerConnector { tcp_pool: ConnectionPool::new( tcp_service, self.conn_lifetime, self.conn_keep_alive, None, self.limit, ), ssl_pool: ConnectionPool::new( ssl_service, self.conn_lifetime, self.conn_keep_alive, Some(self.disconnect_timeout), self.limit, ), } } } } #[cfg(not(feature = "ssl"))] mod connect_impl { use futures::future::{err, Either, FutureResult}; use futures::Poll; use super::*; use crate::client::connection::IoConnection; pub(crate) struct InnerConnector where Io: AsyncRead + AsyncWrite + 'static, T: Service, { pub(crate) tcp_pool: ConnectionPool, } impl Clone for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, T: Service + Clone, { fn clone(&self) -> Self { InnerConnector { tcp_pool: self.tcp_pool.clone(), } } } impl Service for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, T: Service, { type Request = Connect; type Response = IoConnection; type Error = ConnectError; type Future = Either< as Service>::Future, FutureResult, ConnectError>, >; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.tcp_pool.poll_ready() } fn call(&mut self, req: Uri) -> Self::Future { match req.scheme_str() { Some("https") | Some("wss") => { Either::B(err(ConnectError::SslIsNotSupported)) } _ => Either::A(self.tcp_pool.call(req)), } } } } #[cfg(feature = "ssl")] mod connect_impl { use std::marker::PhantomData; use futures::future::{Either, FutureResult}; use futures::{Async, Future, Poll}; use super::*; use crate::client::connection::EitherConnection; pub(crate) struct InnerConnector where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, T1: Service, T2: Service, { pub(crate) tcp_pool: ConnectionPool, pub(crate) ssl_pool: ConnectionPool, } impl Clone for InnerConnector where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, T1: Service + Clone, T2: Service + Clone, { fn clone(&self) -> Self { InnerConnector { tcp_pool: self.tcp_pool.clone(), ssl_pool: self.ssl_pool.clone(), } } } impl Service for InnerConnector where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, T1: Service, T2: Service, { type Request = Connect; type Response = EitherConnection; type Error = ConnectError; type Future = Either< FutureResult, Either< InnerConnectorResponseA, InnerConnectorResponseB, >, >; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.tcp_pool.poll_ready() } fn call(&mut self, req: Connect) -> Self::Future { match req.uri.scheme_str() { Some("https") | Some("wss") => { Either::B(Either::B(InnerConnectorResponseB { fut: self.ssl_pool.call(req), _t: PhantomData, })) } _ => Either::B(Either::A(InnerConnectorResponseA { fut: self.tcp_pool.call(req), _t: PhantomData, })), } } } pub(crate) struct InnerConnectorResponseA where Io1: AsyncRead + AsyncWrite + 'static, T: Service, { fut: as Service>::Future, _t: PhantomData, } impl Future for InnerConnectorResponseA where T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { type Item = EitherConnection; type Error = ConnectError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(res) => Ok(Async::Ready(EitherConnection::A(res))), } } } pub(crate) struct InnerConnectorResponseB where Io2: AsyncRead + AsyncWrite + 'static, T: Service, { fut: as Service>::Future, _t: PhantomData, } impl Future for InnerConnectorResponseB where T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { type Item = EitherConnection; type Error = ConnectError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(res) => Ok(Async::Ready(EitherConnection::B(res))), } } } }