use std::{ collections::VecDeque, future::Future, io, net::SocketAddr, pin::Pin, task::{Context, Poll}, }; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; use futures_core::future::LocalBoxFuture; use log::{error, trace}; use super::connect::{Address, Connect, ConnectAddrs, Connection}; use super::error::ConnectError; /// TCP connector service factory #[derive(Copy, Clone, Debug)] pub struct TcpConnectorFactory; impl TcpConnectorFactory { /// Create TCP connector service pub fn service(&self) -> TcpConnector { TcpConnector } } impl ServiceFactory> for TcpConnectorFactory { type Response = Connection; type Error = ConnectError; type Config = (); type Service = TcpConnector; type InitError = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { let service = self.service(); Box::pin(async move { Ok(service) }) } } /// TCP connector service #[derive(Copy, Clone, Debug)] pub struct TcpConnector; impl Service> for TcpConnector { type Response = Connection; type Error = ConnectError; type Future = TcpConnectorResponse; actix_service::always_ready!(); fn call(&mut self, req: Connect) -> Self::Future { let port = req.port(); let Connect { req, addr, .. } = req; TcpConnectorResponse::new(req, port, addr) } } #[doc(hidden)] /// TCP stream connector response future pub enum TcpConnectorResponse { Response { req: Option, port: u16, addrs: Option>, stream: Option>>, }, Error(Option), } impl TcpConnectorResponse { pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse { trace!( "TCP connector - connecting to {:?} port:{}", req.host(), port ); match addr { ConnectAddrs::One(None) => { error!("TCP connector: got unresolved address"); TcpConnectorResponse::Error(Some(ConnectError::Unresolved)) } ConnectAddrs::One(Some(addr)) => TcpConnectorResponse::Response { req: Some(req), port, addrs: None, stream: Some(Box::pin(TcpStream::connect(addr))), }, // when resolver returns multiple socket addr for request they would be popped from // front end of queue and returns with the first successful tcp connection. ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response { req: Some(req), port, addrs: Some(addrs), stream: None, }, } } } impl Future for TcpConnectorResponse { type Output = Result, ConnectError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match this { TcpConnectorResponse::Error(e) => Poll::Ready(Err(e.take().unwrap())), // connect TcpConnectorResponse::Response { req, port, addrs, stream, } => loop { if let Some(new) = stream.as_mut() { match new.as_mut().poll(cx) { Poll::Ready(Ok(sock)) => { let req = req.take().unwrap(); trace!( "TCP connector - successfully connected to connecting to {:?} - {:?}", req.host(), sock.peer_addr() ); return Poll::Ready(Ok(Connection::new(sock, req))); } Poll::Pending => return Poll::Pending, Poll::Ready(Err(err)) => { trace!( "TCP connector - failed to connect to connecting to {:?} port: {}", req.as_ref().unwrap().host(), port, ); if addrs.is_none() || addrs.as_ref().unwrap().is_empty() { return Poll::Ready(Err(ConnectError::Io(err))); } } } } // try to connect let addr = addrs.as_mut().unwrap().pop_front().unwrap(); *stream = Some(Box::pin(TcpStream::connect(addr))); }, } } }