diff --git a/actix-connect/CHANGES.md b/actix-connect/CHANGES.md index a97c59c8..07c73b71 100644 --- a/actix-connect/CHANGES.md +++ b/actix-connect/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.2.3] - 2019-08-05 + +* Add `ConnectService` and `OpensslConnectService` + ## [0.2.2] - 2019-07-24 * Add `rustls` support diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index d7eab1ca..b2dca77c 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-connect" -version = "0.2.2" +version = "0.2.3" authors = ["Nikolay Kim "] description = "Actix Connector - tcp connector service" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-connect/src/connector.rs b/actix-connect/src/connector.rs index 9bae8a7f..ba9bf50c 100644 --- a/actix-connect/src/connector.rs +++ b/actix-connect/src/connector.rs @@ -18,6 +18,17 @@ impl TcpConnectorFactory { pub fn new() -> Self { TcpConnectorFactory(PhantomData) } + + /// Create tcp connector service + pub fn service(&self) -> TcpConnector { + TcpConnector(PhantomData) + } +} + +impl Default for TcpConnectorFactory { + fn default() -> Self { + TcpConnectorFactory(PhantomData) + } } impl Clone for TcpConnectorFactory { @@ -36,7 +47,7 @@ impl NewService for TcpConnectorFactory { type Future = FutureResult; fn new_service(&self, _: &()) -> Self::Future { - ok(TcpConnector(PhantomData)) + ok(self.service()) } } diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index b538fdfc..bf054f71 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -16,6 +16,7 @@ mod connect; mod connector; mod error; mod resolver; +mod service; pub mod ssl; #[cfg(feature = "uri")] @@ -29,6 +30,7 @@ pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{TcpConnector, TcpConnectorFactory}; pub use self::error::ConnectError; pub use self::resolver::{Resolver, ResolverFactory}; +pub use self::service::{ConnectService, ConnectServiceFactory}; use actix_service::{NewService, Service, ServiceExt}; use tokio_tcp::TcpStream; diff --git a/actix-connect/src/resolver.rs b/actix-connect/src/resolver.rs index 957bedc1..1a76a618 100644 --- a/actix-connect/src/resolver.rs +++ b/actix-connect/src/resolver.rs @@ -25,6 +25,13 @@ impl ResolverFactory { _t: PhantomData, } } + + pub fn service(&self) -> Resolver { + Resolver { + resolver: self.resolver.clone(), + _t: PhantomData, + } + } } impl Default for ResolverFactory { @@ -55,10 +62,7 @@ impl NewService for ResolverFactory { type Future = FutureResult; fn new_service(&self, _: &()) -> Self::Future { - ok(Resolver { - resolver: self.resolver.clone(), - _t: PhantomData, - }) + ok(self.service()) } } diff --git a/actix-connect/src/service.rs b/actix-connect/src/service.rs new file mode 100644 index 00000000..55fbc78a --- /dev/null +++ b/actix-connect/src/service.rs @@ -0,0 +1,123 @@ +use actix_service::{NewService, Service}; +use futures::future::{ok, FutureResult}; +use futures::{try_ready, Async, Future, Poll}; +use tokio_tcp::TcpStream; +use trust_dns_resolver::AsyncResolver; + +use crate::connect::{Address, Connect, Connection}; +use crate::connector::{TcpConnector, TcpConnectorFactory}; +use crate::error::ConnectError; +use crate::resolver::{Resolver, ResolverFactory}; + +pub struct ConnectServiceFactory { + tcp: TcpConnectorFactory, + resolver: ResolverFactory, +} + +impl ConnectServiceFactory { + /// Construct new ConnectService factory + pub fn new() -> Self { + ConnectServiceFactory { + tcp: TcpConnectorFactory::default(), + resolver: ResolverFactory::default(), + } + } + + /// Construct new connect service with custom dns resolver + pub fn with_resolver(resolver: AsyncResolver) -> Self { + ConnectServiceFactory { + tcp: TcpConnectorFactory::default(), + resolver: ResolverFactory::new(resolver), + } + } + + /// Construct new service + pub fn service(&self) -> ConnectService { + ConnectService { + tcp: self.tcp.service(), + resolver: self.resolver.service(), + } + } +} + +impl Default for ConnectServiceFactory { + fn default() -> Self { + ConnectServiceFactory { + tcp: TcpConnectorFactory::default(), + resolver: ResolverFactory::default(), + } + } +} + +impl Clone for ConnectServiceFactory { + fn clone(&self) -> Self { + ConnectServiceFactory { + tcp: self.tcp.clone(), + resolver: self.resolver.clone(), + } + } +} + +impl NewService for ConnectServiceFactory { + type Request = Connect; + type Response = Connection; + type Error = ConnectError; + type Config = (); + type Service = ConnectService; + type InitError = (); + type Future = FutureResult; + + fn new_service(&self, _: &()) -> Self::Future { + ok(self.service()) + } +} + +#[derive(Clone)] +pub struct ConnectService { + tcp: TcpConnector, + resolver: Resolver, +} + +impl Service for ConnectService { + type Request = Connect; + type Response = Connection; + type Error = ConnectError; + type Future = ConnectServiceResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Connect) -> Self::Future { + ConnectServiceResponse { + fut1: Some(self.resolver.call(req)), + fut2: None, + tcp: self.tcp.clone(), + } + } +} + +pub struct ConnectServiceResponse { + fut1: Option< as Service>::Future>, + fut2: Option< as Service>::Future>, + tcp: TcpConnector, +} + +impl Future for ConnectServiceResponse { + type Item = Connection; + type Error = ConnectError; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut1 { + let res = try_ready!(fut.poll()); + let _ = self.fut1.take(); + self.fut2 = Some(self.tcp.call(res)); + } + + if let Some(ref mut fut) = self.fut2 { + return fut.poll(); + } + + Ok(Async::NotReady) + } +} diff --git a/actix-connect/src/ssl/mod.rs b/actix-connect/src/ssl/mod.rs index 2110d01d..6c02848c 100644 --- a/actix-connect/src/ssl/mod.rs +++ b/actix-connect/src/ssl/mod.rs @@ -3,7 +3,9 @@ #[cfg(feature = "ssl")] mod openssl; #[cfg(feature = "ssl")] -pub use self::openssl::OpensslConnector; +pub use self::openssl::{ + OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector, +}; #[cfg(feature = "rust-tls")] mod rustls; #[cfg(feature = "rust-tls")] diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index 89504db3..ff1f846d 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -1,13 +1,17 @@ -use std::fmt; use std::marker::PhantomData; +use std::{fmt, io}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{NewService, Service}; -use futures::{future::ok, future::FutureResult, Async, Future, Poll}; +use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll}; use openssl::ssl::{HandshakeError, SslConnector}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; +use tokio_tcp::TcpStream; +use trust_dns_resolver::AsyncResolver; -use crate::{Address, Connection}; +use crate::{ + Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection, +}; /// Openssl connector factory pub struct OpensslConnector { @@ -77,6 +81,15 @@ pub struct OpensslConnectorService { _t: PhantomData<(T, U)>, } +impl Clone for OpensslConnectorService { + fn clone(&self) -> Self { + Self { + connector: self.connector.clone(), + _t: PhantomData, + } + } +} + impl Service for OpensslConnectorService where U: AsyncRead + AsyncWrite + fmt::Debug, @@ -126,3 +139,113 @@ where } } } + +pub struct OpensslConnectServiceFactory { + tcp: ConnectServiceFactory, + openssl: OpensslConnector, +} + +impl OpensslConnectServiceFactory { + /// Construct new OpensslConnectService factory + pub fn new(connector: SslConnector) -> Self { + OpensslConnectServiceFactory { + tcp: ConnectServiceFactory::default(), + openssl: OpensslConnector::new(connector), + } + } + + /// Construct new connect service with custom dns resolver + pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self { + OpensslConnectServiceFactory { + tcp: ConnectServiceFactory::with_resolver(resolver), + openssl: OpensslConnector::new(connector), + } + } + + /// Construct openssl connect service + pub fn service(&self) -> OpensslConnectService { + OpensslConnectService { + tcp: self.tcp.service(), + openssl: OpensslConnectorService { + connector: self.openssl.connector.clone(), + _t: PhantomData, + }, + } + } +} + +impl Clone for OpensslConnectServiceFactory { + fn clone(&self) -> Self { + OpensslConnectServiceFactory { + tcp: self.tcp.clone(), + openssl: self.openssl.clone(), + } + } +} + +impl NewService for OpensslConnectServiceFactory { + type Request = Connect; + type Response = SslStream; + type Error = ConnectError; + type Config = (); + type Service = OpensslConnectService; + type InitError = (); + type Future = FutureResult; + + fn new_service(&self, _: &()) -> Self::Future { + ok(self.service()) + } +} + +#[derive(Clone)] +pub struct OpensslConnectService { + tcp: ConnectService, + openssl: OpensslConnectorService, +} + +impl Service for OpensslConnectService { + type Request = Connect; + type Response = SslStream; + type Error = ConnectError; + type Future = OpensslConnectServiceResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Connect) -> Self::Future { + OpensslConnectServiceResponse { + fut1: Some(self.tcp.call(req)), + fut2: None, + openssl: self.openssl.clone(), + } + } +} + +pub struct OpensslConnectServiceResponse { + fut1: Option< as Service>::Future>, + fut2: Option< as Service>::Future>, + openssl: OpensslConnectorService, +} + +impl Future for OpensslConnectServiceResponse { + type Item = SslStream; + type Error = ConnectError; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut1 { + let res = try_ready!(fut.poll()); + let _ = self.fut1.take(); + self.fut2 = Some(self.openssl.call(res)); + } + + if let Some(ref mut fut) = self.fut2 { + let connect = try_ready!(fut + .poll() + .map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e)))); + Ok(Async::Ready(connect.into_parts().0)) + } else { + Ok(Async::NotReady) + } + } +}