From a7c74c53ea1feafc4f582915b8b259f0ae752283 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 13 Mar 2019 15:37:12 -0700 Subject: [PATCH] store request in Connect request --- actix-connect/src/connect.rs | 183 ++++++++++++++++--------------- actix-connect/src/connector.rs | 99 +++++++++++------ actix-connect/src/lib.rs | 34 +++--- actix-connect/src/resolver.rs | 135 ++++++++++++----------- actix-connect/src/ssl/openssl.rs | 66 +++++------ 5 files changed, 281 insertions(+), 236 deletions(-) diff --git a/actix-connect/src/connect.rs b/actix-connect/src/connect.rs index e6c8be28..92be3fa6 100644 --- a/actix-connect/src/connect.rs +++ b/actix-connect/src/connect.rs @@ -7,154 +7,161 @@ use either::Either; use crate::error::ConnectError; /// Connect request -#[derive(Eq, PartialEq, Debug, Hash)] -pub enum Connect { - /// Host name - Host { host: String, port: u16 }, - /// Host name with address of this host - Addr { - host: String, - addr: Either>, - }, +pub trait Address { + /// Host name of the request + fn host(&self) -> &str; + + /// Port of the request + fn port(&self) -> u16; } -impl Connect { +impl Address for (String, u16) { + fn host(&self) -> &str { + &self.0 + } + + fn port(&self) -> u16 { + self.1 + } +} + +impl Address for (&'static str, u16) { + fn host(&self) -> &str { + self.0 + } + + fn port(&self) -> u16 { + self.1 + } +} + +/// Connect request +#[derive(Eq, PartialEq, Debug, Hash)] +pub struct Connect { + pub(crate) req: T, + pub(crate) addr: Option>>, +} + +impl Connect<(&'static str, u16)> { /// Create new `Connect` instance. - pub fn new>(host: T, port: u16) -> Connect { - Connect::Host { - host: host.as_ref().to_owned(), - port, + pub fn new(host: &'static str, port: u16) -> Connect<(&'static str, u16)> { + Connect { + req: (host, port), + addr: None, + } + } +} + +impl Connect<(String, u16)> { + /// Create new `Connect` instance. + pub fn new>(host: T, port: u16) -> Connect<(String, u16)> { + Connect { + req: (host.as_ref().to_owned(), port), + addr: None, } } /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 - pub fn with>(host: T) -> Result { + pub fn with>(host: T) -> Result, ConnectError> { let mut parts_iter = host.as_ref().splitn(2, ':'); let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?; let port_str = parts_iter.next().unwrap_or(""); let port = port_str .parse::() .map_err(|_| ConnectError::InvalidInput)?; - Ok(Connect::Host { - host: host.to_owned(), - port, + Ok(Connect { + req: (host.to_owned(), port), + addr: None, }) } +} +impl Connect { /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages. - pub fn with_address>(host: T, addr: SocketAddr) -> Connect { - Connect::Addr { - addr: Either::Left(addr), - host: host.into(), + pub fn with_address(req: T, addr: SocketAddr) -> Connect { + Connect { + req, + addr: Some(Either::Left(addr)), } } /// Host name - fn host(&self) -> &str { - match self { - Connect::Host { ref host, .. } => host, - Connect::Addr { ref host, .. } => host, - } + pub fn host(&self) -> &str { + self.req.host() + } + + /// Port of the request + pub fn port(&self) -> u16 { + self.req.port() } } -impl fmt::Display for Connect { +impl fmt::Display for Connect { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}:{}", self.host(), 0) + write!(f, "{}:{}", self.host(), self.port()) } } -pub struct Stream { - io: T, - host: String, - params: P, +pub struct Connection { + io: U, + req: T, } -impl Stream { - pub fn new(io: T, host: String) -> Self { - Self { - io, - host, - params: (), - } +impl Connection { + pub fn new(io: U, req: T) -> Self { + Self { io, req } } } -impl Stream { +impl Connection { /// Reconstruct from a parts. - pub fn from_parts(io: T, host: String, params: P) -> Self { - Self { io, params, host } + pub fn from_parts(io: U, req: T) -> Self { + Self { io, req } } /// Deconstruct into a parts. - pub fn into_parts(self) -> (T, String, P) { - (self.io, self.host, self.params) + pub fn into_parts(self) -> (U, T) { + (self.io, self.req) } /// Replace inclosed object, return new Stream and old object - pub fn replace(self, io: U) -> (T, Stream) { - ( - self.io, - Stream { - io, - host: self.host, - params: self.params, - }, - ) + pub fn replace(self, io: Y) -> (U, Connection) { + (self.io, Connection { io, req: self.req }) } /// Returns a shared reference to the underlying stream. - pub fn get_ref(&self) -> &T { + pub fn get_ref(&self) -> &U { &self.io } /// Returns a mutable reference to the underlying stream. - pub fn get_mut(&mut self) -> &mut T { + pub fn get_mut(&mut self) -> &mut U { &mut self.io } - - /// Get host name - pub fn host(&self) -> &str { - &self.host - } - - /// Return new Io object with new parameter. - pub fn set(self, params: U) -> Stream { - Stream { - io: self.io, - host: self.host, - params: params, - } - } - - /// Maps an Io<_, P> to Io<_, U> by applying a function to a contained value. - pub fn map(self, op: F) -> Stream - where - F: FnOnce(P) -> U, - { - Stream { - io: self.io, - host: self.host, - params: op(self.params), - } - } } -impl std::ops::Deref for Stream { - type Target = T; +impl Connection { + /// Get request + pub fn host(&self) -> &str { + &self.req.host() + } +} - fn deref(&self) -> &T { +impl std::ops::Deref for Connection { + type Target = U; + + fn deref(&self) -> &U { &self.io } } -impl std::ops::DerefMut for Stream { - fn deref_mut(&mut self) -> &mut T { +impl std::ops::DerefMut for Connection { + fn deref_mut(&mut self) -> &mut U { &mut self.io } } -impl fmt::Debug for Stream { +impl fmt::Debug for Connection { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Stream {{{:?}}}", self.io) } diff --git a/actix-connect/src/connector.rs b/actix-connect/src/connector.rs index ae8be957..f4e8837f 100644 --- a/actix-connect/src/connector.rs +++ b/actix-connect/src/connector.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::marker::PhantomData; use std::net::SocketAddr; use actix_service::{NewService, Service}; @@ -6,74 +7,99 @@ use futures::future::{err, ok, Either, FutureResult}; use futures::{Async, Future, Poll}; use tokio_tcp::{ConnectFuture, TcpStream}; -use super::connect::{Connect, Stream}; +use super::connect::{Address, Connect, Connection}; use super::error::ConnectError; /// Tcp connector service factory -#[derive(Copy, Clone, Debug)] -pub struct ConnectorFactory; +#[derive(Debug)] +pub struct ConnectorFactory(PhantomData); -impl NewService for ConnectorFactory { - type Request = Connect; - type Response = Stream; +impl ConnectorFactory { + pub fn new() -> Self { + ConnectorFactory(PhantomData) + } +} + +impl Clone for ConnectorFactory { + fn clone(&self) -> Self { + ConnectorFactory(PhantomData) + } +} + +impl NewService for ConnectorFactory { + type Request = Connect; + type Response = Connection; type Error = ConnectError; - type Service = Connector; + type Service = Connector; type InitError = (); type Future = FutureResult; fn new_service(&self, _: &()) -> Self::Future { - ok(Connector) + ok(Connector(PhantomData)) } } /// Tcp connector service -#[derive(Copy, Clone, Debug)] -pub struct Connector; +#[derive(Debug)] +pub struct Connector(PhantomData); -impl Service for Connector { - type Request = Connect; - type Response = Stream; +impl Connector { + pub fn new() -> Self { + Connector(PhantomData) + } +} + +impl Clone for Connector { + fn clone(&self) -> Self { + Connector(PhantomData) + } +} + +impl Service for Connector { + type Request = Connect; + type Response = Connection; type Error = ConnectError; - type Future = Either>; + type Future = Either, FutureResult>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, req: Connect) -> Self::Future { - match req { - Connect::Host { .. } => { - error!("TCP connector: got unresolved address"); - Either::B(err(ConnectError::Unresolverd)) - } - Connect::Addr { host, addr } => Either::A(ConnectorResponse::new(host, addr)), + fn call(&mut self, req: Connect) -> Self::Future { + let Connect { req, addr } = req; + + if let Some(addr) = addr { + Either::A(ConnectorResponse::new(req, addr)) + } else { + error!("TCP connector: got unresolved address"); + Either::B(err(ConnectError::Unresolverd)) } } } #[doc(hidden)] /// Tcp stream connector response future -pub struct ConnectorResponse { - host: Option, +pub struct ConnectorResponse { + req: Option, addrs: Option>, stream: Option, } -impl ConnectorResponse { +impl ConnectorResponse { pub fn new( - host: String, + req: T, addr: either::Either>, - ) -> ConnectorResponse { - trace!("TCP connector - connecting to {:?}", host); + ) -> ConnectorResponse { + trace!("TCP connector - connecting to {:?}", req.host()); match addr { either::Either::Left(addr) => ConnectorResponse { - host: Some(host), + req: Some(req), addrs: None, stream: Some(TcpStream::connect(&addr)), }, either::Either::Right(addrs) => ConnectorResponse { - host: Some(host), + req: Some(req), addrs: Some(addrs), stream: None, }, @@ -81,8 +107,8 @@ impl ConnectorResponse { } } -impl Future for ConnectorResponse { - type Item = Stream; +impl Future for ConnectorResponse { + type Item = Connection; type Error = ConnectError; fn poll(&mut self) -> Poll { @@ -91,18 +117,19 @@ impl Future for ConnectorResponse { if let Some(new) = self.stream.as_mut() { match new.poll() { Ok(Async::Ready(sock)) => { - let host = self.host.take().unwrap(); + let req = self.req.take().unwrap(); trace!( "TCP connector - successfully connected to connecting to {:?} - {:?}", - host, sock.peer_addr() + req.host(), sock.peer_addr() ); - return Ok(Async::Ready(Stream::new(sock, host))); + return Ok(Async::Ready(Connection::new(sock, req))); } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { trace!( - "TCP connector - failed to connect to connecting to {:?}", - self.host.as_ref().unwrap() + "TCP connector - failed to connect to connecting to {:?} port: {}", + self.req.as_ref().unwrap().host(), + self.req.as_ref().unwrap().port(), ); if self.addrs.as_ref().unwrap().is_empty() { return Err(err.into()); diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index 257f5994..542c68e6 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -16,7 +16,7 @@ pub mod ssl; pub use trust_dns_resolver::error::ResolveError; -pub use self::connect::{Connect, Stream}; +pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{Connector, ConnectorFactory}; pub use self::error::ConnectError; pub use self::resolver::{Resolver, ResolverFactory}; @@ -26,40 +26,40 @@ use tokio_tcp::TcpStream; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; /// Create tcp connector service -pub fn new_connector( +pub fn new_connector( cfg: ResolverConfig, opts: ResolverOpts, -) -> impl Service, Error = ConnectError> + Clone -{ - Resolver::new(cfg, opts).and_then(Connector) +) -> impl Service, Response = Connection, Error = ConnectError> + + Clone { + Resolver::new(cfg, opts).and_then(Connector::new()) } /// Create tcp connector service -pub fn new_connector_factory( +pub fn new_connector_factory( cfg: ResolverConfig, opts: ResolverOpts, ) -> impl NewService< - Request = Connect, - Response = Stream, + Request = Connect, + Response = Connection, Error = ConnectError, InitError = (), > + Clone { - ResolverFactory::new(cfg, opts).and_then(ConnectorFactory) + ResolverFactory::new(cfg, opts).and_then(ConnectorFactory::new()) } /// Create connector service with default parameters -pub fn default_connector( -) -> impl Service, Error = ConnectError> + Clone -{ - Resolver::default().and_then(Connector) +pub fn default_connector( +) -> impl Service, Response = Connection, Error = ConnectError> + + Clone { + Resolver::default().and_then(Connector::new()) } /// Create connector service factory with default parameters -pub fn default_connector_factory() -> impl NewService< - Request = Connect, - Response = Stream, +pub fn default_connector_factory() -> impl NewService< + Request = Connect, + Response = Connection, Error = ConnectError, InitError = (), > + Clone { - ResolverFactory::default().and_then(ConnectorFactory) + ResolverFactory::default().and_then(ConnectorFactory::new()) } diff --git a/actix-connect/src/resolver.rs b/actix-connect/src/resolver.rs index 5dc750b9..0be30ed1 100644 --- a/actix-connect/src/resolver.rs +++ b/actix-connect/src/resolver.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::marker::PhantomData; use std::net::SocketAddr; use actix_service::{NewService, Service}; @@ -9,15 +10,16 @@ use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::{AsyncResolver, Background}; -use crate::connect::Connect; +use crate::connect::{Address, Connect}; use crate::error::ConnectError; /// DNS Resolver Service factory -pub struct ResolverFactory { +pub struct ResolverFactory { resolver: AsyncResolver, + _t: PhantomData, } -impl Default for ResolverFactory { +impl Default for ResolverFactory { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -29,44 +31,50 @@ impl Default for ResolverFactory { } } -impl ResolverFactory { +impl ResolverFactory { /// Create new resolver instance with custom configuration and options. pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { let (resolver, bg) = AsyncResolver::new(cfg, opts); tokio_current_thread::spawn(bg); - ResolverFactory { resolver } - } -} - -impl Clone for ResolverFactory { - fn clone(&self) -> Self { ResolverFactory { - resolver: self.resolver.clone(), + resolver, + _t: PhantomData, } } } -impl NewService for ResolverFactory { - type Request = Connect; - type Response = Connect; +impl Clone for ResolverFactory { + fn clone(&self) -> Self { + ResolverFactory { + resolver: self.resolver.clone(), + _t: PhantomData, + } + } +} + +impl NewService for ResolverFactory { + type Request = Connect; + type Response = Connect; type Error = ConnectError; - type Service = Resolver; + type Service = Resolver; type InitError = (); type Future = FutureResult; fn new_service(&self, _: &()) -> Self::Future { ok(Resolver { resolver: self.resolver.clone(), + _t: PhantomData, }) } } /// DNS Resolver Service -pub struct Resolver { +pub struct Resolver { resolver: AsyncResolver, + _t: PhantomData, } -impl Default for Resolver { +impl Default for Resolver { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -78,100 +86,99 @@ impl Default for Resolver { } } -impl Resolver { +impl Resolver { /// Create new resolver instance with custom configuration and options. pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { let (resolver, bg) = AsyncResolver::new(cfg, opts); tokio_current_thread::spawn(bg); - Resolver { resolver } - } -} - -impl Clone for Resolver { - fn clone(&self) -> Self { Resolver { - resolver: self.resolver.clone(), + resolver, + _t: PhantomData, } } } -impl Service for Resolver { - type Request = Connect; - type Response = Connect; +impl Clone for Resolver { + fn clone(&self) -> Self { + Resolver { + resolver: self.resolver.clone(), + _t: PhantomData, + } + } +} + +impl Service for Resolver { + type Request = Connect; + type Response = Connect; type Error = ConnectError; - type Future = Either>; + type Future = Either, FutureResult, Self::Error>>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, req: Connect) -> Self::Future { - match req { - Connect::Host { host, port } => { - if let Ok(ip) = host.parse() { - Either::B(ok(Connect::Addr { - host: host, - addr: either::Either::Left(SocketAddr::new(ip, port)), - })) - } else { - trace!("DNS resolver: resolving host {:?}", host); - Either::A(ResolverFuture::new(host, port, &self.resolver)) - } + fn call(&mut self, mut req: Connect) -> Self::Future { + if req.addr.is_some() { + Either::B(ok(req)) + } else { + if let Ok(ip) = req.host().parse() { + req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); + Either::B(ok(req)) + } else { + trace!("DNS resolver: resolving host {:?}", req.host()); + Either::A(ResolverFuture::new(req, &self.resolver)) } - other => Either::B(ok(other)), } } } #[doc(hidden)] /// Resolver future -pub struct ResolverFuture { - host: Option, - port: u16, +pub struct ResolverFuture { + req: Option>, lookup: Option>, } -impl ResolverFuture { - pub fn new(host: String, port: u16, resolver: &AsyncResolver) -> Self { +impl ResolverFuture { + pub fn new(req: Connect, resolver: &AsyncResolver) -> Self { ResolverFuture { - lookup: Some(resolver.lookup_ip(host.as_str())), - host: Some(host), - port, + lookup: Some(resolver.lookup_ip(req.host())), + req: Some(req), } } } -impl Future for ResolverFuture { - type Item = Connect; +impl Future for ResolverFuture { + type Item = Connect; type Error = ConnectError; fn poll(&mut self) -> Poll { match self.lookup.as_mut().unwrap().poll().map_err(|e| { trace!( "DNS resolver: failed to resolve host {:?} err: {}", - self.host.as_ref().unwrap(), + self.req.as_ref().unwrap().host(), e ); e })? { Async::NotReady => Ok(Async::NotReady), Async::Ready(ips) => { - let host = self.host.take().unwrap(); + let mut req = self.req.take().unwrap(); let mut addrs: VecDeque<_> = ips .iter() - .map(|ip| SocketAddr::new(ip, self.port)) + .map(|ip| SocketAddr::new(ip, req.port())) .collect(); - trace!("DNS resolver: host {:?} resolved to {:?}", host, addrs); + trace!( + "DNS resolver: host {:?} resolved to {:?}", + req.host(), + addrs + ); if addrs.len() == 1 { - Ok(Async::Ready(Connect::Addr { - addr: either::Either::Left(addrs.pop_front().unwrap()), - host, - })) + req.addr = Some(either::Either::Left(addrs.pop_front().unwrap())); + Ok(Async::Ready(req)) } else { - Ok(Async::Ready(Connect::Addr { - addr: either::Either::Right(addrs), - host, - })) + req.addr = Some(either::Either::Right(addrs)); + Ok(Async::Ready(req)) } } } diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index 59ccdb0f..daaf05a7 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -7,15 +7,15 @@ use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use openssl::ssl::{HandshakeError, SslConnector}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; -use crate::Stream; +use crate::{Address, Connection}; /// Openssl connector factory -pub struct OpensslConnector { +pub struct OpensslConnector { connector: SslConnector, - _t: PhantomData<(T, P, E)>, + _t: PhantomData<(T, U, E)>, } -impl OpensslConnector { +impl OpensslConnector { pub fn new(connector: SslConnector) -> Self { OpensslConnector { connector, @@ -24,13 +24,17 @@ impl OpensslConnector { } } -impl OpensslConnector { +impl OpensslConnector +where + T: Address, + U: AsyncRead + AsyncWrite + fmt::Debug, +{ pub fn service( connector: SslConnector, ) -> impl Service< - Request = Stream, - Response = Stream, P>, - Error = HandshakeError, + Request = Connection, + Response = Connection>, + Error = HandshakeError, > { OpensslConnectorService { connector: connector, @@ -39,7 +43,7 @@ impl OpensslConnector { } } -impl Clone for OpensslConnector { +impl Clone for OpensslConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), @@ -48,14 +52,14 @@ impl Clone for OpensslConnector { } } -impl NewService<()> for OpensslConnector +impl NewService<()> for OpensslConnector where - T: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + fmt::Debug, { - type Request = Stream; - type Response = Stream, P>; - type Error = HandshakeError; - type Service = OpensslConnectorService; + type Request = Connection; + type Response = Connection>; + type Error = HandshakeError; + type Service = OpensslConnectorService; type InitError = E; type Future = FutureResult; @@ -67,25 +71,25 @@ where } } -pub struct OpensslConnectorService { +pub struct OpensslConnectorService { connector: SslConnector, - _t: PhantomData<(T, P)>, + _t: PhantomData<(T, U)>, } -impl Service for OpensslConnectorService +impl Service for OpensslConnectorService where - T: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + fmt::Debug, { - type Request = Stream; - type Response = Stream, P>; - type Error = HandshakeError; - type Future = ConnectAsyncExt; + type Request = Connection; + type Response = Connection>; + type Error = HandshakeError; + type Future = ConnectAsyncExt; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, stream: Stream) -> Self::Future { + fn call(&mut self, stream: Connection) -> Self::Future { trace!("SSL Handshake start for: {:?}", stream.host()); let (io, stream) = stream.replace(()); ConnectAsyncExt { @@ -95,17 +99,17 @@ where } } -pub struct ConnectAsyncExt { - fut: ConnectAsync, - stream: Option>, +pub struct ConnectAsyncExt { + fut: ConnectAsync, + stream: Option>, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where - T: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + fmt::Debug, { - type Item = Stream, P>; - type Error = HandshakeError; + type Item = Connection>; + type Error = HandshakeError; fn poll(&mut self) -> Poll { match self.fut.poll().map_err(|e| {