diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 707d5551b..778083a1c 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -16,7 +16,7 @@ use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::Payload; use super::error::SendRequestError; -use super::pool::{Acquired, Protocol}; +use super::pool::Acquired; use super::{h1proto, h2proto}; pub(crate) enum ConnectionType { @@ -49,7 +49,7 @@ impl H2Connection { } } -// wake up waker when drop +// cancel spawned connection task on drop. impl Drop for H2Connection { fn drop(&mut self) { self.handle.abort(); @@ -74,8 +74,6 @@ impl DerefMut for H2Connection { pub trait Connection { type Io: AsyncRead + AsyncWrite + Unpin; - fn protocol(&self) -> Protocol; - /// Send request and body fn send_request>( self, @@ -154,14 +152,6 @@ where { type Io = T; - fn protocol(&self) -> Protocol { - match self.io { - Some(ConnectionType::H1(_)) => Protocol::Http1, - Some(ConnectionType::H2(_)) => Protocol::Http2, - None => Protocol::Http1, - } - } - fn send_request>( mut self, head: H, @@ -210,7 +200,7 @@ where } #[allow(dead_code)] -pub(crate) enum EitherConnection +pub(crate) enum EitherIoConnection where A: AsyncRead + AsyncWrite + Unpin + 'static, B: AsyncRead + AsyncWrite + Unpin + 'static, @@ -219,28 +209,21 @@ where B(IoConnection), } -impl Connection for EitherConnection +impl Connection for EitherIoConnection where A: AsyncRead + AsyncWrite + Unpin + 'static, B: AsyncRead + AsyncWrite + Unpin + 'static, { type Io = EitherIo; - fn protocol(&self) -> Protocol { - match self { - EitherConnection::A(con) => con.protocol(), - EitherConnection::B(con) => con.protocol(), - } - } - fn send_request>( self, head: H, body: RB, ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { match self { - EitherConnection::A(con) => con.send_request(head, body), - EitherConnection::B(con) => con.send_request(head, body), + EitherIoConnection::A(con) => con.send_request(head, body), + EitherIoConnection::B(con) => con.send_request(head, body), } } @@ -253,11 +236,11 @@ where Result<(ResponseHead, Framed), SendRequestError>, > { match self { - EitherConnection::A(con) => Box::pin(async { + EitherIoConnection::A(con) => Box::pin(async { let (head, framed) = con.open_tunnel(head).await?; Ok((head, framed.into_map_io(EitherIo::A))) }), - EitherConnection::B(con) => Box::pin(async { + EitherIoConnection::B(con) => Box::pin(async { let (head, framed) = con.open_tunnel(head).await?; Ok((head, framed.into_map_io(EitherIo::B))) }), diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 3bf424d49..65536f257 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -356,7 +356,7 @@ where mod connect_impl { use std::task::{Context, Poll}; - use futures_util::future::{err, Either, Ready}; + use futures_core::future::LocalBoxFuture; use super::*; use crate::client::connection::IoConnection; @@ -388,10 +388,7 @@ mod connect_impl { { type Response = IoConnection; type Error = ConnectError; - type Future = Either< - as Service>::Future, - Ready, ConnectError>>, - >; + type Future = LocalBoxFuture<'static, Result, ConnectError>>; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.tcp_pool.poll_ready(cx) @@ -400,9 +397,9 @@ mod connect_impl { fn call(&self, req: Connect) -> Self::Future { match req.uri.scheme_str() { Some("https") | Some("wss") => { - Either::Right(err(ConnectError::SslIsNotSupported)) + Box::pin(async { Err(ConnectError::SslIsNotSupported) }) } - _ => Either::Left(self.tcp_pool.call(req)), + _ => self.tcp_pool.call(req), } } } @@ -411,33 +408,29 @@ mod connect_impl { #[cfg(any(feature = "openssl", feature = "rustls"))] mod connect_impl { use std::future::Future; - use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; - use futures_core::ready; - use futures_util::future::Either; - use super::*; - use crate::client::connection::EitherConnection; + use crate::client::connection::EitherIoConnection; - pub(crate) struct InnerConnector + pub(crate) struct InnerConnector where + S1: Service + 'static, + S2: Service + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T1: Service, - T2: Service, { - pub(crate) tcp_pool: ConnectionPool, - pub(crate) ssl_pool: ConnectionPool, + pub(crate) tcp_pool: ConnectionPool, + pub(crate) ssl_pool: ConnectionPool, } - impl Clone for InnerConnector + impl Clone for InnerConnector where + S1: Service + 'static, + S2: Service + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T1: Service + 'static, - T2: Service + 'static, { fn clone(&self) -> Self { InnerConnector { @@ -447,19 +440,16 @@ mod connect_impl { } } - impl Service for InnerConnector + impl Service for InnerConnector where + S1: Service + 'static, + S2: Service + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T1: Service + 'static, - T2: Service + 'static, { - type Response = EitherConnection; + type Response = EitherIoConnection; type Error = ConnectError; - type Future = Either< - InnerConnectorResponseA, - InnerConnectorResponseB, - >; + type Future = InnerConnectorResponse; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.tcp_pool.poll_ready(cx) @@ -467,69 +457,44 @@ mod connect_impl { fn call(&self, req: Connect) -> Self::Future { match req.uri.scheme_str() { - Some("https") | Some("wss") => Either::Right(InnerConnectorResponseB { - fut: self.ssl_pool.call(req), - _phantom: PhantomData, - }), - _ => Either::Left(InnerConnectorResponseA { - fut: self.tcp_pool.call(req), - _phantom: PhantomData, - }), + Some("https") | Some("wss") => { + InnerConnectorResponse::Io2(self.ssl_pool.call(req)) + } + _ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)), } } } - #[pin_project::pin_project] - pub(crate) struct InnerConnectorResponseA + #[pin_project::pin_project(project = InnerConnectorProj)] + pub(crate) enum InnerConnectorResponse where - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, - { - #[pin] - fut: as Service>::Future, - _phantom: PhantomData, - } - - impl Future for InnerConnectorResponseA - where - T: Service + 'static, + S1: Service + 'static, + S2: Service + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static, { - type Output = Result, ConnectError>; + Io1(#[pin] as Service>::Future), + Io2(#[pin] as Service>::Future), + } + + impl Future for InnerConnectorResponse + where + S1: Service + 'static, + S2: Service + 'static, + Io1: AsyncRead + AsyncWrite + Unpin + 'static, + Io2: AsyncRead + AsyncWrite + Unpin + 'static, + { + type Output = Result, ConnectError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready( - ready!(Pin::new(&mut self.get_mut().fut).poll(cx)) - .map(EitherConnection::A), - ) - } - } - - #[pin_project::pin_project] - pub(crate) struct InnerConnectorResponseB - where - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, - { - #[pin] - fut: as Service>::Future, - _phantom: PhantomData, - } - - impl Future for InnerConnectorResponseB - where - T: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - type Output = Result, ConnectError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready( - ready!(Pin::new(&mut self.get_mut().fut).poll(cx)) - .map(EitherConnection::B), - ) + match self.project() { + InnerConnectorProj::Io1(fut) => { + fut.poll(cx).map_ok(EitherIoConnection::A) + } + InnerConnectorProj::Io2(fut) => { + fut.poll(cx).map_ok(EitherIoConnection::B) + } + } } } }