From 10d2c675966b9ec05ffa73ee5238706286d7ac5c Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 28 Aug 2018 16:24:36 -0700 Subject: [PATCH] better Connector impl --- examples/basic.rs | 20 ++++---- src/connector.rs | 116 +++++++++++++++++++++++++++++---------------- src/lib.rs | 2 +- src/ssl/openssl.rs | 19 ++++---- 4 files changed, 98 insertions(+), 59 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index e7c92c08..4e2039ca 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -64,14 +64,16 @@ fn main() { // bind socket address and start workers. By default server uses number of // available logical cpu as threads count. actix net start separate // instances of service pipeline in each worker. - Server::default().bind( - // configure service pipeline - "0.0.0.0:8443", move || { - let num = num.clone(); - let acceptor = acceptor.clone(); + Server::default() + .bind( + // configure service pipeline + "0.0.0.0:8443", + move || { + let num = num.clone(); + let acceptor = acceptor.clone(); - // service for converting incoming TcpStream to a SslStream - (move |stream| { + // service for converting incoming TcpStream to a SslStream + (move |stream| { SslAcceptorExt::accept_async(&acceptor, stream) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) }) @@ -89,7 +91,9 @@ fn main() { .and_then((service, move || { Ok::<_, io::Error>(ServiceState { num: num.clone() }) })) - }).unwrap().start(); + }, + ).unwrap() + .start(); sys.run(); } diff --git a/src/connector.rs b/src/connector.rs index 4ea28712..f6ff75f7 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -2,14 +2,19 @@ use std::collections::VecDeque; use std::io; use std::net::SocketAddr; -use futures::{future::ok, Async, Future, Poll}; +use futures::{ + future::{ok, FutureResult}, + Async, Future, Poll, +}; use tokio; use tokio_tcp::{ConnectFuture, TcpStream}; -use tower_service::Service; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::lookup_ip::LookupIpFuture; +use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::{AsyncResolver, Background}; +use super::{NewService, Service}; + #[derive(Fail, Debug)] pub enum ConnectorError { /// Failed to resolve the hostname @@ -25,31 +30,52 @@ pub enum ConnectorError { IoError(io::Error), } +pub struct ConnectionInfo { + pub host: String, + pub addr: SocketAddr, +} + pub struct Connector { resolver: AsyncResolver, } -impl Connector { - pub fn new() -> Self { - let resolver = match AsyncResolver::from_system_conf() { - Ok((resolver, bg)) => { - tokio::spawn(bg); - resolver - } - Err(err) => { - warn!("Can not create system dns resolver: {}", err); - let (resolver, bg) = - AsyncResolver::new(ResolverConfig::default(), ResolverOpts::default()); - tokio::spawn(bg); - resolver - } +impl Default for Connector { + fn default() -> Self { + let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { + (cfg, opts) + } else { + (ResolverConfig::default(), ResolverOpts::default()) }; + Connector::new(cfg, opts) + } +} + +impl Connector { + pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { + let (resolver, bg) = AsyncResolver::new(cfg, opts); + tokio::spawn(bg); Connector { resolver } } - pub fn new_service() -> impl Future { - ok(Connector::new()) + pub fn new_service() -> impl NewService< + Request = String, + Response = (ConnectionInfo, TcpStream), + Error = ConnectorError, + InitError = E, + > + Clone { + || -> FutureResult { ok(Connector::default()) } + } + + pub fn new_service_with_config( + cfg: ResolverConfig, opts: ResolverOpts, + ) -> impl NewService< + Request = String, + Response = (ConnectionInfo, TcpStream), + Error = ConnectorError, + InitError = E, + > + Clone { + move || -> FutureResult { ok(Connector::new(cfg.clone(), opts.clone())) } } } @@ -63,7 +89,7 @@ impl Clone for Connector { impl Service for Connector { type Request = String; - type Response = (String, TcpStream); + type Response = (ConnectionInfo, TcpStream); type Error = ConnectorError; type Future = ConnectorFuture; @@ -72,36 +98,28 @@ impl Service for Connector { } fn call(&mut self, addr: String) -> Self::Future { - let fut = ResolveFut::new(&addr, 0, &self.resolver); + let fut = ResolveFut::new(addr, 0, &self.resolver); - ConnectorFuture { - fut, - addr: Some(addr), - fut2: None, - } + ConnectorFuture { fut, fut2: None } } } pub struct ConnectorFuture { - addr: Option, fut: ResolveFut, fut2: Option, } impl Future for ConnectorFuture { - type Item = (String, TcpStream); + type Item = (ConnectionInfo, TcpStream); type Error = ConnectorError; fn poll(&mut self) -> Poll { if let Some(ref mut fut) = self.fut2 { - return match fut.poll()? { - Async::Ready(stream) => Ok(Async::Ready((self.addr.take().unwrap(), stream))), - Async::NotReady => Ok(Async::NotReady), - }; + return fut.poll(); } match self.fut.poll()? { - Async::Ready(addrs) => { - self.fut2 = Some(TcpConnector::new(addrs)); + Async::Ready((host, addrs)) => { + self.fut2 = Some(TcpConnector::new(host, addrs)); self.poll() } Async::NotReady => Ok(Async::NotReady), @@ -111,19 +129,21 @@ impl Future for ConnectorFuture { /// Resolver future struct ResolveFut { - lookup: Option>, + host: Option, port: u16, + lookup: Option>, addrs: Option>, error: Option, error2: Option, } impl ResolveFut { - pub fn new(addr: &str, port: u16, resolver: &AsyncResolver) -> Self { + pub fn new(addr: String, port: u16, resolver: &AsyncResolver) -> Self { // we need to do dns resolution match ResolveFut::parse(addr.as_ref(), port) { Ok((host, port)) => ResolveFut { port, + host: Some(host.to_owned()), lookup: Some(resolver.lookup_ip(host)), addrs: None, error: None, @@ -131,6 +151,7 @@ impl ResolveFut { }, Err(err) => ResolveFut { port, + host: None, lookup: None, addrs: None, error: Some(err), @@ -160,7 +181,7 @@ impl ResolveFut { } impl Future for ResolveFut { - type Item = VecDeque; + type Item = (String, VecDeque); type Error = ConnectorError; fn poll(&mut self) -> Poll { @@ -169,7 +190,7 @@ impl Future for ResolveFut { } else if let Some(err) = self.error2.take() { Err(ConnectorError::Resolver(err)) } else if let Some(addrs) = self.addrs.take() { - Ok(Async::Ready(addrs)) + Ok(Async::Ready((self.host.take().unwrap(), addrs))) } else { match self.lookup.as_mut().unwrap().poll() { Ok(Async::NotReady) => Ok(Async::NotReady), @@ -183,7 +204,7 @@ impl Future for ResolveFut { "Expect at least one A dns record".to_owned(), )) } else { - Ok(Async::Ready(addrs)) + Ok(Async::Ready((self.host.take().unwrap(), addrs))) } } Err(err) => Err(ConnectorError::Resolver(format!("{}", err))), @@ -194,21 +215,25 @@ impl Future for ResolveFut { /// Tcp stream connector pub struct TcpConnector { + host: Option, + addr: Option, addrs: VecDeque, stream: Option, } impl TcpConnector { - pub fn new(addrs: VecDeque) -> TcpConnector { + pub fn new(host: String, addrs: VecDeque) -> TcpConnector { TcpConnector { addrs, + host: Some(host), + addr: None, stream: None, } } } impl Future for TcpConnector { - type Item = TcpStream; + type Item = (ConnectionInfo, TcpStream); type Error = ConnectorError; fn poll(&mut self) -> Poll { @@ -216,7 +241,15 @@ impl Future for TcpConnector { loop { if let Some(new) = self.stream.as_mut() { match new.poll() { - Ok(Async::Ready(sock)) => return Ok(Async::Ready(sock)), + Ok(Async::Ready(sock)) => { + return Ok(Async::Ready(( + ConnectionInfo { + host: self.host.take().unwrap(), + addr: self.addr.take().unwrap(), + }, + sock, + ))) + } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { if self.addrs.is_empty() { @@ -229,6 +262,7 @@ impl Future for TcpConnector { // try to connect let addr = self.addrs.pop_front().unwrap(); self.stream = Some(TcpStream::connect(&addr)); + self.addr = Some(addr) } } } diff --git a/src/lib.rs b/src/lib.rs index 5f2faaca..79c38346 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,7 @@ pub mod ssl; mod worker; pub use configurable::{IntoNewConfigurableService, NewConfigurableService}; -pub use connector::{Connector, ConnectorError}; +pub use connector::{ConnectionInfo, Connector, ConnectorError}; pub use server::Server; pub use service::{IntoNewService, IntoService, NewServiceExt}; diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index 047b8bde..65a784b8 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -6,6 +6,7 @@ use openssl::ssl::{AlpnError, Error, SslAcceptor, SslAcceptorBuilder, SslConnect use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, ConnectAsync, SslAcceptorExt, SslConnectorExt, SslStream}; +use connector::ConnectionInfo; use {NewService, Service}; /// Support `SSL` connections via openssl package @@ -115,8 +116,8 @@ impl Clone for OpensslConnector { } impl NewService for OpensslConnector { - type Request = (String, T); - type Response = (String, SslStream); + type Request = (ConnectionInfo, T); + type Response = (ConnectionInfo, SslStream); type Error = Error; type Service = OpensslConnectorService; type InitError = io::Error; @@ -136,8 +137,8 @@ pub struct OpensslConnectorService { } impl Service for OpensslConnectorService { - type Request = (String, T); - type Response = (String, SslStream); + type Request = (ConnectionInfo, T); + type Response = (ConnectionInfo, SslStream); type Error = Error; type Future = ConnectAsyncExt; @@ -145,24 +146,24 @@ impl Service for OpensslConnectorService { Ok(Async::Ready(())) } - fn call(&mut self, (host, stream): Self::Request) -> Self::Future { + fn call(&mut self, (info, stream): Self::Request) -> Self::Future { ConnectAsyncExt { - fut: SslConnectorExt::connect_async(&self.connector, &host, stream), - host: Some(host), + fut: SslConnectorExt::connect_async(&self.connector, &info.host, stream), + host: Some(info), } } } pub struct ConnectAsyncExt { fut: ConnectAsync, - host: Option, + host: Option, } impl Future for ConnectAsyncExt where T: AsyncRead + AsyncWrite, { - type Item = (String, SslStream); + type Item = (ConnectionInfo, SslStream); type Error = Error; fn poll(&mut self) -> Poll {