From 0b0d14d1ea08b41d5b69913e2c987af3cb974d11 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 23 Oct 2018 22:14:02 -0700 Subject: [PATCH] refactor Connector service --- Cargo.toml | 2 +- src/connector.rs | 171 +++++++++++++++++++-------------------------- src/ssl/openssl.rs | 80 ++++++++------------- 3 files changed, 105 insertions(+), 148 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c685c130..3f4a04a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-net" -version = "0.1.1" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Actix net - framework for the compisible network services for Rust (experimental)" readme = "README.md" diff --git a/src/connector.rs b/src/connector.rs index 75bc5147..62fbe1ed 100644 --- a/src/connector.rs +++ b/src/connector.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::io; use std::net::SocketAddr; +use std::time::Duration; use futures::{ future::{ok, FutureResult}, @@ -24,6 +25,10 @@ pub enum ConnectorError { // #[fail(display = "Invalid input: {}", _0)] NoRecords, + /// Connecting took too long + // #[fail(display = "Timeout out while establishing connection")] + Timeout, + /// Connection io error // #[fail(display = "{}", _0)] IoError(io::Error), @@ -35,16 +40,50 @@ impl From for ConnectorError { } } -pub struct ConnectionInfo { +#[derive(Eq, PartialEq, Debug)] +pub struct Connect { pub host: String, - pub addr: SocketAddr, + pub port: Option, + pub timeout: Duration, } -pub struct Connector { - resolver: Resolver, +impl Connect { + pub fn host>(host: T) -> Connect { + Connect { + host: host.as_ref().to_owned(), + port: None, + timeout: Duration::from_secs(1), + } + } + + pub fn host_and_port>(host: T, port: u16) -> Connect { + Connect { + host: host.as_ref().to_owned(), + port: Some(port), + timeout: Duration::from_secs(1), + } + } + + /// Set connect timeout + /// + /// By default timeout is set to a 1 second. + pub fn timeout(mut self, timeout: Duration) -> Connect { + self.timeout = timeout; + self + } } -impl Default for Connector { +impl HostAware for Connect { + fn host(&self) -> &str { + &self.host + } +} + +pub struct Connector { + resolver: Resolver, +} + +impl Default for Connector { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -56,7 +95,7 @@ impl Default for Connector { } } -impl Connector { +impl Connector { pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { Connector { resolver: Resolver::new(cfg, opts), @@ -64,43 +103,34 @@ impl Connector { } pub fn with_resolver( - resolver: Resolver, - ) -> impl Service< - Request = T, - Response = (T, ConnectionInfo, TcpStream), - Error = ConnectorError, - > + Clone { + resolver: Resolver, + ) -> impl Service + + Clone { Connector { resolver } } pub fn new_service() -> impl NewService< - Request = T, - Response = (T, ConnectionInfo, TcpStream), + Request = Connect, + Response = (Connect, TcpStream), Error = ConnectorError, InitError = E, > + Clone { - || -> FutureResult, E> { ok(Connector::default()) } + || -> FutureResult { ok(Connector::default()) } } pub fn new_service_with_config( cfg: ResolverConfig, opts: ResolverOpts, ) -> impl NewService< - Request = T, - Response = (T, ConnectionInfo, TcpStream), + Request = Connect, + Response = (Connect, TcpStream), Error = ConnectorError, InitError = E, > + Clone { - move || -> FutureResult, E> { ok(Connector::new(cfg.clone(), opts)) } - } - - pub fn change_request(&self) -> Connector { - Connector { - resolver: self.resolver.change_request(), - } + move || -> FutureResult { ok(Connector::new(cfg.clone(), opts)) } } } -impl Clone for Connector { +impl Clone for Connector { fn clone(&self) -> Self { Connector { resolver: self.resolver.clone(), @@ -108,11 +138,11 @@ impl Clone for Connector { } } -impl Service for Connector { - type Request = T; - type Response = (T, ConnectionInfo, TcpStream); +impl Service for Connector { + type Request = Connect; + type Response = (Connect, TcpStream); type Error = ConnectorError; - type Future = ConnectorFuture; + type Future = ConnectorFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) @@ -127,13 +157,13 @@ impl Service for Connector { } #[doc(hidden)] -pub struct ConnectorFuture { - fut: ResolverFuture, - fut2: Option>, +pub struct ConnectorFuture { + fut: ResolverFuture, + fut2: Option, } -impl Future for ConnectorFuture { - type Item = (T, ConnectionInfo, TcpStream); +impl Future for ConnectorFuture { + type Item = (Connect, TcpStream); type Error = ConnectorError; fn poll(&mut self) -> Poll { @@ -141,11 +171,11 @@ impl Future for ConnectorFuture { return fut.poll(); } match self.fut.poll().map_err(ConnectorError::from)? { - Async::Ready((req, host, addrs)) => { + Async::Ready((req, _, addrs)) => { if addrs.is_empty() { Err(ConnectorError::NoRecords) } else { - self.fut2 = Some(TcpConnector::new(req, host, addrs)); + self.fut2 = Some(TcpConnector::new(req, addrs)); self.poll() } } @@ -154,76 +184,28 @@ impl Future for ConnectorFuture { } } -#[derive(Clone)] -pub struct DefaultConnector(Connector); - -impl Default for DefaultConnector { - fn default() -> Self { - DefaultConnector(Connector::default()) - } -} - -impl DefaultConnector { - pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { - DefaultConnector(Connector::new(cfg, opts)) - } -} - -impl Service for DefaultConnector { - type Request = T; - type Response = TcpStream; - type Error = ConnectorError; - type Future = DefaultConnectorFuture; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.poll_ready() - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - DefaultConnectorFuture { - fut: self.0.call(req), - } - } -} - -#[doc(hidden)] -pub struct DefaultConnectorFuture { - fut: ConnectorFuture, -} - -impl Future for DefaultConnectorFuture { - type Item = TcpStream; - type Error = ConnectorError; - - fn poll(&mut self) -> Poll { - Ok(Async::Ready(try_ready!(self.fut.poll()).2)) - } -} - #[doc(hidden)] /// Tcp stream connector -pub struct TcpConnector { - req: Option, - host: Option, +pub struct TcpConnector { + req: Option, addr: Option, addrs: VecDeque, stream: Option, } -impl TcpConnector { - pub fn new(req: T, host: String, addrs: VecDeque) -> TcpConnector { +impl TcpConnector { + pub fn new(req: Connect, addrs: VecDeque) -> TcpConnector { TcpConnector { addrs, req: Some(req), - host: Some(host), addr: None, stream: None, } } } -impl Future for TcpConnector { - type Item = (T, ConnectionInfo, TcpStream); +impl Future for TcpConnector { + type Item = (Connect, TcpStream); type Error = ConnectorError; fn poll(&mut self) -> Poll { @@ -232,14 +214,7 @@ impl Future for TcpConnector { if let Some(new) = self.stream.as_mut() { match new.poll() { Ok(Async::Ready(sock)) => { - return Ok(Async::Ready(( - self.req.take().unwrap(), - ConnectionInfo { - host: self.host.take().unwrap(), - addr: self.addr.take().unwrap(), - }, - sock, - ))) + return Ok(Async::Ready((self.req.take().unwrap(), sock))) } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index 26a70011..0355e5ef 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -6,7 +6,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, ConnectAsync, SslAcceptorExt, SslConnectorExt, SslStream}; use super::MAX_CONN_COUNTER; -use connector::ConnectionInfo; +use connector::Connect; use counter::{Counter, CounterGuard}; use service::{NewService, Service}; @@ -102,113 +102,95 @@ impl Future for OpensslAcceptorServiceFut { } /// Openssl connector factory -pub struct OpensslConnector { +pub struct OpensslConnector { connector: SslConnector, - t: PhantomData, - io: PhantomData, - _e: PhantomData, + _t: PhantomData<(T, E)>, } -impl OpensslConnector { +impl OpensslConnector { pub fn new(connector: SslConnector) -> Self { OpensslConnector { connector, - t: PhantomData, - io: PhantomData, - _e: PhantomData, + _t: PhantomData, } } } -impl OpensslConnector { +impl OpensslConnector { pub fn service( connector: SslConnector, - ) -> impl Service< - Request = (T, ConnectionInfo, Io), - Response = (T, ConnectionInfo, SslStream), - Error = Error, - > { + ) -> impl Service), Error = Error> + { OpensslConnectorService { connector: connector, - t: PhantomData, - io: PhantomData, + _t: PhantomData, } } } -impl Clone for OpensslConnector { +impl Clone for OpensslConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - t: PhantomData, - io: PhantomData, - _e: PhantomData, + _t: PhantomData, } } } -impl NewService for OpensslConnector { - type Request = (T, ConnectionInfo, Io); - type Response = (T, ConnectionInfo, SslStream); +impl NewService for OpensslConnector { + type Request = (Connect, T); + type Response = (Connect, SslStream); type Error = Error; - type Service = OpensslConnectorService; + type Service = OpensslConnectorService; type InitError = E; type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(OpensslConnectorService { connector: self.connector.clone(), - t: PhantomData, - io: PhantomData, + _t: PhantomData, }) } } -pub struct OpensslConnectorService { +pub struct OpensslConnectorService { connector: SslConnector, - t: PhantomData, - io: PhantomData, + _t: PhantomData, } -impl Service for OpensslConnectorService { - type Request = (T, ConnectionInfo, Io); - type Response = (T, ConnectionInfo, SslStream); +impl Service for OpensslConnectorService { + type Request = (Connect, T); + type Response = (Connect, SslStream); type Error = Error; - type Future = ConnectAsyncExt; + type Future = ConnectAsyncExt; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, (req, info, stream): Self::Request) -> Self::Future { + fn call(&mut self, (req, stream): Self::Request) -> Self::Future { ConnectAsyncExt { - fut: SslConnectorExt::connect_async(&self.connector, &info.host, stream), + fut: SslConnectorExt::connect_async(&self.connector, &req.host, stream), req: Some(req), - host: Some(info), } } } -pub struct ConnectAsyncExt { - fut: ConnectAsync, - req: Option, - host: Option, +pub struct ConnectAsyncExt { + fut: ConnectAsync, + req: Option, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where - Io: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite, { - type Item = (T, ConnectionInfo, SslStream); + type Item = (Connect, SslStream); type Error = Error; fn poll(&mut self) -> Poll { match self.fut.poll()? { - Async::Ready(stream) => Ok(Async::Ready(( - self.req.take().unwrap(), - self.host.take().unwrap(), - stream, - ))), + Async::Ready(stream) => Ok(Async::Ready((self.req.take().unwrap(), stream))), Async::NotReady => Ok(Async::NotReady), } }