1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-29 06:04:58 +02:00

fix actix-tls tests (#241)

This commit is contained in:
fakeshadow
2020-12-29 19:36:17 +08:00
committed by GitHub
parent 0934078947
commit 03eb96d6d4
12 changed files with 291 additions and 207 deletions

View File

@ -8,7 +8,7 @@ use std::task::{Context, Poll};
use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory};
use futures_util::future::{err, ok, BoxFuture, Either, FutureExt, Ready};
use futures_util::future::{ready, Ready};
use log::{error, trace};
use super::connect::{Address, Connect, Connection};
@ -50,7 +50,7 @@ impl<T: Address> ServiceFactory<Connect<T>> for TcpConnectorFactory<T> {
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(self.service())
ready(Ok(self.service()))
}
}
@ -73,8 +73,7 @@ impl<T> Clone for TcpConnector<T> {
impl<T: Address> Service<Connect<T>> for TcpConnector<T> {
type Response = Connection<T, TcpStream>;
type Error = ConnectError;
#[allow(clippy::type_complexity)]
type Future = Either<TcpConnectorResponse<T>, Ready<Result<Self::Response, Self::Error>>>;
type Future = TcpConnectorResponse<T>;
actix_service::always_ready!();
@ -83,21 +82,26 @@ impl<T: Address> Service<Connect<T>> for TcpConnector<T> {
let Connect { req, addr, .. } = req;
if let Some(addr) = addr {
Either::Left(TcpConnectorResponse::new(req, port, addr))
TcpConnectorResponse::new(req, port, addr)
} else {
error!("TCP connector: got unresolved address");
Either::Right(err(ConnectError::Unresolved))
TcpConnectorResponse::Error(Some(ConnectError::Unresolved))
}
}
}
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
#[doc(hidden)]
/// TCP stream connector response future
pub struct TcpConnectorResponse<T> {
req: Option<T>,
port: u16,
addrs: Option<VecDeque<SocketAddr>>,
stream: Option<BoxFuture<'static, Result<TcpStream, io::Error>>>,
pub enum TcpConnectorResponse<T> {
Response {
req: Option<T>,
port: u16,
addrs: Option<VecDeque<SocketAddr>>,
stream: Option<LocalBoxFuture<'static, Result<TcpStream, io::Error>>>,
},
Error(Option<ConnectError>),
}
impl<T: Address> TcpConnectorResponse<T> {
@ -113,13 +117,13 @@ impl<T: Address> TcpConnectorResponse<T> {
);
match addr {
either::Either::Left(addr) => TcpConnectorResponse {
either::Either::Left(addr) => TcpConnectorResponse::Response {
req: Some(req),
port,
addrs: None,
stream: Some(TcpStream::connect(addr).boxed()),
stream: Some(Box::pin(TcpStream::connect(addr))),
},
either::Either::Right(addrs) => TcpConnectorResponse {
either::Either::Right(addrs) => TcpConnectorResponse::Response {
req: Some(req),
port,
addrs: Some(addrs),
@ -134,36 +138,43 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// connect
loop {
if let Some(new) = this.stream.as_mut() {
match new.as_mut().poll(cx) {
Poll::Ready(Ok(sock)) => {
let req = this.req.take().unwrap();
trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}",
req.host(), sock.peer_addr()
);
return Poll::Ready(Ok(Connection::new(sock, req)));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => {
trace!(
"TCP connector - failed to connect to connecting to {:?} port: {}",
this.req.as_ref().unwrap().host(),
this.port,
);
if this.addrs.is_none() || this.addrs.as_ref().unwrap().is_empty() {
return Poll::Ready(Err(err.into()));
match this {
TcpConnectorResponse::Error(e) => Poll::Ready(Err(e.take().unwrap())),
// connect
TcpConnectorResponse::Response {
req,
port,
addrs,
stream,
} => loop {
if let Some(new) = stream.as_mut() {
match new.as_mut().poll(cx) {
Poll::Ready(Ok(sock)) => {
let req = req.take().unwrap();
trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}",
req.host(), sock.peer_addr()
);
return Poll::Ready(Ok(Connection::new(sock, req)));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => {
trace!(
"TCP connector - failed to connect to connecting to {:?} port: {}",
req.as_ref().unwrap().host(),
port,
);
if addrs.is_none() || addrs.as_ref().unwrap().is_empty() {
return Poll::Ready(Err(err.into()));
}
}
}
}
}
// try to connect
let addr = this.addrs.as_mut().unwrap().pop_front().unwrap();
this.stream = Some(TcpStream::connect(addr).boxed());
// try to connect
let addr = addrs.as_mut().unwrap().pop_front().unwrap();
*stream = Some(Box::pin(TcpStream::connect(addr)));
},
}
}
}

View File

@ -11,6 +11,7 @@ mod error;
mod resolve;
mod service;
pub mod ssl;
#[cfg(feature = "uri")]
mod uri;
use actix_rt::{net::TcpStream, Arbiter};
@ -35,7 +36,7 @@ pub async fn start_resolver(
cfg: ResolverConfig,
opts: ResolverOpts,
) -> Result<AsyncResolver, ConnectError> {
Ok(AsyncResolver::tokio(cfg, opts).await?)
Ok(AsyncResolver::tokio(cfg, opts)?)
}
struct DefaultResolver(AsyncResolver);
@ -52,7 +53,7 @@ pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError
}
};
let resolver = AsyncResolver::tokio(cfg, opts).await?;
let resolver = AsyncResolver::tokio(cfg, opts)?;
Arbiter::set_item(DefaultResolver(resolver.clone()));
Ok(resolver)

View File

@ -8,7 +8,7 @@ use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory};
use futures_util::{
future::{err, ok, Either, Ready},
future::{ready, Either, Ready},
ready,
};
use log::trace;
@ -21,43 +21,31 @@ use crate::connect::{
};
/// OpenSSL connector factory
pub struct OpensslConnector<T, U> {
pub struct OpensslConnector {
connector: SslConnector,
_t: PhantomData<(T, U)>,
}
impl<T, U> OpensslConnector<T, U> {
impl OpensslConnector {
pub fn new(connector: SslConnector) -> Self {
OpensslConnector {
connector,
_t: PhantomData,
}
OpensslConnector { connector }
}
}
impl<T, U> OpensslConnector<T, U>
where
T: Address + 'static,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{
pub fn service(connector: SslConnector) -> OpensslConnectorService<T, U> {
OpensslConnectorService {
connector,
_t: PhantomData,
}
impl OpensslConnector {
pub fn service(connector: SslConnector) -> OpensslConnectorService {
OpensslConnectorService { connector }
}
}
impl<T, U> Clone for OpensslConnector<T, U> {
impl Clone for OpensslConnector {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T, U> ServiceFactory<Connection<T, U>> for OpensslConnector<T, U>
impl<T, U> ServiceFactory<Connection<T, U>> for OpensslConnector
where
T: Address + 'static,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
@ -65,33 +53,30 @@ where
type Response = Connection<T, SslStream<U>>;
type Error = io::Error;
type Config = ();
type Service = OpensslConnectorService<T, U>;
type Service = OpensslConnectorService;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(OpensslConnectorService {
ready(Ok(OpensslConnectorService {
connector: self.connector.clone(),
_t: PhantomData,
})
}))
}
}
pub struct OpensslConnectorService<T, U> {
pub struct OpensslConnectorService {
connector: SslConnector,
_t: PhantomData<(T, U)>,
}
impl<T, U> Clone for OpensslConnectorService<T, U> {
impl Clone for OpensslConnectorService {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T, U> Service<Connection<T, U>> for OpensslConnectorService<T, U>
impl<T, U> Service<Connection<T, U>> for OpensslConnectorService
where
T: Address + 'static,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
@ -109,7 +94,7 @@ where
let host = stream.host().to_string();
match self.connector.configure() {
Err(e) => Either::Right(err(io::Error::new(io::ErrorKind::Other, e))),
Err(e) => Either::Right(ready(Err(io::Error::new(io::ErrorKind::Other, e)))),
Ok(config) => {
let ssl = config
.into_ssl(&host)
@ -156,7 +141,7 @@ where
pub struct OpensslConnectServiceFactory<T> {
tcp: ConnectServiceFactory<T>,
openssl: OpensslConnector<T, TcpStream>,
openssl: OpensslConnector,
}
impl<T> OpensslConnectServiceFactory<T> {
@ -182,7 +167,6 @@ impl<T> OpensslConnectServiceFactory<T> {
tcp: self.tcp.service(),
openssl: OpensslConnectorService {
connector: self.openssl.connector.clone(),
_t: PhantomData,
},
}
}
@ -206,14 +190,14 @@ impl<T: Address + 'static> ServiceFactory<Connect<T>> for OpensslConnectServiceF
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(self.service())
ready(Ok(self.service()))
}
}
#[derive(Clone)]
pub struct OpensslConnectService<T> {
tcp: ConnectService<T>,
openssl: OpensslConnectorService<T, TcpStream>,
openssl: OpensslConnectorService,
}
impl<T: Address + 'static> Service<Connect<T>> for OpensslConnectService<T> {
@ -234,10 +218,8 @@ impl<T: Address + 'static> Service<Connect<T>> for OpensslConnectService<T> {
pub struct OpensslConnectServiceResponse<T: Address + 'static> {
fut1: Option<<ConnectService<T> as Service<Connect<T>>>::Future>,
fut2: Option<
<OpensslConnectorService<T, TcpStream> as Service<Connection<T, TcpStream>>>::Future,
>,
openssl: OpensslConnectorService<T, TcpStream>,
fut2: Option<<OpensslConnectorService as Service<Connection<T, TcpStream>>>::Future>,
openssl: OpensslConnectorService,
}
impl<T: Address> Future for OpensslConnectServiceResponse<T> {

View File

@ -1,6 +1,5 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@ -10,7 +9,10 @@ pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, Ready};
use futures_util::{
future::{ready, Ready},
ready,
};
use log::trace;
use tokio_rustls::{Connect, TlsConnector};
use webpki::DNSNameRef;
@ -18,77 +20,63 @@ use webpki::DNSNameRef;
use crate::connect::{Address, Connection};
/// Rustls connector factory
pub struct RustlsConnector<T, U> {
pub struct RustlsConnector {
connector: Arc<ClientConfig>,
_t: PhantomData<(T, U)>,
}
impl<T, U> RustlsConnector<T, U> {
impl RustlsConnector {
pub fn new(connector: Arc<ClientConfig>) -> Self {
RustlsConnector {
connector,
_t: PhantomData,
}
RustlsConnector { connector }
}
}
impl<T, U> RustlsConnector<T, U>
where
T: Address,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
pub fn service(connector: Arc<ClientConfig>) -> RustlsConnectorService<T, U> {
RustlsConnectorService {
connector,
_t: PhantomData,
}
impl RustlsConnector {
pub fn service(connector: Arc<ClientConfig>) -> RustlsConnectorService {
RustlsConnectorService { connector }
}
}
impl<T, U> Clone for RustlsConnector<T, U> {
impl Clone for RustlsConnector {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T: Address, U> ServiceFactory<Connection<T, U>> for RustlsConnector<T, U>
impl<T: Address, U> ServiceFactory<Connection<T, U>> for RustlsConnector
where
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
type Response = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
type Config = ();
type Service = RustlsConnectorService<T, U>;
type Service = RustlsConnectorService;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(RustlsConnectorService {
ready(Ok(RustlsConnectorService {
connector: self.connector.clone(),
_t: PhantomData,
})
}))
}
}
pub struct RustlsConnectorService<T, U> {
pub struct RustlsConnectorService {
connector: Arc<ClientConfig>,
_t: PhantomData<(T, U)>,
}
impl<T, U> Clone for RustlsConnectorService<T, U> {
impl Clone for RustlsConnectorService {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_t: PhantomData,
}
}
}
impl<T: Address, U> Service<Connection<T, U>> for RustlsConnectorService<T, U>
impl<T, U> Service<Connection<T, U>> for RustlsConnectorService
where
T: Address,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
type Response = Connection<T, TlsStream<U>>;
@ -114,20 +102,18 @@ pub struct ConnectAsyncExt<T, U> {
stream: Option<Connection<T, ()>>,
}
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
impl<T, U> Future for ConnectAsyncExt<T, U>
where
T: Address,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
type Output = Result<Connection<T, TlsStream<U>>, std::io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Poll::Ready(
futures_util::ready!(Pin::new(&mut this.fut).poll(cx)).map(|stream| {
let s = this.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host());
s.replace(stream).1
}),
)
let stream = ready!(Pin::new(&mut this.fut).poll(cx))?;
let s = this.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host());
Poll::Ready(Ok(s.replace(stream).1))
}
}