1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-08 03:04:23 +01:00

112 lines
2.8 KiB
Rust
Raw Normal View History

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_core::{future::LocalBoxFuture, ready};
pub use openssl::ssl::{
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
};
pub use tokio_openssl::SslStream;
2020-09-08 18:00:07 +01:00
use super::MAX_CONN_COUNTER;
2020-09-08 18:00:07 +01:00
/// Accept TLS connections via `openssl` package.
///
2020-09-08 18:00:07 +01:00
/// `openssl` feature enables this `Acceptor` type.
2020-12-29 19:36:17 +08:00
pub struct Acceptor {
acceptor: SslAcceptor,
}
2020-12-29 19:36:17 +08:00
impl Acceptor {
2020-09-08 18:00:07 +01:00
/// Create OpenSSL based `Acceptor` service factory.
#[inline]
pub fn new(acceptor: SslAcceptor) -> Self {
2020-12-29 19:36:17 +08:00
Acceptor { acceptor }
}
}
2020-12-29 19:36:17 +08:00
impl Clone for Acceptor {
2020-09-08 18:00:07 +01:00
#[inline]
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
}
}
}
2020-12-29 19:36:17 +08:00
impl<T> ServiceFactory<T> for Acceptor
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
type Error = SslError;
type Config = ();
2020-12-29 19:36:17 +08:00
type Service = AcceptorService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
let res = MAX_CONN_COUNTER.with(|conns| {
Ok(AcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
})
});
Box::pin(async { res })
}
}
2020-12-29 19:36:17 +08:00
pub struct AcceptorService {
acceptor: SslAcceptor,
conns: Counter,
}
2020-12-29 19:36:17 +08:00
impl<T> Service<T> for AcceptorService
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
type Error = SslError;
type Future = AcceptorServiceResponse<T>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&self, io: T) -> Self::Future {
2020-12-31 06:11:50 +08:00
let ssl_ctx = self.acceptor.context();
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
AcceptorServiceResponse {
_guard: self.conns.get(),
stream: Some(SslStream::new(ssl, io).unwrap()),
}
}
}
pub struct AcceptorServiceResponse<T>
where
T: AsyncRead + AsyncWrite,
{
stream: Option<SslStream<T>>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
type Output = Result<SslStream<T>, SslError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved.")))
}
}