|
|
|
@ -15,8 +15,8 @@ use actix_rt::{
|
|
|
|
|
};
|
|
|
|
|
use actix_service::Service;
|
|
|
|
|
use actix_tls::connect::{
|
|
|
|
|
new_connector, Connect as TcpConnect, ConnectError as TcpConnectError,
|
|
|
|
|
Connection as TcpConnection, Resolver,
|
|
|
|
|
ConnectError as TcpConnectError, ConnectInfo, Connection as TcpConnection,
|
|
|
|
|
Connector as TcpConnector, Resolver,
|
|
|
|
|
};
|
|
|
|
|
use futures_core::{future::LocalBoxFuture, ready};
|
|
|
|
|
use http::Uri;
|
|
|
|
@ -28,13 +28,15 @@ use super::error::ConnectError;
|
|
|
|
|
use super::pool::ConnectionPool;
|
|
|
|
|
use super::Connect;
|
|
|
|
|
|
|
|
|
|
enum SslConnector {
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
|
enum OurTlsConnector {
|
|
|
|
|
#[allow(dead_code)] // only dead when no TLS feature is enabled
|
|
|
|
|
None,
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "openssl")]
|
|
|
|
|
Openssl(actix_tls::connect::ssl::openssl::SslConnector),
|
|
|
|
|
Openssl(actix_tls::connect::openssl::reexports::SslConnector),
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "rustls")]
|
|
|
|
|
Rustls(std::sync::Arc<actix_tls::connect::ssl::rustls::ClientConfig>),
|
|
|
|
|
Rustls(std::sync::Arc<actix_tls::connect::rustls::reexports::ClientConfig>),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Manages HTTP client network connectivity.
|
|
|
|
@ -53,21 +55,22 @@ enum SslConnector {
|
|
|
|
|
pub struct Connector<T> {
|
|
|
|
|
connector: T,
|
|
|
|
|
config: ConnectorConfig,
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
|
ssl: SslConnector,
|
|
|
|
|
|
|
|
|
|
#[allow(dead_code)] // only dead when no TLS feature is enabled
|
|
|
|
|
ssl: OurTlsConnector,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Connector<()> {
|
|
|
|
|
#[allow(clippy::new_ret_no_self, clippy::let_unit_value)]
|
|
|
|
|
pub fn new() -> Connector<
|
|
|
|
|
impl Service<
|
|
|
|
|
TcpConnect<Uri>,
|
|
|
|
|
ConnectInfo<Uri>,
|
|
|
|
|
Response = TcpConnection<Uri, TcpStream>,
|
|
|
|
|
Error = actix_tls::connect::ConnectError,
|
|
|
|
|
> + Clone,
|
|
|
|
|
> {
|
|
|
|
|
Connector {
|
|
|
|
|
connector: new_connector(resolver::resolver()),
|
|
|
|
|
connector: TcpConnector::new(resolver::resolver()).service(),
|
|
|
|
|
config: ConnectorConfig::default(),
|
|
|
|
|
ssl: Self::build_ssl(vec![b"h2".to_vec(), b"http/1.1".to_vec()]),
|
|
|
|
|
}
|
|
|
|
@ -75,16 +78,16 @@ impl Connector<()> {
|
|
|
|
|
|
|
|
|
|
/// Provides an empty TLS connector when no TLS feature is enabled.
|
|
|
|
|
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
|
|
|
|
fn build_ssl(_: Vec<Vec<u8>>) -> SslConnector {
|
|
|
|
|
SslConnector::None
|
|
|
|
|
fn build_ssl(_: Vec<Vec<u8>>) -> OurTlsConnector {
|
|
|
|
|
OurTlsConnector::None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Build TLS connector with rustls, based on supplied ALPN protocols
|
|
|
|
|
///
|
|
|
|
|
/// Note that if both `openssl` and `rustls` features are enabled, rustls will be used.
|
|
|
|
|
#[cfg(feature = "rustls")]
|
|
|
|
|
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector {
|
|
|
|
|
use actix_tls::connect::tls::rustls::{webpki_roots_cert_store, ClientConfig};
|
|
|
|
|
fn build_ssl(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
|
|
|
|
|
use actix_tls::connect::rustls::{reexports::ClientConfig, webpki_roots_cert_store};
|
|
|
|
|
|
|
|
|
|
let mut config = ClientConfig::builder()
|
|
|
|
|
.with_safe_defaults()
|
|
|
|
@ -93,13 +96,13 @@ impl Connector<()> {
|
|
|
|
|
|
|
|
|
|
config.alpn_protocols = protocols;
|
|
|
|
|
|
|
|
|
|
SslConnector::Rustls(std::sync::Arc::new(config))
|
|
|
|
|
OurTlsConnector::Rustls(std::sync::Arc::new(config))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Build TLS connector with openssl, based on supplied ALPN protocols
|
|
|
|
|
#[cfg(all(feature = "openssl", not(feature = "rustls")))]
|
|
|
|
|
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector {
|
|
|
|
|
use actix_tls::connect::tls::openssl::{SslConnector as OpensslConnector, SslMethod};
|
|
|
|
|
fn build_ssl(protocols: Vec<Vec<u8>>) -> OurTlsConnector {
|
|
|
|
|
use actix_tls::connect::openssl::reexports::{SslConnector, SslMethod};
|
|
|
|
|
use bytes::{BufMut, BytesMut};
|
|
|
|
|
|
|
|
|
|
let mut alpn = BytesMut::with_capacity(20);
|
|
|
|
@ -108,12 +111,12 @@ impl Connector<()> {
|
|
|
|
|
alpn.put(proto.as_slice());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
|
|
|
|
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
|
|
|
|
|
if let Err(err) = ssl.set_alpn_protos(&alpn) {
|
|
|
|
|
log::error!("Can not set ALPN protocol: {:?}", err);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SslConnector::Openssl(ssl.build())
|
|
|
|
|
OurTlsConnector::Openssl(ssl.build())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -123,7 +126,7 @@ impl<S> Connector<S> {
|
|
|
|
|
where
|
|
|
|
|
Io1: ActixStream + fmt::Debug + 'static,
|
|
|
|
|
S1: Service<
|
|
|
|
|
TcpConnect<Uri>,
|
|
|
|
|
ConnectInfo<Uri>,
|
|
|
|
|
Response = TcpConnection<Uri, Io1>,
|
|
|
|
|
Error = TcpConnectError,
|
|
|
|
|
> + Clone,
|
|
|
|
@ -136,7 +139,7 @@ impl<S> Connector<S> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<S, Io> Connector<S>
|
|
|
|
|
impl<S, IO> Connector<S>
|
|
|
|
|
where
|
|
|
|
|
// Note:
|
|
|
|
|
// Input Io type is bound to ActixStream trait but internally in client module they
|
|
|
|
@ -145,8 +148,8 @@ where
|
|
|
|
|
//
|
|
|
|
|
// This remap is to hide ActixStream's trait methods. They are not meant to be called
|
|
|
|
|
// from user code.
|
|
|
|
|
Io: ActixStream + fmt::Debug + 'static,
|
|
|
|
|
S: Service<TcpConnect<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
|
|
|
|
|
IO: ActixStream + fmt::Debug + 'static,
|
|
|
|
|
S: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, IO>, Error = TcpConnectError>
|
|
|
|
|
+ Clone
|
|
|
|
|
+ 'static,
|
|
|
|
|
{
|
|
|
|
@ -166,18 +169,21 @@ where
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "openssl")]
|
|
|
|
|
/// Use custom `SslConnector` instance.
|
|
|
|
|
pub fn ssl(mut self, connector: actix_tls::connect::ssl::openssl::SslConnector) -> Self {
|
|
|
|
|
self.ssl = SslConnector::Openssl(connector);
|
|
|
|
|
pub fn ssl(
|
|
|
|
|
mut self,
|
|
|
|
|
connector: actix_tls::connect::openssl::reexports::SslConnector,
|
|
|
|
|
) -> Self {
|
|
|
|
|
self.ssl = OurTlsConnector::Openssl(connector);
|
|
|
|
|
self
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "rustls")]
|
|
|
|
|
/// Use custom `SslConnector` instance.
|
|
|
|
|
/// Use custom `ClientConfig` instance.
|
|
|
|
|
pub fn rustls(
|
|
|
|
|
mut self,
|
|
|
|
|
connector: std::sync::Arc<actix_tls::connect::ssl::rustls::ClientConfig>,
|
|
|
|
|
connector: std::sync::Arc<actix_tls::connect::rustls::reexports::ClientConfig>,
|
|
|
|
|
) -> Self {
|
|
|
|
|
self.ssl = SslConnector::Rustls(connector);
|
|
|
|
|
self.ssl = OurTlsConnector::Rustls(connector);
|
|
|
|
|
self
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -266,7 +272,7 @@ where
|
|
|
|
|
/// Finish configuration process and create connector service.
|
|
|
|
|
/// The Connector builder always concludes by calling `finish()` last in
|
|
|
|
|
/// its combinator chain.
|
|
|
|
|
pub fn finish(self) -> ConnectorService<S, Io> {
|
|
|
|
|
pub fn finish(self) -> ConnectorService<S, IO> {
|
|
|
|
|
let local_address = self.config.local_address;
|
|
|
|
|
let timeout = self.config.timeout;
|
|
|
|
|
|
|
|
|
@ -279,11 +285,12 @@ where
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let tls_service = match self.ssl {
|
|
|
|
|
SslConnector::None => {
|
|
|
|
|
OurTlsConnector::None => {
|
|
|
|
|
#[cfg(not(feature = "dangerous-h2c"))]
|
|
|
|
|
{
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "dangerous-h2c")]
|
|
|
|
|
{
|
|
|
|
|
use std::io;
|
|
|
|
@ -305,17 +312,17 @@ where
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct NoOpTlsConnectorService;
|
|
|
|
|
|
|
|
|
|
impl<T, U> Service<Connection<T, U>> for NoOpTlsConnectorService
|
|
|
|
|
impl<R, IO> Service<Connection<R, IO>> for NoOpTlsConnectorService
|
|
|
|
|
where
|
|
|
|
|
U: ActixStream + 'static,
|
|
|
|
|
IO: ActixStream + 'static,
|
|
|
|
|
{
|
|
|
|
|
type Response = Connection<T, Box<dyn ConnectionIo>>;
|
|
|
|
|
type Response = Connection<R, Box<dyn ConnectionIo>>;
|
|
|
|
|
type Error = io::Error;
|
|
|
|
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
|
|
|
|
|
|
|
|
|
actix_service::always_ready!();
|
|
|
|
|
|
|
|
|
|
fn call(&self, connection: Connection<T, U>) -> Self::Future {
|
|
|
|
|
fn call(&self, connection: Connection<R, IO>) -> Self::Future {
|
|
|
|
|
let (io, connection) = connection.replace_io(());
|
|
|
|
|
let (_, connection) = connection.replace_io(Box::new(io) as _);
|
|
|
|
|
|
|
|
|
@ -334,13 +341,14 @@ where
|
|
|
|
|
Some(actix_service::boxed::rc_service(tls_service))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "openssl")]
|
|
|
|
|
SslConnector::Openssl(tls) => {
|
|
|
|
|
OurTlsConnector::Openssl(tls) => {
|
|
|
|
|
const H2: &[u8] = b"h2";
|
|
|
|
|
|
|
|
|
|
use actix_tls::connect::ssl::openssl::{OpensslConnector, SslStream};
|
|
|
|
|
use actix_tls::connect::openssl::{reexports::AsyncSslStream, TlsConnector};
|
|
|
|
|
|
|
|
|
|
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, SslStream<Io>> {
|
|
|
|
|
impl<IO: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncSslStream<IO>> {
|
|
|
|
|
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
|
|
|
|
|
let sock = self.into_parts().0;
|
|
|
|
|
let h2 = sock
|
|
|
|
@ -359,19 +367,20 @@ where
|
|
|
|
|
|
|
|
|
|
let tls_service = TlsConnectorService {
|
|
|
|
|
tcp_service: tcp_service_inner,
|
|
|
|
|
tls_service: OpensslConnector::service(tls),
|
|
|
|
|
tls_service: TlsConnector::service(tls),
|
|
|
|
|
timeout: handshake_timeout,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Some(actix_service::boxed::rc_service(tls_service))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(feature = "rustls")]
|
|
|
|
|
SslConnector::Rustls(tls) => {
|
|
|
|
|
OurTlsConnector::Rustls(tls) => {
|
|
|
|
|
const H2: &[u8] = b"h2";
|
|
|
|
|
|
|
|
|
|
use actix_tls::connect::ssl::rustls::{RustlsConnector, TlsStream};
|
|
|
|
|
use actix_tls::connect::rustls::{reexports::AsyncTlsStream, TlsConnector};
|
|
|
|
|
|
|
|
|
|
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, TlsStream<Io>> {
|
|
|
|
|
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, AsyncTlsStream<Io>> {
|
|
|
|
|
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
|
|
|
|
|
let sock = self.into_parts().0;
|
|
|
|
|
let h2 = sock
|
|
|
|
@ -391,7 +400,7 @@ where
|
|
|
|
|
|
|
|
|
|
let tls_service = TlsConnectorService {
|
|
|
|
|
tcp_service: tcp_service_inner,
|
|
|
|
|
tls_service: RustlsConnector::service(tls),
|
|
|
|
|
tls_service: TlsConnector::service(tls),
|
|
|
|
|
timeout: handshake_timeout,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -460,26 +469,28 @@ where
|
|
|
|
|
|
|
|
|
|
/// service for establish tcp connection and do client tls handshake.
|
|
|
|
|
/// operation is canceled when timeout limit reached.
|
|
|
|
|
struct TlsConnectorService<S, St> {
|
|
|
|
|
/// tcp connection is canceled on `TcpConnectorInnerService`'s timeout setting.
|
|
|
|
|
tcp_service: S,
|
|
|
|
|
/// tls connection is canceled on `TlsConnectorService`'s timeout setting.
|
|
|
|
|
tls_service: St,
|
|
|
|
|
struct TlsConnectorService<Tcp, Tls> {
|
|
|
|
|
/// TCP connection is canceled on `TcpConnectorInnerService`'s timeout setting.
|
|
|
|
|
tcp_service: Tcp,
|
|
|
|
|
|
|
|
|
|
/// TLS connection is canceled on `TlsConnectorService`'s timeout setting.
|
|
|
|
|
tls_service: Tls,
|
|
|
|
|
|
|
|
|
|
timeout: Duration,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<S, St, Io> Service<Connect> for TlsConnectorService<S, St>
|
|
|
|
|
impl<Tcp, Tls, IO> Service<Connect> for TlsConnectorService<Tcp, Tls>
|
|
|
|
|
where
|
|
|
|
|
S: Service<Connect, Response = TcpConnection<Uri, Io>, Error = ConnectError>
|
|
|
|
|
Tcp: Service<Connect, Response = TcpConnection<Uri, IO>, Error = ConnectError>
|
|
|
|
|
+ Clone
|
|
|
|
|
+ 'static,
|
|
|
|
|
St: Service<TcpConnection<Uri, Io>, Error = std::io::Error> + Clone + 'static,
|
|
|
|
|
Io: ConnectionIo,
|
|
|
|
|
St::Response: IntoConnectionIo,
|
|
|
|
|
Tls: Service<TcpConnection<Uri, IO>, Error = std::io::Error> + Clone + 'static,
|
|
|
|
|
Tls::Response: IntoConnectionIo,
|
|
|
|
|
IO: ConnectionIo,
|
|
|
|
|
{
|
|
|
|
|
type Response = (Box<dyn ConnectionIo>, Protocol);
|
|
|
|
|
type Error = ConnectError;
|
|
|
|
|
type Future = TlsConnectorFuture<St, S::Future, St::Future>;
|
|
|
|
|
type Future = TlsConnectorFuture<Tls, Tcp::Future, Tls::Future>;
|
|
|
|
|
|
|
|
|
|
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
|
ready!(self.tcp_service.poll_ready(cx))?;
|
|
|
|
@ -579,7 +590,7 @@ impl<S: Clone> TcpConnectorInnerService<S> {
|
|
|
|
|
|
|
|
|
|
impl<S, Io> Service<Connect> for TcpConnectorInnerService<S>
|
|
|
|
|
where
|
|
|
|
|
S: Service<TcpConnect<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
|
|
|
|
|
S: Service<ConnectInfo<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
|
|
|
|
|
+ Clone
|
|
|
|
|
+ 'static,
|
|
|
|
|
{
|
|
|
|
@ -590,7 +601,7 @@ where
|
|
|
|
|
actix_service::forward_ready!(service);
|
|
|
|
|
|
|
|
|
|
fn call(&self, req: Connect) -> Self::Future {
|
|
|
|
|
let mut req = TcpConnect::new(req.uri).set_addr(req.addr);
|
|
|
|
|
let mut req = ConnectInfo::new(req.uri).set_addr(req.addr);
|
|
|
|
|
|
|
|
|
|
if let Some(local_addr) = self.local_address {
|
|
|
|
|
req = req.set_local_addr(local_addr);
|
|
|
|
@ -629,8 +640,8 @@ where
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Connector service for pooled Plain/Tls Tcp connections.
|
|
|
|
|
pub type ConnectorService<S, Io> = ConnectorServicePriv<
|
|
|
|
|
TcpConnectorService<TcpConnectorInnerService<S>>,
|
|
|
|
|
pub type ConnectorService<Svc, IO> = ConnectorServicePriv<
|
|
|
|
|
TcpConnectorService<TcpConnectorInnerService<Svc>>,
|
|
|
|
|
Rc<
|
|
|
|
|
dyn Service<
|
|
|
|
|
Connect,
|
|
|
|
@ -642,7 +653,7 @@ pub type ConnectorService<S, Io> = ConnectorServicePriv<
|
|
|
|
|
>,
|
|
|
|
|
>,
|
|
|
|
|
>,
|
|
|
|
|
Io,
|
|
|
|
|
IO,
|
|
|
|
|
Box<dyn ConnectionIo>,
|
|
|
|
|
>;
|
|
|
|
|
|
|
|
|
@ -741,7 +752,7 @@ mod resolver {
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
|
|
pub(super) fn resolver() -> Resolver {
|
|
|
|
|
Resolver::Default
|
|
|
|
|
Resolver::default()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -783,8 +794,7 @@ mod resolver {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// dns struct is cached in thread local.
|
|
|
|
|
// so new client constructor can reuse the existing dns resolver.
|
|
|
|
|
// resolver struct is cached in thread local so new clients can reuse the existing instance
|
|
|
|
|
thread_local! {
|
|
|
|
|
static TRUST_DNS_RESOLVER: RefCell<Option<Resolver>> = RefCell::new(None);
|
|
|
|
|
}
|
|
|
|
@ -792,8 +802,10 @@ mod resolver {
|
|
|
|
|
// get from thread local or construct a new trust-dns resolver.
|
|
|
|
|
TRUST_DNS_RESOLVER.with(|local| {
|
|
|
|
|
let resolver = local.borrow().as_ref().map(Clone::clone);
|
|
|
|
|
|
|
|
|
|
match resolver {
|
|
|
|
|
Some(resolver) => resolver,
|
|
|
|
|
|
|
|
|
|
None => {
|
|
|
|
|
let (cfg, opts) = match read_system_conf() {
|
|
|
|
|
Ok((cfg, opts)) => (cfg, opts),
|
|
|
|
@ -806,8 +818,9 @@ mod resolver {
|
|
|
|
|
let resolver = TokioAsyncResolver::tokio(cfg, opts).unwrap();
|
|
|
|
|
|
|
|
|
|
// box trust dns resolver and put it in thread local.
|
|
|
|
|
let resolver = Resolver::new_custom(TrustDnsResolver(resolver));
|
|
|
|
|
let resolver = Resolver::custom(TrustDnsResolver(resolver));
|
|
|
|
|
*local.borrow_mut() = Some(resolver.clone());
|
|
|
|
|
|
|
|
|
|
resolver
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -838,9 +851,9 @@ mod tests {
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
let connector = Connector {
|
|
|
|
|
connector: new_connector(resolver::resolver()),
|
|
|
|
|
connector: TcpConnector::new(resolver::resolver()).service(),
|
|
|
|
|
config: ConnectorConfig::default(),
|
|
|
|
|
ssl: SslConnector::None,
|
|
|
|
|
ssl: OurTlsConnector::None,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let client = Client::builder().connector(connector).finish();
|
|
|
|
|