use std::{ fmt, io, net, pin::Pin, task::{Context, Poll}, }; use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use actix_http::{ body::Body, client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError}, h1::ClientCodec, RequestHead, RequestHeadType, ResponseHead, }; use actix_service::Service; use futures_core::future::LocalBoxFuture; use crate::response::ClientResponse; pub(crate) struct ConnectorWrapper(pub T); type TunnelResponse = (ResponseHead, Framed); pub(crate) trait Connect { fn send_request( &self, head: RequestHeadType, body: Body, addr: Option, ) -> LocalBoxFuture<'static, Result>; /// Send request, returns Response and Framed fn open_tunnel( &self, head: RequestHead, addr: Option, ) -> LocalBoxFuture<'static, Result>; } impl Connect for ConnectorWrapper where T: Service, T::Response: Connection, ::Io: 'static, ::Future: 'static, ::TunnelFuture: 'static, T::Future: 'static, { fn send_request( &self, head: RequestHeadType, body: Body, addr: Option, ) -> LocalBoxFuture<'static, Result> { // connect to the host let fut = self.0.call(ClientConnect { uri: head.as_ref().uri.clone(), addr, }); Box::pin(async move { let connection = fut.await?; // send request let (head, payload) = connection.send_request(head, body).await?; Ok(ClientResponse::new(head, payload)) }) } fn open_tunnel( &self, head: RequestHead, addr: Option, ) -> LocalBoxFuture<'static, Result> { // connect to the host let fut = self.0.call(ClientConnect { uri: head.uri.clone(), addr, }); Box::pin(async move { let connection = fut.await?; // send request let (head, framed) = connection.open_tunnel(RequestHeadType::from(head)).await?; let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); Ok((head, framed)) }) } } trait AsyncSocket { fn as_read(&self) -> &(dyn AsyncRead + Unpin); fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin); fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin); } struct Socket(T); impl AsyncSocket for Socket { fn as_read(&self) -> &(dyn AsyncRead + Unpin) { &self.0 } fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin) { &mut self.0 } fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin) { &mut self.0 } } pub struct BoxedSocket(Box); impl fmt::Debug for BoxedSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "BoxedSocket") } } impl AsyncRead for BoxedSocket { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf) } } impl AsyncWrite for BoxedSocket { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(self.get_mut().0.as_write()).poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(self.get_mut().0.as_write()).poll_flush(cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { Pin::new(self.get_mut().0.as_write()).poll_shutdown(cx) } }