1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-02-20 03:14:21 +01:00

rename client io trait. reduce duplicate code (#2079)

This commit is contained in:
fakeshadow 2021-03-16 09:31:14 -07:00 committed by GitHub
parent 69dd1a9bd6
commit c8f6d37290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 75 deletions

View File

@ -19,6 +19,10 @@ use super::error::SendRequestError;
use super::pool::Acquired; use super::pool::Acquired;
use super::{h1proto, h2proto}; use super::{h1proto, h2proto};
pub trait ConnectionIo: AsyncRead + AsyncWrite + Unpin + 'static {}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> ConnectionIo for T {}
pub(crate) enum ConnectionType<Io> { pub(crate) enum ConnectionType<Io> {
H1(Io), H1(Io),
H2(H2Connection), H2(H2Connection),

View File

@ -18,7 +18,7 @@ use futures_core::ready;
use http::Uri; use http::Uri;
use super::config::ConnectorConfig; use super::config::ConnectorConfig;
use super::connection::{Connection, EitherIoConnection}; use super::connection::{Connection, ConnectionIo, EitherIoConnection};
use super::error::ConnectError; use super::error::ConnectError;
use super::pool::ConnectionPool; use super::pool::ConnectionPool;
use super::Connect; use super::Connect;
@ -61,9 +61,6 @@ pub struct Connector<T> {
ssl: SslConnector, ssl: SslConnector,
} }
pub trait Io: AsyncRead + AsyncWrite + Unpin {}
impl<T: AsyncRead + AsyncWrite + Unpin> Io for T {}
impl Connector<()> { impl Connector<()> {
#[allow(clippy::new_ret_no_self, clippy::let_unit_value)] #[allow(clippy::new_ret_no_self, clippy::let_unit_value)]
pub fn new() -> Connector< pub fn new() -> Connector<
@ -281,16 +278,16 @@ where
pub type DummyService = Box< pub type DummyService = Box<
dyn Service< dyn Service<
Connect, Connect,
Response = (Box<dyn Io>, Protocol), Response = (Box<dyn ConnectionIo>, Protocol),
Error = ConnectError, Error = ConnectError,
Future = futures_core::future::LocalBoxFuture< Future = futures_core::future::LocalBoxFuture<
'static, 'static,
Result<(Box<dyn Io>, Protocol), ConnectError>, Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>,
>, >,
>, >,
>; >;
InnerConnector::<_, DummyService, _, Box<dyn Io>> { InnerConnector::<_, DummyService, _, Box<dyn ConnectionIo>> {
tcp_pool: ConnectionPool::new( tcp_pool: ConnectionPool::new(
tcp_service, tcp_service,
self.config.no_disconnect_timeout(), self.config.no_disconnect_timeout(),
@ -334,9 +331,12 @@ where
.map(|protos| protos.windows(2).any(|w| w == H2)) .map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false); .unwrap_or(false);
if h2 { if h2 {
(Box::new(sock) as Box<dyn Io>, Protocol::Http2) (
Box::new(sock) as Box<dyn ConnectionIo>,
Protocol::Http2,
)
} else { } else {
(Box::new(sock) as Box<dyn Io>, Protocol::Http1) (Box::new(sock) as _, Protocol::Http1)
} }
}) })
.map_err(ConnectError::from), .map_err(ConnectError::from),
@ -354,9 +354,9 @@ where
.map(|protos| protos.windows(2).any(|w| w == H2)) .map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false); .unwrap_or(false);
if h2 { if h2 {
(Box::new(sock) as Box<dyn Io>, Protocol::Http2) (Box::new(sock) as _, Protocol::Http2)
} else { } else {
(Box::new(sock) as Box<dyn Io>, Protocol::Http1) (Box::new(sock) as _, Protocol::Http1)
} }
}), }),
), ),

View File

@ -14,7 +14,7 @@ pub use actix_tls::connect::{
Connect as TcpConnect, ConnectError as TcpConnectError, Connection as TcpConnection, Connect as TcpConnect, ConnectError as TcpConnectError, Connection as TcpConnection,
}; };
pub use self::connection::Connection; pub use self::connection::{Connection, ConnectionIo};
pub use self::connector::Connector; pub use self::connector::Connector;
pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
pub use crate::Protocol; pub use crate::Protocol;

View File

@ -1,15 +1,16 @@
use std::{ use std::{
fmt,
future::Future, future::Future,
io, net, net,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use actix_codec::Framed;
use actix_http::{ use actix_http::{
body::Body, body::Body,
client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError}, client::{
Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
},
h1::ClientCodec, h1::ClientCodec,
Payload, RequestHead, RequestHeadType, ResponseHead, Payload, RequestHead, RequestHeadType, ResponseHead,
}; };
@ -123,7 +124,7 @@ impl<Fut, C, Io> Future for ConnectRequestFuture<Fut, Io>
where where
Fut: Future<Output = Result<C, ConnectError>>, Fut: Future<Output = Result<C, ConnectError>>,
C: Connection<Io = Io>, C: Connection<Io = Io>,
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: ConnectionIo,
{ {
type Output = Result<ConnectResponse, SendRequestError>; type Output = Result<ConnectResponse, SendRequestError>;
@ -138,14 +139,14 @@ where
let fut = ConnectRequestFuture::Client { let fut = ConnectRequestFuture::Client {
fut: connection.send_request(head, body), fut: connection.send_request(head, body),
}; };
self.as_mut().set(fut); self.set(fut);
} }
ConnectRequest::Tunnel(head, ..) => { ConnectRequest::Tunnel(head, ..) => {
// send request // send request
let fut = ConnectRequestFuture::Tunnel { let fut = ConnectRequestFuture::Tunnel {
fut: connection.open_tunnel(RequestHeadType::from(head)), fut: connection.open_tunnel(RequestHeadType::from(head)),
}; };
self.as_mut().set(fut); self.set(fut);
} }
} }
self.poll(cx) self.poll(cx)
@ -158,65 +159,11 @@ where
} }
ConnectRequestProj::Tunnel { fut } => { ConnectRequestProj::Tunnel { fut } => {
let (head, framed) = ready!(fut.as_mut().poll(cx))?; let (head, framed) = ready!(fut.as_mut().poll(cx))?;
let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); let framed = framed.into_map_io(|io| Box::new(io) as _);
Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed))) Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
} }
} }
} }
} }
trait AsyncSocket { pub type BoxedSocket = Box<dyn ConnectionIo>;
fn as_read(&self) -> &(dyn AsyncRead + Unpin);
fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin);
fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin);
}
struct Socket<T: AsyncRead + AsyncWrite + Unpin>(T);
impl<T: AsyncRead + AsyncWrite + Unpin> AsyncSocket for Socket<T> {
fn as_read(&self) -> &(dyn AsyncRead + Unpin) {
&self.0
}
fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin) {
&mut self.0
}
fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin) {
&mut self.0
}
}
pub struct BoxedSocket(Box<dyn AsyncSocket>);
impl fmt::Debug for BoxedSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BoxedSocket")
}
}
impl AsyncRead for BoxedSocket {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf)
}
}
impl AsyncWrite for BoxedSocket {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(self.get_mut().0.as_write()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_write()).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_write()).poll_shutdown(cx)
}
}