1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-07 14:44:24 +01:00

store request in Connect request

This commit is contained in:
Nikolay Kim 2019-03-13 15:37:12 -07:00
parent 3e7d737e73
commit a7c74c53ea
5 changed files with 281 additions and 236 deletions

View File

@ -7,154 +7,161 @@ use either::Either;
use crate::error::ConnectError; use crate::error::ConnectError;
/// Connect request /// Connect request
#[derive(Eq, PartialEq, Debug, Hash)] pub trait Address {
pub enum Connect { /// Host name of the request
/// Host name fn host(&self) -> &str;
Host { host: String, port: u16 },
/// Host name with address of this host /// Port of the request
Addr { fn port(&self) -> u16;
host: String,
addr: Either<SocketAddr, VecDeque<SocketAddr>>,
},
} }
impl Connect { impl Address for (String, u16) {
fn host(&self) -> &str {
&self.0
}
fn port(&self) -> u16 {
self.1
}
}
impl Address for (&'static str, u16) {
fn host(&self) -> &str {
self.0
}
fn port(&self) -> u16 {
self.1
}
}
/// Connect request
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct Connect<T> {
pub(crate) req: T,
pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
}
impl Connect<(&'static str, u16)> {
/// Create new `Connect` instance. /// Create new `Connect` instance.
pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect { pub fn new(host: &'static str, port: u16) -> Connect<(&'static str, u16)> {
Connect::Host { Connect {
host: host.as_ref().to_owned(), req: (host, port),
port, addr: None,
}
}
}
impl Connect<(String, u16)> {
/// Create new `Connect` instance.
pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect<(String, u16)> {
Connect {
req: (host.as_ref().to_owned(), port),
addr: None,
} }
} }
/// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
pub fn with<T: AsRef<str>>(host: T) -> Result<Connect, ConnectError> { pub fn with<T: AsRef<str>>(host: T) -> Result<Connect<(String, u16)>, ConnectError> {
let mut parts_iter = host.as_ref().splitn(2, ':'); let mut parts_iter = host.as_ref().splitn(2, ':');
let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?; let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?;
let port_str = parts_iter.next().unwrap_or(""); let port_str = parts_iter.next().unwrap_or("");
let port = port_str let port = port_str
.parse::<u16>() .parse::<u16>()
.map_err(|_| ConnectError::InvalidInput)?; .map_err(|_| ConnectError::InvalidInput)?;
Ok(Connect::Host { Ok(Connect {
host: host.to_owned(), req: (host.to_owned(), port),
port, addr: None,
}) })
} }
}
impl<T: Address> Connect<T> {
/// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages. /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
pub fn with_address<T: Into<String>>(host: T, addr: SocketAddr) -> Connect { pub fn with_address(req: T, addr: SocketAddr) -> Connect<T> {
Connect::Addr { Connect {
addr: Either::Left(addr), req,
host: host.into(), addr: Some(Either::Left(addr)),
} }
} }
/// Host name /// Host name
fn host(&self) -> &str { pub fn host(&self) -> &str {
match self { self.req.host()
Connect::Host { ref host, .. } => host,
Connect::Addr { ref host, .. } => host,
} }
/// Port of the request
pub fn port(&self) -> u16 {
self.req.port()
} }
} }
impl fmt::Display for Connect { impl<T: Address> fmt::Display for Connect<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}:{}", self.host(), 0) write!(f, "{}:{}", self.host(), self.port())
} }
} }
pub struct Stream<T, P = ()> { pub struct Connection<T, U> {
io: T, io: U,
host: String, req: T,
params: P,
} }
impl<T> Stream<T, ()> { impl<T, U> Connection<T, U> {
pub fn new(io: T, host: String) -> Self { pub fn new(io: U, req: T) -> Self {
Self { Self { io, req }
io,
host,
params: (),
}
} }
} }
impl<T, P> Stream<T, P> { impl<T, U> Connection<T, U> {
/// Reconstruct from a parts. /// Reconstruct from a parts.
pub fn from_parts(io: T, host: String, params: P) -> Self { pub fn from_parts(io: U, req: T) -> Self {
Self { io, params, host } Self { io, req }
} }
/// Deconstruct into a parts. /// Deconstruct into a parts.
pub fn into_parts(self) -> (T, String, P) { pub fn into_parts(self) -> (U, T) {
(self.io, self.host, self.params) (self.io, self.req)
} }
/// Replace inclosed object, return new Stream and old object /// Replace inclosed object, return new Stream and old object
pub fn replace<U>(self, io: U) -> (T, Stream<U, P>) { pub fn replace<Y>(self, io: Y) -> (U, Connection<T, Y>) {
( (self.io, Connection { io, req: self.req })
self.io,
Stream {
io,
host: self.host,
params: self.params,
},
)
} }
/// Returns a shared reference to the underlying stream. /// Returns a shared reference to the underlying stream.
pub fn get_ref(&self) -> &T { pub fn get_ref(&self) -> &U {
&self.io &self.io
} }
/// Returns a mutable reference to the underlying stream. /// Returns a mutable reference to the underlying stream.
pub fn get_mut(&mut self) -> &mut T { pub fn get_mut(&mut self) -> &mut U {
&mut self.io &mut self.io
} }
/// Get host name
pub fn host(&self) -> &str {
&self.host
}
/// Return new Io object with new parameter.
pub fn set<U>(self, params: U) -> Stream<T, U> {
Stream {
io: self.io,
host: self.host,
params: params,
}
}
/// Maps an Io<_, P> to Io<_, U> by applying a function to a contained value.
pub fn map<U, F>(self, op: F) -> Stream<T, U>
where
F: FnOnce(P) -> U,
{
Stream {
io: self.io,
host: self.host,
params: op(self.params),
}
}
} }
impl<T, P> std::ops::Deref for Stream<T, P> { impl<T: Address, U> Connection<T, U> {
type Target = T; /// Get request
pub fn host(&self) -> &str {
&self.req.host()
}
}
fn deref(&self) -> &T { impl<T, U> std::ops::Deref for Connection<T, U> {
type Target = U;
fn deref(&self) -> &U {
&self.io &self.io
} }
} }
impl<T, P> std::ops::DerefMut for Stream<T, P> { impl<T, U> std::ops::DerefMut for Connection<T, U> {
fn deref_mut(&mut self) -> &mut T { fn deref_mut(&mut self) -> &mut U {
&mut self.io &mut self.io
} }
} }
impl<T: fmt::Debug, P> fmt::Debug for Stream<T, P> { impl<T, U: fmt::Debug> fmt::Debug for Connection<T, U> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Stream {{{:?}}}", self.io) write!(f, "Stream {{{:?}}}", self.io)
} }

