From b75b5114c30bf56692290775e012a152d13f0122 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 18 Mar 2021 10:53:22 -0700 Subject: [PATCH] refactor actix_http connection types and connector services (#2081) --- actix-http/CHANGES.md | 6 + actix-http/src/client/config.rs | 2 + actix-http/src/client/connection.rs | 491 +++++++++++++++--------- actix-http/src/client/connector.rs | 575 +++++++++++++++++++--------- actix-http/src/client/h1proto.rs | 136 ++----- actix-http/src/client/h2proto.rs | 38 +- actix-http/src/client/mod.rs | 2 +- actix-http/src/client/pool.rs | 120 +++--- awc/CHANGES.md | 4 + awc/src/builder.rs | 20 +- awc/src/connect.rs | 30 +- awc/src/frozen.rs | 22 +- awc/src/lib.rs | 9 +- awc/src/middleware/redirect.rs | 4 +- awc/src/request.rs | 14 +- awc/src/ws.rs | 5 +- 16 files changed, 867 insertions(+), 611 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index e96c22274..c4e0aec89 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,10 +1,16 @@ # Changes ## Unreleased - 2021-xx-xx +### Added +* `client::Connector::handshake_timeout` method for customize tls connection handshake timeout. [#2081] +* `client::ConnectorService` as `client::Connector::finish` method's return type [#2081] +* `client::ConnectionIo` trait alias [#2081] + ### Chaged * `client::Connector` type now only have one generic type for `actix_service::Service`. [#2063] [#2063]: https://github.com/actix/actix-web/pull/2063 +[#2081]: https://github.com/actix/actix-web/pull/2081 ## 3.0.0-beta.4 - 2021-03-08 diff --git a/actix-http/src/client/config.rs b/actix-http/src/client/config.rs index 0d54e1b49..1c0405cbc 100644 --- a/actix-http/src/client/config.rs +++ b/actix-http/src/client/config.rs @@ -8,6 +8,7 @@ const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1MB #[derive(Clone)] pub(crate) struct ConnectorConfig { pub(crate) timeout: Duration, + pub(crate) handshake_timeout: Duration, pub(crate) conn_lifetime: Duration, pub(crate) conn_keep_alive: Duration, pub(crate) disconnect_timeout: Option, @@ -21,6 +22,7 @@ impl Default for ConnectorConfig { fn default() -> Self { Self { timeout: Duration::from_secs(5), + handshake_timeout: Duration::from_secs(5), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), disconnect_timeout: Some(Duration::from_millis(3000)), diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 291148b5b..89dfd59de 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -1,14 +1,16 @@ -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, io, time}; +use std::{ + io, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, + time, +}; use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use actix_rt::task::JoinHandle; use bytes::Bytes; use futures_core::future::LocalBoxFuture; use h2::client::SendRequest; -use pin_project::pin_project; use crate::body::MessageBody; use crate::h1::ClientCodec; @@ -19,32 +21,148 @@ use super::error::SendRequestError; use super::pool::Acquired; use super::{h1proto, h2proto}; +/// Trait alias for types impl [tokio::io::AsyncRead] and [tokio::io::AsyncWrite]. pub trait ConnectionIo: AsyncRead + AsyncWrite + Unpin + 'static {} impl ConnectionIo for T {} -pub(crate) enum ConnectionType { - H1(Io), - H2(H2Connection), +/// HTTP client connection +pub struct H1Connection { + io: Option, + created: time::Instant, + acquired: Acquired, } -/// `H2Connection` has two parts: `SendRequest` and `Connection`. +impl H1Connection { + /// close or release the connection to pool based on flag input + pub(super) fn on_release(&mut self, keep_alive: bool) { + if keep_alive { + self.release(); + } else { + self.close(); + } + } + + /// Close connection + fn close(&mut self) { + let io = self.io.take().unwrap(); + self.acquired.close(ConnectionInnerType::H1(io)); + } + + /// Release this connection to the connection pool + fn release(&mut self) { + let io = self.io.take().unwrap(); + self.acquired + .release(ConnectionInnerType::H1(io), self.created); + } + + fn io_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Io> { + Pin::new(self.get_mut().io.as_mut().unwrap()) + } +} + +impl AsyncRead for H1Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.io_pin_mut().poll_read(cx, buf) + } +} + +impl AsyncWrite for H1Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.io_pin_mut().poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.io_pin_mut().poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.io_pin_mut().poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + self.io_pin_mut().poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.io.as_ref().unwrap().is_write_vectored() + } +} + +/// HTTP2 client connection +pub struct H2Connection { + io: Option, + created: time::Instant, + acquired: Acquired, +} + +impl Deref for H2Connection { + type Target = SendRequest; + + fn deref(&self) -> &Self::Target { + &self.io.as_ref().unwrap().sender + } +} + +impl DerefMut for H2Connection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.io.as_mut().unwrap().sender + } +} + +impl H2Connection { + /// close or release the connection to pool based on flag input + pub(super) fn on_release(&mut self, close: bool) { + if close { + self.close(); + } else { + self.release(); + } + } + + /// Close connection + fn close(&mut self) { + let io = self.io.take().unwrap(); + self.acquired.close(ConnectionInnerType::H2(io)); + } + + /// Release this connection to the connection pool + fn release(&mut self) { + let io = self.io.take().unwrap(); + self.acquired + .release(ConnectionInnerType::H2(io), self.created); + } +} + +/// `H2ConnectionInner` has two parts: `SendRequest` and `Connection`. /// -/// `Connection` is spawned as an async task on runtime and `H2Connection` holds a handle for -/// this task. Therefore, it can wake up and quit the task when SendRequest is dropped. -pub(crate) struct H2Connection { +/// `Connection` is spawned as an async task on runtime and `H2ConnectionInner` holds a handle +/// for this task. Therefore, it can wake up and quit the task when SendRequest is dropped. +pub(super) struct H2ConnectionInner { handle: JoinHandle<()>, sender: SendRequest, } -impl H2Connection { - pub(crate) fn new( +impl H2ConnectionInner { + pub(super) fn new( sender: SendRequest, connection: h2::client::Connection, - ) -> Self - where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - { + ) -> Self { let handle = actix_rt::spawn(async move { let _ = connection.await; }); @@ -53,143 +171,80 @@ impl H2Connection { } } -// cancel spawned connection task on drop. -impl Drop for H2Connection { +/// Cancel spawned connection task on drop. +impl Drop for H2ConnectionInner { fn drop(&mut self) { self.handle.abort(); } } -// only expose sender type to public. -impl Deref for H2Connection { - type Target = SendRequest; - - fn deref(&self) -> &Self::Target { - &self.sender - } -} - -impl DerefMut for H2Connection { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.sender - } -} - -pub trait Connection { - type Io: AsyncRead + AsyncWrite + Unpin; - - /// Send request and body - fn send_request( - self, - head: H, - body: B, - ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> - where - B: MessageBody + 'static, - H: Into + 'static; - - /// Send request, returns Response and Framed - fn open_tunnel + 'static>( - self, - head: H, - ) -> LocalBoxFuture< - 'static, - Result<(ResponseHead, Framed), SendRequestError>, - >; -} - -#[doc(hidden)] -/// HTTP client connection -pub struct IoConnection -where - T: AsyncWrite + Unpin + 'static, -{ - io: Option>, - created: time::Instant, - pool: Acquired, -} - -impl fmt::Debug for IoConnection -where - T: AsyncWrite + Unpin + fmt::Debug + 'static, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.io { - Some(ConnectionType::H1(ref io)) => write!(f, "H1Connection({:?})", io), - Some(ConnectionType::H2(_)) => write!(f, "H2Connection"), - None => write!(f, "Connection(Empty)"), - } - } -} - -impl IoConnection { - pub(crate) fn new( - io: ConnectionType, - created: time::Instant, - pool: Acquired, - ) -> Self { - IoConnection { - pool, - created, - io: Some(io), - } - } - - #[cfg(test)] - pub(crate) fn into_parts(self) -> (ConnectionType, time::Instant, Acquired) { - (self.io.unwrap(), self.created, self.pool) - } - - async fn send_request>( - mut self, - head: H, - body: B, - ) -> Result<(ResponseHead, Payload), SendRequestError> { - match self.io.take().unwrap() { - ConnectionType::H1(io) => { - h1proto::send_request(io, head.into(), body, self.created, self.pool) - .await - } - ConnectionType::H2(io) => { - h2proto::send_request(io, head.into(), body, self.created, self.pool) - .await - } - } - } - - /// Send request, returns Response and Framed - async fn open_tunnel>( - mut self, - head: H, - ) -> Result<(ResponseHead, Framed), SendRequestError> { - match self.io.take().unwrap() { - ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await, - ConnectionType::H2(io) => { - self.pool.release(ConnectionType::H2(io), self.created); - Err(SendRequestError::TunnelNotSupported) - } - } - } -} - #[allow(dead_code)] -pub(crate) enum EitherIoConnection +/// Unified connection type cover Http1 Plain/Tls and Http2 protocols +pub enum Connection> where - A: AsyncRead + AsyncWrite + Unpin + 'static, - B: AsyncRead + AsyncWrite + Unpin + 'static, + A: ConnectionIo, + B: ConnectionIo, { - A(IoConnection), - B(IoConnection), + Tcp(ConnectionType), + Tls(ConnectionType), } -impl Connection for EitherIoConnection -where - A: AsyncRead + AsyncWrite + Unpin + 'static, - B: AsyncRead + AsyncWrite + Unpin + 'static, -{ - type Io = EitherIo; +/// Unified connection type cover Http1/2 protocols +pub enum ConnectionType { + H1(H1Connection), + H2(H2Connection), +} - fn send_request( +/// Helper type for storing connection types in pool. +pub(super) enum ConnectionInnerType { + H1(Io), + H2(H2ConnectionInner), +} + +impl ConnectionType { + pub(super) fn from_pool( + inner: ConnectionInnerType, + created: time::Instant, + acquired: Acquired, + ) -> Self { + match inner { + ConnectionInnerType::H1(io) => Self::from_h1(io, created, acquired), + ConnectionInnerType::H2(io) => Self::from_h2(io, created, acquired), + } + } + + pub(super) fn from_h1( + io: Io, + created: time::Instant, + acquired: Acquired, + ) -> Self { + Self::H1(H1Connection { + io: Some(io), + created, + acquired, + }) + } + + pub(super) fn from_h2( + io: H2ConnectionInner, + created: time::Instant, + acquired: Acquired, + ) -> Self { + Self::H2(H2Connection { + io: Some(io), + created, + acquired, + }) + } +} + +impl Connection +where + A: ConnectionIo, + B: ConnectionIo, +{ + /// Send a request through connection. + pub fn send_request( self, head: H, body: RB, @@ -198,76 +253,106 @@ where RB: MessageBody + 'static, H: Into + 'static, { - match self { - EitherIoConnection::A(con) => Box::pin(con.send_request(head, body)), - EitherIoConnection::B(con) => Box::pin(con.send_request(head, body)), - } + Box::pin(async move { + match self { + Connection::Tcp(ConnectionType::H1(conn)) => { + h1proto::send_request(conn, head.into(), body).await + } + Connection::Tls(ConnectionType::H1(conn)) => { + h1proto::send_request(conn, head.into(), body).await + } + Connection::Tls(ConnectionType::H2(conn)) => { + h2proto::send_request(conn, head.into(), body).await + } + _ => unreachable!( + "Plain Tcp connection can be used only in Http1 protocol" + ), + } + }) } - /// Send request, returns Response and Framed - fn open_tunnel + 'static>( + /// Send request, returns Response and Framed tunnel. + pub fn open_tunnel + 'static>( self, head: H, ) -> LocalBoxFuture< 'static, - Result<(ResponseHead, Framed), SendRequestError>, + Result<(ResponseHead, Framed, ClientCodec>), SendRequestError>, > { - match self { - EitherIoConnection::A(con) => Box::pin(async { - let (head, framed) = con.open_tunnel(head).await?; - Ok((head, framed.into_map_io(EitherIo::A))) - }), - EitherIoConnection::B(con) => Box::pin(async { - let (head, framed) = con.open_tunnel(head).await?; - Ok((head, framed.into_map_io(EitherIo::B))) - }), - } + Box::pin(async move { + match self { + Connection::Tcp(ConnectionType::H1(ref _conn)) => { + let (head, framed) = h1proto::open_tunnel(self, head.into()).await?; + Ok((head, framed)) + } + Connection::Tls(ConnectionType::H1(ref _conn)) => { + let (head, framed) = h1proto::open_tunnel(self, head.into()).await?; + Ok((head, framed)) + } + Connection::Tls(ConnectionType::H2(mut conn)) => { + conn.release(); + Err(SendRequestError::TunnelNotSupported) + } + Connection::Tcp(ConnectionType::H2(_)) => { + unreachable!( + "Plain Tcp connection can be used only in Http1 protocol" + ) + } + } + }) } } -#[pin_project(project = EitherIoProj)] -pub enum EitherIo { - A(#[pin] A), - B(#[pin] B), -} - -impl AsyncRead for EitherIo +impl AsyncRead for Connection where - A: AsyncRead, - B: AsyncRead, + A: ConnectionIo, + B: ConnectionIo, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - match self.project() { - EitherIoProj::A(val) => val.poll_read(cx, buf), - EitherIoProj::B(val) => val.poll_read(cx, buf), + match self.get_mut() { + Connection::Tcp(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_read(cx, buf) + } + Connection::Tls(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_read(cx, buf) + } + _ => unreachable!("H2Connection can not impl AsyncRead trait"), } } } -impl AsyncWrite for EitherIo +const H2_UNREACHABLE_WRITE: &'static str = "H2Connection can not impl AsyncWrite trait"; + +impl AsyncWrite for Connection where - A: AsyncWrite, - B: AsyncWrite, + A: ConnectionIo, + B: ConnectionIo, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - match self.project() { - EitherIoProj::A(val) => val.poll_write(cx, buf), - EitherIoProj::B(val) => val.poll_write(cx, buf), + match self.get_mut() { + Connection::Tcp(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_write(cx, buf) + } + Connection::Tls(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_write(cx, buf) + } + _ => unreachable!(H2_UNREACHABLE_WRITE), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.project() { - EitherIoProj::A(val) => val.poll_flush(cx), - EitherIoProj::B(val) => val.poll_flush(cx), + match self.get_mut() { + Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_flush(cx), + Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_flush(cx), + _ => unreachable!(H2_UNREACHABLE_WRITE), } } @@ -275,9 +360,38 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.project() { - EitherIoProj::A(val) => val.poll_shutdown(cx), - EitherIoProj::B(val) => val.poll_shutdown(cx), + match self.get_mut() { + Connection::Tcp(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_shutdown(cx) + } + Connection::Tls(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_shutdown(cx) + } + _ => unreachable!(H2_UNREACHABLE_WRITE), + } + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + match self.get_mut() { + Connection::Tcp(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_write_vectored(cx, bufs) + } + Connection::Tls(ConnectionType::H1(conn)) => { + Pin::new(conn).poll_write_vectored(cx, bufs) + } + _ => unreachable!(H2_UNREACHABLE_WRITE), + } + } + + fn is_write_vectored(&self) -> bool { + match *self { + Connection::Tcp(ConnectionType::H1(ref conn)) => conn.is_write_vectored(), + Connection::Tls(ConnectionType::H1(ref conn)) => conn.is_write_vectored(), + _ => unreachable!(H2_UNREACHABLE_WRITE), } } } @@ -300,10 +414,13 @@ mod test { let tcp = TcpStream::connect(local).await.unwrap(); let (sender, connection) = h2::client::handshake(tcp).await.unwrap(); - let conn = H2Connection::new(sender.clone(), connection); + let conn = H2ConnectionInner::new(sender.clone(), connection); assert!(sender.clone().ready().await.is_ok()); - assert!(h2::client::SendRequest::clone(&*conn).ready().await.is_ok()); + assert!(h2::client::SendRequest::clone(&conn.sender) + .ready() + .await + .is_ok()); drop(conn); diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 0c9159b87..1a5f32880 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -3,22 +3,26 @@ use std::{ future::Future, net::IpAddr, pin::Pin, + rc::Rc, task::{Context, Poll}, time::Duration, }; -use actix_codec::{AsyncRead, AsyncWrite}; -use actix_rt::net::TcpStream; -use actix_service::{apply_fn, Service, ServiceExt}; -use actix_tls::connect::{ - new_connector, Connect as TcpConnect, Connection as TcpConnection, Resolver, +use actix_rt::{ + net::TcpStream, + time::{sleep, Sleep}, }; -use actix_utils::timeout::{TimeoutError, TimeoutService}; -use futures_core::ready; +use actix_service::Service; +use actix_tls::connect::{ + new_connector, Connect as TcpConnect, ConnectError as TcpConnectError, + Connection as TcpConnection, Resolver, +}; +use futures_core::{future::LocalBoxFuture, ready}; use http::Uri; +use pin_project::pin_project; use super::config::ConnectorConfig; -use super::connection::{Connection, ConnectionIo, EitherIoConnection}; +use super::connection::{Connection, ConnectionIo}; use super::error::ConnectError; use super::pool::ConnectionPool; use super::Connect; @@ -28,18 +32,15 @@ use super::Protocol; use actix_tls::connect::ssl::openssl::SslConnector as OpensslConnector; #[cfg(feature = "rustls")] use actix_tls::connect::ssl::rustls::ClientConfig; -#[cfg(feature = "rustls")] -use std::sync::Arc; -#[cfg(any(feature = "openssl", feature = "rustls"))] enum SslConnector { + #[allow(dead_code)] + None, #[cfg(feature = "openssl")] Openssl(OpensslConnector), #[cfg(feature = "rustls")] - Rustls(Arc), + Rustls(std::sync::Arc), } -#[cfg(not(any(feature = "openssl", feature = "rustls")))] -type SslConnector = (); /// Manages HTTP client network connectivity. /// @@ -104,23 +105,25 @@ impl Connector<()> { config.root_store.add_server_trust_anchors( &actix_tls::connect::ssl::rustls::TLS_SERVER_ROOTS, ); - SslConnector::Rustls(Arc::new(config)) + SslConnector::Rustls(std::sync::Arc::new(config)) } // ssl turned off, provides empty ssl connector #[cfg(not(any(feature = "openssl", feature = "rustls")))] - fn build_ssl(_: Vec>) -> SslConnector {} + fn build_ssl(_: Vec>) -> SslConnector { + SslConnector::None + } } -impl Connector { +impl Connector { /// Use custom connector. - pub fn connector(self, connector: T1) -> Connector + pub fn connector(self, connector: S1) -> Connector where - U1: AsyncRead + AsyncWrite + Unpin + fmt::Debug, - T1: Service< + Io1: ConnectionIo + fmt::Debug, + S1: Service< TcpConnect, - Response = TcpConnection, - Error = actix_tls::connect::ConnectError, + Response = TcpConnection, + Error = TcpConnectError, > + Clone, { Connector { @@ -131,23 +134,30 @@ impl Connector { } } -impl Connector +impl Connector where - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, - T: Service< + Io: ConnectionIo + fmt::Debug, + S: Service< TcpConnect, - Response = TcpConnection, - Error = actix_tls::connect::ConnectError, + Response = TcpConnection, + Error = TcpConnectError, > + Clone + 'static, { - /// Connection timeout, i.e. max time to connect to remote host including dns name resolution. - /// Set to 5 second by default. + /// Tcp connection timeout, i.e. max time to connect to remote host including dns name + /// resolution. Set to 5 second by default. pub fn timeout(mut self, timeout: Duration) -> Self { self.config.timeout = timeout; self } + /// Tls handshake timeout, i.e. max time to do tls handshake with remote host after tcp + /// connection established. Set to 5 second by default. + pub fn handshake_timeout(mut self, timeout: Duration) -> Self { + self.config.handshake_timeout = timeout; + self + } + #[cfg(feature = "openssl")] /// Use custom `SslConnector` instance. pub fn ssl(mut self, connector: OpensslConnector) -> Self { @@ -157,7 +167,7 @@ where #[cfg(feature = "rustls")] /// Use custom `SslConnector` instance. - pub fn rustls(mut self, connector: Arc) -> Self { + pub fn rustls(mut self, connector: std::sync::Arc) -> Self { self.ssl = SslConnector::Rustls(connector); self } @@ -247,158 +257,369 @@ where /// Finish configuration process and create connector service. /// The Connector builder always concludes by calling `finish()` last in /// its combinator chain. - pub fn finish( - self, - ) -> impl Service { + pub fn finish(self) -> ConnectorService { let local_address = self.config.local_address; let timeout = self.config.timeout; - let tcp_service = TimeoutService::new( - timeout, - apply_fn(self.connector.clone(), move |msg: Connect, srv| { - let mut req = TcpConnect::new(msg.uri).set_addr(msg.addr); + let tcp_service_inner = + TcpConnectorInnerService::new(self.connector, timeout, local_address); - if let Some(local_addr) = local_address { - req = req.set_local_addr(local_addr); + #[allow(clippy::redundant_clone)] + let tcp_service = TcpConnectorService { + service: tcp_service_inner.clone(), + }; + + let tls_service = match self.ssl { + SslConnector::None => None, + #[cfg(feature = "openssl")] + SslConnector::Openssl(tls) => { + const H2: &[u8] = b"h2"; + + use actix_tls::connect::ssl::openssl::{OpensslConnector, SslStream}; + + impl IntoConnectionIo for TcpConnection> { + fn into_connection_io(self) -> (Box, Protocol) { + let sock = self.into_parts().0; + let h2 = sock + .ssl() + .selected_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (Box::new(sock), Protocol::Http2) + } else { + (Box::new(sock), Protocol::Http1) + } + } } - srv.call(req) - }) - .map_err(ConnectError::from) - .map(|stream| (stream.into_parts().0, Protocol::Http1)), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectError::Timeout, - }); + let handshake_timeout = self.config.handshake_timeout; - #[cfg(not(any(feature = "openssl", feature = "rustls")))] - { - // A dummy service for annotate tls pool's type signature. - pub type DummyService = Box< - dyn Service< - Connect, - Response = (Box, Protocol), - Error = ConnectError, - Future = futures_core::future::LocalBoxFuture< - 'static, - Result<(Box, Protocol), ConnectError>, - >, - >, - >; + let tls_service = TlsConnectorService { + tcp_service: tcp_service_inner, + tls_service: OpensslConnector::service(tls), + timeout: handshake_timeout, + }; - InnerConnector::<_, DummyService, _, Box> { - tcp_pool: ConnectionPool::new( - tcp_service, - self.config.no_disconnect_timeout(), - ), - tls_pool: None, + Some(actix_service::boxed::rc_service(tls_service)) } - } - - #[cfg(any(feature = "openssl", feature = "rustls"))] - { - const H2: &[u8] = b"h2"; - use actix_service::{boxed::service, pipeline}; - #[cfg(feature = "openssl")] - use actix_tls::connect::ssl::openssl::OpensslConnector; #[cfg(feature = "rustls")] - use actix_tls::connect::ssl::rustls::{RustlsConnector, Session}; + SslConnector::Rustls(tls) => { + const H2: &[u8] = b"h2"; - let ssl_service = TimeoutService::new( - timeout, - pipeline( - apply_fn(self.connector.clone(), move |msg: Connect, srv| { - let mut req = TcpConnect::new(msg.uri).set_addr(msg.addr); + use actix_tls::connect::ssl::rustls::{ + RustlsConnector, Session, TlsStream, + }; - if let Some(local_addr) = local_address { - req = req.set_local_addr(local_addr); + impl IntoConnectionIo for TcpConnection> { + fn into_connection_io(self) -> (Box, Protocol) { + let sock = self.into_parts().0; + let h2 = sock + .get_ref() + .1 + .get_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (Box::new(sock), Protocol::Http2) + } else { + (Box::new(sock), Protocol::Http1) } + } + } - srv.call(req) - }) - .map_err(ConnectError::from), - ) - .and_then(match self.ssl { - #[cfg(feature = "openssl")] - SslConnector::Openssl(ssl) => service( - OpensslConnector::service(ssl) - .map(|stream| { - let sock = stream.into_parts().0; - let h2 = sock - .ssl() - .selected_alpn_protocol() - .map(|protos| protos.windows(2).any(|w| w == H2)) - .unwrap_or(false); - if h2 { - ( - Box::new(sock) as Box, - Protocol::Http2, - ) - } else { - (Box::new(sock) as _, Protocol::Http1) - } - }) - .map_err(ConnectError::from), - ), - #[cfg(feature = "rustls")] - SslConnector::Rustls(ssl) => service( - RustlsConnector::service(ssl) - .map_err(ConnectError::from) - .map(|stream| { - let sock = stream.into_parts().0; - let h2 = sock - .get_ref() - .1 - .get_alpn_protocol() - .map(|protos| protos.windows(2).any(|w| w == H2)) - .unwrap_or(false); - if h2 { - (Box::new(sock) as _, Protocol::Http2) - } else { - (Box::new(sock) as _, Protocol::Http1) - } - }), - ), - }), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectError::Timeout, - }); + let handshake_timeout = self.config.handshake_timeout; - InnerConnector { - tcp_pool: ConnectionPool::new( - tcp_service, - self.config.no_disconnect_timeout(), - ), - tls_pool: Some(ConnectionPool::new(ssl_service, self.config)), + let tls_service = TlsConnectorService { + tcp_service: tcp_service_inner, + tls_service: RustlsConnector::service(tls), + timeout: handshake_timeout, + }; + + Some(actix_service::boxed::rc_service(tls_service)) } + }; + + let tcp_config = self.config.no_disconnect_timeout(); + + let tcp_pool = ConnectionPool::new(tcp_service, tcp_config); + + let tls_config = self.config; + let tls_pool = tls_service + .map(move |tls_service| ConnectionPool::new(tls_service, tls_config)); + + ConnectorServicePriv { tcp_pool, tls_pool } + } +} + +/// tcp service for map `TcpConnection` type to `(Io, Protocol)` +#[derive(Clone)] +pub struct TcpConnectorService { + service: S, +} + +impl Service for TcpConnectorService +where + S: Service, Error = ConnectError> + + Clone + + 'static, +{ + type Response = (Io, Protocol); + type Error = ConnectError; + type Future = TcpConnectorFuture; + + actix_service::forward_ready!(service); + + fn call(&self, req: Connect) -> Self::Future { + TcpConnectorFuture { + fut: self.service.call(req), } } } -struct InnerConnector +#[pin_project] +pub struct TcpConnectorFuture { + #[pin] + fut: Fut, +} + +impl Future for TcpConnectorFuture where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, + Fut: Future, ConnectError>>, +{ + type Output = Result<(Io, Protocol), ConnectError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project() + .fut + .poll(cx) + .map_ok(|res| (res.into_parts().0, Protocol::Http1)) + } +} + +/// service for establish tcp connection and do client tls handshake. +/// operation is canceled when timeout limit reached. +struct TlsConnectorService { + /// tcp connection is canceled on `TcpConnectorInnerService`'s timeout setting. + tcp_service: S, + /// tls connection is canceled on `TlsConnectorService`'s timeout setting. + tls_service: St, + timeout: Duration, +} + +impl Service for TlsConnectorService +where + S: Service, Error = ConnectError> + + Clone + + 'static, + St: Service, Response = Res, Error = std::io::Error> + + Clone + + 'static, + Io: ConnectionIo, + Res: IntoConnectionIo, +{ + type Response = (Box, Protocol); + type Error = ConnectError; + type Future = TlsConnectorFuture; + + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + ready!(self.tcp_service.poll_ready(cx))?; + ready!(self.tls_service.poll_ready(cx))?; + Poll::Ready(Ok(())) + } + + fn call(&self, req: Connect) -> Self::Future { + let fut = self.tcp_service.call(req); + let tls_service = self.tls_service.clone(); + let timeout = self.timeout; + + TlsConnectorFuture::TcpConnect { + fut, + tls_service: Some(tls_service), + timeout, + } + } +} + +#[pin_project(project = TlsConnectorProj)] +#[allow(clippy::large_enum_variant)] +enum TlsConnectorFuture { + TcpConnect { + #[pin] + fut: Fut1, + tls_service: Option, + timeout: Duration, + }, + TlsConnect { + #[pin] + fut: Fut2, + #[pin] + timeout: Sleep, + }, +} + +/// helper trait for generic over different TlsStream types between tls crates. +trait IntoConnectionIo { + fn into_connection_io(self) -> (Box, Protocol); +} + +impl Future for TlsConnectorFuture +where + S: Service< + TcpConnection, + Response = Res, + Error = std::io::Error, + Future = Fut2, + >, + Fut1: Future, ConnectError>>, + Fut2: Future>, + Io: ConnectionIo, + Res: IntoConnectionIo, +{ + type Output = Result<(Box, Protocol), ConnectError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().project() { + TlsConnectorProj::TcpConnect { + fut, + tls_service, + timeout, + } => { + let res = ready!(fut.poll(cx))?; + let fut = tls_service + .take() + .expect("TlsConnectorFuture polled after complete") + .call(res); + let timeout = sleep(*timeout); + self.set(TlsConnectorFuture::TlsConnect { fut, timeout }); + self.poll(cx) + } + TlsConnectorProj::TlsConnect { fut, timeout } => match fut.poll(cx)? { + Poll::Ready(res) => Poll::Ready(Ok(res.into_connection_io())), + Poll::Pending => timeout.poll(cx).map(|_| Err(ConnectError::Timeout)), + }, + } + } +} + +/// service for establish tcp connection. +/// operation is canceled when timeout limit reached. +#[derive(Clone)] +pub struct TcpConnectorInnerService { + service: S, + timeout: Duration, + local_address: Option, +} + +impl TcpConnectorInnerService { + fn new( + service: S, + timeout: Duration, + local_address: Option, + ) -> Self { + Self { + service, + timeout, + local_address, + } + } +} + +impl Service for TcpConnectorInnerService +where + S: Service< + TcpConnect, + Response = TcpConnection, + Error = TcpConnectError, + > + Clone + + 'static, +{ + type Response = S::Response; + type Error = ConnectError; + type Future = TcpConnectorInnerFuture; + + actix_service::forward_ready!(service); + + fn call(&self, req: Connect) -> Self::Future { + let mut req = TcpConnect::new(req.uri).set_addr(req.addr); + + if let Some(local_addr) = self.local_address { + req = req.set_local_addr(local_addr); + } + + TcpConnectorInnerFuture { + fut: self.service.call(req), + timeout: sleep(self.timeout), + } + } +} + +#[pin_project] +pub struct TcpConnectorInnerFuture { + #[pin] + fut: Fut, + #[pin] + timeout: Sleep, +} + +impl Future for TcpConnectorInnerFuture +where + Fut: Future, TcpConnectError>>, +{ + type Output = Result, ConnectError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.fut.poll(cx) { + Poll::Ready(res) => Poll::Ready(res.map_err(ConnectError::from)), + Poll::Pending => this.timeout.poll(cx).map(|_| Err(ConnectError::Timeout)), + } + } +} + +/// Connector service for pooled Plain/Tls Tcp connections. +pub type ConnectorService = ConnectorServicePriv< + TcpConnectorService>, + Rc< + dyn Service< + Connect, + Response = (Box, Protocol), + Error = ConnectError, + Future = LocalBoxFuture< + 'static, + Result<(Box, Protocol), ConnectError>, + >, + >, + >, + Io, + Box, +>; + +pub struct ConnectorServicePriv +where + S1: Service, + S2: Service, + Io1: ConnectionIo, + Io2: ConnectionIo, { tcp_pool: ConnectionPool, tls_pool: Option>, } -impl Service for InnerConnector +impl Service for ConnectorServicePriv where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, + S1: Service + + Clone + + 'static, + S2: Service + + Clone + + 'static, + Io1: ConnectionIo, + Io2: ConnectionIo, { - type Response = EitherIoConnection; + type Response = Connection; type Error = ConnectError; - type Future = InnerConnectorResponse; + type Future = ConnectorServiceFuture; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { ready!(self.tcp_pool.poll_ready(cx))?; @@ -411,41 +632,49 @@ where fn call(&self, req: Connect) -> Self::Future { match req.uri.scheme_str() { Some("https") | Some("wss") => match self.tls_pool { - None => InnerConnectorResponse::SslIsNotSupported, - Some(ref pool) => InnerConnectorResponse::Io2(pool.call(req)), + None => ConnectorServiceFuture::SslIsNotSupported, + Some(ref pool) => ConnectorServiceFuture::Tls(pool.call(req)), }, - _ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)), + _ => ConnectorServiceFuture::Tcp(self.tcp_pool.call(req)), } } } -#[pin_project::pin_project(project = InnerConnectorProj)] -enum InnerConnectorResponse +#[pin_project(project = ConnectorServiceProj)] +pub enum ConnectorServiceFuture where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, + S1: Service + + Clone + + 'static, + S2: Service + + Clone + + 'static, + Io1: ConnectionIo, + Io2: ConnectionIo, { - Io1(#[pin] as Service>::Future), - Io2(#[pin] as Service>::Future), + Tcp(#[pin] as Service>::Future), + Tls(#[pin] as Service>::Future), SslIsNotSupported, } -impl Future for InnerConnectorResponse +impl Future for ConnectorServiceFuture where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, + S1: Service + + Clone + + 'static, + S2: Service + + Clone + + 'static, + Io1: ConnectionIo, + Io2: ConnectionIo, { - type Output = Result, ConnectError>; + type Output = Result, ConnectError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { - InnerConnectorProj::Io1(fut) => fut.poll(cx).map_ok(EitherIoConnection::A), - InnerConnectorProj::Io2(fut) => fut.poll(cx).map_ok(EitherIoConnection::B), - InnerConnectorProj::SslIsNotSupported => { + ConnectorServiceProj::Tcp(fut) => fut.poll(cx).map_ok(Connection::Tcp), + ConnectorServiceProj::Tls(fut) => fut.poll(cx).map_ok(Connection::Tls), + ConnectorServiceProj::SslIsNotSupported => { Poll::Ready(Err(ConnectError::SslIsNotSupported)) } } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 75a3c1f3d..01a6e1edf 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -1,9 +1,10 @@ -use std::io::Write; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{io, time}; +use std::{ + io::Write, + pin::Pin, + task::{Context, Poll}, +}; -use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; +use actix_codec::Framed; use bytes::buf::BufMut; use bytes::{Bytes, BytesMut}; use futures_core::Stream; @@ -11,28 +12,24 @@ use futures_util::{future::poll_fn, SinkExt as _}; use crate::error::PayloadError; use crate::h1; -use crate::header::HeaderMap; use crate::http::{ - header::{IntoHeaderValue, EXPECT, HOST}, + header::{HeaderMap, IntoHeaderValue, EXPECT, HOST}, StatusCode, }; use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::{Payload, PayloadStream}; -use super::connection::ConnectionType; +use super::connection::{ConnectionIo, H1Connection}; use super::error::{ConnectError, SendRequestError}; -use super::pool::Acquired; use crate::body::{BodySize, MessageBody}; -pub(crate) async fn send_request( - io: T, +pub(crate) async fn send_request( + io: H1Connection, mut head: RequestHeadType, body: B, - created: time::Instant, - acquired: Acquired, ) -> Result<(ResponseHead, Payload), SendRequestError> where - T: AsyncRead + AsyncWrite + Unpin + 'static, + Io: ConnectionIo, B: MessageBody, { // set request host header @@ -62,12 +59,6 @@ where } } - let io = H1Connection { - created, - acquired, - io: Some(io), - }; - // create Framed and prepare sending request let mut framed = Framed::new(io, h1::ClientCodec::default()); @@ -138,18 +129,18 @@ where } } -pub(crate) async fn open_tunnel( - io: T, +pub(crate) async fn open_tunnel( + io: Io, head: RequestHeadType, -) -> Result<(ResponseHead, Framed), SendRequestError> +) -> Result<(ResponseHead, Framed), SendRequestError> where - T: AsyncRead + AsyncWrite + Unpin + 'static, + Io: ConnectionIo, { - // create Framed and send request + // create Framed and send request. let mut framed = Framed::new(io, h1::ClientCodec::default()); framed.send((head, BodySize::None).into()).await?; - // read response + // read response head. let head = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx)) .await .ok_or(ConnectError::Disconnected)??; @@ -158,12 +149,12 @@ where } /// send request body to the peer -pub(crate) async fn send_body( +pub(crate) async fn send_body( body: B, - mut framed: Pin<&mut Framed>, + mut framed: Pin<&mut Framed>, ) -> Result<(), SendRequestError> where - T: AsyncRead + AsyncWrite + Unpin + 'static, + Io: ConnectionIo, B: MessageBody, { actix_rt::pin!(body); @@ -202,92 +193,16 @@ where Ok(()) } -#[doc(hidden)] -/// HTTP client connection -pub struct H1Connection -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ - /// T should be `Unpin` - io: Option, - created: time::Instant, - acquired: Acquired, -} - -impl H1Connection -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ - fn on_release(&mut self, keep_alive: bool) { - if keep_alive { - self.release(); - } else { - self.close(); - } - } - - /// Close connection - fn close(&mut self) { - if let Some(io) = self.io.take() { - self.acquired.close(ConnectionType::H1(io)); - } - } - - /// Release this connection to the connection pool - fn release(&mut self) { - if let Some(io) = self.io.take() { - self.acquired.release(ConnectionType::H1(io), self.created); - } - } -} - -impl AsyncRead for H1Connection { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.io.as_mut().unwrap()).poll_read(cx, buf) - } -} - -impl AsyncWrite for H1Connection { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.io.as_mut().unwrap()).poll_write(cx, buf) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(self.io.as_mut().unwrap()).poll_flush(cx) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(self.io.as_mut().unwrap()).poll_shutdown(cx) - } -} - #[pin_project::pin_project] -pub(crate) struct PlStream +pub(crate) struct PlStream where - Io: AsyncRead + AsyncWrite + Unpin + 'static, + Io: ConnectionIo, { #[pin] framed: Option, h1::ClientPayloadCodec>>, } -impl PlStream -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ +impl PlStream { fn new(framed: Framed, h1::ClientCodec>) -> Self { let framed = framed.into_map_codec(|codec| codec.into_payload_codec()); @@ -297,10 +212,7 @@ where } } -impl Stream for PlStream -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ +impl Stream for PlStream { type Item = Result; fn poll_next( diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 82e81c7ff..437b9ae76 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -1,7 +1,5 @@ use std::future::Future; -use std::time; -use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; use futures_util::future::poll_fn; use h2::{ @@ -17,20 +15,16 @@ use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::Payload; use super::config::ConnectorConfig; -use super::connection::ConnectionType; +use super::connection::{ConnectionIo, H2Connection}; use super::error::SendRequestError; -use super::pool::Acquired; -use crate::client::connection::H2Connection; -pub(crate) async fn send_request( - mut io: H2Connection, +pub(crate) async fn send_request( + mut io: H2Connection, head: RequestHeadType, body: B, - created: time::Instant, - acquired: Acquired, ) -> Result<(ResponseHead, Payload), SendRequestError> where - T: AsyncRead + AsyncWrite + Unpin + 'static, + Io: ConnectionIo, B: MessageBody, { trace!("Sending client request: {:?} {:?}", head, body.size()); @@ -103,13 +97,13 @@ where let res = poll_fn(|cx| io.poll_ready(cx)).await; if let Err(e) = res { - release(io, acquired, created, e.is_io()); + io.on_release(e.is_io()); return Err(SendRequestError::from(e)); } let resp = match io.send_request(req, eof) { Ok((fut, send)) => { - release(io, acquired, created, false); + io.on_release(false); if !eof { send_body(body, send).await?; @@ -117,7 +111,7 @@ where fut.await.map_err(SendRequestError::from)? } Err(e) => { - release(io, acquired, created, e.is_io()); + io.on_release(e.is_io()); return Err(e.into()); } }; @@ -178,26 +172,10 @@ async fn send_body( } } -/// release SendRequest object -fn release( - io: H2Connection, - acquired: Acquired, - created: time::Instant, - close: bool, -) { - if close { - acquired.close(ConnectionType::H2(io)); - } else { - acquired.release(ConnectionType::H2(io), created); - } -} - -pub(crate) fn handshake( +pub(crate) fn handshake( io: Io, config: &ConnectorConfig, ) -> impl Future, Connection), h2::Error>> -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, { let mut builder = Builder::new(); builder diff --git a/actix-http/src/client/mod.rs b/actix-http/src/client/mod.rs index ce6aa3bc1..41d5fef2a 100644 --- a/actix-http/src/client/mod.rs +++ b/actix-http/src/client/mod.rs @@ -15,7 +15,7 @@ pub use actix_tls::connect::{ }; pub use self::connection::{Connection, ConnectionIo}; -pub use self::connector::Connector; +pub use self::connector::{Connector, ConnectorService}; pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; pub use crate::Protocol; diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 79adcf3e2..88188038f 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -1,34 +1,38 @@ //! Client connection pooling keyed on the authority part of the connection URI. -use std::collections::VecDeque; -use std::future::Future; -use std::ops::Deref; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use std::{cell::RefCell, io}; +use std::{ + cell::RefCell, + collections::VecDeque, + future::Future, + io, + ops::Deref, + pin::Pin, + rc::Rc, + sync::Arc, + task::{Context, Poll}, + time::{Duration, Instant}, +}; -use actix_codec::{AsyncRead, AsyncWrite}; +use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_rt::time::{sleep, Sleep}; use actix_service::Service; use ahash::AHashMap; use futures_core::future::LocalBoxFuture; use http::uri::Authority; use pin_project::pin_project; -use tokio::io::ReadBuf; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use super::config::ConnectorConfig; -use super::connection::{ConnectionType, H2Connection, IoConnection}; +use super::connection::{ + ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner, +}; use super::error::ConnectError; use super::h2proto::handshake; use super::Connect; use super::Protocol; #[derive(Hash, Eq, PartialEq, Clone, Debug)] -pub(crate) struct Key { +pub struct Key { authority: Authority, } @@ -38,17 +42,18 @@ impl From for Key { } } +#[doc(hidden)] /// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key. -pub(crate) struct ConnectionPool +pub struct ConnectionPool where Io: AsyncWrite + Unpin + 'static, { - connector: Rc, + connector: S, inner: ConnectionPoolInner, } /// wrapper type for check the ref count of Rc. -struct ConnectionPoolInner(Rc>) +pub struct ConnectionPoolInner(Rc>) where Io: AsyncWrite + Unpin + 'static; @@ -56,10 +61,21 @@ impl ConnectionPoolInner where Io: AsyncWrite + Unpin + 'static, { + fn new(config: ConnectorConfig) -> Self { + let permits = Arc::new(Semaphore::new(config.limit)); + let available = RefCell::new(AHashMap::default()); + + Self(Rc::new(ConnectionPoolInnerPriv { + config, + available, + permits, + })) + } + /// spawn a async for graceful shutdown h1 Io type with a timeout. - fn close(&self, conn: ConnectionType) { + fn close(&self, conn: ConnectionInnerType) { if let Some(timeout) = self.config.disconnect_timeout { - if let ConnectionType::H1(io) = conn { + if let ConnectionInnerType::H1(io) = conn { actix_rt::spawn(CloseConnection::new(io, timeout)); } } @@ -104,7 +120,7 @@ where } } -struct ConnectionPoolInnerPriv +pub struct ConnectionPoolInnerPriv where Io: AsyncWrite + Unpin + 'static, { @@ -128,15 +144,7 @@ where /// Any requests beyond limit would be wait in fifo order and get notified in async manner /// by [`tokio::sync::Semaphore`] pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self { - let permits = Arc::new(Semaphore::new(config.limit)); - let available = RefCell::new(AHashMap::default()); - let connector = Rc::new(connector); - - let inner = ConnectionPoolInner(Rc::new(ConnectionPoolInnerPriv { - config, - available, - permits, - })); + let inner = ConnectionPoolInner::new(config); Self { connector, inner } } @@ -144,12 +152,14 @@ where impl Service for ConnectionPool where - S: Service + 'static, - Io: AsyncRead + AsyncWrite + Unpin + 'static, + S: Service + + Clone + + 'static, + Io: ConnectionIo, { - type Response = IoConnection; + type Response = ConnectionType; type Error = ConnectError; - type Future = LocalBoxFuture<'static, Result, ConnectError>>; + type Future = LocalBoxFuture<'static, Result>; actix_service::forward_ready!(connector); @@ -193,7 +203,7 @@ where inner.close(c.conn); } else { // check if the connection is still usable - if let ConnectionType::H1(ref mut io) = c.conn { + if let ConnectionInnerType::H1(ref mut io) = c.conn { let check = ConnectionCheckFuture { io }; match check.await { ConnectionState::Tainted => { @@ -221,7 +231,9 @@ where // match the connection and spawn new one if did not get anything. match conn { - Some(conn) => Ok(IoConnection::new(conn.conn, conn.created, acquired)), + Some(conn) => { + Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired)) + } None => { let (io, proto) = connector.call(req).await?; @@ -229,19 +241,12 @@ where assert!(proto != Protocol::Http3); if proto == Protocol::Http1 { - Ok(IoConnection::new( - ConnectionType::H1(io), - Instant::now(), - acquired, - )) + Ok(ConnectionType::from_h1(io, Instant::now(), acquired)) } else { let config = &acquired.inner.config; let (sender, connection) = handshake(io, config).await?; - Ok(IoConnection::new( - ConnectionType::H2(H2Connection::new(sender, connection)), - Instant::now(), - acquired, - )) + let inner = H2ConnectionInner::new(sender, connection); + Ok(ConnectionType::from_h2(inner, Instant::now(), acquired)) } } } @@ -292,7 +297,7 @@ where } struct PooledConnection { - conn: ConnectionType, + conn: ConnectionInnerType, used: Instant, created: Instant, } @@ -332,26 +337,26 @@ where } } -pub(crate) struct Acquired +pub struct Acquired where Io: AsyncWrite + Unpin + 'static, { + /// authority key for identify connection. key: Key, + /// handle to connection pool. inner: ConnectionPoolInner, + /// permit for limit concurrent in-flight connection for a Client object. permit: OwnedSemaphorePermit, } -impl Acquired -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ +impl Acquired { /// Close the IO. - pub(crate) fn close(&self, conn: ConnectionType) { + pub(super) fn close(&self, conn: ConnectionInnerType) { self.inner.close(conn); } /// Release IO back into pool. - pub(crate) fn release(&self, conn: ConnectionType, created: Instant) { + pub(super) fn release(&self, conn: ConnectionInnerType, created: Instant) { let Acquired { key, inner, .. } = self; inner @@ -376,7 +381,7 @@ mod test { use http::Uri; use super::*; - use crate::client::connection::IoConnection; + use crate::client::connection::ConnectionType; /// A stream type that always returns pending on async read. /// @@ -423,6 +428,7 @@ mod test { } } + #[derive(Clone)] struct TestPoolConnector { generated: Rc>, } @@ -441,12 +447,14 @@ mod test { } } - fn release(conn: IoConnection) + fn release(conn: ConnectionType) where T: AsyncRead + AsyncWrite + Unpin + 'static, { - let (conn, created, acquired) = conn.into_parts(); - acquired.release(conn, created); + match conn { + ConnectionType::H1(mut conn) => conn.on_release(true), + ConnectionType::H2(mut conn) => conn.on_release(false), + } } #[actix_rt::test] diff --git a/awc/CHANGES.md b/awc/CHANGES.md index bf7bffc49..d2cb7c009 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -1,6 +1,10 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed +* `ConnectorService` type is renamed to `BoxConnectorService` [#2081] + +[#2081]: https://github.com/actix/actix-web/pull/2081 ## 3.0.0-beta.3 - 2021-03-08 diff --git a/awc/src/builder.rs b/awc/src/builder.rs index 33f43ab93..925d9ae2a 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -6,7 +6,7 @@ use std::time::Duration; use actix_codec::{AsyncRead, AsyncWrite}; use actix_http::{ - client::{Connector, TcpConnect, TcpConnectError, TcpConnection}, + client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection}, http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri}, }; use actix_rt::net::TcpStream; @@ -15,7 +15,7 @@ use actix_service::{boxed, Service}; use crate::connect::DefaultConnector; use crate::error::SendRequestError; use crate::middleware::{NestTransform, Redirect, Transform}; -use crate::{Client, ClientConfig, ConnectRequest, ConnectResponse, ConnectorService}; +use crate::{Client, ClientConfig, ConnectRequest, ConnectResponse}; /// An HTTP Client builder /// @@ -234,7 +234,7 @@ where /// Finish build process and create `Client` instance. pub fn finish(self) -> Client where - M: Transform + 'static, + M: Transform>, ConnectRequest> + 'static, M::Transform: Service, { @@ -250,7 +250,7 @@ where fn _finish(self) -> Client where - M: Transform + 'static, + M: Transform>, ConnectRequest> + 'static, M::Transform: Service, { @@ -269,16 +269,14 @@ where connector = connector.local_address(val); } - let connector = boxed::service(DefaultConnector::new(connector.finish())); - let connector = boxed::service(self.middleware.new_transform(connector)); + let connector = DefaultConnector::new(connector.finish()); + let connector = boxed::rc_service(self.middleware.new_transform(connector)); - let config = ClientConfig { - headers: self.headers, + Client(ClientConfig { + headers: Rc::new(self.headers), timeout: self.timeout, connector, - }; - - Client(Rc::new(config)) + }) } } diff --git a/awc/src/connect.rs b/awc/src/connect.rs index d3bc01ed6..6a9fc4630 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -2,6 +2,7 @@ use std::{ future::Future, net, pin::Pin, + rc::Rc, task::{Context, Poll}, }; @@ -19,7 +20,7 @@ use futures_core::{future::LocalBoxFuture, ready}; use crate::response::ClientResponse; -pub type ConnectorService = Box< +pub type BoxConnectorService = Rc< dyn Service< ConnectRequest, Response = ConnectResponse, @@ -28,6 +29,8 @@ pub type ConnectorService = Box< >, >; +pub type BoxedSocket = Box; + pub enum ConnectRequest { Client(RequestHeadType, Body, Option), Tunnel(RequestHead, Option), @@ -58,7 +61,7 @@ impl ConnectResponse { } } -pub(crate) struct DefaultConnector { +pub struct DefaultConnector { connector: S, } @@ -68,15 +71,14 @@ impl DefaultConnector { } } -impl Service for DefaultConnector +impl Service for DefaultConnector where - S: Service, - S::Response: Connection, - ::Io: 'static, + S: Service>, + Io: ConnectionIo, { type Response = ConnectResponse; type Error = SendRequestError; - type Future = ConnectRequestFuture::Io>; + type Future = ConnectRequestFuture; actix_service::forward_ready!(connector); @@ -102,7 +104,10 @@ where pin_project_lite::pin_project! { #[project = ConnectRequestProj] - pub(crate) enum ConnectRequestFuture { + pub enum ConnectRequestFuture + where + Io: ConnectionIo + { Connection { #[pin] fut: Fut, @@ -114,16 +119,15 @@ pin_project_lite::pin_project! { Tunnel { fut: LocalBoxFuture< 'static, - Result<(ResponseHead, Framed), SendRequestError>, + Result<(ResponseHead, Framed, ClientCodec>), SendRequestError>, >, } } } -impl Future for ConnectRequestFuture +impl Future for ConnectRequestFuture where - Fut: Future>, - C: Connection, + Fut: Future, ConnectError>>, Io: ConnectionIo, { type Output = Result; @@ -165,5 +169,3 @@ where } } } - -pub type BoxedSocket = Box; diff --git a/awc/src/frozen.rs b/awc/src/frozen.rs index 46b4063a0..5fe8edb19 100644 --- a/awc/src/frozen.rs +++ b/awc/src/frozen.rs @@ -23,7 +23,7 @@ pub struct FrozenClientRequest { pub(crate) addr: Option, pub(crate) response_decompress: bool, pub(crate) timeout: Option, - pub(crate) config: Rc, + pub(crate) config: ClientConfig, } impl FrozenClientRequest { @@ -51,7 +51,7 @@ impl FrozenClientRequest { self.addr, self.response_decompress, self.timeout, - self.config.as_ref(), + &self.config, body, ) } @@ -62,7 +62,7 @@ impl FrozenClientRequest { self.addr, self.response_decompress, self.timeout, - self.config.as_ref(), + &self.config, value, ) } @@ -73,7 +73,7 @@ impl FrozenClientRequest { self.addr, self.response_decompress, self.timeout, - self.config.as_ref(), + &self.config, value, ) } @@ -88,7 +88,7 @@ impl FrozenClientRequest { self.addr, self.response_decompress, self.timeout, - self.config.as_ref(), + &self.config, stream, ) } @@ -99,7 +99,7 @@ impl FrozenClientRequest { self.addr, self.response_decompress, self.timeout, - self.config.as_ref(), + &self.config, ) } @@ -168,7 +168,7 @@ impl FrozenSendBuilder { self.req.addr, self.req.response_decompress, self.req.timeout, - self.req.config.as_ref(), + &self.req.config, body, ) } @@ -183,7 +183,7 @@ impl FrozenSendBuilder { self.req.addr, self.req.response_decompress, self.req.timeout, - self.req.config.as_ref(), + &self.req.config, value, ) } @@ -198,7 +198,7 @@ impl FrozenSendBuilder { self.req.addr, self.req.response_decompress, self.req.timeout, - self.req.config.as_ref(), + &self.req.config, value, ) } @@ -217,7 +217,7 @@ impl FrozenSendBuilder { self.req.addr, self.req.response_decompress, self.req.timeout, - self.req.config.as_ref(), + &self.req.config, stream, ) } @@ -232,7 +232,7 @@ impl FrozenSendBuilder { self.req.addr, self.req.response_decompress, self.req.timeout, - self.req.config.as_ref(), + &self.req.config, ) } } diff --git a/awc/src/lib.rs b/awc/src/lib.rs index 77e08fbbd..8addf4d8b 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -121,7 +121,7 @@ pub mod test; pub mod ws; pub use self::builder::ClientBuilder; -pub use self::connect::{BoxedSocket, ConnectRequest, ConnectResponse, ConnectorService}; +pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; pub use self::response::{ClientResponse, JsonBody, MessageBody}; @@ -147,11 +147,12 @@ pub use self::sender::SendClientRequest; /// } /// ``` #[derive(Clone)] -pub struct Client(Rc); +pub struct Client(ClientConfig); +#[derive(Clone)] pub(crate) struct ClientConfig { - pub(crate) connector: ConnectorService, - pub(crate) headers: HeaderMap, + pub(crate) connector: BoxConnectorService, + pub(crate) headers: Rc, pub(crate) timeout: Option, } diff --git a/awc/src/middleware/redirect.rs b/awc/src/middleware/redirect.rs index f8bdd2def..62ea1d0ac 100644 --- a/awc/src/middleware/redirect.rs +++ b/awc/src/middleware/redirect.rs @@ -189,7 +189,7 @@ where // remove body .call(ConnectRequest::Client(head, Body::None, addr)); - self.as_mut().set(RedirectServiceFuture::Client { + self.set(RedirectServiceFuture::Client { fut, max_redirect_times, uri: Some(uri), @@ -236,7 +236,7 @@ where .unwrap() .call(ConnectRequest::Client(head, body_new, addr)); - self.as_mut().set(RedirectServiceFuture::Client { + self.set(RedirectServiceFuture::Client { fut, max_redirect_times, uri: Some(uri), diff --git a/awc/src/request.rs b/awc/src/request.rs index 1b63f3687..3e9babb7d 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -57,7 +57,7 @@ pub struct ClientRequest { addr: Option, response_decompress: bool, timeout: Option, - config: Rc, + config: ClientConfig, #[cfg(feature = "cookies")] cookies: Option, @@ -65,7 +65,7 @@ pub struct ClientRequest { impl ClientRequest { /// Create new client request builder. - pub(crate) fn new(method: Method, uri: U, config: Rc) -> Self + pub(crate) fn new(method: Method, uri: U, config: ClientConfig) -> Self where Uri: TryFrom, >::Error: Into, @@ -398,7 +398,7 @@ impl ClientRequest { slf.addr, slf.response_decompress, slf.timeout, - slf.config.as_ref(), + &slf.config, body, ) } @@ -414,7 +414,7 @@ impl ClientRequest { slf.addr, slf.response_decompress, slf.timeout, - slf.config.as_ref(), + &slf.config, value, ) } @@ -432,7 +432,7 @@ impl ClientRequest { slf.addr, slf.response_decompress, slf.timeout, - slf.config.as_ref(), + &slf.config, value, ) } @@ -452,7 +452,7 @@ impl ClientRequest { slf.addr, slf.response_decompress, slf.timeout, - slf.config.as_ref(), + &slf.config, stream, ) } @@ -468,7 +468,7 @@ impl ClientRequest { slf.addr, slf.response_decompress, slf.timeout, - slf.config.as_ref(), + &slf.config, ) } diff --git a/awc/src/ws.rs b/awc/src/ws.rs index f64e9e19a..df25b7289 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -28,7 +28,6 @@ use std::convert::TryFrom; use std::net::SocketAddr; -use std::rc::Rc; use std::{fmt, str}; use actix_codec::Framed; @@ -56,7 +55,7 @@ pub struct WebsocketsRequest { addr: Option, max_size: usize, server_mode: bool, - config: Rc, + config: ClientConfig, #[cfg(feature = "cookies")] cookies: Option, @@ -64,7 +63,7 @@ pub struct WebsocketsRequest { impl WebsocketsRequest { /// Create new WebSocket connection - pub(crate) fn new(uri: U, config: Rc) -> Self + pub(crate) fn new(uri: U, config: ClientConfig) -> Self where Uri: TryFrom, >::Error: Into,