From 52a45fda53626273b4596a81fa9fb422dd8ffa60 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 13 Mar 2019 12:40:11 -0700 Subject: [PATCH] redesign actix-connector --- actix-connector/Cargo.toml | 16 +- actix-connector/src/connect.rs | 161 +++++++++++ actix-connector/src/connector.rs | 430 +++++++---------------------- actix-connector/src/error.rs | 30 ++ actix-connector/src/lib.rs | 61 +++- actix-connector/src/resolver.rs | 178 +++++++----- actix-connector/src/ssl/openssl.rs | 76 +++-- 7 files changed, 521 insertions(+), 431 deletions(-) create mode 100644 actix-connector/src/connect.rs create mode 100644 actix-connector/src/error.rs diff --git a/actix-connector/Cargo.toml b/actix-connector/Cargo.toml index b2a2f9cf..49e77bf1 100644 --- a/actix-connector/Cargo.toml +++ b/actix-connector/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "actix-connector" -version = "0.3.0" +name = "actix-connect" +version = "0.1.0" authors = ["Nikolay Kim "] description = "Actix Connector - tcp connector service" keywords = ["network", "framework", "async", "futures"] @@ -27,11 +27,15 @@ default = [] ssl = ["openssl", "tokio-openssl"] [dependencies] -actix-service = "0.3.3" +actix-service = "0.3.4" actix-codec = "0.1.1" -futures = "0.1" -tokio-tcp = "0.1" -tokio-current-thread = "0.1" +actix-utils = "0.3.4" +derive_more = "0.14.0" +either = "1.5.1" +futures = "0.1.25" +log = "0.4" +tokio-tcp = "0.1.3" +tokio-current-thread = "0.1.5" trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false } # openssl diff --git a/actix-connector/src/connect.rs b/actix-connector/src/connect.rs new file mode 100644 index 00000000..e6c8be28 --- /dev/null +++ b/actix-connector/src/connect.rs @@ -0,0 +1,161 @@ +use std::collections::VecDeque; +use std::fmt; +use std::net::SocketAddr; + +use either::Either; + +use crate::error::ConnectError; + +/// Connect request +#[derive(Eq, PartialEq, Debug, Hash)] +pub enum Connect { + /// Host name + Host { host: String, port: u16 }, + /// Host name with address of this host + Addr { + host: String, + addr: Either>, + }, +} + +impl Connect { + /// Create new `Connect` instance. + pub fn new>(host: T, port: u16) -> Connect { + Connect::Host { + host: host.as_ref().to_owned(), + port, + } + } + + /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 + pub fn with>(host: T) -> Result { + let mut parts_iter = host.as_ref().splitn(2, ':'); + let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?; + let port_str = parts_iter.next().unwrap_or(""); + let port = port_str + .parse::() + .map_err(|_| ConnectError::InvalidInput)?; + Ok(Connect::Host { + host: host.to_owned(), + port, + }) + } + + /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages. + pub fn with_address>(host: T, addr: SocketAddr) -> Connect { + Connect::Addr { + addr: Either::Left(addr), + host: host.into(), + } + } + + /// Host name + fn host(&self) -> &str { + match self { + Connect::Host { ref host, .. } => host, + Connect::Addr { ref host, .. } => host, + } + } +} + +impl fmt::Display for Connect { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}:{}", self.host(), 0) + } +} + +pub struct Stream { + io: T, + host: String, + params: P, +} + +impl Stream { + pub fn new(io: T, host: String) -> Self { + Self { + io, + host, + params: (), + } + } +} + +impl Stream { + /// Reconstruct from a parts. + pub fn from_parts(io: T, host: String, params: P) -> Self { + Self { io, params, host } + } + + /// Deconstruct into a parts. + pub fn into_parts(self) -> (T, String, P) { + (self.io, self.host, self.params) + } + + /// Replace inclosed object, return new Stream and old object + pub fn replace(self, io: U) -> (T, Stream) { + ( + self.io, + Stream { + io, + host: self.host, + params: self.params, + }, + ) + } + + /// Returns a shared reference to the underlying stream. + pub fn get_ref(&self) -> &T { + &self.io + } + + /// Returns a mutable reference to the underlying stream. + pub fn get_mut(&mut self) -> &mut T { + &mut self.io + } + + /// Get host name + pub fn host(&self) -> &str { + &self.host + } + + /// Return new Io object with new parameter. + pub fn set(self, params: U) -> Stream { + Stream { + io: self.io, + host: self.host, + params: params, + } + } + + /// Maps an Io<_, P> to Io<_, U> by applying a function to a contained value. + pub fn map(self, op: F) -> Stream + where + F: FnOnce(P) -> U, + { + Stream { + io: self.io, + host: self.host, + params: op(self.params), + } + } +} + +impl std::ops::Deref for Stream { + type Target = T; + + fn deref(&self) -> &T { + &self.io + } +} + +impl std::ops::DerefMut for Stream { + fn deref_mut(&mut self) -> &mut T { + &mut self.io + } +} + +impl fmt::Debug for Stream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Stream {{{:?}}}", self.io) + } +} diff --git a/actix-connector/src/connector.rs b/actix-connector/src/connector.rs index e3c12cc2..11849a46 100644 --- a/actix-connector/src/connector.rs +++ b/actix-connector/src/connector.rs @@ -1,318 +1,89 @@ use std::collections::VecDeque; -use std::marker::PhantomData; -use std::net::{IpAddr, SocketAddr}; -use std::time::Duration; -use std::{fmt, io}; +use std::net::SocketAddr; -use actix_service::{fn_factory, NewService, Service}; -use futures::future::{ok, Either}; -use futures::{try_ready, Async, Future, Poll}; +use actix_service::{NewService, Service}; +use futures::future::{err, ok, Either, FutureResult}; +use futures::{Async, Future, Poll}; use tokio_tcp::{ConnectFuture, TcpStream}; -use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -use trust_dns_resolver::system_conf::read_system_conf; -use super::resolver::{RequestHost, ResolveError, Resolver, ResolverFuture}; +use super::connect::{Connect, Stream}; +use super::error::ConnectError; -/// Port of the request -pub trait RequestPort { - fn port(&self) -> u16; -} +/// Tcp connector service factory +#[derive(Copy, Clone, Debug)] +pub struct ConnectorFactory; -// #[derive(Fail, Debug)] -#[derive(Debug)] -pub enum ConnectorError { - /// Failed to resolve the hostname - // #[fail(display = "Failed resolving hostname: {}", _0)] - Resolver(ResolveError), +impl NewService for ConnectorFactory { + type Request = Connect; + type Response = Stream; + type Error = ConnectError; + type Service = Connector; + type InitError = (); + type Future = FutureResult; - /// No dns records - // #[fail(display = "No dns records found for the input")] - NoRecords, - - /// Connecting took too long - // #[fail(display = "Timeout out while establishing connection")] - Timeout, - - /// Invalid input - InvalidInput, - - /// Connection io error - // #[fail(display = "{}", _0)] - IoError(io::Error), -} - -impl From for ConnectorError { - fn from(err: ResolveError) -> Self { - ConnectorError::Resolver(err) + fn new_service(&self, _: &()) -> Self::Future { + ok(Connector) } } -impl From for ConnectorError { - fn from(err: io::Error) -> Self { - ConnectorError::IoError(err) - } -} - -/// Connect request -#[derive(Eq, PartialEq, Debug, Hash)] -pub struct Connect { - pub kind: ConnectKind, - pub timeout: Duration, -} - -#[derive(Eq, PartialEq, Debug, Hash)] -pub enum ConnectKind { - Host { host: String, port: u16 }, - Addr { host: String, addr: SocketAddr }, -} - -impl Connect { - /// Create new `Connect` instance. - pub fn new>(host: T, port: u16) -> Connect { - Connect { - kind: ConnectKind::Host { - host: host.as_ref().to_owned(), - port, - }, - timeout: Duration::from_secs(1), - } - } - - /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 - pub fn with>(host: T) -> Result { - let mut parts_iter = host.as_ref().splitn(2, ':'); - let host = parts_iter.next().ok_or(ConnectorError::InvalidInput)?; - let port_str = parts_iter.next().unwrap_or(""); - let port = port_str - .parse::() - .map_err(|_| ConnectorError::InvalidInput)?; - Ok(Connect { - kind: ConnectKind::Host { - host: host.to_owned(), - port, - }, - timeout: Duration::from_secs(1), - }) - } - - /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages. - pub fn with_address>(host: T, addr: SocketAddr) -> Connect { - Connect { - kind: ConnectKind::Addr { - addr, - host: host.into(), - }, - timeout: Duration::from_secs(1), - } - } - - /// Set connect timeout - /// - /// By default timeout is set to a 1 second. - pub fn timeout(mut self, timeout: Duration) -> Connect { - self.timeout = timeout; - self - } -} - -impl RequestHost for Connect { - fn host(&self) -> &str { - match self.kind { - ConnectKind::Host { ref host, .. } => host, - ConnectKind::Addr { ref host, .. } => host, - } - } -} - -impl RequestPort for Connect { - fn port(&self) -> u16 { - match self.kind { - ConnectKind::Host { port, .. } => port, - ConnectKind::Addr { addr, .. } => addr.port(), - } - } -} - -impl fmt::Display for Connect { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}:{}", self.host(), self.port()) - } -} - -/// Tcp connector -pub struct Connector { - resolver: Resolver, -} - -impl Default for Connector { - fn default() -> Self { - let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { - (cfg, opts) - } else { - (ResolverConfig::default(), ResolverOpts::default()) - }; - - Connector::new(cfg, opts) - } -} - -impl Connector { - /// Create new connector with resolver configuration - pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { - Connector { - resolver: Resolver::new(cfg, opts), - } - } - - /// Create new connector with custom resolver - pub fn with_resolver( - resolver: Resolver, - ) -> impl Service - + Clone { - Connector { resolver } - } - - /// Create new default connector service - pub fn new_service_with_config( - cfg: ResolverConfig, - opts: ResolverOpts, - ) -> impl NewService< - (), - Request = Connect, - Response = (Connect, TcpStream), - Error = ConnectorError, - InitError = E, - > + Clone { - fn_factory(move || ok(Connector::new(cfg.clone(), opts))) - } -} - -impl Clone for Connector { - fn clone(&self) -> Self { - Connector { - resolver: self.resolver.clone(), - } - } -} +/// Tcp connector service +#[derive(Copy, Clone, Debug)] +pub struct Connector; impl Service for Connector { type Request = Connect; - type Response = (Connect, TcpStream); - type Error = ConnectorError; - type Future = Either; + type Response = Stream; + type Error = ConnectError; + type Future = Either>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Connect) -> Self::Future { - match req.kind { - ConnectKind::Host { .. } => Either::A(ConnectorFuture { - fut: self.resolver.call(req), - fut2: None, - }), - ConnectKind::Addr { addr, .. } => { - let mut addrs = VecDeque::new(); - addrs.push_back(addr.ip()); - Either::B(ConnectorTcpFuture { - fut: TcpConnectorResponse::new(req, addrs), - }) + match req { + Connect::Host { .. } => { + error!("TCP connector: got unresolved address"); + Either::B(err(ConnectError::Unresolverd)) } + Connect::Addr { host, addr } => Either::A(ConnectorResponse::new(host, addr)), } } } -#[doc(hidden)] -pub struct ConnectorFuture { - fut: ResolverFuture, - fut2: Option>, -} - -impl Future for ConnectorFuture { - type Item = (Connect, TcpStream); - type Error = ConnectorError; - - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut2 { - return fut.poll().map_err(ConnectorError::from); - } - match self.fut.poll().map_err(ConnectorError::from)? { - Async::Ready((req, addrs)) => { - if addrs.is_empty() { - Err(ConnectorError::NoRecords) - } else { - self.fut2 = Some(TcpConnectorResponse::new(req, addrs)); - self.poll() - } - } - Async::NotReady => Ok(Async::NotReady), - } - } -} - -#[doc(hidden)] -pub struct ConnectorTcpFuture { - fut: TcpConnectorResponse, -} - -impl Future for ConnectorTcpFuture { - type Item = (Connect, TcpStream); - type Error = ConnectorError; - - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(ConnectorError::IoError) - } -} - -/// Tcp stream connector service -pub struct TcpConnector(PhantomData); - -impl Default for TcpConnector { - fn default() -> TcpConnector { - TcpConnector(PhantomData) - } -} - -impl Service for TcpConnector { - type Request = (T, VecDeque); - type Response = (T, TcpStream); - type Error = io::Error; - type Future = TcpConnectorResponse; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, (req, addrs): (T, VecDeque)) -> Self::Future { - TcpConnectorResponse::new(req, addrs) - } -} - #[doc(hidden)] /// Tcp stream connector response future -pub struct TcpConnectorResponse { - port: u16, - req: Option, - addr: Option, - addrs: VecDeque, +pub struct ConnectorResponse { + host: Option, + addrs: Option>, stream: Option, } -impl TcpConnectorResponse { - pub fn new(req: T, addrs: VecDeque) -> TcpConnectorResponse { - TcpConnectorResponse { - addrs, - port: req.port(), - req: Some(req), - addr: None, - stream: None, +impl ConnectorResponse { + pub fn new( + host: String, + addr: either::Either>, + ) -> ConnectorResponse { + trace!("TCP connector - connecting to {:?}", host); + + match addr { + either::Either::Left(addr) => ConnectorResponse { + host: Some(host), + addrs: None, + stream: Some(TcpStream::connect(&addr)), + }, + either::Either::Right(addrs) => ConnectorResponse { + host: Some(host), + addrs: Some(addrs), + stream: None, + }, } } } -impl Future for TcpConnectorResponse { - type Item = (T, TcpStream); - type Error = io::Error; +impl Future for ConnectorResponse { + type Item = Stream; + type Error = ConnectError; fn poll(&mut self) -> Poll { // connect @@ -320,67 +91,76 @@ impl Future for TcpConnectorResponse { if let Some(new) = self.stream.as_mut() { match new.poll() { Ok(Async::Ready(sock)) => { - return Ok(Async::Ready((self.req.take().unwrap(), sock))); + let host = self.host.take().unwrap(); + trace!( + "TCP connector - successfully connected to connecting to {:?} - {:?}", + host, sock.peer_addr() + ); + return Ok(Async::Ready(Stream::new(sock, self.host.take().unwrap()))); } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { - if self.addrs.is_empty() { - return Err(err); + trace!( + "TCP connector - failed to connect to connecting to {:?}", + self.host.as_ref().unwrap() + ); + if self.addrs.as_ref().unwrap().is_empty() { + return Err(err.into()); } } } } // try to connect - let addr = SocketAddr::new(self.addrs.pop_front().unwrap(), self.port); - self.stream = Some(TcpStream::connect(&addr)); - self.addr = Some(addr) + self.stream = Some(TcpStream::connect( + &self.addrs.as_mut().unwrap().pop_front().unwrap(), + )); } } } -#[derive(Clone)] -pub struct DefaultConnector(Connector); +// #[derive(Clone)] +// pub struct DefaultConnector(Connector); -impl Default for DefaultConnector { - fn default() -> Self { - DefaultConnector(Connector::default()) - } -} +// impl Default for DefaultConnector { +// fn default() -> Self { +// DefaultConnector(Connector::default()) +// } +// } -impl DefaultConnector { - pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { - DefaultConnector(Connector::new(cfg, opts)) - } -} +// impl DefaultConnector { +// pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { +// DefaultConnector(Connector::new(cfg, opts)) +// } +// } -impl Service for DefaultConnector { - type Request = Connect; - type Response = TcpStream; - type Error = ConnectorError; - type Future = DefaultConnectorFuture; +// impl Service for DefaultConnector { +// type Request = Connect; +// type Response = TcpStream; +// type Error = ConnectorError; +// type Future = DefaultConnectorFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.poll_ready() - } +// fn poll_ready(&mut self) -> Poll<(), Self::Error> { +// self.0.poll_ready() +// } - fn call(&mut self, req: Connect) -> Self::Future { - DefaultConnectorFuture { - fut: self.0.call(req), - } - } -} +// fn call(&mut self, req: Connect) -> Self::Future { +// DefaultConnectorFuture { +// fut: self.0.call(req), +// } +// } +// } -#[doc(hidden)] -pub struct DefaultConnectorFuture { - fut: Either, -} +// #[doc(hidden)] +// pub struct DefaultConnectorFuture { +// fut: Either, +// } -impl Future for DefaultConnectorFuture { - type Item = TcpStream; - type Error = ConnectorError; +// impl Future for DefaultConnectorFuture { +// type Item = TcpStream; +// type Error = ConnectorError; - fn poll(&mut self) -> Poll { - Ok(Async::Ready(try_ready!(self.fut.poll()).1)) - } -} +// fn poll(&mut self) -> Poll { +// Ok(Async::Ready(try_ready!(self.fut.poll()).1)) +// } +// } diff --git a/actix-connector/src/error.rs b/actix-connector/src/error.rs new file mode 100644 index 00000000..01e5ae2d --- /dev/null +++ b/actix-connector/src/error.rs @@ -0,0 +1,30 @@ +use std::io; + +use derive_more::{Display, From}; +use trust_dns_resolver::error::ResolveError; + +#[derive(Debug, From, Display)] +pub enum ConnectError { + /// Failed to resolve the hostname + #[display(fmt = "Failed resolving hostname: {}", _0)] + Resolver(ResolveError), + + /// No dns records + #[display(fmt = "No dns records found for the input")] + NoRecords, + + /// Connecting took too long + #[display(fmt = "Timeout out while establishing connection")] + Timeout, + + /// Invalid input + InvalidInput, + + /// Unresolved host name + #[display(fmt = "Connector received `Connect` method with unresolved host")] + Unresolverd, + + /// Connection io error + #[display(fmt = "{}", _0)] + IoError(io::Error), +} diff --git a/actix-connector/src/lib.rs b/actix-connector/src/lib.rs index be92d5e7..257f5994 100644 --- a/actix-connector/src/lib.rs +++ b/actix-connector/src/lib.rs @@ -1,16 +1,65 @@ -//! Actix Connector - tcp connector service +//! Actix connect - tcp connector service //! //! ## Package feature //! -//! * `tls` - enables ssl support via `native-tls` crate //! * `ssl` - enables ssl support via `openssl` crate //! * `rust-tls` - enables ssl support via `rustls` crate +#[macro_use] +extern crate log; + +mod connect; mod connector; +mod error; mod resolver; pub mod ssl; -pub use self::connector::{ - Connect, Connector, ConnectorError, DefaultConnector, RequestPort, TcpConnector, -}; -pub use self::resolver::{RequestHost, Resolver}; +pub use trust_dns_resolver::error::ResolveError; + +pub use self::connect::{Connect, Stream}; +pub use self::connector::{Connector, ConnectorFactory}; +pub use self::error::ConnectError; +pub use self::resolver::{Resolver, ResolverFactory}; + +use actix_service::{NewService, Service, ServiceExt}; +use tokio_tcp::TcpStream; +use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; + +/// Create tcp connector service +pub fn new_connector( + cfg: ResolverConfig, + opts: ResolverOpts, +) -> impl Service, Error = ConnectError> + Clone +{ + Resolver::new(cfg, opts).and_then(Connector) +} + +/// Create tcp connector service +pub fn new_connector_factory( + cfg: ResolverConfig, + opts: ResolverOpts, +) -> impl NewService< + Request = Connect, + Response = Stream, + Error = ConnectError, + InitError = (), +> + Clone { + ResolverFactory::new(cfg, opts).and_then(ConnectorFactory) +} + +/// Create connector service with default parameters +pub fn default_connector( +) -> impl Service, Error = ConnectError> + Clone +{ + Resolver::default().and_then(Connector) +} + +/// Create connector service factory with default parameters +pub fn default_connector_factory() -> impl NewService< + Request = Connect, + Response = Stream, + Error = ConnectError, + InitError = (), +> + Clone { + ResolverFactory::default().and_then(ConnectorFactory) +} diff --git a/actix-connector/src/resolver.rs b/actix-connector/src/resolver.rs index 443fdd75..5dc750b9 100644 --- a/actix-connector/src/resolver.rs +++ b/actix-connector/src/resolver.rs @@ -1,32 +1,72 @@ use std::collections::VecDeque; -use std::marker::PhantomData; -use std::net::IpAddr; +use std::net::SocketAddr; -use actix_service::Service; +use actix_service::{NewService, Service}; +use futures::future::{ok, Either, FutureResult}; use futures::{Async, Future, Poll}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -pub use trust_dns_resolver::error::ResolveError; use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::{AsyncResolver, Background}; -/// Host name of the request -pub trait RequestHost { - fn host(&self) -> &str; +use crate::connect::Connect; +use crate::error::ConnectError; + +/// DNS Resolver Service factory +pub struct ResolverFactory { + resolver: AsyncResolver, } -impl RequestHost for String { - fn host(&self) -> &str { - self.as_ref() +impl Default for ResolverFactory { + fn default() -> Self { + let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { + (cfg, opts) + } else { + (ResolverConfig::default(), ResolverOpts::default()) + }; + + ResolverFactory::new(cfg, opts) } } -pub struct Resolver { - resolver: AsyncResolver, - req: PhantomData, +impl ResolverFactory { + /// Create new resolver instance with custom configuration and options. + pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { + let (resolver, bg) = AsyncResolver::new(cfg, opts); + tokio_current_thread::spawn(bg); + ResolverFactory { resolver } + } } -impl Default for Resolver { +impl Clone for ResolverFactory { + fn clone(&self) -> Self { + ResolverFactory { + resolver: self.resolver.clone(), + } + } +} + +impl NewService for ResolverFactory { + type Request = Connect; + type Response = Connect; + type Error = ConnectError; + type Service = Resolver; + type InitError = (); + type Future = FutureResult; + + fn new_service(&self, _: &()) -> Self::Future { + ok(Resolver { + resolver: self.resolver.clone(), + }) + } +} + +/// DNS Resolver Service +pub struct Resolver { + resolver: AsyncResolver, +} + +impl Default for Resolver { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -38,91 +78,101 @@ impl Default for Resolver { } } -impl Resolver { +impl Resolver { /// Create new resolver instance with custom configuration and options. pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { let (resolver, bg) = AsyncResolver::new(cfg, opts); tokio_current_thread::spawn(bg); - Resolver { - resolver, - req: PhantomData, - } - } - - /// Change type of resolver request. - pub fn into_request(&self) -> Resolver { - Resolver { - resolver: self.resolver.clone(), - req: PhantomData, - } + Resolver { resolver } } } -impl Clone for Resolver { +impl Clone for Resolver { fn clone(&self) -> Self { Resolver { resolver: self.resolver.clone(), - req: PhantomData, } } } -impl Service for Resolver { - type Request = T; - type Response = (T, VecDeque); - type Error = ResolveError; - type Future = ResolverFuture; +impl Service for Resolver { + type Request = Connect; + type Response = Connect; + type Error = ConnectError; + type Future = Either>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, req: T) -> Self::Future { - if let Ok(ip) = req.host().parse() { - let mut addrs = VecDeque::new(); - addrs.push_back(ip); - ResolverFuture::new(req, &self.resolver, Some(addrs)) - } else { - ResolverFuture::new(req, &self.resolver, None) + fn call(&mut self, req: Connect) -> Self::Future { + match req { + Connect::Host { host, port } => { + if let Ok(ip) = host.parse() { + Either::B(ok(Connect::Addr { + host: host, + addr: either::Either::Left(SocketAddr::new(ip, port)), + })) + } else { + trace!("DNS resolver: resolving host {:?}", host); + Either::A(ResolverFuture::new(host, port, &self.resolver)) + } + } + other => Either::B(ok(other)), } } } #[doc(hidden)] /// Resolver future -pub struct ResolverFuture { - req: Option, +pub struct ResolverFuture { + host: Option, + port: u16, lookup: Option>, - addrs: Option>, } -impl ResolverFuture { - pub fn new(addr: T, resolver: &AsyncResolver, addrs: Option>) -> Self { - // we need to do dns resolution - let lookup = Some(resolver.lookup_ip(addr.host())); +impl ResolverFuture { + pub fn new(host: String, port: u16, resolver: &AsyncResolver) -> Self { ResolverFuture { - lookup, - addrs, - req: Some(addr), + lookup: Some(resolver.lookup_ip(host.as_str())), + host: Some(host), + port, } } } -impl Future for ResolverFuture { - type Item = (T, VecDeque); - type Error = ResolveError; +impl Future for ResolverFuture { + type Item = Connect; + type Error = ConnectError; fn poll(&mut self) -> Poll { - if let Some(addrs) = self.addrs.take() { - Ok(Async::Ready((self.req.take().unwrap(), addrs))) - } else { - match self.lookup.as_mut().unwrap().poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(ips)) => Ok(Async::Ready(( - self.req.take().unwrap(), - ips.iter().collect(), - ))), - Err(err) => Err(err), + match self.lookup.as_mut().unwrap().poll().map_err(|e| { + trace!( + "DNS resolver: failed to resolve host {:?} err: {}", + self.host.as_ref().unwrap(), + e + ); + e + })? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(ips) => { + let host = self.host.take().unwrap(); + let mut addrs: VecDeque<_> = ips + .iter() + .map(|ip| SocketAddr::new(ip, self.port)) + .collect(); + trace!("DNS resolver: host {:?} resolved to {:?}", host, addrs); + if addrs.len() == 1 { + Ok(Async::Ready(Connect::Addr { + addr: either::Either::Left(addrs.pop_front().unwrap()), + host, + })) + } else { + Ok(Async::Ready(Connect::Addr { + addr: either::Either::Right(addrs), + host, + })) + } } } } diff --git a/actix-connector/src/ssl/openssl.rs b/actix-connector/src/ssl/openssl.rs index 3df9cab5..59ccdb0f 100644 --- a/actix-connector/src/ssl/openssl.rs +++ b/actix-connector/src/ssl/openssl.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::marker::PhantomData; use actix_codec::{AsyncRead, AsyncWrite}; @@ -6,15 +7,15 @@ use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use openssl::ssl::{HandshakeError, SslConnector}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; -use crate::resolver::RequestHost; +use crate::Stream; /// Openssl connector factory -pub struct OpensslConnector { +pub struct OpensslConnector { connector: SslConnector, - _t: PhantomData<(R, T, E)>, + _t: PhantomData<(T, P, E)>, } -impl OpensslConnector { +impl OpensslConnector { pub fn new(connector: SslConnector) -> Self { OpensslConnector { connector, @@ -23,11 +24,14 @@ impl OpensslConnector { } } -impl OpensslConnector { +impl OpensslConnector { pub fn service( connector: SslConnector, - ) -> impl Service), Error = HandshakeError> - { + ) -> impl Service< + Request = Stream, + Response = Stream, P>, + Error = HandshakeError, + > { OpensslConnectorService { connector: connector, _t: PhantomData, @@ -35,7 +39,7 @@ impl OpensslConnector { } } -impl Clone for OpensslConnector { +impl Clone for OpensslConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), @@ -44,13 +48,14 @@ impl Clone for OpensslConnector { } } -impl NewService<()> - for OpensslConnector +impl NewService<()> for OpensslConnector +where + T: AsyncRead + AsyncWrite + fmt::Debug, { - type Request = (R, T); - type Response = (R, SslStream); + type Request = Stream; + type Response = Stream, P>; type Error = HandshakeError; - type Service = OpensslConnectorService; + type Service = OpensslConnectorService; type InitError = E; type Future = FutureResult; @@ -62,45 +67,56 @@ impl NewService<()> } } -pub struct OpensslConnectorService { +pub struct OpensslConnectorService { connector: SslConnector, - _t: PhantomData<(R, T)>, + _t: PhantomData<(T, P)>, } -impl Service for OpensslConnectorService { - type Request = (R, T); - type Response = (R, SslStream); +impl Service for OpensslConnectorService +where + T: AsyncRead + AsyncWrite + fmt::Debug, +{ + type Request = Stream; + type Response = Stream, P>; type Error = HandshakeError; - type Future = ConnectAsyncExt; + type Future = ConnectAsyncExt; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, (req, stream): (R, T)) -> Self::Future { + fn call(&mut self, stream: Stream) -> Self::Future { + trace!("SSL Handshake start for: {:?}", stream.host()); + let (io, stream) = stream.replace(()); ConnectAsyncExt { - fut: SslConnectorExt::connect_async(&self.connector, req.host(), stream), - req: Some(req), + fut: SslConnectorExt::connect_async(&self.connector, stream.host(), io), + stream: Some(stream), } } } -pub struct ConnectAsyncExt { - req: Option, +pub struct ConnectAsyncExt { fut: ConnectAsync, + stream: Option>, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where - R: RequestHost, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + fmt::Debug, { - type Item = (R, SslStream); + type Item = Stream, P>; type Error = HandshakeError; fn poll(&mut self) -> Poll { - match self.fut.poll()? { - Async::Ready(stream) => Ok(Async::Ready((self.req.take().unwrap(), stream))), + match self.fut.poll().map_err(|e| { + trace!("SSL Handshake error: {:?}", e); + e + })? { + Async::Ready(stream) => { + let s = self.stream.take().unwrap(); + trace!("SSL Handshake success: {:?}", s.host()); + Ok(Async::Ready(s.replace(stream).1)) + } Async::NotReady => Ok(Async::NotReady), } }