1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-31 11:32:10 +01:00

redesign actix-connector

This commit is contained in:
Nikolay Kim 2019-03-13 12:40:11 -07:00
parent 1fcc0734b5
commit 52a45fda53
7 changed files with 521 additions and 431 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-connector" name = "actix-connect"
version = "0.3.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service" description = "Actix Connector - tcp connector service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -27,11 +27,15 @@ default = []
ssl = ["openssl", "tokio-openssl"] ssl = ["openssl", "tokio-openssl"]
[dependencies] [dependencies]
actix-service = "0.3.3" actix-service = "0.3.4"
actix-codec = "0.1.1" actix-codec = "0.1.1"
futures = "0.1" actix-utils = "0.3.4"
tokio-tcp = "0.1" derive_more = "0.14.0"
tokio-current-thread = "0.1" either = "1.5.1"
futures = "0.1.25"
log = "0.4"
tokio-tcp = "0.1.3"
tokio-current-thread = "0.1.5"
trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false } trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false }
# openssl # openssl

View File

@ -0,0 +1,161 @@
use std::collections::VecDeque;
use std::fmt;
use std::net::SocketAddr;
use either::Either;
use crate::error::ConnectError;
/// Connect request
#[derive(Eq, PartialEq, Debug, Hash)]
pub enum Connect {
/// Host name
Host { host: String, port: u16 },
/// Host name with address of this host
Addr {
host: String,
addr: Either<SocketAddr, VecDeque<SocketAddr>>,
},
}
impl Connect {
/// Create new `Connect` instance.
pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect {
Connect::Host {
host: host.as_ref().to_owned(),
port,
}
}
/// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
pub fn with<T: AsRef<str>>(host: T) -> Result<Connect, ConnectError> {
let mut parts_iter = host.as_ref().splitn(2, ':');
let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?;
let port_str = parts_iter.next().unwrap_or("");
let port = port_str
.parse::<u16>()
.map_err(|_| ConnectError::InvalidInput)?;
Ok(Connect::Host {
host: host.to_owned(),
port,
})
}
/// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
pub fn with_address<T: Into<String>>(host: T, addr: SocketAddr) -> Connect {
Connect::Addr {
addr: Either::Left(addr),
host: host.into(),
}
}
/// Host name
fn host(&self) -> &str {
match self {
Connect::Host { ref host, .. } => host,
Connect::Addr { ref host, .. } => host,
}
}
}
impl fmt::Display for Connect {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}:{}", self.host(), 0)
}
}
pub struct Stream<T, P = ()> {
io: T,
host: String,
params: P,
}
impl<T> Stream<T, ()> {
pub fn new(io: T, host: String) -> Self {
Self {
io,
host,
params: (),
}
}
}
impl<T, P> Stream<T, P> {
/// Reconstruct from a parts.
pub fn from_parts(io: T, host: String, params: P) -> Self {
Self { io, params, host }
}
/// Deconstruct into a parts.
pub fn into_parts(self) -> (T, String, P) {
(self.io, self.host, self.params)
}
/// Replace inclosed object, return new Stream and old object
pub fn replace<U>(self, io: U) -> (T, Stream<U, P>) {
(
self.io,
Stream {
io,
host: self.host,
params: self.params,
},
)
}
/// Returns a shared reference to the underlying stream.
pub fn get_ref(&self) -> &T {
&self.io
}
/// Returns a mutable reference to the underlying stream.
pub fn get_mut(&mut self) -> &mut T {
&mut self.io
}
/// Get host name
pub fn host(&self) -> &str {
&self.host
}
/// Return new Io object with new parameter.
pub fn set<U>(self, params: U) -> Stream<T, U> {
Stream {
io: self.io,
host: self.host,
params: params,
}
}
/// Maps an Io<_, P> to Io<_, U> by applying a function to a contained value.
pub fn map<U, F>(self, op: F) -> Stream<T, U>
where
F: FnOnce(P) -> U,
{
Stream {
io: self.io,
host: self.host,
params: op(self.params),
}
}
}
impl<T, P> std::ops::Deref for Stream<T, P> {
type Target = T;
fn deref(&self) -> &T {
&self.io
}
}
impl<T, P> std::ops::DerefMut for Stream<T, P> {
fn deref_mut(&mut self) -> &mut T {
&mut self.io
}
}
impl<T: fmt::Debug, P> fmt::Debug for Stream<T, P> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Stream {{{:?}}}", self.io)
}
}

