1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-29 20:44:59 +02:00

merge -connect and -tls and upgrade to rt v2 (#238)

This commit is contained in:
Rob Ede
2020-12-29 00:38:41 +00:00
committed by GitHub
parent 3c6de3a81b
commit 5759c9e144
24 changed files with 212 additions and 507 deletions

View File

@ -0,0 +1,42 @@
//! TLS acceptor services for Actix ecosystem.
//!
//! ## Crate Features
//! * `openssl` - TLS acceptor using the `openssl` crate.
//! * `rustls` - TLS acceptor using the `rustls` crate.
//! * `native-tls` - TLS acceptor using the `native-tls` crate.
use std::sync::atomic::{AtomicUsize, Ordering};
use actix_utils::counter::Counter;
#[cfg(feature = "openssl")]
pub mod openssl;
#[cfg(feature = "rustls")]
pub mod rustls;
#[cfg(feature = "native-tls")]
pub mod nativetls;
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
}
/// Sets the maximum per-worker concurrent TLS connection limit.
///
/// All listeners will stop accepting connections when this limit is reached.
/// It can be used to regulate the global TLS CPU usage.
///
/// By default, the connection limit is 256.
pub fn max_concurrent_tls_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed);
}
/// TLS error combined with service error.
#[derive(Debug)]
pub enum TlsError<E1, E2> {
Tls(E1),
Service(E2),
}

View File

@ -0,0 +1,112 @@
use std::marker::PhantomData;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::Counter;
use futures_util::future::{self, FutureExt, LocalBoxFuture, TryFutureExt};
pub use native_tls::Error;
pub use tokio_native_tls::{TlsAcceptor, TlsStream};
use super::MAX_CONN_COUNTER;
/// Accept TLS connections via `native-tls` package.
///
/// `native-tls` feature enables this `Acceptor` type.
pub struct Acceptor<T> {
acceptor: TlsAcceptor,
io: PhantomData<T>,
}
impl<T> Acceptor<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
/// Create `native-tls` based `Acceptor` service factory.
#[inline]
pub fn new(acceptor: TlsAcceptor) -> Self {
Acceptor {
acceptor,
io: PhantomData,
}
}
}
impl<T> Clone for Acceptor<T> {
#[inline]
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<T> ServiceFactory<T> for Acceptor<T>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = TlsStream<T>;
type Error = Error;
type Service = NativeTlsAcceptorService<T>;
type Config = ();
type InitError = ();
type Future = future::Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
MAX_CONN_COUNTER.with(|conns| {
future::ok(NativeTlsAcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
io: PhantomData,
})
})
}
}
pub struct NativeTlsAcceptorService<T> {
acceptor: TlsAcceptor,
io: PhantomData<T>,
conns: Counter,
}
impl<T> Clone for NativeTlsAcceptorService<T> {
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
io: PhantomData,
conns: self.conns.clone(),
}
}
}
impl<T> Service<T> for NativeTlsAcceptorService<T>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = TlsStream<T>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, io: T) -> Self::Future {
let guard = self.conns.get();
let this = self.clone();
async move { this.acceptor.accept(io).await }
.map_ok(move |io| {
// Required to preserve `CounterGuard` until `Self::Future` is completely resolved.
let _ = guard;
io
})
.boxed_local()
}
}

View File

@ -0,0 +1,121 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_util::{
future::{ok, Ready},
ready,
};
pub use openssl::ssl::{
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
};
pub use tokio_openssl::SslStream;
use super::MAX_CONN_COUNTER;
/// Accept TLS connections via `openssl` package.
///
/// `openssl` feature enables this `Acceptor` type.
pub struct Acceptor<T: AsyncRead + AsyncWrite> {
acceptor: SslAcceptor,
io: PhantomData<T>,
}
impl<T: AsyncRead + AsyncWrite> Acceptor<T> {
/// Create OpenSSL based `Acceptor` service factory.
#[inline]
pub fn new(acceptor: SslAcceptor) -> Self {
Acceptor {
acceptor,
io: PhantomData,
}
}
}
impl<T: AsyncRead + AsyncWrite> Clone for Acceptor<T> {
#[inline]
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
io: PhantomData,
}
}
}
impl<T> ServiceFactory<T> for Acceptor<T>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
type Error = SslError;
type Config = ();
type Service = AcceptorService<T>;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
MAX_CONN_COUNTER.with(|conns| {
ok(AcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
io: PhantomData,
})
})
}
}
pub struct AcceptorService<T> {
acceptor: SslAcceptor,
conns: Counter,
io: PhantomData<T>,
}
impl<T> Service<T> for AcceptorService<T>
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
type Error = SslError;
type Future = AcceptorServiceResponse<T>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, io: T) -> Self::Future {
let acc = self.acceptor.clone();
let ssl_ctx = acc.into_context();
let ssl = Ssl::new(&ssl_ctx).expect("Provided SSL acceptor was invalid.");
AcceptorServiceResponse {
_guard: self.conns.get(),
stream: Some(SslStream::new(ssl, io).unwrap()),
}
}
}
pub struct AcceptorServiceResponse<T>
where
T: AsyncRead + AsyncWrite,
{
stream: Option<SslStream<T>>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
type Output = Result<SslStream<T>, SslError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved.")))
}
}

View File

@ -0,0 +1,129 @@
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_util::future::{ok, Ready};
use tokio_rustls::{Accept, TlsAcceptor};
pub use rustls::{ServerConfig, Session};
pub use tokio_rustls::server::TlsStream;
pub use webpki_roots::TLS_SERVER_ROOTS;
use super::MAX_CONN_COUNTER;
/// Accept TLS connections via `rustls` package.
///
/// `rustls` feature enables this `Acceptor` type.
pub struct Acceptor<T> {
config: Arc<ServerConfig>,
io: PhantomData<T>,
}
impl<T> Acceptor<T>
where
T: AsyncRead + AsyncWrite,
{
/// Create Rustls based `Acceptor` service factory.
#[inline]
pub fn new(config: ServerConfig) -> Self {
Acceptor {
config: Arc::new(config),
io: PhantomData,
}
}
}
impl<T> Clone for Acceptor<T> {
#[inline]
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
io: PhantomData,
}
}
}
impl<T> ServiceFactory<T> for Acceptor<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Response = TlsStream<T>;
type Error = io::Error;
type Service = AcceptorService<T>;
type Config = ();
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
MAX_CONN_COUNTER.with(|conns| {
ok(AcceptorService {
acceptor: self.config.clone().into(),
conns: conns.clone(),
io: PhantomData,
})
})
}
}
/// Rustls based `Acceptor` service
pub struct AcceptorService<T> {
acceptor: TlsAcceptor,
io: PhantomData<T>,
conns: Counter,
}
impl<T> Service<T> for AcceptorService<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Response = TlsStream<T>;
type Error = io::Error;
type Future = AcceptorServiceFut<T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, req: T) -> Self::Future {
AcceptorServiceFut {
_guard: self.conns.get(),
fut: self.acceptor.accept(req),
}
}
}
pub struct AcceptorServiceFut<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fut: Accept<T>,
_guard: CounterGuard,
}
impl<T> Future for AcceptorServiceFut<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = Result<TlsStream<T>, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let res = futures_util::ready!(Pin::new(&mut this.fut).poll(cx));
match res {
Ok(io) => Poll::Ready(Ok(io)),
Err(e) => Poll::Ready(Err(e)),
}
}
}