From 8b13236d415617e360eed1d10296e8082519d901 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 8 Sep 2018 09:36:38 -0700 Subject: [PATCH] refactor connections counter --- Cargo.toml | 2 + src/lib.rs | 1 + src/server.rs | 6 +- src/server_service.rs | 115 ++---------------------------------- src/ssl/mod.rs | 4 +- src/worker.rs | 131 +++++++++++++++++++++++++++++++++++++----- 6 files changed, 130 insertions(+), 129 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5b06ff1..c31c325a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,9 @@ tokio-io = "0.1" tokio-tcp = "0.1" tokio-timer = "0.2" tokio-reactor = "0.1" +tokio-current-thread = "0.1" tower-service = "0.1" + trust-dns-resolver = "0.10.0-alpha.2" # native-tls diff --git a/src/lib.rs b/src/lib.rs index 1cb81ed0..bf95306a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ extern crate net2; extern crate num_cpus; extern crate slab; extern crate tokio; +extern crate tokio_current_thread; extern crate tokio_io; extern crate tokio_reactor; extern crate tokio_tcp; diff --git a/src/server.rs b/src/server.rs index 0e977459..9bc8774a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,8 +13,8 @@ use actix::{ }; use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::server_service::{self, ServerNewService, ServerServiceFactory}; -use super::worker::{Conn, StopWorker, Worker, WorkerAvailability, WorkerClient}; +use super::server_service::{ServerNewService, ServerServiceFactory}; +use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient}; use super::NewService; use super::{PauseServer, ResumeServer, StopServer, Token}; @@ -73,7 +73,7 @@ impl Server { /// /// By default max connections is set to a 25k per worker. pub fn maxconn(self, num: usize) -> Self { - server_service::max_concurrent_connections(num); + worker::max_concurrent_connections(num); self } diff --git a/src/server_service.rs b/src/server_service.rs index a7272c6f..0bfad8df 100644 --- a/src/server_service.rs +++ b/src/server_service.rs @@ -1,11 +1,7 @@ -use std::cell::Cell; use std::net; -use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; use futures::future::{err, ok}; -use futures::task::AtomicTask; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; @@ -26,37 +22,13 @@ pub(crate) type BoxedServerService = Box< >, >; -const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); - -/// Sets the maximum per-worker number of concurrent connections. -/// -/// All socket listeners will stop accepting connections when this limit is -/// reached for each worker. -/// -/// By default max connections is set to a 25k per worker. -pub fn max_concurrent_connections(num: usize) { - MAX_CONNS.store(num, Ordering::Relaxed); -} - -pub(crate) fn num_connections() -> usize { - MAX_CONNS_COUNTER.with(|counter| counter.total()) -} - -thread_local! { - static MAX_CONNS_COUNTER: Counter = Counter::new(MAX_CONNS.load(Ordering::Relaxed)); -} - pub(crate) struct ServerService { service: T, - counter: Counter, } impl ServerService { fn new(service: T) -> Self { - MAX_CONNS_COUNTER.with(|counter| ServerService { - service, - counter: counter.clone(), - }) + ServerService { service } } } @@ -72,11 +44,7 @@ where type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if self.counter.check() { - self.service.poll_ready().map_err(|_| ()) - } else { - Ok(Async::NotReady) - } + self.service.poll_ready().map_err(|_| ()) } fn call(&mut self, req: ServerMessage) -> Self::Future { @@ -87,14 +55,7 @@ where }); if let Ok(stream) = stream { - let guard = self.counter.get(); - - Box::new( - self.service - .call(stream) - .map_err(|_| ()) - .map(move |_| drop(guard)), - ) + Box::new(self.service.call(stream)) } else { Box::new(err(())) } @@ -159,71 +120,3 @@ impl ServerServiceFactory for Box { self.as_ref().create() } } - -#[derive(Clone)] -pub(crate) struct Counter(Rc); - -struct CounterInner { - count: Cell, - maxconn: usize, - task: AtomicTask, -} - -impl Counter { - pub fn new(maxconn: usize) -> Self { - Counter(Rc::new(CounterInner { - maxconn, - count: Cell::new(0), - task: AtomicTask::new(), - })) - } - - pub fn get(&self) -> CounterGuard { - CounterGuard::new(self.0.clone()) - } - - pub fn check(&self) -> bool { - self.0.check() - } - - pub fn total(&self) -> usize { - self.0.count.get() - } -} - -pub(crate) struct CounterGuard(Rc); - -impl CounterGuard { - fn new(inner: Rc) -> Self { - inner.inc(); - CounterGuard(inner) - } -} - -impl Drop for CounterGuard { - fn drop(&mut self) { - self.0.dec(); - } -} - -impl CounterInner { - fn inc(&self) { - let num = self.count.get() + 1; - self.count.set(num); - if num == self.maxconn { - self.task.register(); - } - } - - fn dec(&self) { - let num = self.count.get(); - self.count.set(num - 1); - if num == self.maxconn { - self.task.notify(); - } - } - - fn check(&self) -> bool { - self.count.get() < self.maxconn - } -} diff --git a/src/ssl/mod.rs b/src/ssl/mod.rs index bb440885..0f16cd08 100644 --- a/src/ssl/mod.rs +++ b/src/ssl/mod.rs @@ -1,7 +1,7 @@ //! SSL Services use std::sync::atomic::{AtomicUsize, Ordering}; -use super::server_service::Counter; +use super::worker::Connections; #[cfg(feature = "ssl")] mod openssl; @@ -21,7 +21,7 @@ pub fn max_concurrent_ssl_connect(num: usize) { } thread_local! { - static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed)); + static MAX_CONN_COUNTER: Connections = Connections::new(MAX_CONN.load(Ordering::Relaxed)); } // #[cfg(feature = "tls")] diff --git a/src/worker.rs b/src/worker.rs index 9cc054f3..8d4bcac0 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,10 +1,14 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::cell::Cell; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::{net, time}; use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::oneshot; +use futures::task::AtomicTask; use futures::{future, Async, Future, Poll}; +use tokio_current_thread::spawn; use actix::msgs::StopArbiter; use actix::{ @@ -13,7 +17,7 @@ use actix::{ }; use super::accept::AcceptNotify; -use super::server_service::{self, BoxedServerService, ServerMessage, ServerServiceFactory}; +use super::server_service::{BoxedServerService, ServerMessage, ServerServiceFactory}; use super::Token; #[derive(Message)] @@ -24,6 +28,27 @@ pub(crate) struct Conn { pub peer: Option, } +const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); + +/// Sets the maximum per-worker number of concurrent connections. +/// +/// All socket listeners will stop accepting connections when this limit is +/// reached for each worker. +/// +/// By default max connections is set to a 25k per worker. +pub fn max_concurrent_connections(num: usize) { + MAX_CONNS.store(num, Ordering::Relaxed); +} + +pub(crate) fn num_connections() -> usize { + MAX_CONNS_COUNTER.with(|conns| conns.total()) +} + +thread_local! { + static MAX_CONNS_COUNTER: Connections = + Connections::new(MAX_CONNS.load(Ordering::Relaxed)); +} + #[derive(Clone)] pub(crate) struct WorkerClient { pub idx: usize, @@ -88,6 +113,7 @@ impl Message for StopWorker { pub(crate) struct Worker { services: Vec, availability: WorkerAvailability, + conns: Connections, } impl Actor for Worker { @@ -99,10 +125,11 @@ impl Worker { ctx: &mut Context, services: Vec>, availability: WorkerAvailability, ) -> Self { - let wrk = Worker { + let wrk = MAX_CONNS_COUNTER.with(|conns| Worker { availability, services: Vec::new(), - }; + conns: conns.clone(), + }); ctx.wait( future::join_all(services.into_iter().map(|s| s.create())) @@ -139,7 +166,7 @@ impl Worker { ) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - let num = server_service::num_connections(); + let num = num_connections(); if num == 0 { let _ = tx.send(true); Arbiter::current().do_send(StopArbiter(0)); @@ -159,7 +186,15 @@ impl Handler for Worker { type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) { - Arbiter::spawn(self.services[msg.handler.0].call(ServerMessage::Connect(msg.io))) + let guard = self.conns.get(); + spawn( + self.services[msg.handler.0] + .call(ServerMessage::Connect(msg.io)) + .map(|val| { + drop(guard); + val + }), + ) } } @@ -168,14 +203,14 @@ impl Handler for Worker { type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { - let num = server_service::num_connections(); + let num = num_connections(); if num == 0 { info!("Shutting down http worker, 0 connections"); Response::reply(Ok(true)) } else if let Some(dur) = msg.graceful { self.shutdown(false); let (tx, rx) = oneshot::channel(); - let num = server_service::num_connections(); + let num = num_connections(); if num != 0 { info!("Graceful http worker shutdown, {} connections", num); self.shutdown_timeout(ctx, tx, dur); @@ -199,11 +234,13 @@ impl ActorFuture for CheckReadiness { type Actor = Worker; fn poll(&mut self, act: &mut Worker, _: &mut Context) -> Poll<(), ()> { - let mut val = true; - for service in &mut act.services { - if let Ok(Async::NotReady) = service.poll_ready() { - val = false; - break; + let mut val = act.conns.check(); + if val { + for service in &mut act.services { + if let Ok(Async::NotReady) = service.poll_ready() { + val = false; + break; + } } } if self.0 != val { @@ -213,3 +250,71 @@ impl ActorFuture for CheckReadiness { Ok(Async::NotReady) } } + +#[derive(Clone)] +pub(crate) struct Connections(Rc); + +struct ConnectionsInner { + count: Cell, + maxconn: usize, + task: AtomicTask, +} + +impl Connections { + pub fn new(maxconn: usize) -> Self { + Connections(Rc::new(ConnectionsInner { + maxconn, + count: Cell::new(0), + task: AtomicTask::new(), + })) + } + + pub fn get(&self) -> ConnectionsGuard { + ConnectionsGuard::new(self.0.clone()) + } + + pub fn check(&self) -> bool { + self.0.check() + } + + pub fn total(&self) -> usize { + self.0.count.get() + } +} + +pub(crate) struct ConnectionsGuard(Rc); + +impl ConnectionsGuard { + fn new(inner: Rc) -> Self { + inner.inc(); + ConnectionsGuard(inner) + } +} + +impl Drop for ConnectionsGuard { + fn drop(&mut self) { + self.0.dec(); + } +} + +impl ConnectionsInner { + fn inc(&self) { + let num = self.count.get() + 1; + self.count.set(num); + if num == self.maxconn { + self.task.register(); + } + } + + fn dec(&self) { + let num = self.count.get(); + self.count.set(num - 1); + if num == self.maxconn { + self.task.notify(); + } + } + + fn check(&self) -> bool { + self.count.get() < self.maxconn + } +}