View File

@ -1,318 +1,89 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::PhantomData; use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use std::{fmt, io};
use actix_service::{fn_factory, NewService, Service}; use actix_service::{NewService, Service};
use futures::future::{ok, Either}; use futures::future::{err, ok, Either, FutureResult};
use futures::{try_ready, Async, Future, Poll}; use futures::{Async, Future, Poll};
use tokio_tcp::{ConnectFuture, TcpStream}; use tokio_tcp::{ConnectFuture, TcpStream};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::system_conf::read_system_conf;
use super::resolver::{RequestHost, ResolveError, Resolver, ResolverFuture}; use super::connect::{Connect, Stream};
use super::error::ConnectError;
/// Port of the request /// Tcp connector service factory
pub trait RequestPort { #[derive(Copy, Clone, Debug)]
fn port(&self) -> u16; pub struct ConnectorFactory;
}
// #[derive(Fail, Debug)] impl NewService for ConnectorFactory {
#[derive(Debug)] type Request = Connect;
pub enum ConnectorError { type Response = Stream<TcpStream>;
/// Failed to resolve the hostname type Error = ConnectError;
// #[fail(display = "Failed resolving hostname: {}", _0)] type Service = Connector;
Resolver(ResolveError), type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
/// No dns records fn new_service(&self, _: &()) -> Self::Future {
// #[fail(display = "No dns records found for the input")] ok(Connector)
NoRecords,
/// Connecting took too long
// #[fail(display = "Timeout out while establishing connection")]
Timeout,
/// Invalid input
InvalidInput,
/// Connection io error
// #[fail(display = "{}", _0)]
IoError(io::Error),
}
impl From<ResolveError> for ConnectorError {
fn from(err: ResolveError) -> Self {
ConnectorError::Resolver(err)
} }
} }
impl From<io::Error> for ConnectorError { /// Tcp connector service
fn from(err: io::Error) -> Self { #[derive(Copy, Clone, Debug)]
ConnectorError::IoError(err) pub struct Connector;
}
}
/// Connect request
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct Connect {
pub kind: ConnectKind,
pub timeout: Duration,
}
#[derive(Eq, PartialEq, Debug, Hash)]
pub enum ConnectKind {
Host { host: String, port: u16 },
Addr { host: String, addr: SocketAddr },
}
impl Connect {
/// Create new `Connect` instance.
pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect {
Connect {
kind: ConnectKind::Host {
host: host.as_ref().to_owned(),
port,
},
timeout: Duration::from_secs(1),
}
}
/// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
pub fn with<T: AsRef<str>>(host: T) -> Result<Connect, ConnectorError> {
let mut parts_iter = host.as_ref().splitn(2, ':');
let host = parts_iter.next().ok_or(ConnectorError::InvalidInput)?;
let port_str = parts_iter.next().unwrap_or("");
let port = port_str
.parse::<u16>()
.map_err(|_| ConnectorError::InvalidInput)?;
Ok(Connect {
kind: ConnectKind::Host {
host: host.to_owned(),
port,
},
timeout: Duration::from_secs(1),
})
}
/// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
pub fn with_address<T: Into<String>>(host: T, addr: SocketAddr) -> Connect {
Connect {
kind: ConnectKind::Addr {
addr,
host: host.into(),
},
timeout: Duration::from_secs(1),
}
}
/// Set connect timeout
///
/// By default timeout is set to a 1 second.
pub fn timeout(mut self, timeout: Duration) -> Connect {
self.timeout = timeout;
self
}
}
impl RequestHost for Connect {
fn host(&self) -> &str {
match self.kind {
ConnectKind::Host { ref host, .. } => host,
ConnectKind::Addr { ref host, .. } => host,
}
}
}
impl RequestPort for Connect {
fn port(&self) -> u16 {
match self.kind {
ConnectKind::Host { port, .. } => port,
ConnectKind::Addr { addr, .. } => addr.port(),
}
}
}
impl fmt::Display for Connect {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}:{}", self.host(), self.port())
}
}
/// Tcp connector
pub struct Connector {
resolver: Resolver<Connect>,
}
impl Default for Connector {
fn default() -> Self {
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
(cfg, opts)
} else {
(ResolverConfig::default(), ResolverOpts::default())
};
Connector::new(cfg, opts)
}
}
impl Connector {
/// Create new connector with resolver configuration
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
Connector {
resolver: Resolver::new(cfg, opts),
}
}
/// Create new connector with custom resolver
pub fn with_resolver(
resolver: Resolver<Connect>,
) -> impl Service<Request = Connect, Response = (Connect, TcpStream), Error = ConnectorError>
+ Clone {
Connector { resolver }
}
/// Create new default connector service
pub fn new_service_with_config<E>(
cfg: ResolverConfig,
opts: ResolverOpts,
) -> impl NewService<
(),
Request = Connect,
Response = (Connect, TcpStream),
Error = ConnectorError,
InitError = E,
> + Clone {
fn_factory(move || ok(Connector::new(cfg.clone(), opts)))
}
}
impl Clone for Connector {
fn clone(&self) -> Self {
Connector {
resolver: self.resolver.clone(),
}
}
}
impl Service for Connector { impl Service for Connector {
type Request = Connect; type Request = Connect;
type Response = (Connect, TcpStream); type Response = Stream<TcpStream>;
type Error = ConnectorError; type Error = ConnectError;
type Future = Either<ConnectorFuture, ConnectorTcpFuture>; type Future = Either<ConnectorResponse, FutureResult<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
fn call(&mut self, req: Connect) -> Self::Future { fn call(&mut self, req: Connect) -> Self::Future {
match req.kind { match req {
ConnectKind::Host { .. } => Either::A(ConnectorFuture { Connect::Host { .. } => {
fut: self.resolver.call(req), error!("TCP connector: got unresolved address");
fut2: None, Either::B(err(ConnectError::Unresolverd))
}),
ConnectKind::Addr { addr, .. } => {
let mut addrs = VecDeque::new();
addrs.push_back(addr.ip());
Either::B(ConnectorTcpFuture {
fut: TcpConnectorResponse::new(req, addrs),
})
} }
Connect::Addr { host, addr } => Either::A(ConnectorResponse::new(host, addr)),
} }
} }
} }
#[doc(hidden)]
pub struct ConnectorFuture {
fut: ResolverFuture<Connect>,
fut2: Option<TcpConnectorResponse<Connect>>,
}
impl Future for ConnectorFuture {
type Item = (Connect, TcpStream);
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut2 {
return fut.poll().map_err(ConnectorError::from);
}
match self.fut.poll().map_err(ConnectorError::from)? {
Async::Ready((req, addrs)) => {
if addrs.is_empty() {
Err(ConnectorError::NoRecords)
} else {
self.fut2 = Some(TcpConnectorResponse::new(req, addrs));
self.poll()
}
}
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[doc(hidden)]
pub struct ConnectorTcpFuture {
fut: TcpConnectorResponse<Connect>,
}
impl Future for ConnectorTcpFuture {
type Item = (Connect, TcpStream);
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(ConnectorError::IoError)
}
}
/// Tcp stream connector service
pub struct TcpConnector<T: RequestPort>(PhantomData<T>);
impl<T: RequestPort> Default for TcpConnector<T> {
fn default() -> TcpConnector<T> {
TcpConnector(PhantomData)
}
}
impl<T: RequestPort> Service for TcpConnector<T> {
type Request = (T, VecDeque<IpAddr>);
type Response = (T, TcpStream);
type Error = io::Error;
type Future = TcpConnectorResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, (req, addrs): (T, VecDeque<IpAddr>)) -> Self::Future {
TcpConnectorResponse::new(req, addrs)
}
}
#[doc(hidden)] #[doc(hidden)]
/// Tcp stream connector response future /// Tcp stream connector response future
pub struct TcpConnectorResponse<T: RequestPort> { pub struct ConnectorResponse {
port: u16, host: Option<String>,
req: Option<T>, addrs: Option<VecDeque<SocketAddr>>,
addr: Option<SocketAddr>,
addrs: VecDeque<IpAddr>,
stream: Option<ConnectFuture>, stream: Option<ConnectFuture>,
} }
impl<T: RequestPort> TcpConnectorResponse<T> { impl ConnectorResponse {
pub fn new(req: T, addrs: VecDeque<IpAddr>) -> TcpConnectorResponse<T> { pub fn new(
TcpConnectorResponse { host: String,
addrs, addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
port: req.port(), ) -> ConnectorResponse {
req: Some(req), trace!("TCP connector - connecting to {:?}", host);
addr: None,
stream: None, match addr {
either::Either::Left(addr) => ConnectorResponse {
host: Some(host),
addrs: None,
stream: Some(TcpStream::connect(&addr)),
},
either::Either::Right(addrs) => ConnectorResponse {
host: Some(host),
addrs: Some(addrs),
stream: None,
},
} }
} }
} }
impl<T: RequestPort> Future for TcpConnectorResponse<T> { impl Future for ConnectorResponse {
type Item = (T, TcpStream); type Item = Stream<TcpStream>;
type Error = io::Error; type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// connect // connect
@ -320,67 +91,76 @@ impl<T: RequestPort> Future for TcpConnectorResponse<T> {
if let Some(new) = self.stream.as_mut() { if let Some(new) = self.stream.as_mut() {
match new.poll() { match new.poll() {
Ok(Async::Ready(sock)) => { Ok(Async::Ready(sock)) => {
return Ok(Async::Ready((self.req.take().unwrap(), sock))); let host = self.host.take().unwrap();
trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}",
host, sock.peer_addr()
);
return Ok(Async::Ready(Stream::new(sock, self.host.take().unwrap())));
} }
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => { Err(err) => {
if self.addrs.is_empty() { trace!(
return Err(err); "TCP connector - failed to connect to connecting to {:?}",
self.host.as_ref().unwrap()
);
if self.addrs.as_ref().unwrap().is_empty() {
return Err(err.into());
} }
} }
} }
} }
// try to connect // try to connect
let addr = SocketAddr::new(self.addrs.pop_front().unwrap(), self.port); self.stream = Some(TcpStream::connect(
self.stream = Some(TcpStream::connect(&addr)); &self.addrs.as_mut().unwrap().pop_front().unwrap(),
self.addr = Some(addr) ));
} }
} }
} }
#[derive(Clone)] // #[derive(Clone)]
pub struct DefaultConnector(Connector); // pub struct DefaultConnector(Connector);
impl Default for DefaultConnector { // impl Default for DefaultConnector {
fn default() -> Self { // fn default() -> Self {
DefaultConnector(Connector::default()) // DefaultConnector(Connector::default())
} // }
} // }
impl DefaultConnector { // impl DefaultConnector {
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { // pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
DefaultConnector(Connector::new(cfg, opts)) // DefaultConnector(Connector::new(cfg, opts))
} // }
} // }
impl Service for DefaultConnector { // impl Service for DefaultConnector {
type Request = Connect; // type Request = Connect;
type Response = TcpStream; // type Response = TcpStream;
type Error = ConnectorError; // type Error = ConnectorError;
type Future = DefaultConnectorFuture; // type Future = DefaultConnectorFuture;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { // fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready() // self.0.poll_ready()
} // }
fn call(&mut self, req: Connect) -> Self::Future { // fn call(&mut self, req: Connect) -> Self::Future {
DefaultConnectorFuture { // DefaultConnectorFuture {
fut: self.0.call(req), // fut: self.0.call(req),
} // }
} // }
} // }
#[doc(hidden)] // #[doc(hidden)]
pub struct DefaultConnectorFuture { // pub struct DefaultConnectorFuture {
fut: Either<ConnectorFuture, ConnectorTcpFuture>, // fut: Either<ConnectorFuture, ConnectorTcpFuture>,
} // }
impl Future for DefaultConnectorFuture { // impl Future for DefaultConnectorFuture {
type Item = TcpStream; // type Item = TcpStream;
type Error = ConnectorError; // type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { // fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(try_ready!(self.fut.poll()).1)) // Ok(Async::Ready(try_ready!(self.fut.poll()).1))
} // }
} // }

