use std::collections::VecDeque; use std::future::Future; use std::io; use std::marker::PhantomData; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; use futures::future::{err, ok, BoxFuture, Either, FutureExt, Ready}; use tokio_net::tcp::TcpStream; use super::connect::{Address, Connect, Connection}; use super::error::ConnectError; /// Tcp connector service factory #[derive(Debug)] pub struct TcpConnectorFactory(PhantomData); impl TcpConnectorFactory { pub fn new() -> Self { TcpConnectorFactory(PhantomData) } /// Create tcp connector service pub fn service(&self) -> TcpConnector { TcpConnector(PhantomData) } } impl Default for TcpConnectorFactory { fn default() -> Self { TcpConnectorFactory(PhantomData) } } impl Clone for TcpConnectorFactory { fn clone(&self) -> Self { TcpConnectorFactory(PhantomData) } } impl ServiceFactory for TcpConnectorFactory { type Request = Connect; type Response = Connection; type Error = ConnectError; type Config = (); type Service = TcpConnector; type InitError = (); type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(self.service()) } } /// Tcp connector service #[derive(Default, Debug)] pub struct TcpConnector(PhantomData); impl TcpConnector { pub fn new() -> Self { TcpConnector(PhantomData) } } impl Clone for TcpConnector { fn clone(&self) -> Self { TcpConnector(PhantomData) } } impl Service for TcpConnector { type Request = Connect; type Response = Connection; type Error = ConnectError; type Future = Either, Ready>>; fn poll_ready(&mut self, _: &mut Context) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: Connect) -> Self::Future { let port = req.port(); let Connect { req, addr, .. } = req; if let Some(addr) = addr { Either::Left(TcpConnectorResponse::new(req, port, addr)) } else { error!("TCP connector: got unresolved address"); Either::Right(err(ConnectError::Unresolverd)) } } } #[doc(hidden)] /// Tcp stream connector response future pub struct TcpConnectorResponse { req: Option, port: u16, addrs: Option>, stream: Option>>, } impl TcpConnectorResponse { pub fn new( req: T, port: u16, addr: either::Either>, ) -> TcpConnectorResponse { trace!( "TCP connector - connecting to {:?} port:{}", req.host(), port ); match addr { either::Either::Left(addr) => TcpConnectorResponse { req: Some(req), port, addrs: None, stream: Some(TcpStream::connect(addr).boxed()), }, either::Either::Right(addrs) => TcpConnectorResponse { req: Some(req), port, addrs: Some(addrs), stream: None, }, } } } impl Future for TcpConnectorResponse { type Output = Result, ConnectError>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.get_mut(); // connect loop { if let Some(new) = this.stream.as_mut() { match new.as_mut().poll(cx) { Poll::Ready(Ok(sock)) => { let req = this.req.take().unwrap(); trace!( "TCP connector - successfully connected to connecting to {:?} - {:?}", req.host(), sock.peer_addr() ); return Poll::Ready(Ok(Connection::new(sock, req))); } Poll::Pending => return Poll::Pending, Poll::Ready(Err(err)) => { trace!( "TCP connector - failed to connect to connecting to {:?} port: {}", this.req.as_ref().unwrap().host(), this.port, ); if this.addrs.is_none() || this.addrs.as_ref().unwrap().is_empty() { return Poll::Ready(Err(err.into())); } } } } // try to connect let addr = this.addrs.as_mut().unwrap().pop_front().unwrap(); this.stream = Some(TcpStream::connect(addr).boxed()); } } }