From b18fbc98d56148fec9aab7f6a55d40a33becf295 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 5 Dec 2019 20:52:37 +0600 Subject: [PATCH] move rustls and nativetls acceptor services to actix-tls --- actix-server/src/ssl/mod.rs | 42 ------ actix-server/src/ssl/rustls.rs | 124 ------------------ actix-tls/CHANGES.md | 2 + actix-tls/Cargo.toml | 9 +- actix-tls/src/lib.rs | 9 +- .../src/ssl => actix-tls/src}/nativetls.rs | 56 ++++---- actix-tls/src/rustls.rs | 116 ++++++++++++++++ 7 files changed, 155 insertions(+), 203 deletions(-) delete mode 100644 actix-server/src/ssl/mod.rs delete mode 100644 actix-server/src/ssl/rustls.rs rename {actix-server/src/ssl => actix-tls/src}/nativetls.rs (60%) create mode 100644 actix-tls/src/rustls.rs diff --git a/actix-server/src/ssl/mod.rs b/actix-server/src/ssl/mod.rs deleted file mode 100644 index e9a8da87..00000000 --- a/actix-server/src/ssl/mod.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! SSL Services -use std::sync::atomic::{AtomicUsize, Ordering}; - -use crate::counter::Counter; - -#[cfg(feature = "openssl")] -mod openssl; -#[cfg(feature = "openssl")] -pub use self::openssl::OpensslAcceptor; - -#[cfg(feature = "nativetls")] -mod nativetls; -#[cfg(feature = "nativetls")] -pub use self::nativetls::NativeTlsAcceptor; - -#[cfg(feature = "rustls")] -mod rustls; -#[cfg(feature = "rustls")] -pub use self::rustls::RustlsAcceptor; - -/// Sets the maximum per-worker concurrent ssl connection establish process. -/// -/// All listeners will stop accepting connections when this limit is -/// reached. It can be used to limit the global SSL CPU usage. -/// -/// By default max connections is set to a 256. -pub fn max_concurrent_ssl_connect(num: usize) { - MAX_CONN.store(num, Ordering::Relaxed); -} - -pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256); - -thread_local! { - static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed)); -} - -/// Ssl error combinded with service error. -#[derive(Debug)] -pub enum SslError { - Ssl(E1), - Service(E2), -} diff --git a/actix-server/src/ssl/rustls.rs b/actix-server/src/ssl/rustls.rs deleted file mode 100644 index 971c19bb..00000000 --- a/actix-server/src/ssl/rustls.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::future::Future; -use std::io; -use std::marker::PhantomData; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite}; -use actix_service::{Service, ServiceFactory}; -use futures::future::{ok, Ready}; -use rust_tls::ServerConfig; -use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor}; - -use crate::counter::{Counter, CounterGuard}; -use crate::ssl::MAX_CONN_COUNTER; -use crate::{Io, Protocol, ServerConfig as SrvConfig}; - -/// Support `SSL` connections via rustls package -/// -/// `rust-tls` feature enables `RustlsAcceptor` type -pub struct RustlsAcceptor { - config: Arc, - io: PhantomData<(T, P)>, -} - -impl RustlsAcceptor { - /// Create `RustlsAcceptor` new service - pub fn new(config: ServerConfig) -> Self { - RustlsAcceptor { - config: Arc::new(config), - io: PhantomData, - } - } -} - -impl Clone for RustlsAcceptor { - fn clone(&self) -> Self { - Self { - config: self.config.clone(), - io: PhantomData, - } - } -} - -impl ServiceFactory for RustlsAcceptor { - type Request = Io; - type Response = Io, P>; - type Error = io::Error; - - type Config = SrvConfig; - type Service = RustlsAcceptorService; - type InitError = (); - type Future = Ready>; - - fn new_service(&self, cfg: &SrvConfig) -> Self::Future { - cfg.set_secure(); - - MAX_CONN_COUNTER.with(|conns| { - ok(RustlsAcceptorService { - acceptor: self.config.clone().into(), - conns: conns.clone(), - io: PhantomData, - }) - }) - } -} - -pub struct RustlsAcceptorService { - acceptor: TlsAcceptor, - io: PhantomData<(T, P)>, - conns: Counter, -} - -impl Service for RustlsAcceptorService { - type Request = Io; - type Response = Io, P>; - type Error = io::Error; - type Future = RustlsAcceptorServiceFut; - - fn poll_ready(&mut self, cx: &mut Context) -> Poll> { - if self.conns.available(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - let (io, params, _) = req.into_parts(); - RustlsAcceptorServiceFut { - _guard: self.conns.get(), - fut: self.acceptor.accept(io), - params: Some(params), - } - } -} - -pub struct RustlsAcceptorServiceFut -where - T: AsyncRead + AsyncWrite + Unpin, -{ - fut: Accept, - params: Option