View File

@ -0,0 +1,30 @@
use std::io;
use derive_more::{Display, From};
use trust_dns_resolver::error::ResolveError;
#[derive(Debug, From, Display)]
pub enum ConnectError {
/// Failed to resolve the hostname
#[display(fmt = "Failed resolving hostname: {}", _0)]
Resolver(ResolveError),
/// No dns records
#[display(fmt = "No dns records found for the input")]
NoRecords,
/// Connecting took too long
#[display(fmt = "Timeout out while establishing connection")]
Timeout,
/// Invalid input
InvalidInput,
/// Unresolved host name
#[display(fmt = "Connector received `Connect` method with unresolved host")]
Unresolverd,
/// Connection io error
#[display(fmt = "{}", _0)]
IoError(io::Error),
}

View File

@ -1,16 +1,65 @@
//! Actix Connector - tcp connector service //! Actix connect - tcp connector service
//! //!
//! ## Package feature //! ## Package feature
//! //!
//! * `tls` - enables ssl support via `native-tls` crate
//! * `ssl` - enables ssl support via `openssl` crate //! * `ssl` - enables ssl support via `openssl` crate
//! * `rust-tls` - enables ssl support via `rustls` crate //! * `rust-tls` - enables ssl support via `rustls` crate
#[macro_use]
extern crate log;
mod connect;
mod connector; mod connector;
mod error;
mod resolver; mod resolver;
pub mod ssl; pub mod ssl;
pub use self::connector::{ pub use trust_dns_resolver::error::ResolveError;
Connect, Connector, ConnectorError, DefaultConnector, RequestPort, TcpConnector,
}; pub use self::connect::{Connect, Stream};
pub use self::resolver::{RequestHost, Resolver}; pub use self::connector::{Connector, ConnectorFactory};
pub use self::error::ConnectError;
pub use self::resolver::{Resolver, ResolverFactory};
use actix_service::{NewService, Service, ServiceExt};
use tokio_tcp::TcpStream;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
/// Create tcp connector service
pub fn new_connector(
cfg: ResolverConfig,
opts: ResolverOpts,
) -> impl Service<Request = Connect, Response = Stream<TcpStream>, Error = ConnectError> + Clone
{
Resolver::new(cfg, opts).and_then(Connector)
}
/// Create tcp connector service
pub fn new_connector_factory(
cfg: ResolverConfig,
opts: ResolverOpts,
) -> impl NewService<
Request = Connect,
Response = Stream<TcpStream>,
Error = ConnectError,
InitError = (),
> + Clone {
ResolverFactory::new(cfg, opts).and_then(ConnectorFactory)
}
/// Create connector service with default parameters
pub fn default_connector(
) -> impl Service<Request = Connect, Response = Stream<TcpStream>, Error = ConnectError> + Clone
{
Resolver::default().and_then(Connector)
}
/// Create connector service factory with default parameters
pub fn default_connector_factory() -> impl NewService<
Request = Connect,
Response = Stream<TcpStream>,
Error = ConnectError,
InitError = (),
> + Clone {
ResolverFactory::default().and_then(ConnectorFactory)
}