View File

@ -1,4 +1,5 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use actix_service::{NewService, Service}; use actix_service::{NewService, Service};
@ -6,74 +7,99 @@ use futures::future::{err, ok, Either, FutureResult};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use tokio_tcp::{ConnectFuture, TcpStream}; use tokio_tcp::{ConnectFuture, TcpStream};
use super::connect::{Connect, Stream}; use super::connect::{Address, Connect, Connection};
use super::error::ConnectError; use super::error::ConnectError;
/// Tcp connector service factory /// Tcp connector service factory
#[derive(Copy, Clone, Debug)] #[derive(Debug)]
pub struct ConnectorFactory; pub struct ConnectorFactory<T>(PhantomData<T>);
impl NewService for ConnectorFactory { impl<T> ConnectorFactory<T> {
type Request = Connect; pub fn new() -> Self {
type Response = Stream<TcpStream>; ConnectorFactory(PhantomData)
}
}
impl<T> Clone for ConnectorFactory<T> {
fn clone(&self) -> Self {
ConnectorFactory(PhantomData)
}
}
impl<T: Address> NewService for ConnectorFactory<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Service = Connector; type Service = Connector<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(Connector) ok(Connector(PhantomData))
} }
} }
/// Tcp connector service /// Tcp connector service
#[derive(Copy, Clone, Debug)] #[derive(Debug)]
pub struct Connector; pub struct Connector<T>(PhantomData<T>);
impl Service for Connector { impl<T> Connector<T> {
type Request = Connect; pub fn new() -> Self {
type Response = Stream<TcpStream>; Connector(PhantomData)
}
}
impl<T> Clone for Connector<T> {
fn clone(&self) -> Self {
Connector(PhantomData)
}
}
impl<T: Address> Service for Connector<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Future = Either<ConnectorResponse, FutureResult<Self::Response, Self::Error>>; type Future = Either<ConnectorResponse<T>, FutureResult<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
fn call(&mut self, req: Connect) -> Self::Future { fn call(&mut self, req: Connect<T>) -> Self::Future {
match req { let Connect { req, addr } = req;
Connect::Host { .. } => {
if let Some(addr) = addr {
Either::A(ConnectorResponse::new(req, addr))
} else {
error!("TCP connector: got unresolved address"); error!("TCP connector: got unresolved address");
Either::B(err(ConnectError::Unresolverd)) Either::B(err(ConnectError::Unresolverd))
} }
Connect::Addr { host, addr } => Either::A(ConnectorResponse::new(host, addr)),
}
} }
} }
#[doc(hidden)] #[doc(hidden)]
/// Tcp stream connector response future /// Tcp stream connector response future
pub struct ConnectorResponse { pub struct ConnectorResponse<T> {
host: Option<String>, req: Option<T>,
addrs: Option<VecDeque<SocketAddr>>, addrs: Option<VecDeque<SocketAddr>>,
stream: Option<ConnectFuture>, stream: Option<ConnectFuture>,
} }
impl ConnectorResponse { impl<T: Address> ConnectorResponse<T> {
pub fn new( pub fn new(
host: String, req: T,
addr: either::Either<SocketAddr, VecDeque<SocketAddr>>, addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
) -> ConnectorResponse { ) -> ConnectorResponse<T> {
trace!("TCP connector - connecting to {:?}", host); trace!("TCP connector - connecting to {:?}", req.host());
match addr { match addr {
either::Either::Left(addr) => ConnectorResponse { either::Either::Left(addr) => ConnectorResponse {
host: Some(host), req: Some(req),
addrs: None, addrs: None,
stream: Some(TcpStream::connect(&addr)), stream: Some(TcpStream::connect(&addr)),
}, },
either::Either::Right(addrs) => ConnectorResponse { either::Either::Right(addrs) => ConnectorResponse {
host: Some(host), req: Some(req),
addrs: Some(addrs), addrs: Some(addrs),
stream: None, stream: None,
}, },
@ -81,8 +107,8 @@ impl ConnectorResponse {
} }
} }
impl Future for ConnectorResponse { impl<T: Address> Future for ConnectorResponse<T> {
type Item = Stream<TcpStream>; type Item = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -91,18 +117,19 @@ impl Future for ConnectorResponse {
if let Some(new) = self.stream.as_mut() { if let Some(new) = self.stream.as_mut() {
match new.poll() { match new.poll() {
Ok(Async::Ready(sock)) => { Ok(Async::Ready(sock)) => {
let host = self.host.take().unwrap(); let req = self.req.take().unwrap();
trace!( trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}", "TCP connector - successfully connected to connecting to {:?} - {:?}",
host, sock.peer_addr() req.host(), sock.peer_addr()
); );
return Ok(Async::Ready(Stream::new(sock, host))); return Ok(Async::Ready(Connection::new(sock, req)));
} }
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => { Err(err) => {
trace!( trace!(
"TCP connector - failed to connect to connecting to {:?}", "TCP connector - failed to connect to connecting to {:?} port: {}",
self.host.as_ref().unwrap() self.req.as_ref().unwrap().host(),
self.req.as_ref().unwrap().port(),
); );
if self.addrs.as_ref().unwrap().is_empty() { if self.addrs.as_ref().unwrap().is_empty() {
return Err(err.into()); return Err(err.into());

View File

@ -16,7 +16,7 @@ pub mod ssl;
pub use trust_dns_resolver::error::ResolveError; pub use trust_dns_resolver::error::ResolveError;
pub use self::connect::{Connect, Stream}; pub use self::connect::{Address, Connect, Connection};
pub use self::connector::{Connector, ConnectorFactory}; pub use self::connector::{Connector, ConnectorFactory};
pub use self::error::ConnectError; pub use self::error::ConnectError;
pub use self::resolver::{Resolver, ResolverFactory}; pub use self::resolver::{Resolver, ResolverFactory};
@ -26,40 +26,40 @@ use tokio_tcp::TcpStream;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
/// Create tcp connector service /// Create tcp connector service
pub fn new_connector( pub fn new_connector<T: Address>(
cfg: ResolverConfig, cfg: ResolverConfig,
opts: ResolverOpts, opts: ResolverOpts,
) -> impl Service<Request = Connect, Response = Stream<TcpStream>, Error = ConnectError> + Clone ) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
{ + Clone {
Resolver::new(cfg, opts).and_then(Connector) Resolver::new(cfg, opts).and_then(Connector::new())
} }
/// Create tcp connector service /// Create tcp connector service
pub fn new_connector_factory( pub fn new_connector_factory<T: Address>(
cfg: ResolverConfig, cfg: ResolverConfig,
opts: ResolverOpts, opts: ResolverOpts,
) -> impl NewService< ) -> impl NewService<
Request = Connect, Request = Connect<T>,
Response = Stream<TcpStream>, Response = Connection<T, TcpStream>,
Error = ConnectError, Error = ConnectError,
InitError = (), InitError = (),
> + Clone { > + Clone {
ResolverFactory::new(cfg, opts).and_then(ConnectorFactory) ResolverFactory::new(cfg, opts).and_then(ConnectorFactory::new())
} }
/// Create connector service with default parameters /// Create connector service with default parameters
pub fn default_connector( pub fn default_connector<T: Address>(
) -> impl Service<Request = Connect, Response = Stream<TcpStream>, Error = ConnectError> + Clone ) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
{ + Clone {
Resolver::default().and_then(Connector) Resolver::default().and_then(Connector::new())
} }
/// Create connector service factory with default parameters /// Create connector service factory with default parameters
pub fn default_connector_factory() -> impl NewService< pub fn default_connector_factory<T: Address>() -> impl NewService<
Request = Connect, Request = Connect<T>,
Response = Stream<TcpStream>, Response = Connection<T, TcpStream>,
Error = ConnectError, Error = ConnectError,
InitError = (), InitError = (),
> + Clone { > + Clone {
ResolverFactory::default().and_then(ConnectorFactory) ResolverFactory::default().and_then(ConnectorFactory::new())
} }

View File

@ -1,4 +1,5 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use actix_service::{NewService, Service}; use actix_service::{NewService, Service};
@ -9,15 +10,16 @@ use trust_dns_resolver::lookup_ip::LookupIpFuture;
use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::system_conf::read_system_conf;
use trust_dns_resolver::{AsyncResolver, Background}; use trust_dns_resolver::{AsyncResolver, Background};
use crate::connect::Connect; use crate::connect::{Address, Connect};
use crate::error::ConnectError; use crate::error::ConnectError;
/// DNS Resolver Service factory /// DNS Resolver Service factory
pub struct ResolverFactory { pub struct ResolverFactory<T> {
resolver: AsyncResolver, resolver: AsyncResolver,
_t: PhantomData<T>,
} }
impl Default for ResolverFactory { impl<T> Default for ResolverFactory<T> {
fn default() -> Self { fn default() -> Self {
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
(cfg, opts) (cfg, opts)
@ -29,44 +31,50 @@ impl Default for ResolverFactory {
} }
} }
impl ResolverFactory { impl<T> ResolverFactory<T> {
/// Create new resolver instance with custom configuration and options. /// Create new resolver instance with custom configuration and options.
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
let (resolver, bg) = AsyncResolver::new(cfg, opts); let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg); tokio_current_thread::spawn(bg);
ResolverFactory { resolver } ResolverFactory {
resolver,
_t: PhantomData,
}
} }
} }
impl Clone for ResolverFactory { impl<T> Clone for ResolverFactory<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
ResolverFactory { ResolverFactory {
resolver: self.resolver.clone(), resolver: self.resolver.clone(),
_t: PhantomData,
} }
} }
} }
impl NewService for ResolverFactory { impl<T: Address> NewService for ResolverFactory<T> {
type Request = Connect; type Request = Connect<T>;
type Response = Connect; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
type Service = Resolver; type Service = Resolver<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(Resolver { ok(Resolver {
resolver: self.resolver.clone(), resolver: self.resolver.clone(),
_t: PhantomData,
}) })
} }
} }
/// DNS Resolver Service /// DNS Resolver Service
pub struct Resolver { pub struct Resolver<T> {
resolver: AsyncResolver, resolver: AsyncResolver,
_t: PhantomData<T>,
} }
impl Default for Resolver { impl<T> Default for Resolver<T> {
fn default() -> Self { fn default() -> Self {
let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
(cfg, opts) (cfg, opts)
@ -78,100 +86,99 @@ impl Default for Resolver {
} }
} }
impl Resolver { impl<T> Resolver<T> {
/// Create new resolver instance with custom configuration and options. /// Create new resolver instance with custom configuration and options.
pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
let (resolver, bg) = AsyncResolver::new(cfg, opts); let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg); tokio_current_thread::spawn(bg);
Resolver { resolver } Resolver {
resolver,
_t: PhantomData,
}
} }
} }
impl Clone for Resolver { impl<T> Clone for Resolver<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Resolver { Resolver {
resolver: self.resolver.clone(), resolver: self.resolver.clone(),
_t: PhantomData,
} }
} }
} }
impl Service for Resolver { impl<T: Address> Service for Resolver<T> {
type Request = Connect; type Request = Connect<T>;
type Response = Connect; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
type Future = Either<ResolverFuture, FutureResult<Connect, Self::Error>>; type Future = Either<ResolverFuture<T>, FutureResult<Connect<T>, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
fn call(&mut self, req: Connect) -> Self::Future { fn call(&mut self, mut req: Connect<T>) -> Self::Future {
match req { if req.addr.is_some() {
Connect::Host { host, port } => { Either::B(ok(req))
if let Ok(ip) = host.parse() {
Either::B(ok(Connect::Addr {
host: host,
addr: either::Either::Left(SocketAddr::new(ip, port)),
}))
} else { } else {
trace!("DNS resolver: resolving host {:?}", host); if let Ok(ip) = req.host().parse() {
Either::A(ResolverFuture::new(host, port, &self.resolver)) req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
Either::B(ok(req))
} else {
trace!("DNS resolver: resolving host {:?}", req.host());
Either::A(ResolverFuture::new(req, &self.resolver))
} }
} }
other => Either::B(ok(other)),
}
} }
} }
#[doc(hidden)] #[doc(hidden)]
/// Resolver future /// Resolver future
pub struct ResolverFuture { pub struct ResolverFuture<T: Address> {
host: Option<String>, req: Option<Connect<T>>,
port: u16,
lookup: Option<Background<LookupIpFuture>>, lookup: Option<Background<LookupIpFuture>>,
} }
impl ResolverFuture { impl<T: Address> ResolverFuture<T> {
pub fn new(host: String, port: u16, resolver: &AsyncResolver) -> Self { pub fn new(req: Connect<T>, resolver: &AsyncResolver) -> Self {
ResolverFuture { ResolverFuture {
lookup: Some(resolver.lookup_ip(host.as_str())), lookup: Some(resolver.lookup_ip(req.host())),
host: Some(host), req: Some(req),
port,
} }
} }
} }
impl Future for ResolverFuture { impl<T: Address> Future for ResolverFuture<T> {
type Item = Connect; type Item = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.lookup.as_mut().unwrap().poll().map_err(|e| { match self.lookup.as_mut().unwrap().poll().map_err(|e| {
trace!( trace!(
"DNS resolver: failed to resolve host {:?} err: {}", "DNS resolver: failed to resolve host {:?} err: {}",
self.host.as_ref().unwrap(), self.req.as_ref().unwrap().host(),
e e
); );
e e
})? { })? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(ips) => { Async::Ready(ips) => {
let host = self.host.take().unwrap(); let mut req = self.req.take().unwrap();
let mut addrs: VecDeque<_> = ips let mut addrs: VecDeque<_> = ips
.iter() .iter()
.map(|ip| SocketAddr::new(ip, self.port)) .map(|ip| SocketAddr::new(ip, req.port()))
.collect(); .collect();
trace!("DNS resolver: host {:?} resolved to {:?}", host, addrs); trace!(
"DNS resolver: host {:?} resolved to {:?}",
req.host(),
addrs
);
if addrs.len() == 1 { if addrs.len() == 1 {
Ok(Async::Ready(Connect::Addr { req.addr = Some(either::Either::Left(addrs.pop_front().unwrap()));
addr: either::Either::Left(addrs.pop_front().unwrap()), Ok(Async::Ready(req))
host,
}))
} else { } else {
Ok(Async::Ready(Connect::Addr { req.addr = Some(either::Either::Right(addrs));
addr: either::Either::Right(addrs), Ok(Async::Ready(req))
host,
}))
} }
} }
} }

View File

@ -7,15 +7,15 @@ use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use openssl::ssl::{HandshakeError, SslConnector}; use openssl::ssl::{HandshakeError, SslConnector};
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
use crate::Stream; use crate::{Address, Connection};
/// Openssl connector factory /// Openssl connector factory
pub struct OpensslConnector<T, P, E> { pub struct OpensslConnector<T, U, E> {
connector: SslConnector, connector: SslConnector,
_t: PhantomData<(T, P, E)>, _t: PhantomData<(T, U, E)>,
} }
impl<T, P, E> OpensslConnector<T, P, E> { impl<T, U, E> OpensslConnector<T, U, E> {
pub fn new(connector: SslConnector) -> Self { pub fn new(connector: SslConnector) -> Self {
OpensslConnector { OpensslConnector {
connector, connector,
@ -24,13 +24,17 @@ impl<T, P, E> OpensslConnector<T, P, E> {
} }
} }
impl<T: AsyncRead + AsyncWrite + fmt::Debug, P> OpensslConnector<T, P, ()> { impl<T, U, E> OpensslConnector<T, U, E>
where
T: Address,
U: AsyncRead + AsyncWrite + fmt::Debug,
{
pub fn service( pub fn service(
connector: SslConnector, connector: SslConnector,
) -> impl Service< ) -> impl Service<
Request = Stream<T, P>, Request = Connection<T, U>,
Response = Stream<SslStream<T>, P>, Response = Connection<T, SslStream<U>>,
Error = HandshakeError<T>, Error = HandshakeError<U>,
> { > {
OpensslConnectorService { OpensslConnectorService {
connector: connector, connector: connector,
@ -39,7 +43,7 @@ impl<T: AsyncRead + AsyncWrite + fmt::Debug, P> OpensslConnector<T, P, ()> {
} }
} }
impl<T, P, E> Clone for OpensslConnector<T, P, E> { impl<T, U, E> Clone for OpensslConnector<T, U, E> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
connector: self.connector.clone(), connector: self.connector.clone(),
@ -48,14 +52,14 @@ impl<T, P, E> Clone for OpensslConnector<T, P, E> {
} }
} }
impl<T, P, E> NewService<()> for OpensslConnector<T, P, E> impl<T: Address, U, E> NewService<()> for OpensslConnector<T, U, E>
where where
T: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
{ {
type Request = Stream<T, P>; type Request = Connection<T, U>;
type Response = Stream<SslStream<T>, P>; type Response = Connection<T, SslStream<U>>;
type Error = HandshakeError<T>; type Error = HandshakeError<U>;
type Service = OpensslConnectorService<T, P>; type Service = OpensslConnectorService<T, U>;
type InitError = E; type InitError = E;
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
@ -67,25 +71,25 @@ where
} }
} }
pub struct OpensslConnectorService<T, P> { pub struct OpensslConnectorService<T, U> {
connector: SslConnector, connector: SslConnector,
_t: PhantomData<(T, P)>, _t: PhantomData<(T, U)>,
} }
impl<T, P> Service for OpensslConnectorService<T, P> impl<T: Address, U> Service for OpensslConnectorService<T, U>
where where
T: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
{ {
type Request = Stream<T, P>; type Request = Connection<T, U>;
type Response = Stream<SslStream<T>, P>; type Response = Connection<T, SslStream<U>>;
type Error = HandshakeError<T>; type Error = HandshakeError<U>;
type Future = ConnectAsyncExt<T, P>; type Future = ConnectAsyncExt<T, U>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
fn call(&mut self, stream: Stream<T, P>) -> Self::Future { fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host()); trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace(()); let (io, stream) = stream.replace(());
ConnectAsyncExt { ConnectAsyncExt {
@ -95,17 +99,17 @@ where
} }
} }
pub struct ConnectAsyncExt<T, P> { pub struct ConnectAsyncExt<T, U> {
fut: ConnectAsync<T>, fut: ConnectAsync<U>,
stream: Option<Stream<(), P>>, stream: Option<Connection<T, ()>>,
} }
impl<T, P> Future for ConnectAsyncExt<T, P> impl<T: Address, U> Future for ConnectAsyncExt<T, U>
where where
T: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
{ {
type Item = Stream<SslStream<T>, P>; type Item = Connection<T, SslStream<U>>;
type Error = HandshakeError<T>; type Error = HandshakeError<U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll().map_err(|e| { match self.fut.poll().map_err(|e| {