From 88d8c99e317b7a0d8e9cf6f8bc11a0a8514a224c Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 10 Sep 2018 19:39:55 -0700 Subject: [PATCH] use Resolver service for Connector --- src/connector.rs | 165 +++++++++-------------------------------------- src/lib.rs | 2 +- src/resolver.rs | 32 +++++++-- 3 files changed, 55 insertions(+), 144 deletions(-) diff --git a/src/connector.rs b/src/connector.rs index fe2646ad..2da44f3e 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -1,54 +1,47 @@ use std::collections::VecDeque; use std::io; -use std::marker::PhantomData; use std::net::SocketAddr; use futures::{ future::{ok, FutureResult}, Async, Future, Poll, }; -use tokio; use tokio_tcp::{ConnectFuture, TcpStream}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::system_conf::read_system_conf; -use trust_dns_resolver::{AsyncResolver, Background}; +use super::resolver::{HostAware, Resolver, ResolverError, ResolverFuture}; use super::{NewService, Service}; -pub trait HostAware { - fn host(&self) -> &str; -} +// #[derive(Fail, Debug)] -impl HostAware for String { - fn host(&self) -> &str { - self.as_ref() - } -} - -#[derive(Fail, Debug)] pub enum ConnectorError { /// Failed to resolve the hostname - #[fail(display = "Failed resolving hostname: {}", _0)] - Resolver(String), + // #[fail(display = "Failed resolving hostname: {}", _0)] + Resolver(ResolverError), - /// Address is invalid - #[fail(display = "Invalid input: {}", _0)] - InvalidInput(&'static str), + /// Not dns records + // #[fail(display = "Invalid input: {}", _0)] + NoRecords, /// Connection io error - #[fail(display = "{}", _0)] + // #[fail(display = "{}", _0)] IoError(io::Error), } +impl From for ConnectorError { + fn from(err: ResolverError) -> Self { + ConnectorError::Resolver(err) + } +} + pub struct ConnectionInfo { pub host: String, pub addr: SocketAddr, } pub struct Connector { - resolver: AsyncResolver, - req: PhantomData, + resolver: Resolver, } impl Default for Connector { @@ -65,11 +58,8 @@ impl Default for Connector { impl Connector { pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { - let (resolver, bg) = AsyncResolver::new(cfg, opts); - tokio::spawn(bg); Connector { - resolver, - req: PhantomData, + resolver: Resolver::new(cfg, opts), } } @@ -95,8 +85,7 @@ impl Connector { pub fn change_request(&self) -> Connector { Connector { - resolver: self.resolver.clone(), - req: PhantomData, + resolver: self.resolver.change_request(), } } } @@ -105,7 +94,6 @@ impl Clone for Connector { fn clone(&self) -> Self { Connector { resolver: self.resolver.clone(), - req: PhantomData, } } } @@ -121,14 +109,15 @@ impl Service for Connector { } fn call(&mut self, req: Self::Request) -> Self::Future { - let fut = ResolveFut::new(req, 0, &self.resolver); - - ConnectorFuture { fut, fut2: None } + ConnectorFuture { + fut: self.resolver.call(req), + fut2: None, + } } } pub struct ConnectorFuture { - fut: ResolveFut, + fut: ResolverFuture, fut2: Option>, } @@ -140,10 +129,14 @@ impl Future for ConnectorFuture { if let Some(ref mut fut) = self.fut2 { return fut.poll(); } - match self.fut.poll()? { + match self.fut.poll().map_err(ConnectorError::from)? { Async::Ready((req, host, addrs)) => { - self.fut2 = Some(TcpConnector::new(req, host, addrs)); - self.poll() + if addrs.is_empty() { + Err(ConnectorError::NoRecords) + } else { + self.fut2 = Some(TcpConnector::new(req, host, addrs)); + self.poll() + } } Async::NotReady => Ok(Async::NotReady), } @@ -196,106 +189,6 @@ impl Future for DefaultConnectorFuture { } } -/// Resolver future -struct ResolveFut { - req: Option, - host: Option, - port: u16, - lookup: Option>, - addrs: Option>, - error: Option, - error2: Option, -} - -impl ResolveFut { - pub fn new(addr: T, port: u16, resolver: &AsyncResolver) -> Self { - // we need to do dns resolution - match ResolveFut::::parse(addr.host(), port) { - Ok((host, port)) => { - let lookup = Some(resolver.lookup_ip(host.as_str())); - ResolveFut { - port, - lookup, - req: Some(addr), - host: Some(host), - addrs: None, - error: None, - error2: None, - } - } - Err(err) => ResolveFut { - port, - req: None, - host: None, - lookup: None, - addrs: None, - error: Some(err), - error2: None, - }, - } - } - - fn parse(addr: &str, port: u16) -> Result<(String, u16), ConnectorError> { - macro_rules! try_opt { - ($e:expr, $msg:expr) => { - match $e { - Some(r) => r, - None => return Err(ConnectorError::InvalidInput($msg)), - } - }; - } - - // split the string by ':' and convert the second part to u16 - let mut parts_iter = addr.splitn(2, ':'); - let host = try_opt!(parts_iter.next(), "invalid socket address"); - let port_str = parts_iter.next().unwrap_or(""); - let port: u16 = port_str.parse().unwrap_or(port); - - Ok((host.to_owned(), port)) - } -} - -impl Future for ResolveFut { - type Item = (T, String, VecDeque); - type Error = ConnectorError; - - fn poll(&mut self) -> Poll { - if let Some(err) = self.error.take() { - Err(err) - } else if let Some(err) = self.error2.take() { - Err(ConnectorError::Resolver(err)) - } else if let Some(addrs) = self.addrs.take() { - Ok(Async::Ready(( - self.req.take().unwrap(), - self.host.take().unwrap(), - addrs, - ))) - } else { - match self.lookup.as_mut().unwrap().poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(ips)) => { - let addrs: VecDeque<_> = ips - .iter() - .map(|ip| SocketAddr::new(ip, self.port)) - .collect(); - if addrs.is_empty() { - Err(ConnectorError::Resolver( - "Expect at least one A dns record".to_owned(), - )) - } else { - Ok(Async::Ready(( - self.req.take().unwrap(), - self.host.take().unwrap(), - addrs, - ))) - } - } - Err(err) => Err(ConnectorError::Resolver(format!("{}", err))), - } - } - } -} - /// Tcp stream connector pub struct TcpConnector { req: Option, diff --git a/src/lib.rs b/src/lib.rs index 2db03f08..f425f318 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ #[macro_use] extern crate log; extern crate bytes; -#[macro_use] +// #[macro_use] extern crate failure; #[macro_use] extern crate futures; diff --git a/src/resolver.rs b/src/resolver.rs index 9b58e48d..ddd3c62e 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -54,6 +54,13 @@ impl Resolver { req: PhantomData, } } + + pub fn change_request(&self) -> Resolver { + Resolver { + resolver: self.resolver.clone(), + req: PhantomData, + } + } } impl Clone for Resolver { @@ -67,7 +74,7 @@ impl Clone for Resolver { impl Service for Resolver { type Request = T; - type Response = VecDeque; + type Response = (T, String, VecDeque); type Error = ResolverError; type Future = ResolverFuture; @@ -83,11 +90,12 @@ impl Service for Resolver { #[doc(hidden)] /// Resolver future pub struct ResolverFuture { + req: Option, port: u16, lookup: Option>, addrs: Option>, error: Option, - req: PhantomData, + host: Option, } impl ResolverFuture { @@ -99,17 +107,19 @@ impl ResolverFuture { ResolverFuture { port, lookup, + req: Some(addr), + host: Some(host.to_owned()), addrs: None, error: None, - req: PhantomData, } } Err(err) => ResolverFuture { port, + req: None, + host: None, lookup: None, addrs: None, error: Some(err), - req: PhantomData, }, } } @@ -126,14 +136,18 @@ impl ResolverFuture { } impl Future for ResolverFuture { - type Item = VecDeque; + type Item = (T, String, VecDeque); type Error = ResolverError; fn poll(&mut self) -> Poll { if let Some(err) = self.error.take() { Err(err) } else if let Some(addrs) = self.addrs.take() { - Ok(Async::Ready(addrs)) + Ok(Async::Ready(( + self.req.take().unwrap(), + self.host.take().unwrap(), + addrs, + ))) } else { match self.lookup.as_mut().unwrap().poll() { Ok(Async::NotReady) => Ok(Async::NotReady), @@ -142,7 +156,11 @@ impl Future for ResolverFuture { .iter() .map(|ip| SocketAddr::new(ip, self.port)) .collect(); - Ok(Async::Ready(addrs)) + Ok(Async::Ready(( + self.req.take().unwrap(), + self.host.take().unwrap(), + addrs, + ))) } Err(err) => Err(ResolverError::Resolve(err)), }