View File

@ -1,32 +1,72 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::PhantomData; use std::net::SocketAddr;
use std::net::IpAddr;
use actix_service::Service; use actix_service::{NewService, Service};
use futures::future::{ok, Either, FutureResult};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
pub use trust_dns_resolver::error::ResolveError;
use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::lookup_ip::LookupIpFuture;
use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::system_conf::read_system_conf;
use trust_dns_resolver::{AsyncResolver, Background}; use trust_dns_resolver::{AsyncResolver, Background};
/// Host name of the request use crate::connect::Connect;
pub trait RequestHost { use crate::error::ConnectError;
fn host(&self) -> &str;
/// DNS Resolver Service factory
pub struct ResolverFactory {
resolver: AsyncResolver,
} }
impl RequestHost for String { impl Default for ResolverFactory {
fn host(&self) -> &str { fn default() -> Self {
self.as_ref() let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
(cfg, opts)
} else {
(ResolverConfig::default(), ResolverOpts::default())
};
ResolverFactory::new(cfg, opts)
} }
} }
pub struct Resolver<T = String> { impl ResolverFactory {
resolver: AsyncResolver, /// Create new resolver instance with custom configuration and options.
req: PhantomData<T>, pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg);
ResolverFactory { resolver }
}
} }
impl<T: RequestHost> Default for Resolver<T> { impl Clone for ResolverFactory {
fn clone(&self) -> Self {
ResolverFactory {
resolver: self.resolver.clone(),
}
}
}
impl NewService for ResolverFactory {
type Request = Connect;
type Response = Connect;
type Error = ConnectError;
type Service = Resolver;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
ok(Resolver {
resolver: self.resolver.clone(),
})
}
}
/// DNS Resolver Service
pub struct Resolver {
resolver: AsyncResolver,
}
impl Default for Resolver {
fn default() -> Self { fn default() -> Self {
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
(cfg, opts) (cfg, opts)
@ -38,91 +78,101 @@ impl<T: RequestHost> Default for Resolver<T> {
} }
} }
impl<T: RequestHost> Resolver<T> { impl Resolver {
/// Create new resolver instance with custom configuration and options. /// Create new resolver instance with custom configuration and options.
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
let (resolver, bg) = AsyncResolver::new(cfg, opts); let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg); tokio_current_thread::spawn(bg);
Resolver { Resolver { resolver }
resolver,
req: PhantomData,
}
}
/// Change type of resolver request.
pub fn into_request<T2: RequestHost>(&self) -> Resolver<T2> {
Resolver {
resolver: self.resolver.clone(),
req: PhantomData,
}
} }
} }
impl<T> Clone for Resolver<T> { impl Clone for Resolver {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Resolver { Resolver {
resolver: self.resolver.clone(), resolver: self.resolver.clone(),
req: PhantomData,
} }
} }
} }
impl<T: RequestHost> Service for Resolver<T> { impl Service for Resolver {
type Request = T; type Request = Connect;
type Response = (T, VecDeque<IpAddr>); type Response = Connect;
type Error = ResolveError; type Error = ConnectError;
type Future = ResolverFuture<T>; type Future = Either<ResolverFuture, FutureResult<Connect, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
fn call(&mut self, req: T) -> Self::Future { fn call(&mut self, req: Connect) -> Self::Future {
if let Ok(ip) = req.host().parse() { match req {
let mut addrs = VecDeque::new(); Connect::Host { host, port } => {
addrs.push_back(ip); if let Ok(ip) = host.parse() {
ResolverFuture::new(req, &self.resolver, Some(addrs)) Either::B(ok(Connect::Addr {
} else { host: host,
ResolverFuture::new(req, &self.resolver, None) addr: either::Either::Left(SocketAddr::new(ip, port)),
}))
} else {
trace!("DNS resolver: resolving host {:?}", host);
Either::A(ResolverFuture::new(host, port, &self.resolver))
}
}
other => Either::B(ok(other)),
} }
} }
} }
#[doc(hidden)] #[doc(hidden)]
/// Resolver future /// Resolver future
pub struct ResolverFuture<T> { pub struct ResolverFuture {
req: Option<T>, host: Option<String>,
port: u16,
lookup: Option<Background<LookupIpFuture>>, lookup: Option<Background<LookupIpFuture>>,
addrs: Option<VecDeque<IpAddr>>,
} }
impl<T: RequestHost> ResolverFuture<T> { impl ResolverFuture {
pub fn new(addr: T, resolver: &AsyncResolver, addrs: Option<VecDeque<IpAddr>>) -> Self { pub fn new(host: String, port: u16, resolver: &AsyncResolver) -> Self {
// we need to do dns resolution
let lookup = Some(resolver.lookup_ip(addr.host()));
ResolverFuture { ResolverFuture {
lookup, lookup: Some(resolver.lookup_ip(host.as_str())),
addrs, host: Some(host),
req: Some(addr), port,
} }
} }
} }
impl<T: RequestHost> Future for ResolverFuture<T> { impl Future for ResolverFuture {
type Item = (T, VecDeque<IpAddr>); type Item = Connect;
type Error = ResolveError; type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(addrs) = self.addrs.take() { match self.lookup.as_mut().unwrap().poll().map_err(|e| {
Ok(Async::Ready((self.req.take().unwrap(), addrs))) trace!(
} else { "DNS resolver: failed to resolve host {:?} err: {}",
match self.lookup.as_mut().unwrap().poll() { self.host.as_ref().unwrap(),
Ok(Async::NotReady) => Ok(Async::NotReady), e
Ok(Async::Ready(ips)) => Ok(Async::Ready(( );
self.req.take().unwrap(), e
ips.iter().collect(), })? {
))), Async::NotReady => Ok(Async::NotReady),
Err(err) => Err(err), Async::Ready(ips) => {
let host = self.host.take().unwrap();
let mut addrs: VecDeque<_> = ips
.iter()
.map(|ip| SocketAddr::new(ip, self.port))
.collect();
trace!("DNS resolver: host {:?} resolved to {:?}", host, addrs);
if addrs.len() == 1 {
Ok(Async::Ready(Connect::Addr {
addr: either::Either::Left(addrs.pop_front().unwrap()),
host,
}))
} else {
Ok(Async::Ready(Connect::Addr {
addr: either::Either::Right(addrs),
host,
}))
}
} }
} }
} }