, - _guard: CounterGuard, -} - -impl Unpin for RustlsAcceptorServiceFut {} - -impl Future for RustlsAcceptorServiceFut { - type Output = Result, P>, io::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = self.get_mut(); - - let res = futures::ready!(Pin::new(&mut this.fut).poll(cx)); - match res { - Ok(io) => { - let params = this.params.take().unwrap(); - Poll::Ready(Ok(Io::from_parts(io, params, Protocol::Unknown))) - } - Err(e) => Poll::Ready(Err(e)), - } - } -} diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index 67fe1a97..6905a3fe 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -8,6 +8,8 @@ * Enable rustls acceptor service +* Enable native-tls acceptor service + ## [1.0.0-alpha.1] - 2019-12-02 * Split openssl accetor from actix-server package diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 4f3dbf57..a5abd3e4 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" workspace = ".." [package.metadata.docs.rs] -features = ["openssl", "rustls"] +features = ["openssl", "rustls", "nativetls"] [lib] name = "actix_tls" @@ -28,6 +28,9 @@ openssl = ["open-ssl", "tokio-openssl"] # rustls rustls = ["rust-tls", "webpki"] +# nativetls +nativetls = ["native-tls", "tokio-tls"] + [dependencies] actix-service = "1.0.0-alpha.3" actix-codec = "0.2.0-alpha.3" @@ -48,6 +51,10 @@ webpki = { version = "0.21", optional = true } webpki-roots = { version = "0.17", optional = true } tokio-rustls = { version = "0.12.0", optional = true } +# native-tls +native-tls = { version="0.2", optional = true } +tokio-tls = { version="0.3", optional = true } + [dev-dependencies] bytes = "0.5" actix-testing = { version="1.0.0-alpha.3" } diff --git a/actix-tls/src/lib.rs b/actix-tls/src/lib.rs index 54b840eb..a6a50fa1 100644 --- a/actix-tls/src/lib.rs +++ b/actix-tls/src/lib.rs @@ -9,10 +9,11 @@ use actix_utils::counter::Counter; #[cfg(feature = "openssl")] pub mod openssl; -//#[cfg(feature = "rustls")] -//mod rustls; -//#[cfg(feature = "rustls")] -//pub use self::rustls::RustlsAcceptor; +#[cfg(feature = "rustls")] +pub mod rustls; + +#[cfg(feature = "nativetls")] +pub mod nativetls; /// Sets the maximum per-worker concurrent ssl connection establish process. /// diff --git a/actix-server/src/ssl/nativetls.rs b/actix-tls/src/nativetls.rs similarity index 60% rename from actix-server/src/ssl/nativetls.rs rename to actix-tls/src/nativetls.rs index a40d77a5..8b2c7cee 100644 --- a/actix-server/src/ssl/nativetls.rs +++ b/actix-tls/src/nativetls.rs @@ -1,26 +1,24 @@ -use std::convert::Infallible; use std::marker::PhantomData; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; -use futures::future::{self, FutureExt as _, LocalBoxFuture, TryFutureExt as _}; -use native_tls::Error; -use tokio_tls::{TlsAcceptor, TlsStream}; +use actix_utils::counter::Counter; +use futures::future::{self, FutureExt, LocalBoxFuture, TryFutureExt}; +pub use native_tls::Error; +pub use tokio_tls::{TlsAcceptor, TlsStream}; -use crate::counter::Counter; -use crate::ssl::MAX_CONN_COUNTER; -use crate::{Io, ServerConfig}; +use crate::MAX_CONN_COUNTER; /// Support `SSL` connections via native-tls package /// /// `tls` feature enables `NativeTlsAcceptor` type -pub struct NativeTlsAcceptor { +pub struct NativeTlsAcceptor { acceptor: TlsAcceptor, - io: PhantomData<(T, P)>, + io: PhantomData, } -impl NativeTlsAcceptor +impl NativeTlsAcceptor where T: AsyncRead + AsyncWrite + Unpin, { @@ -34,7 +32,7 @@ where } } -impl Clone for NativeTlsAcceptor { +impl Clone for NativeTlsAcceptor { #[inline] fn clone(&self) -> Self { Self { @@ -44,23 +42,20 @@ impl Clone for NativeTlsAcceptor { } } -impl ServiceFactory for NativeTlsAcceptor +impl ServiceFactory for NativeTlsAcceptor where T: AsyncRead + AsyncWrite + Unpin + 'static, - P: 'static, { - type Request = Io; - type Response = Io, P>; + type Request = T; + type Response = TlsStream; type Error = Error; + type Service = NativeTlsAcceptorService; - type Config = ServerConfig; - type Service = NativeTlsAcceptorService; - type InitError = Infallible; + type Config = (); + type InitError = (); type Future = future::Ready>; - fn new_service(&self, cfg: &ServerConfig) -> Self::Future { - cfg.set_secure(); - + fn new_service(&self, _: ()) -> Self::Future { MAX_CONN_COUNTER.with(|conns| { future::ok(NativeTlsAcceptorService { acceptor: self.acceptor.clone(), @@ -71,13 +66,13 @@ where } } -pub struct NativeTlsAcceptorService { +pub struct NativeTlsAcceptorService { acceptor: TlsAcceptor, - io: PhantomData<(T, P)>, + io: PhantomData, conns: Counter, } -impl Clone for NativeTlsAcceptorService { +impl Clone for NativeTlsAcceptorService { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), @@ -87,15 +82,14 @@ impl Clone for NativeTlsAcceptorService { } } -impl Service for NativeTlsAcceptorService +impl Service for NativeTlsAcceptorService where T: AsyncRead + AsyncWrite + Unpin + 'static, - P: 'static, { - type Request = Io; - type Response = Io, P>; + type Request = T; + type Response = TlsStream; type Error = Error; - type Future = LocalBoxFuture<'static, Result, P>, Error>>; + type Future = LocalBoxFuture<'static, Result, Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.conns.available(cx) { @@ -108,9 +102,7 @@ where fn call(&mut self, req: Self::Request) -> Self::Future { let guard = self.conns.get(); let this = self.clone(); - let (io, params, proto) = req.into_parts(); - async move { this.acceptor.accept(io).await } - .map_ok(move |stream| Io::from_parts(stream, params, proto)) + async move { this.acceptor.accept(req).await } .map_ok(move |io| { // Required to preserve `CounterGuard` until `Self::Future` // is completely resolved. diff --git a/actix-tls/src/rustls.rs b/actix-tls/src/rustls.rs new file mode 100644 index 00000000..50c5cfff --- /dev/null +++ b/actix-tls/src/rustls.rs @@ -0,0 +1,116 @@ +use std::future::Future; +use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use actix_codec::{AsyncRead, AsyncWrite}; +use actix_service::{Service, ServiceFactory}; +use actix_utils::counter::{Counter, CounterGuard}; +use futures::future::{ok, Ready}; +use tokio_rustls::{Accept, TlsAcceptor}; + +pub use rust_tls::ServerConfig; +pub use tokio_rustls::server::TlsStream; + +use crate::MAX_CONN_COUNTER; + +/// Support `SSL` connections via rustls package +/// +/// `rust-tls` feature enables `RustlsAcceptor` type +pub struct Acceptor { + config: Arc, + io: PhantomData, +} + +impl Acceptor { + /// Create rustls based `Acceptor` service factory + pub fn new(config: ServerConfig) -> Self { + Acceptor { + config: Arc::new(config), + io: PhantomData, + } + } +} + +impl Clone for Acceptor { + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + io: PhantomData, + } + } +} + +impl ServiceFactory for Acceptor { + type Request = T; + type Response = TlsStream; + type Error = io::Error; + type Service = AcceptorService; + + type Config = (); + type InitError = (); + type Future = Ready>; + + fn new_service(&self, _: ()) -> Self::Future { + MAX_CONN_COUNTER.with(|conns| { + ok(AcceptorService { + acceptor: self.config.clone().into(), + conns: conns.clone(), + io: PhantomData, + }) + }) + } +} + +/// RusTLS based `Acceptor` service +pub struct AcceptorService { + acceptor: TlsAcceptor, + io: PhantomData, + conns: Counter, +} + +impl Service for AcceptorService { + type Request = T; + type Response = TlsStream; + type Error = io::Error; + type Future = AcceptorServiceFut; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.conns.available(cx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + AcceptorServiceFut { + _guard: self.conns.get(), + fut: self.acceptor.accept(req), + } + } +} + +pub struct AcceptorServiceFut +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fut: Accept, + _guard: CounterGuard, +} + +impl Future for AcceptorServiceFut { + type Output = Result, io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + let res = futures::ready!(Pin::new(&mut this.fut).poll(cx)); + match res { + Ok(io) => Poll::Ready(Ok(io)), + Err(e) => Poll::Ready(Err(e)), + } + } +}