diff --git a/Cargo.toml b/Cargo.toml index 533fbc17..342f866a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,3 +27,7 @@ actix-tracing = { path = "actix-tracing" } actix-utils = { path = "actix-utils" } actix-router = { path = "router" } bytestring = { path = "string" } + +# FIXME: remove override +trust-dns-proto = { git = "https://github.com/bluejekyll/trust-dns.git", branch = "main" } +trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns.git", branch = "main" } \ No newline at end of file diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 5998bae7..3a633d62 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] derive_more = "0.99.2" -futures-channel = "0.3.1" +futures-channel = "0.3.7" parking_lot = "0.11" lazy_static = "1.3" log = "0.4" diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index af0b4238..1c24e0eb 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -12,7 +12,7 @@ license = "MIT OR Apache-2.0" edition = "2018" [package.metadata.docs.rs] -features = ["openssl", "rustls", "native-tls", "accept", "connect", "http"] +features = ["openssl", "rustls", "native-tls", "accept", "connect", "uri"] [lib] name = "actix_tls" @@ -23,13 +23,13 @@ name = "basic" required-features = ["accept", "rustls"] [features] -default = ["accept", "connect", "http"] +default = ["accept", "connect", "uri"] # enable acceptor services accept = [] # enable connector services -connect = [] +connect = ["trust-dns-proto/tokio-runtime", "trust-dns-resolver/tokio-runtime", "trust-dns-resolver/system-config"] # use openssl impls openssl = ["tls-openssl", "tokio-openssl"] @@ -40,6 +40,9 @@ rustls = ["tls-rustls", "webpki", "webpki-roots", "tokio-rustls"] # use native-tls impls native-tls = ["tls-native-tls", "tokio-native-tls"] +# support http::Uri as connect address +uri = ["http"] + [dependencies] actix-codec = "0.4.0-beta.1" actix-rt = "2.0.0-beta.1" @@ -49,10 +52,12 @@ actix-utils = "3.0.0-beta.1" derive_more = "0.99.5" either = "1.6" futures-util = { version = "0.3.7", default-features = false } -http = { version = "0.2.0", optional = true } +http = { version = "0.2.2", optional = true } log = "0.4" -trust-dns-proto = { version = "0.19", default-features = false, features = ["tokio-runtime"] } -trust-dns-resolver = { version = "0.19", default-features = false, features = ["tokio-runtime", "system-config"] } + +# resolver +trust-dns-proto = { version = "0.20.0-alpha.3", default-features = false, optional = true } +trust-dns-resolver = { version = "0.20.0-alpha.3", default-features = false, optional = true } # openssl tls-openssl = { package = "openssl", version = "0.10", optional = true } @@ -74,5 +79,6 @@ tokio-native-tls = { version = "0.3", optional = true } actix-server = "2.0.0-beta.1" actix-testing = "2.0.0-beta.1" bytes = "1" -log = "0.4" env_logger = "0.8" +futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } +log = "0.4" diff --git a/actix-tls/src/accept/nativetls.rs b/actix-tls/src/accept/nativetls.rs index 12d23855..5d80ce8b 100644 --- a/actix-tls/src/accept/nativetls.rs +++ b/actix-tls/src/accept/nativetls.rs @@ -1,10 +1,9 @@ -use std::marker::PhantomData; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::Counter; -use futures_util::future::{self, FutureExt, LocalBoxFuture, TryFutureExt}; +use futures_util::future::{ready, LocalBoxFuture, Ready}; pub use native_tls::Error; pub use tokio_native_tls::{TlsAcceptor, TlsStream}; @@ -14,75 +13,64 @@ use super::MAX_CONN_COUNTER; /// Accept TLS connections via `native-tls` package. /// /// `native-tls` feature enables this `Acceptor` type. -pub struct Acceptor { +pub struct Acceptor { acceptor: TlsAcceptor, - io: PhantomData, } -impl Acceptor -where - T: AsyncRead + AsyncWrite + Unpin, -{ +impl Acceptor { /// Create `native-tls` based `Acceptor` service factory. #[inline] pub fn new(acceptor: TlsAcceptor) -> Self { - Acceptor { - acceptor, - io: PhantomData, - } + Acceptor { acceptor } } } -impl Clone for Acceptor { +impl Clone for Acceptor { #[inline] fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), - io: PhantomData, } } } -impl ServiceFactory for Acceptor +impl ServiceFactory for Acceptor where T: AsyncRead + AsyncWrite + Unpin + 'static, { type Response = TlsStream; type Error = Error; - type Service = NativeTlsAcceptorService; - type Config = (); + + type Service = NativeTlsAcceptorService; type InitError = (); - type Future = future::Ready>; + type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { MAX_CONN_COUNTER.with(|conns| { - future::ok(NativeTlsAcceptorService { + ready(Ok(NativeTlsAcceptorService { acceptor: self.acceptor.clone(), conns: conns.clone(), - io: PhantomData, - }) + })) }) } } -pub struct NativeTlsAcceptorService { +pub struct NativeTlsAcceptorService { acceptor: TlsAcceptor, - io: PhantomData, conns: Counter, } -impl Clone for NativeTlsAcceptorService { +impl Clone for NativeTlsAcceptorService { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), - io: PhantomData, conns: self.conns.clone(), } } } -impl Service for NativeTlsAcceptorService +impl Service for NativeTlsAcceptorService where T: AsyncRead + AsyncWrite + Unpin + 'static, { @@ -101,12 +89,10 @@ where fn call(&mut self, io: T) -> Self::Future { let guard = self.conns.get(); let this = self.clone(); - async move { this.acceptor.accept(io).await } - .map_ok(move |io| { - // Required to preserve `CounterGuard` until `Self::Future` is completely resolved. - let _ = guard; - io - }) - .boxed_local() + Box::pin(async move { + let io = this.acceptor.accept(io).await; + drop(guard); + io + }) } } diff --git a/actix-tls/src/accept/openssl.rs b/actix-tls/src/accept/openssl.rs index 44877b24..e51808a3 100644 --- a/actix-tls/src/accept/openssl.rs +++ b/actix-tls/src/accept/openssl.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -7,7 +6,7 @@ use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::{Counter, CounterGuard}; use futures_util::{ - future::{ok, Ready}, + future::{ready, Ready}, ready, }; @@ -21,61 +20,54 @@ use super::MAX_CONN_COUNTER; /// Accept TLS connections via `openssl` package. /// /// `openssl` feature enables this `Acceptor` type. -pub struct Acceptor { +pub struct Acceptor { acceptor: SslAcceptor, - io: PhantomData, } -impl Acceptor { +impl Acceptor { /// Create OpenSSL based `Acceptor` service factory. #[inline] pub fn new(acceptor: SslAcceptor) -> Self { - Acceptor { - acceptor, - io: PhantomData, - } + Acceptor { acceptor } } } -impl Clone for Acceptor { +impl Clone for Acceptor { #[inline] fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), - io: PhantomData, } } } -impl ServiceFactory for Acceptor +impl ServiceFactory for Acceptor where T: AsyncRead + AsyncWrite + Unpin + 'static, { type Response = SslStream; type Error = SslError; type Config = (); - type Service = AcceptorService; + type Service = AcceptorService; type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { MAX_CONN_COUNTER.with(|conns| { - ok(AcceptorService { + ready(Ok(AcceptorService { acceptor: self.acceptor.clone(), conns: conns.clone(), - io: PhantomData, - }) + })) }) } } -pub struct AcceptorService { +pub struct AcceptorService { acceptor: SslAcceptor, conns: Counter, - io: PhantomData, } -impl Service for AcceptorService +impl Service for AcceptorService where T: AsyncRead + AsyncWrite + Unpin + 'static, { diff --git a/actix-tls/src/accept/rustls.rs b/actix-tls/src/accept/rustls.rs index 1c40757e..a6686f44 100644 --- a/actix-tls/src/accept/rustls.rs +++ b/actix-tls/src/accept/rustls.rs @@ -1,6 +1,5 @@ use std::future::Future; use std::io; -use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -8,7 +7,7 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::{Counter, CounterGuard}; -use futures_util::future::{ok, Ready}; +use futures_util::future::{ready, Ready}; use tokio_rustls::{Accept, TlsAcceptor}; pub use rustls::{ServerConfig, Session}; @@ -20,66 +19,58 @@ use super::MAX_CONN_COUNTER; /// Accept TLS connections via `rustls` package. /// /// `rustls` feature enables this `Acceptor` type. -pub struct Acceptor { +pub struct Acceptor { config: Arc, - io: PhantomData, } -impl Acceptor -where - T: AsyncRead + AsyncWrite, -{ +impl Acceptor { /// Create Rustls based `Acceptor` service factory. #[inline] pub fn new(config: ServerConfig) -> Self { Acceptor { config: Arc::new(config), - io: PhantomData, } } } -impl Clone for Acceptor { +impl Clone for Acceptor { #[inline] fn clone(&self) -> Self { Self { config: self.config.clone(), - io: PhantomData, } } } -impl ServiceFactory for Acceptor +impl ServiceFactory for Acceptor where T: AsyncRead + AsyncWrite + Unpin, { type Response = TlsStream; type Error = io::Error; - type Service = AcceptorService; - type Config = (); + + type Service = AcceptorService; type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { MAX_CONN_COUNTER.with(|conns| { - ok(AcceptorService { + ready(Ok(AcceptorService { acceptor: self.config.clone().into(), conns: conns.clone(), - io: PhantomData, - }) + })) }) } } /// Rustls based `Acceptor` service -pub struct AcceptorService { +pub struct AcceptorService { acceptor: TlsAcceptor, - io: PhantomData, conns: Counter, } -impl Service for AcceptorService +impl Service for AcceptorService where T: AsyncRead + AsyncWrite + Unpin, { @@ -119,11 +110,6 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - - let res = futures_util::ready!(Pin::new(&mut this.fut).poll(cx)); - match res { - Ok(io) => Poll::Ready(Ok(io)), - Err(e) => Poll::Ready(Err(e)), - } + Pin::new(&mut this.fut).poll(cx) } } diff --git a/actix-tls/src/connect/connector.rs b/actix-tls/src/connect/connector.rs index c7a31e0a..a0a6b8b5 100644 --- a/actix-tls/src/connect/connector.rs +++ b/actix-tls/src/connect/connector.rs @@ -8,7 +8,7 @@ use std::task::{Context, Poll}; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{err, ok, BoxFuture, Either, FutureExt, Ready}; +use futures_util::future::{ready, Ready}; use log::{error, trace}; use super::connect::{Address, Connect, Connection}; @@ -50,7 +50,7 @@ impl ServiceFactory> for TcpConnectorFactory { type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { - ok(self.service()) + ready(Ok(self.service())) } } @@ -73,8 +73,7 @@ impl Clone for TcpConnector { impl Service> for TcpConnector { type Response = Connection; type Error = ConnectError; - #[allow(clippy::type_complexity)] - type Future = Either, Ready>>; + type Future = TcpConnectorResponse; actix_service::always_ready!(); @@ -83,21 +82,26 @@ impl Service> for TcpConnector { let Connect { req, addr, .. } = req; if let Some(addr) = addr { - Either::Left(TcpConnectorResponse::new(req, port, addr)) + TcpConnectorResponse::new(req, port, addr) } else { error!("TCP connector: got unresolved address"); - Either::Right(err(ConnectError::Unresolved)) + TcpConnectorResponse::Error(Some(ConnectError::Unresolved)) } } } +type LocalBoxFuture<'a, T> = Pin + 'a>>; + #[doc(hidden)] /// TCP stream connector response future -pub struct TcpConnectorResponse { - req: Option, - port: u16, - addrs: Option>, - stream: Option>>, +pub enum TcpConnectorResponse { + Response { + req: Option, + port: u16, + addrs: Option>, + stream: Option>>, + }, + Error(Option), } impl TcpConnectorResponse { @@ -113,13 +117,13 @@ impl TcpConnectorResponse { ); match addr { - either::Either::Left(addr) => TcpConnectorResponse { + either::Either::Left(addr) => TcpConnectorResponse::Response { req: Some(req), port, addrs: None, - stream: Some(TcpStream::connect(addr).boxed()), + stream: Some(Box::pin(TcpStream::connect(addr))), }, - either::Either::Right(addrs) => TcpConnectorResponse { + either::Either::Right(addrs) => TcpConnectorResponse::Response { req: Some(req), port, addrs: Some(addrs), @@ -134,36 +138,43 @@ impl Future for TcpConnectorResponse { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - - // connect - loop { - if let Some(new) = this.stream.as_mut() { - match new.as_mut().poll(cx) { - Poll::Ready(Ok(sock)) => { - let req = this.req.take().unwrap(); - trace!( - "TCP connector - successfully connected to connecting to {:?} - {:?}", - req.host(), sock.peer_addr() - ); - return Poll::Ready(Ok(Connection::new(sock, req))); - } - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(err)) => { - trace!( - "TCP connector - failed to connect to connecting to {:?} port: {}", - this.req.as_ref().unwrap().host(), - this.port, - ); - if this.addrs.is_none() || this.addrs.as_ref().unwrap().is_empty() { - return Poll::Ready(Err(err.into())); + match this { + TcpConnectorResponse::Error(e) => Poll::Ready(Err(e.take().unwrap())), + // connect + TcpConnectorResponse::Response { + req, + port, + addrs, + stream, + } => loop { + if let Some(new) = stream.as_mut() { + match new.as_mut().poll(cx) { + Poll::Ready(Ok(sock)) => { + let req = req.take().unwrap(); + trace!( + "TCP connector - successfully connected to connecting to {:?} - {:?}", + req.host(), sock.peer_addr() + ); + return Poll::Ready(Ok(Connection::new(sock, req))); + } + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => { + trace!( + "TCP connector - failed to connect to connecting to {:?} port: {}", + req.as_ref().unwrap().host(), + port, + ); + if addrs.is_none() || addrs.as_ref().unwrap().is_empty() { + return Poll::Ready(Err(err.into())); + } } } } - } - // try to connect - let addr = this.addrs.as_mut().unwrap().pop_front().unwrap(); - this.stream = Some(TcpStream::connect(addr).boxed()); + // try to connect + let addr = addrs.as_mut().unwrap().pop_front().unwrap(); + *stream = Some(Box::pin(TcpStream::connect(addr))); + }, } } } diff --git a/actix-tls/src/connect/mod.rs b/actix-tls/src/connect/mod.rs index 753cfc33..75312c59 100644 --- a/actix-tls/src/connect/mod.rs +++ b/actix-tls/src/connect/mod.rs @@ -11,6 +11,7 @@ mod error; mod resolve; mod service; pub mod ssl; +#[cfg(feature = "uri")] mod uri; use actix_rt::{net::TcpStream, Arbiter}; @@ -35,7 +36,7 @@ pub async fn start_resolver( cfg: ResolverConfig, opts: ResolverOpts, ) -> Result { - Ok(AsyncResolver::tokio(cfg, opts).await?) + Ok(AsyncResolver::tokio(cfg, opts)?) } struct DefaultResolver(AsyncResolver); @@ -52,7 +53,7 @@ pub(crate) async fn get_default_resolver() -> Result { +pub struct OpensslConnector { connector: SslConnector, - _t: PhantomData<(T, U)>, } -impl OpensslConnector { +impl OpensslConnector { pub fn new(connector: SslConnector) -> Self { - OpensslConnector { - connector, - _t: PhantomData, - } + OpensslConnector { connector } } } -impl OpensslConnector -where - T: Address + 'static, - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, -{ - pub fn service(connector: SslConnector) -> OpensslConnectorService { - OpensslConnectorService { - connector, - _t: PhantomData, - } +impl OpensslConnector { + pub fn service(connector: SslConnector) -> OpensslConnectorService { + OpensslConnectorService { connector } } } -impl Clone for OpensslConnector { +impl Clone for OpensslConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl ServiceFactory> for OpensslConnector +impl ServiceFactory> for OpensslConnector where T: Address + 'static, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, @@ -65,33 +53,30 @@ where type Response = Connection>; type Error = io::Error; type Config = (); - type Service = OpensslConnectorService; + type Service = OpensslConnectorService; type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { - ok(OpensslConnectorService { + ready(Ok(OpensslConnectorService { connector: self.connector.clone(), - _t: PhantomData, - }) + })) } } -pub struct OpensslConnectorService { +pub struct OpensslConnectorService { connector: SslConnector, - _t: PhantomData<(T, U)>, } -impl Clone for OpensslConnectorService { +impl Clone for OpensslConnectorService { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl Service> for OpensslConnectorService +impl Service> for OpensslConnectorService where T: Address + 'static, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, @@ -109,7 +94,7 @@ where let host = stream.host().to_string(); match self.connector.configure() { - Err(e) => Either::Right(err(io::Error::new(io::ErrorKind::Other, e))), + Err(e) => Either::Right(ready(Err(io::Error::new(io::ErrorKind::Other, e)))), Ok(config) => { let ssl = config .into_ssl(&host) @@ -156,7 +141,7 @@ where pub struct OpensslConnectServiceFactory { tcp: ConnectServiceFactory, - openssl: OpensslConnector, + openssl: OpensslConnector, } impl OpensslConnectServiceFactory { @@ -182,7 +167,6 @@ impl OpensslConnectServiceFactory { tcp: self.tcp.service(), openssl: OpensslConnectorService { connector: self.openssl.connector.clone(), - _t: PhantomData, }, } } @@ -206,14 +190,14 @@ impl ServiceFactory> for OpensslConnectServiceF type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { - ok(self.service()) + ready(Ok(self.service())) } } #[derive(Clone)] pub struct OpensslConnectService { tcp: ConnectService, - openssl: OpensslConnectorService, + openssl: OpensslConnectorService, } impl Service> for OpensslConnectService { @@ -234,10 +218,8 @@ impl Service> for OpensslConnectService { pub struct OpensslConnectServiceResponse { fut1: Option< as Service>>::Future>, - fut2: Option< - as Service>>::Future, - >, - openssl: OpensslConnectorService, + fut2: Option<>>::Future>, + openssl: OpensslConnectorService, } impl Future for OpensslConnectServiceResponse { diff --git a/actix-tls/src/connect/ssl/rustls.rs b/actix-tls/src/connect/ssl/rustls.rs index f5259280..390ba413 100644 --- a/actix-tls/src/connect/ssl/rustls.rs +++ b/actix-tls/src/connect/ssl/rustls.rs @@ -1,6 +1,5 @@ use std::fmt; use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -10,7 +9,10 @@ pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ok, Ready}; +use futures_util::{ + future::{ready, Ready}, + ready, +}; use log::trace; use tokio_rustls::{Connect, TlsConnector}; use webpki::DNSNameRef; @@ -18,77 +20,63 @@ use webpki::DNSNameRef; use crate::connect::{Address, Connection}; /// Rustls connector factory -pub struct RustlsConnector { +pub struct RustlsConnector { connector: Arc, - _t: PhantomData<(T, U)>, } -impl RustlsConnector { +impl RustlsConnector { pub fn new(connector: Arc) -> Self { - RustlsConnector { - connector, - _t: PhantomData, - } + RustlsConnector { connector } } } -impl RustlsConnector -where - T: Address, - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, -{ - pub fn service(connector: Arc) -> RustlsConnectorService { - RustlsConnectorService { - connector, - _t: PhantomData, - } +impl RustlsConnector { + pub fn service(connector: Arc) -> RustlsConnectorService { + RustlsConnectorService { connector } } } -impl Clone for RustlsConnector { +impl Clone for RustlsConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl ServiceFactory> for RustlsConnector +impl ServiceFactory> for RustlsConnector where U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Response = Connection>; type Error = std::io::Error; type Config = (); - type Service = RustlsConnectorService; + type Service = RustlsConnectorService; type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { - ok(RustlsConnectorService { + ready(Ok(RustlsConnectorService { connector: self.connector.clone(), - _t: PhantomData, - }) + })) } } -pub struct RustlsConnectorService { +pub struct RustlsConnectorService { connector: Arc, - _t: PhantomData<(T, U)>, } -impl Clone for RustlsConnectorService { +impl Clone for RustlsConnectorService { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl Service> for RustlsConnectorService +impl Service> for RustlsConnectorService where + T: Address, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Response = Connection>; @@ -114,20 +102,18 @@ pub struct ConnectAsyncExt { stream: Option>, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where + T: Address, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Output = Result>, std::io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - Poll::Ready( - futures_util::ready!(Pin::new(&mut this.fut).poll(cx)).map(|stream| { - let s = this.stream.take().unwrap(); - trace!("SSL Handshake success: {:?}", s.host()); - s.replace(stream).1 - }), - ) + let stream = ready!(Pin::new(&mut this.fut).poll(cx))?; + let s = this.stream.take().unwrap(); + trace!("SSL Handshake success: {:?}", s.host()); + Poll::Ready(Ok(s.replace(stream).1)) } } diff --git a/actix-tls/tests/test_connect.rs b/actix-tls/tests/test_connect.rs new file mode 100644 index 00000000..fd083893 --- /dev/null +++ b/actix-tls/tests/test_connect.rs @@ -0,0 +1,130 @@ +use std::io; + +use actix_codec::{BytesCodec, Framed}; +use actix_rt::net::TcpStream; +use actix_service::{fn_service, Service, ServiceFactory}; +use actix_testing::TestServer; +use bytes::Bytes; +use futures_util::sink::SinkExt; + +use actix_tls::connect::{ + self as actix_connect, + resolver::{ResolverConfig, ResolverOpts}, + Connect, +}; + +#[cfg(all(feature = "connect", feature = "openssl"))] +#[actix_rt::test] +async fn test_string() { + let srv = TestServer::with(|| { + fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + }) + }); + + let mut conn = actix_connect::default_connector(); + let addr = format!("localhost:{}", srv.port()); + let con = conn.call(addr.into()).await.unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} + +#[cfg(feature = "rustls")] +#[actix_rt::test] +async fn test_rustls_string() { + let srv = TestServer::with(|| { + fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + }) + }); + + let mut conn = actix_connect::default_connector(); + let addr = format!("localhost:{}", srv.port()); + let con = conn.call(addr.into()).await.unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} + +#[actix_rt::test] +async fn test_static_str() { + let srv = TestServer::with(|| { + fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + }) + }); + + let resolver = actix_connect::start_default_resolver().await.unwrap(); + let mut conn = actix_connect::new_connector(resolver.clone()); + + let con = conn.call(Connect::with("10", srv.addr())).await.unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); + + let connect = Connect::new(srv.host().to_owned()); + let mut conn = actix_connect::new_connector(resolver); + let con = conn.call(connect).await; + assert!(con.is_err()); +} + +#[actix_rt::test] +async fn test_new_service() { + let srv = TestServer::with(|| { + fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + }) + }); + + let resolver = + actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default()) + .await + .unwrap(); + + let factory = actix_connect::new_connector_factory(resolver); + + let mut conn = factory.new_service(()).await.unwrap(); + let con = conn.call(Connect::with("10", srv.addr())).await.unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} + +#[cfg(all(feature = "openssl", feature = "uri"))] +#[actix_rt::test] +async fn test_openssl_uri() { + use std::convert::TryFrom; + + let srv = TestServer::with(|| { + fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + }) + }); + + let mut conn = actix_connect::default_connector(); + let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap(); + let con = conn.call(addr.into()).await.unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} + +#[cfg(all(feature = "rustls", feature = "uri"))] +#[actix_rt::test] +async fn test_rustls_uri() { + use std::convert::TryFrom; + + let srv = TestServer::with(|| { + fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + }) + }); + + let mut conn = actix_connect::default_connector(); + let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap(); + let con = conn.call(addr.into()).await.unwrap(); + assert_eq!(con.peer_addr().unwrap(), srv.addr()); +} diff --git a/string/Cargo.toml b/string/Cargo.toml index 3653fe58..d3c290c8 100644 --- a/string/Cargo.toml +++ b/string/Cargo.toml @@ -15,7 +15,7 @@ name = "bytestring" path = "src/lib.rs" [dependencies] -bytes = "0.5.3" +bytes = "1" serde = { version = "1.0", optional = true } [dev-dependencies]