View File

@ -1,3 +1,4 @@
use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
@ -6,15 +7,15 @@ use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use openssl::ssl::{HandshakeError, SslConnector}; use openssl::ssl::{HandshakeError, SslConnector};
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
use crate::resolver::RequestHost; use crate::Stream;
/// Openssl connector factory /// Openssl connector factory
pub struct OpensslConnector<R, T, E> { pub struct OpensslConnector<T, P, E> {
connector: SslConnector, connector: SslConnector,
_t: PhantomData<(R, T, E)>, _t: PhantomData<(T, P, E)>,
} }
impl<R, T, E> OpensslConnector<R, T, E> { impl<T, P, E> OpensslConnector<T, P, E> {
pub fn new(connector: SslConnector) -> Self { pub fn new(connector: SslConnector) -> Self {
OpensslConnector { OpensslConnector {
connector, connector,
@ -23,11 +24,14 @@ impl<R, T, E> OpensslConnector<R, T, E> {
} }
} }
impl<R: RequestHost, T: AsyncRead + AsyncWrite> OpensslConnector<R, T, ()> { impl<T: AsyncRead + AsyncWrite + fmt::Debug, P> OpensslConnector<T, P, ()> {
pub fn service( pub fn service(
connector: SslConnector, connector: SslConnector,
) -> impl Service<Request = (R, T), Response = (R, SslStream<T>), Error = HandshakeError<T>> ) -> impl Service<
{ Request = Stream<T, P>,
Response = Stream<SslStream<T>, P>,
Error = HandshakeError<T>,
> {
OpensslConnectorService { OpensslConnectorService {
connector: connector, connector: connector,
_t: PhantomData, _t: PhantomData,
@ -35,7 +39,7 @@ impl<R: RequestHost, T: AsyncRead + AsyncWrite> OpensslConnector<R, T, ()> {
} }
} }
impl<R, T, E> Clone for OpensslConnector<R, T, E> { impl<T, P, E> Clone for OpensslConnector<T, P, E> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
connector: self.connector.clone(), connector: self.connector.clone(),
@ -44,13 +48,14 @@ impl<R, T, E> Clone for OpensslConnector<R, T, E> {
} }
} }
impl<R: RequestHost, T: AsyncRead + AsyncWrite, E> NewService<()> impl<T, P, E> NewService<()> for OpensslConnector<T, P, E>
for OpensslConnector<R, T, E> where
T: AsyncRead + AsyncWrite + fmt::Debug,
{ {
type Request = (R, T); type Request = Stream<T, P>;
type Response = (R, SslStream<T>); type Response = Stream<SslStream<T>, P>;
type Error = HandshakeError<T>; type Error = HandshakeError<T>;
type Service = OpensslConnectorService<R, T>; type Service = OpensslConnectorService<T, P>;
type InitError = E; type InitError = E;
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
@ -62,45 +67,56 @@ impl<R: RequestHost, T: AsyncRead + AsyncWrite, E> NewService<()>
} }
} }
pub struct OpensslConnectorService<R, T> { pub struct OpensslConnectorService<T, P> {
connector: SslConnector, connector: SslConnector,
_t: PhantomData<(R, T)>, _t: PhantomData<(T, P)>,
} }
impl<R: RequestHost, T: AsyncRead + AsyncWrite> Service for OpensslConnectorService<R, T> { impl<T, P> Service for OpensslConnectorService<T, P>
type Request = (R, T); where
type Response = (R, SslStream<T>); T: AsyncRead + AsyncWrite + fmt::Debug,
{
type Request = Stream<T, P>;
type Response = Stream<SslStream<T>, P>;
type Error = HandshakeError<T>; type Error = HandshakeError<T>;
type Future = ConnectAsyncExt<R, T>; type Future = ConnectAsyncExt<T, P>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
fn call(&mut self, (req, stream): (R, T)) -> Self::Future { fn call(&mut self, stream: Stream<T, P>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace(());
ConnectAsyncExt { ConnectAsyncExt {
fut: SslConnectorExt::connect_async(&self.connector, req.host(), stream), fut: SslConnectorExt::connect_async(&self.connector, stream.host(), io),
req: Some(req), stream: Some(stream),
} }
} }
} }
pub struct ConnectAsyncExt<R, T> { pub struct ConnectAsyncExt<T, P> {
req: Option<R>,
fut: ConnectAsync<T>, fut: ConnectAsync<T>,
stream: Option<Stream<(), P>>,
} }
impl<R, T> Future for ConnectAsyncExt<R, T> impl<T, P> Future for ConnectAsyncExt<T, P>
where where
R: RequestHost, T: AsyncRead + AsyncWrite + fmt::Debug,
T: AsyncRead + AsyncWrite,
{ {
type Item = (R, SslStream<T>); type Item = Stream<SslStream<T>, P>;
type Error = HandshakeError<T>; type Error = HandshakeError<T>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll()? { match self.fut.poll().map_err(|e| {
Async::Ready(stream) => Ok(Async::Ready((self.req.take().unwrap(), stream))), trace!("SSL Handshake error: {:?}", e);
e
})? {
Async::Ready(stream) => {
let s = self.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host());
Ok(Async::Ready(s.replace(stream).1))
}
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
} }
} }