diff --git a/actix-connect/src/connect.rs b/actix-connect/src/connect.rs index e6c8be28..92be3fa6 100644 --- a/actix-connect/src/connect.rs +++ b/actix-connect/src/connect.rs @@ -7,154 +7,161 @@ use either::Either; use crate::error::ConnectError; /// Connect request -#[derive(Eq, PartialEq, Debug, Hash)] -pub enum Connect { - /// Host name - Host { host: String, port: u16 }, - /// Host name with address of this host - Addr { - host: String, - addr: Either<SocketAddr, VecDeque<SocketAddr>>, - }, +pub trait Address { + /// Host name of the request + fn host(&self) -> &str; + + /// Port of the request + fn port(&self) -> u16; } -impl Connect { +impl Address for (String, u16) { + fn host(&self) -> &str { + &self.0 + } + + fn port(&self) -> u16 { + self.1 + } +} + +impl Address for (&'static str, u16) { + fn host(&self) -> &str { + self.0 + } + + fn port(&self) -> u16 { + self.1 + } +} + +/// Connect request +#[derive(Eq, PartialEq, Debug, Hash)] +pub struct Connect<T> { + pub(crate) req: T, + pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>, +} + +impl Connect<(&'static str, u16)> { /// Create new `Connect` instance. - pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect { - Connect::Host { - host: host.as_ref().to_owned(), - port, + pub fn new(host: &'static str, port: u16) -> Connect<(&'static str, u16)> { + Connect { + req: (host, port), + addr: None, + } + } +} + +impl Connect<(String, u16)> { + /// Create new `Connect` instance. + pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect<(String, u16)> { + Connect { + req: (host.as_ref().to_owned(), port), + addr: None, } } /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16 - pub fn with<T: AsRef<str>>(host: T) -> Result<Connect, ConnectError> { + pub fn with<T: AsRef<str>>(host: T) -> Result<Connect<(String, u16)>, ConnectError> { let mut parts_iter = host.as_ref().splitn(2, ':'); let host = parts_iter.next().ok_or(ConnectError::InvalidInput)?; let port_str = parts_iter.next().unwrap_or(""); let port = port_str .parse::<u16>() .map_err(|_| ConnectError::InvalidInput)?; - Ok(Connect::Host { - host: host.to_owned(), - port, + Ok(Connect { + req: (host.to_owned(), port), + addr: None, }) } +} +impl<T: Address> Connect<T> { /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages. - pub fn with_address<T: Into<String>>(host: T, addr: SocketAddr) -> Connect { - Connect::Addr { - addr: Either::Left(addr), - host: host.into(), + pub fn with_address(req: T, addr: SocketAddr) -> Connect<T> { + Connect { + req, + addr: Some(Either::Left(addr)), } } /// Host name - fn host(&self) -> &str { - match self { - Connect::Host { ref host, .. } => host, - Connect::Addr { ref host, .. } => host, - } + pub fn host(&self) -> &str { + self.req.host() + } + + /// Port of the request + pub fn port(&self) -> u16 { + self.req.port() } } -impl fmt::Display for Connect { +impl<T: Address> fmt::Display for Connect<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}:{}", self.host(), 0) + write!(f, "{}:{}", self.host(), self.port()) } } -pub struct Stream<T, P = ()> { - io: T, - host: String, - params: P, +pub struct Connection<T, U> { + io: U, + req: T, } -impl<T> Stream<T, ()> { - pub fn new(io: T, host: String) -> Self { - Self { - io, - host, - params: (), - } +impl<T, U> Connection<T, U> { + pub fn new(io: U, req: T) -> Self { + Self { io, req } } } -impl<T, P> Stream<T, P> { +impl<T, U> Connection<T, U> { /// Reconstruct from a parts. - pub fn from_parts(io: T, host: String, params: P) -> Self { - Self { io, params, host } + pub fn from_parts(io: U, req: T) -> Self { + Self { io, req } } /// Deconstruct into a parts. - pub fn into_parts(self) -> (T, String, P) { - (self.io, self.host, self.params) + pub fn into_parts(self) -> (U, T) { + (self.io, self.req) } /// Replace inclosed object, return new Stream and old object - pub fn replace<U>(self, io: U) -> (T, Stream<U, P>) { - ( - self.io, - Stream { - io, - host: self.host, - params: self.params, - }, - ) + pub fn replace<Y>(self, io: Y) -> (U, Connection<T, Y>) { + (self.io, Connection { io, req: self.req }) } /// Returns a shared reference to the underlying stream. - pub fn get_ref(&self) -> &T { + pub fn get_ref(&self) -> &U { &self.io } /// Returns a mutable reference to the underlying stream. - pub fn get_mut(&mut self) -> &mut T { + pub fn get_mut(&mut self) -> &mut U { &mut self.io } - - /// Get host name - pub fn host(&self) -> &str { - &self.host - } - - /// Return new Io object with new parameter. - pub fn set<U>(self, params: U) -> Stream<T, U> { - Stream { - io: self.io, - host: self.host, - params: params, - } - } - - /// Maps an Io<_, P> to Io<_, U> by applying a function to a contained value. - pub fn map<U, F>(self, op: F) -> Stream<T, U> - where - F: FnOnce(P) -> U, - { - Stream { - io: self.io, - host: self.host, - params: op(self.params), - } - } } -impl<T, P> std::ops::Deref for Stream<T, P> { - type Target = T; +impl<T: Address, U> Connection<T, U> { + /// Get request + pub fn host(&self) -> &str { + &self.req.host() + } +} - fn deref(&self) -> &T { +impl<T, U> std::ops::Deref for Connection<T, U> { + type Target = U; + + fn deref(&self) -> &U { &self.io } } -impl<T, P> std::ops::DerefMut for Stream<T, P> { - fn deref_mut(&mut self) -> &mut T { +impl<T, U> std::ops::DerefMut for Connection<T, U> { + fn deref_mut(&mut self) -> &mut U { &mut self.io } } -impl<T: fmt::Debug, P> fmt::Debug for Stream<T, P> { +impl<T, U: fmt::Debug> fmt::Debug for Connection<T, U> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Stream {{{:?}}}", self.io) } diff --git a/actix-connect/src/connector.rs b/actix-connect/src/connector.rs index ae8be957..f4e8837f 100644 --- a/actix-connect/src/connector.rs +++ b/actix-connect/src/connector.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::marker::PhantomData; use std::net::SocketAddr; use actix_service::{NewService, Service}; @@ -6,74 +7,99 @@ use futures::future::{err, ok, Either, FutureResult}; use futures::{Async, Future, Poll}; use tokio_tcp::{ConnectFuture, TcpStream}; -use super::connect::{Connect, Stream}; +use super::connect::{Address, Connect, Connection}; use super::error::ConnectError; /// Tcp connector service factory -#[derive(Copy, Clone, Debug)] -pub struct ConnectorFactory; +#[derive(Debug)] +pub struct ConnectorFactory<T>(PhantomData<T>); -impl NewService for ConnectorFactory { - type Request = Connect; - type Response = Stream<TcpStream>; +impl<T> ConnectorFactory<T> { + pub fn new() -> Self { + ConnectorFactory(PhantomData) + } +} + +impl<T> Clone for ConnectorFactory<T> { + fn clone(&self) -> Self { + ConnectorFactory(PhantomData) + } +} + +impl<T: Address> NewService for ConnectorFactory<T> { + type Request = Connect<T>; + type Response = Connection<T, TcpStream>; type Error = ConnectError; - type Service = Connector; + type Service = Connector<T>; type InitError = (); type Future = FutureResult<Self::Service, Self::InitError>; fn new_service(&self, _: &()) -> Self::Future { - ok(Connector) + ok(Connector(PhantomData)) } } /// Tcp connector service -#[derive(Copy, Clone, Debug)] -pub struct Connector; +#[derive(Debug)] +pub struct Connector<T>(PhantomData<T>); -impl Service for Connector { - type Request = Connect; - type Response = Stream<TcpStream>; +impl<T> Connector<T> { + pub fn new() -> Self { + Connector(PhantomData) + } +} + +impl<T> Clone for Connector<T> { + fn clone(&self) -> Self { + Connector(PhantomData) + } +} + +impl<T: Address> Service for Connector<T> { + type Request = Connect<T>; + type Response = Connection<T, TcpStream>; type Error = ConnectError; - type Future = Either<ConnectorResponse, FutureResult<Self::Response, Self::Error>>; + type Future = Either<ConnectorResponse<T>, FutureResult<Self::Response, Self::Error>>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, req: Connect) -> Self::Future { - match req { - Connect::Host { .. } => { - error!("TCP connector: got unresolved address"); - Either::B(err(ConnectError::Unresolverd)) - } - Connect::Addr { host, addr } => Either::A(ConnectorResponse::new(host, addr)), + fn call(&mut self, req: Connect<T>) -> Self::Future { + let Connect { req, addr } = req; + + if let Some(addr) = addr { + Either::A(ConnectorResponse::new(req, addr)) + } else { + error!("TCP connector: got unresolved address"); + Either::B(err(ConnectError::Unresolverd)) } } } #[doc(hidden)] /// Tcp stream connector response future -pub struct ConnectorResponse { - host: Option<String>, +pub struct ConnectorResponse<T> { + req: Option<T>, addrs: Option<VecDeque<SocketAddr>>, stream: Option<ConnectFuture>, } -impl ConnectorResponse { +impl<T: Address> ConnectorResponse<T> { pub fn new( - host: String, + req: T, addr: either::Either<SocketAddr, VecDeque<SocketAddr>>, - ) -> ConnectorResponse { - trace!("TCP connector - connecting to {:?}", host); + ) -> ConnectorResponse<T> { + trace!("TCP connector - connecting to {:?}", req.host()); match addr { either::Either::Left(addr) => ConnectorResponse { - host: Some(host), + req: Some(req), addrs: None, stream: Some(TcpStream::connect(&addr)), }, either::Either::Right(addrs) => ConnectorResponse { - host: Some(host), + req: Some(req), addrs: Some(addrs), stream: None, }, @@ -81,8 +107,8 @@ impl ConnectorResponse { } } -impl Future for ConnectorResponse { - type Item = Stream<TcpStream>; +impl<T: Address> Future for ConnectorResponse<T> { + type Item = Connection<T, TcpStream>; type Error = ConnectError; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { @@ -91,18 +117,19 @@ impl Future for ConnectorResponse { if let Some(new) = self.stream.as_mut() { match new.poll() { Ok(Async::Ready(sock)) => { - let host = self.host.take().unwrap(); + let req = self.req.take().unwrap(); trace!( "TCP connector - successfully connected to connecting to {:?} - {:?}", - host, sock.peer_addr() + req.host(), sock.peer_addr() ); - return Ok(Async::Ready(Stream::new(sock, host))); + return Ok(Async::Ready(Connection::new(sock, req))); } Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { trace!( - "TCP connector - failed to connect to connecting to {:?}", - self.host.as_ref().unwrap() + "TCP connector - failed to connect to connecting to {:?} port: {}", + self.req.as_ref().unwrap().host(), + self.req.as_ref().unwrap().port(), ); if self.addrs.as_ref().unwrap().is_empty() { return Err(err.into()); diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index 257f5994..542c68e6 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -16,7 +16,7 @@ pub mod ssl; pub use trust_dns_resolver::error::ResolveError; -pub use self::connect::{Connect, Stream}; +pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{Connector, ConnectorFactory}; pub use self::error::ConnectError; pub use self::resolver::{Resolver, ResolverFactory}; @@ -26,40 +26,40 @@ use tokio_tcp::TcpStream; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; /// Create tcp connector service -pub fn new_connector( +pub fn new_connector<T: Address>( cfg: ResolverConfig, opts: ResolverOpts, -) -> impl Service<Request = Connect, Response = Stream<TcpStream>, Error = ConnectError> + Clone -{ - Resolver::new(cfg, opts).and_then(Connector) +) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + + Clone { + Resolver::new(cfg, opts).and_then(Connector::new()) } /// Create tcp connector service -pub fn new_connector_factory( +pub fn new_connector_factory<T: Address>( cfg: ResolverConfig, opts: ResolverOpts, ) -> impl NewService< - Request = Connect, - Response = Stream<TcpStream>, + Request = Connect<T>, + Response = Connection<T, TcpStream>, Error = ConnectError, InitError = (), > + Clone { - ResolverFactory::new(cfg, opts).and_then(ConnectorFactory) + ResolverFactory::new(cfg, opts).and_then(ConnectorFactory::new()) } /// Create connector service with default parameters -pub fn default_connector( -) -> impl Service<Request = Connect, Response = Stream<TcpStream>, Error = ConnectError> + Clone -{ - Resolver::default().and_then(Connector) +pub fn default_connector<T: Address>( +) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + + Clone { + Resolver::default().and_then(Connector::new()) } /// Create connector service factory with default parameters -pub fn default_connector_factory() -> impl NewService< - Request = Connect, - Response = Stream<TcpStream>, +pub fn default_connector_factory<T: Address>() -> impl NewService< + Request = Connect<T>, + Response = Connection<T, TcpStream>, Error = ConnectError, InitError = (), > + Clone { - ResolverFactory::default().and_then(ConnectorFactory) + ResolverFactory::default().and_then(ConnectorFactory::new()) } diff --git a/actix-connect/src/resolver.rs b/actix-connect/src/resolver.rs index 5dc750b9..0be30ed1 100644 --- a/actix-connect/src/resolver.rs +++ b/actix-connect/src/resolver.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::marker::PhantomData; use std::net::SocketAddr; use actix_service::{NewService, Service}; @@ -9,15 +10,16 @@ use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::system_conf::read_system_conf; use trust_dns_resolver::{AsyncResolver, Background}; -use crate::connect::Connect; +use crate::connect::{Address, Connect}; use crate::error::ConnectError; /// DNS Resolver Service factory -pub struct ResolverFactory { +pub struct ResolverFactory<T> { resolver: AsyncResolver, + _t: PhantomData<T>, } -impl Default for ResolverFactory { +impl<T> Default for ResolverFactory<T> { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -29,44 +31,50 @@ impl Default for ResolverFactory { } } -impl ResolverFactory { +impl<T> ResolverFactory<T> { /// Create new resolver instance with custom configuration and options. pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { let (resolver, bg) = AsyncResolver::new(cfg, opts); tokio_current_thread::spawn(bg); - ResolverFactory { resolver } - } -} - -impl Clone for ResolverFactory { - fn clone(&self) -> Self { ResolverFactory { - resolver: self.resolver.clone(), + resolver, + _t: PhantomData, } } } -impl NewService for ResolverFactory { - type Request = Connect; - type Response = Connect; +impl<T> Clone for ResolverFactory<T> { + fn clone(&self) -> Self { + ResolverFactory { + resolver: self.resolver.clone(), + _t: PhantomData, + } + } +} + +impl<T: Address> NewService for ResolverFactory<T> { + type Request = Connect<T>; + type Response = Connect<T>; type Error = ConnectError; - type Service = Resolver; + type Service = Resolver<T>; type InitError = (); type Future = FutureResult<Self::Service, Self::InitError>; fn new_service(&self, _: &()) -> Self::Future { ok(Resolver { resolver: self.resolver.clone(), + _t: PhantomData, }) } } /// DNS Resolver Service -pub struct Resolver { +pub struct Resolver<T> { resolver: AsyncResolver, + _t: PhantomData<T>, } -impl Default for Resolver { +impl<T> Default for Resolver<T> { fn default() -> Self { let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() { (cfg, opts) @@ -78,100 +86,99 @@ impl Default for Resolver { } } -impl Resolver { +impl<T> Resolver<T> { /// Create new resolver instance with custom configuration and options. pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self { let (resolver, bg) = AsyncResolver::new(cfg, opts); tokio_current_thread::spawn(bg); - Resolver { resolver } - } -} - -impl Clone for Resolver { - fn clone(&self) -> Self { Resolver { - resolver: self.resolver.clone(), + resolver, + _t: PhantomData, } } } -impl Service for Resolver { - type Request = Connect; - type Response = Connect; +impl<T> Clone for Resolver<T> { + fn clone(&self) -> Self { + Resolver { + resolver: self.resolver.clone(), + _t: PhantomData, + } + } +} + +impl<T: Address> Service for Resolver<T> { + type Request = Connect<T>; + type Response = Connect<T>; type Error = ConnectError; - type Future = Either<ResolverFuture, FutureResult<Connect, Self::Error>>; + type Future = Either<ResolverFuture<T>, FutureResult<Connect<T>, Self::Error>>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, req: Connect) -> Self::Future { - match req { - Connect::Host { host, port } => { - if let Ok(ip) = host.parse() { - Either::B(ok(Connect::Addr { - host: host, - addr: either::Either::Left(SocketAddr::new(ip, port)), - })) - } else { - trace!("DNS resolver: resolving host {:?}", host); - Either::A(ResolverFuture::new(host, port, &self.resolver)) - } + fn call(&mut self, mut req: Connect<T>) -> Self::Future { + if req.addr.is_some() { + Either::B(ok(req)) + } else { + if let Ok(ip) = req.host().parse() { + req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); + Either::B(ok(req)) + } else { + trace!("DNS resolver: resolving host {:?}", req.host()); + Either::A(ResolverFuture::new(req, &self.resolver)) } - other => Either::B(ok(other)), } } } #[doc(hidden)] /// Resolver future -pub struct ResolverFuture { - host: Option<String>, - port: u16, +pub struct ResolverFuture<T: Address> { + req: Option<Connect<T>>, lookup: Option<Background<LookupIpFuture>>, } -impl ResolverFuture { - pub fn new(host: String, port: u16, resolver: &AsyncResolver) -> Self { +impl<T: Address> ResolverFuture<T> { + pub fn new(req: Connect<T>, resolver: &AsyncResolver) -> Self { ResolverFuture { - lookup: Some(resolver.lookup_ip(host.as_str())), - host: Some(host), - port, + lookup: Some(resolver.lookup_ip(req.host())), + req: Some(req), } } } -impl Future for ResolverFuture { - type Item = Connect; +impl<T: Address> Future for ResolverFuture<T> { + type Item = Connect<T>; type Error = ConnectError; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { match self.lookup.as_mut().unwrap().poll().map_err(|e| { trace!( "DNS resolver: failed to resolve host {:?} err: {}", - self.host.as_ref().unwrap(), + self.req.as_ref().unwrap().host(), e ); e })? { Async::NotReady => Ok(Async::NotReady), Async::Ready(ips) => { - let host = self.host.take().unwrap(); + let mut req = self.req.take().unwrap(); let mut addrs: VecDeque<_> = ips .iter() - .map(|ip| SocketAddr::new(ip, self.port)) + .map(|ip| SocketAddr::new(ip, req.port())) .collect(); - trace!("DNS resolver: host {:?} resolved to {:?}", host, addrs); + trace!( + "DNS resolver: host {:?} resolved to {:?}", + req.host(), + addrs + ); if addrs.len() == 1 { - Ok(Async::Ready(Connect::Addr { - addr: either::Either::Left(addrs.pop_front().unwrap()), - host, - })) + req.addr = Some(either::Either::Left(addrs.pop_front().unwrap())); + Ok(Async::Ready(req)) } else { - Ok(Async::Ready(Connect::Addr { - addr: either::Either::Right(addrs), - host, - })) + req.addr = Some(either::Either::Right(addrs)); + Ok(Async::Ready(req)) } } } diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index 59ccdb0f..daaf05a7 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -7,15 +7,15 @@ use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use openssl::ssl::{HandshakeError, SslConnector}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; -use crate::Stream; +use crate::{Address, Connection}; /// Openssl connector factory -pub struct OpensslConnector<T, P, E> { +pub struct OpensslConnector<T, U, E> { connector: SslConnector, - _t: PhantomData<(T, P, E)>, + _t: PhantomData<(T, U, E)>, } -impl<T, P, E> OpensslConnector<T, P, E> { +impl<T, U, E> OpensslConnector<T, U, E> { pub fn new(connector: SslConnector) -> Self { OpensslConnector { connector, @@ -24,13 +24,17 @@ impl<T, P, E> OpensslConnector<T, P, E> { } } -impl<T: AsyncRead + AsyncWrite + fmt::Debug, P> OpensslConnector<T, P, ()> { +impl<T, U, E> OpensslConnector<T, U, E> +where + T: Address, + U: AsyncRead + AsyncWrite + fmt::Debug, +{ pub fn service( connector: SslConnector, ) -> impl Service< - Request = Stream<T, P>, - Response = Stream<SslStream<T>, P>, - Error = HandshakeError<T>, + Request = Connection<T, U>, + Response = Connection<T, SslStream<U>>, + Error = HandshakeError<U>, > { OpensslConnectorService { connector: connector, @@ -39,7 +43,7 @@ impl<T: AsyncRead + AsyncWrite + fmt::Debug, P> OpensslConnector<T, P, ()> { } } -impl<T, P, E> Clone for OpensslConnector<T, P, E> { +impl<T, U, E> Clone for OpensslConnector<T, U, E> { fn clone(&self) -> Self { Self { connector: self.connector.clone(), @@ -48,14 +52,14 @@ impl<T, P, E> Clone for OpensslConnector<T, P, E> { } } -impl<T, P, E> NewService<()> for OpensslConnector<T, P, E> +impl<T: Address, U, E> NewService<()> for OpensslConnector<T, U, E> where - T: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + fmt::Debug, { - type Request = Stream<T, P>; - type Response = Stream<SslStream<T>, P>; - type Error = HandshakeError<T>; - type Service = OpensslConnectorService<T, P>; + type Request = Connection<T, U>; + type Response = Connection<T, SslStream<U>>; + type Error = HandshakeError<U>; + type Service = OpensslConnectorService<T, U>; type InitError = E; type Future = FutureResult<Self::Service, Self::InitError>; @@ -67,25 +71,25 @@ where } } -pub struct OpensslConnectorService<T, P> { +pub struct OpensslConnectorService<T, U> { connector: SslConnector, - _t: PhantomData<(T, P)>, + _t: PhantomData<(T, U)>, } -impl<T, P> Service for OpensslConnectorService<T, P> +impl<T: Address, U> Service for OpensslConnectorService<T, U> where - T: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + fmt::Debug, { - type Request = Stream<T, P>; - type Response = Stream<SslStream<T>, P>; - type Error = HandshakeError<T>; - type Future = ConnectAsyncExt<T, P>; + type Request = Connection<T, U>; + type Response = Connection<T, SslStream<U>>; + type Error = HandshakeError<U>; + type Future = ConnectAsyncExt<T, U>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } - fn call(&mut self, stream: Stream<T, P>) -> Self::Future { + fn call(&mut self, stream: Connection<T, U>) -> Self::Future { trace!("SSL Handshake start for: {:?}", stream.host()); let (io, stream) = stream.replace(()); ConnectAsyncExt { @@ -95,17 +99,17 @@ where } } -pub struct ConnectAsyncExt<T, P> { - fut: ConnectAsync<T>, - stream: Option<Stream<(), P>>, +pub struct ConnectAsyncExt<T, U> { + fut: ConnectAsync<U>, + stream: Option<Connection<T, ()>>, } -impl<T, P> Future for ConnectAsyncExt<T, P> +impl<T: Address, U> Future for ConnectAsyncExt<T, U> where - T: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + fmt::Debug, { - type Item = Stream<SslStream<T>, P>; - type Error = HandshakeError<T>; + type Item = Connection<T, SslStream<U>>; + type Error = HandshakeError<U>; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { match self.fut.poll().map_err(|e| {