From 38545dedc71d98b25b988c18eebb1c4a9c3fcc18 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 13 Mar 2019 22:51:31 -0700 Subject: [PATCH] refactor Connect type and add tests --- actix-connect/Cargo.toml | 8 +++ actix-connect/src/connect.rs | 99 ++++++++++++----------------- actix-connect/src/connector.rs | 63 ++++-------------- actix-connect/src/lib.rs | 35 +++++++--- actix-connect/src/resolver.rs | 50 +++++---------- actix-connect/tests/test_connect.rs | 83 ++++++++++++++++++++++++ actix-test-server/Cargo.toml | 1 + actix-test-server/src/lib.rs | 1 + 8 files changed, 188 insertions(+), 152 deletions(-) create mode 100644 actix-connect/tests/test_connect.rs diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 137728fb..30b998de 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -41,3 +41,11 @@ trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false } # openssl openssl = { version="0.10", optional = true } tokio-openssl = { version="0.3", optional = true } + +[dev-dependencies] +bytes = "0.4" +actix-connect = { path=".", features=["ssl"] } +actix-test-server = { version="0.2.0", features=["ssl"] } +actix-server-config = "0.1.0" +actix-utils = "0.3.4" +tokio-tcp = "0.1" \ No newline at end of file diff --git a/actix-connect/src/connect.rs b/actix-connect/src/connect.rs index 168e5542..a82298de 100644 --- a/actix-connect/src/connect.rs +++ b/actix-connect/src/connect.rs @@ -4,34 +4,32 @@ use std::net::SocketAddr; use either::Either; -use crate::error::ConnectError; - /// Connect request pub trait Address { /// Host name of the request fn host(&self) -> &str; /// Port of the request - fn port(&self) -> u16; + fn port(&self) -> Option; } -impl Address for (String, u16) { +impl Address for String { fn host(&self) -> &str { - &self.0 + &self } - fn port(&self) -> u16 { - self.1 + fn port(&self) -> Option { + None } } -impl Address for (&'static str, u16) { +impl Address for &'static str { fn host(&self) -> &str { - self.0 + self } - fn port(&self) -> u16 { - self.1 + fn port(&self) -> Option { + None } } @@ -39,61 +37,26 @@ impl Address for (&'static str, u16) { #[derive(Eq, PartialEq, Debug, Hash)] pub struct Connect { pub(crate) req: T, + pub(crate) port: u16, pub(crate) addr: Option>>, } -impl Connect<(&'static str, u16)> { - /// Create new `Connect` instance. - pub fn new(host: &'static str, port: u16) -> Connect<(&'static str, u16)> { - Connect { - req: (host, port), - addr: None, - } - } -} - -impl Connect<()> { - /// Create new `Connect` instance. - pub fn from_string(host: String, port: u16) -> Connect<(String, u16)> { - Connect { - req: (host, port), - addr: None, - } - } - - /// Create new `Connect` instance. - pub fn from_static(host: &'static str, port: u16) -> Connect<(&'static str, u16)> { - Connect { - req: (host, port), - addr: None, - } - } - - /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 - pub fn from_str>(host: T) -> Result, ConnectError> { - let mut parts_iter = host.as_ref().splitn(2, ':'); - let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?; - let port_str = parts_iter.next().unwrap_or(""); - let port = port_str - .parse::() - .map_err(|_| ConnectError::InvalidInput)?; - Ok(Connect { - req: (host.to_owned(), port), - addr: None, - }) - } -} - impl Connect { - /// Create new `Connect` instance. - pub fn with(req: T) -> Connect { - Connect { req, addr: None } + /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 + pub fn new(req: T) -> Connect { + let (_, port) = parse(req.host()); + Connect { + req, + port: port.unwrap_or(0), + addr: None, + } } /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages. - pub fn with_address(req: T, addr: SocketAddr) -> Connect { + pub fn with(req: T, addr: SocketAddr) -> Connect { Connect { req, + port: 0, addr: Some(Either::Left(addr)), } } @@ -105,7 +68,13 @@ impl Connect { /// Port of the request pub fn port(&self) -> u16 { - self.req.port() + self.req.port().unwrap_or(self.port) + } +} + +impl From for Connect { + fn from(addr: T) -> Self { + Connect::new(addr) } } @@ -115,6 +84,20 @@ impl fmt::Display for Connect { } } +fn parse(host: &str) -> (&str, Option) { + let mut parts_iter = host.splitn(2, ':'); + if let Some(host) = parts_iter.next() { + let port_str = parts_iter.next().unwrap_or(""); + if let Ok(port) = port_str.parse::() { + (host, Some(port)) + } else { + (host, None) + } + } else { + (host, None) + } +} + pub struct Connection { io: U, req: T, diff --git a/actix-connect/src/connector.rs b/actix-connect/src/connector.rs index f4e8837f..d7ba944a 100644 --- a/actix-connect/src/connector.rs +++ b/actix-connect/src/connector.rs @@ -66,10 +66,11 @@ impl Service for Connector { } fn call(&mut self, req: Connect) -> Self::Future { - let Connect { req, addr } = req; + let port = req.port(); + let Connect { req, addr, .. } = req; if let Some(addr) = addr { - Either::A(ConnectorResponse::new(req, addr)) + Either::A(ConnectorResponse::new(req, port, addr)) } else { error!("TCP connector: got unresolved address"); Either::B(err(ConnectError::Unresolverd)) @@ -81,6 +82,7 @@ impl Service for Connector { /// Tcp stream connector response future pub struct ConnectorResponse { req: Option, + port: u16, addrs: Option>, stream: Option, } @@ -88,18 +90,25 @@ pub struct ConnectorResponse { impl ConnectorResponse { pub fn new( req: T, + port: u16, addr: either::Either>, ) -> ConnectorResponse { - trace!("TCP connector - connecting to {:?}", req.host()); + trace!( + "TCP connector - connecting to {:?} port:{}", + req.host(), + port + ); match addr { either::Either::Left(addr) => ConnectorResponse { req: Some(req), + port, addrs: None, stream: Some(TcpStream::connect(&addr)), }, either::Either::Right(addrs) => ConnectorResponse { req: Some(req), + port, addrs: Some(addrs), stream: None, }, @@ -129,7 +138,7 @@ impl Future for ConnectorResponse { trace!( "TCP connector - failed to connect to connecting to {:?} port: {}", self.req.as_ref().unwrap().host(), - self.req.as_ref().unwrap().port(), + self.port, ); if self.addrs.as_ref().unwrap().is_empty() { return Err(err.into()); @@ -145,49 +154,3 @@ impl Future for ConnectorResponse { } } } - -// #[derive(Clone)] -// pub struct DefaultConnector(Connector); - -// impl Default for DefaultConnector { -// fn default() -> Self { -// DefaultConnector(Connector::default()) -// } -// } - -// impl DefaultConnector { -// pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { -// DefaultConnector(Connector::new(cfg, opts)) -// } -// } - -// impl Service for DefaultConnector { -// type Request = Connect; -// type Response = TcpStream; -// type Error = ConnectorError; -// type Future = DefaultConnectorFuture; - -// fn poll_ready(&mut self) -> Poll<(), Self::Error> { -// self.0.poll_ready() -// } - -// fn call(&mut self, req: Connect) -> Self::Future { -// DefaultConnectorFuture { -// fut: self.0.call(req), -// } -// } -// } - -// #[doc(hidden)] -// pub struct DefaultConnectorFuture { -// fut: Either, -// } - -// impl Future for DefaultConnectorFuture { -// type Item = TcpStream; -// type Error = ConnectorError; - -// fn poll(&mut self) -> Poll { -// Ok(Async::Ready(try_ready!(self.fut.poll()).1)) -// } -// } diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index 542c68e6..51bc3f79 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -14,7 +14,7 @@ mod error; mod resolver; pub mod ssl; -pub use trust_dns_resolver::error::ResolveError; +pub use trust_dns_resolver::{error::ResolveError, AsyncResolver}; pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{Connector, ConnectorFactory}; @@ -24,34 +24,51 @@ pub use self::resolver::{Resolver, ResolverFactory}; use actix_service::{NewService, Service, ServiceExt}; use tokio_tcp::TcpStream; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use trust_dns_resolver::system_conf::read_system_conf; + +pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver { + let (resolver, bg) = AsyncResolver::new(cfg, opts); + tokio_current_thread::spawn(bg); + resolver +} + +pub fn start_default_resolver() -> AsyncResolver { + let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { + (cfg, opts) + } else { + (ResolverConfig::default(), ResolverOpts::default()) + }; + + let (resolver, bg) = AsyncResolver::new(cfg, opts); + tokio_current_thread::spawn(bg); + resolver +} /// Create tcp connector service pub fn new_connector( - cfg: ResolverConfig, - opts: ResolverOpts, + resolver: AsyncResolver, ) -> impl Service, Response = Connection, Error = ConnectError> + Clone { - Resolver::new(cfg, opts).and_then(Connector::new()) + Resolver::new(resolver).and_then(Connector::new()) } /// Create tcp connector service pub fn new_connector_factory( - cfg: ResolverConfig, - opts: ResolverOpts, + resolver: AsyncResolver, ) -> impl NewService< Request = Connect, Response = Connection, Error = ConnectError, InitError = (), > + Clone { - ResolverFactory::new(cfg, opts).and_then(ConnectorFactory::new()) + ResolverFactory::new(resolver).and_then(ConnectorFactory::new()) } /// Create connector service with default parameters pub fn default_connector( ) -> impl Service, Response = Connection, Error = ConnectError> + Clone { - Resolver::default().and_then(Connector::new()) + Resolver::new(start_default_resolver()).and_then(Connector::new()) } /// Create connector service factory with default parameters @@ -61,5 +78,5 @@ pub fn default_connector_factory() -> impl NewService< Error = ConnectError, InitError = (), > + Clone { - ResolverFactory::default().and_then(ConnectorFactory::new()) + ResolverFactory::new(start_default_resolver()).and_then(ConnectorFactory::new()) } diff --git a/actix-connect/src/resolver.rs b/actix-connect/src/resolver.rs index 0be30ed1..7c85d1b2 100644 --- a/actix-connect/src/resolver.rs +++ b/actix-connect/src/resolver.rs @@ -5,9 +5,7 @@ use std::net::SocketAddr; use actix_service::{NewService, Service}; use futures::future::{ok, Either, FutureResult}; use futures::{Async, Future, Poll}; -use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::lookup_ip::LookupIpFuture; -use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::{AsyncResolver, Background}; use crate::connect::{Address, Connect}; @@ -19,28 +17,18 @@ pub struct ResolverFactory { _t: PhantomData, } -impl Default for ResolverFactory { - fn default() -> Self { - let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { - (cfg, opts) - } else { - (ResolverConfig::default(), ResolverOpts::default()) - }; - - ResolverFactory::new(cfg, opts) - } -} - impl ResolverFactory { /// Create new resolver instance with custom configuration and options. - pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { - let (resolver, bg) = AsyncResolver::new(cfg, opts); - tokio_current_thread::spawn(bg); + pub fn new(resolver: AsyncResolver) -> Self { ResolverFactory { resolver, _t: PhantomData, } } + + pub fn resolver(&self) -> &AsyncResolver { + &self.resolver + } } impl Clone for ResolverFactory { @@ -74,23 +62,9 @@ pub struct Resolver { _t: PhantomData, } -impl Default for Resolver { - fn default() -> Self { - let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { - (cfg, opts) - } else { - (ResolverConfig::default(), ResolverOpts::default()) - }; - - Resolver::new(cfg, opts) - } -} - impl Resolver { /// Create new resolver instance with custom configuration and options. - pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { - let (resolver, bg) = AsyncResolver::new(cfg, opts); - tokio_current_thread::spawn(bg); + pub fn new(resolver: AsyncResolver) -> Self { Resolver { resolver, _t: PhantomData, @@ -136,13 +110,19 @@ impl Service for Resolver { /// Resolver future pub struct ResolverFuture { req: Option>, - lookup: Option>, + lookup: Background, } impl ResolverFuture { pub fn new(req: Connect, resolver: &AsyncResolver) -> Self { + let lookup = if let Some(host) = req.host().splitn(2, ':').next() { + resolver.lookup_ip(host) + } else { + resolver.lookup_ip(req.host()) + }; + ResolverFuture { - lookup: Some(resolver.lookup_ip(req.host())), + lookup, req: Some(req), } } @@ -153,7 +133,7 @@ impl Future for ResolverFuture { type Error = ConnectError; fn poll(&mut self) -> Poll { - match self.lookup.as_mut().unwrap().poll().map_err(|e| { + match self.lookup.poll().map_err(|e| { trace!( "DNS resolver: failed to resolve host {:?} err: {}", self.req.as_ref().unwrap().host(), diff --git a/actix-connect/tests/test_connect.rs b/actix-connect/tests/test_connect.rs new file mode 100644 index 00000000..908b26e5 --- /dev/null +++ b/actix-connect/tests/test_connect.rs @@ -0,0 +1,83 @@ +use actix_codec::{BytesCodec, Framed}; +use actix_server_config::Io; +use actix_service::{fn_service, NewService, Service}; +use actix_test_server::TestServer; +use bytes::Bytes; +use futures::{future::lazy, Future, Sink}; +use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; + +use actix_connect::{default_connector, Connect}; + +#[test] +fn test_string() { + let mut srv = TestServer::with(|| { + fn_service(|io: Io| { + Framed::new(io.into_parts().0, BytesCodec) + .send(Bytes::from_static(b"test")) + .then(|_| Ok::<_, ()>(())) + }) + }); + + let mut conn = srv + .block_on(lazy(|| Ok::<_, ()>(default_connector()))) + .unwrap(); + let addr = format!("localhost:{}", srv.port()); + let con = srv.block_on(conn.call(addr.into())).unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} + +#[test] +fn test_static_str() { + let mut srv = TestServer::with(|| { + fn_service(|io: Io| { + Framed::new(io.into_parts().0, BytesCodec) + .send(Bytes::from_static(b"test")) + .then(|_| Ok::<_, ()>(())) + }) + }); + + let resolver = srv + .block_on(lazy( + || Ok::<_, ()>(actix_connect::start_default_resolver()), + )) + .unwrap(); + let mut conn = srv + .block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver)))) + .unwrap(); + + let con = srv + .block_on(conn.call(Connect::with("10", srv.addr()))) + .unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} + +#[test] +fn test_new_service() { + let mut srv = TestServer::with(|| { + fn_service(|io: Io| { + Framed::new(io.into_parts().0, BytesCodec) + .send(Bytes::from_static(b"test")) + .then(|_| Ok::<_, ()>(())) + }) + }); + + let resolver = srv + .block_on(lazy(|| { + Ok::<_, ()>(actix_connect::start_resolver( + ResolverConfig::default(), + ResolverOpts::default(), + )) + })) + .unwrap(); + let factory = srv + .block_on(lazy(|| { + Ok::<_, ()>(actix_connect::new_connector_factory(resolver)) + })) + .unwrap(); + + let mut conn = srv.block_on(factory.new_service(&())).unwrap(); + let con = srv + .block_on(conn.call(Connect::with("10", srv.addr()))) + .unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} diff --git a/actix-test-server/Cargo.toml b/actix-test-server/Cargo.toml index 53c24f4f..010ec1eb 100644 --- a/actix-test-server/Cargo.toml +++ b/actix-test-server/Cargo.toml @@ -35,6 +35,7 @@ rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"] [dependencies] actix-rt = "0.2.1" actix-server = "0.4.0" +actix-server-config = "0.1.0" log = "0.4" net2 = "0.2" diff --git a/actix-test-server/src/lib.rs b/actix-test-server/src/lib.rs index ce087a05..49608cf6 100644 --- a/actix-test-server/src/lib.rs +++ b/actix-test-server/src/lib.rs @@ -4,6 +4,7 @@ use std::{net, thread}; use actix_rt::{Runtime, System}; use actix_server::{Server, StreamServiceFactory}; +pub use actix_server_config::{Io, ServerConfig}; use futures::Future; use net2::TcpBuilder;