From 467350c9fcba2ca879f3a97c1778962dc58ef360 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 7 Sep 2018 10:38:39 -0700 Subject: [PATCH] add readiness support for OpensslAcceptor --- src/ssl/mod.rs | 9 +++++ src/ssl/openssl.rs | 91 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/src/ssl/mod.rs b/src/ssl/mod.rs index 5b900273..5504f15e 100644 --- a/src/ssl/mod.rs +++ b/src/ssl/mod.rs @@ -1,9 +1,18 @@ //! SSL Services +use std::sync::atomic::{AtomicUsize, Ordering}; + #[cfg(feature = "ssl")] mod openssl; #[cfg(feature = "ssl")] pub use self::openssl::{OpensslAcceptor, OpensslConnector}; +pub(crate) const MAX_CONN: AtomicUsize = AtomicUsize::new(0); + +/// Set max concurrent ssl connect operation per thread +pub fn max_concurrent_ssl_connect(num: usize) { + MAX_CONN.store(num, Ordering::Relaxed); +} + // #[cfg(feature = "tls")] // mod nativetls; // #[cfg(feature = "tls")] diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index ac25de66..b51fadbe 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -1,11 +1,16 @@ +use std::cell::Cell; use std::io; use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::atomic::Ordering; -use futures::{future, future::FutureResult, Async, Future, Poll}; +use futures::task::AtomicTask; +use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use openssl::ssl::{AlpnError, Error, SslAcceptor, SslAcceptorBuilder, SslConnector}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, ConnectAsync, SslAcceptorExt, SslConnectorExt, SslStream}; +use super::MAX_CONN; use connector::ConnectionInfo; use {NewService, Service}; @@ -46,6 +51,7 @@ impl OpensslAcceptor { }) } } + impl Clone for OpensslAcceptor { fn clone(&self) -> Self { Self { @@ -64,9 +70,10 @@ impl NewService for OpensslAcceptor { type Future = FutureResult; fn new_service(&self) -> Self::Future { - future::ok(OpensslAcceptorService { + ok(OpensslAcceptorService { acceptor: self.acceptor.clone(), io: PhantomData, + inner: Rc::new(Inner::default()), }) } } @@ -74,20 +81,92 @@ impl NewService for OpensslAcceptor { pub struct OpensslAcceptorService { acceptor: SslAcceptor, io: PhantomData, + inner: Rc, } impl Service for OpensslAcceptorService { type Request = T; type Response = SslStream; type Error = Error; - type Future = AcceptAsync; + type Future = OpensslAcceptorServiceFut; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + if self.inner.check() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } } fn call(&mut self, req: Self::Request) -> Self::Future { - SslAcceptorExt::accept_async(&self.acceptor, req) + self.inner.inc(); + + OpensslAcceptorServiceFut { + inner: self.inner.clone(), + fut: SslAcceptorExt::accept_async(&self.acceptor, req), + } + } +} + +struct Inner { + maxconn: usize, + count: Cell, + task: AtomicTask, +} + +impl Default for Inner { + fn default() -> Inner { + Inner { + maxconn: MAX_CONN.load(Ordering::Relaxed), + count: Cell::new(0), + task: AtomicTask::new(), + } + } +} + +impl Inner { + fn inc(&self) { + let num = self.count.get() + 1; + self.count.set(num); + if num == self.maxconn { + self.task.register(); + } + } + + fn dec(&self) { + let num = self.count.get(); + self.count.set(num - 1); + if num == self.maxconn { + self.task.notify(); + } + } + + fn check(&self) -> bool { + self.count.get() < self.maxconn + } +} + +pub struct OpensslAcceptorServiceFut +where + T: AsyncRead + AsyncWrite, +{ + fut: AcceptAsync, + inner: Rc, +} + +impl Future for OpensslAcceptorServiceFut { + type Item = SslStream; + type Error = Error; + + fn poll(&mut self) -> Poll { + let res = self.fut.poll(); + + if let Ok(Async::NotReady) = res { + Ok(Async::NotReady) + } else { + self.inner.dec(); + res + } } } @@ -146,7 +225,7 @@ impl NewService for OpensslConnector type Future = FutureResult; fn new_service(&self) -> Self::Future { - future::ok(OpensslConnectorService { + ok(OpensslConnectorService { connector: self.connector.clone(), t: PhantomData, io: PhantomData,