From 7f0eddd794a9d8efac2ea5fa42034993961e8c59 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 4 Feb 2021 07:01:51 -0800 Subject: [PATCH] add blocking thread customize (#265) --- actix-server/CHANGES.md | 2 ++ actix-server/Cargo.toml | 2 +- actix-server/src/builder.rs | 29 +++++++++++++++++---- actix-server/src/worker.rs | 50 ++++++++++++++++++++++++++++++++----- 4 files changed, 71 insertions(+), 12 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index d99287ef..be59f125 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -3,9 +3,11 @@ ## Unreleased - 2021-xx-xx * Hidden `ServerBuilder::start` method has been removed. Use `ServerBuilder::run`. [#246] * Add retry for EINTR(`io::Interrupted`) in `Accept`'s poll loop. [#264] +* Add `ServerBuilder::worker_max_blocking_threads` for customize blocking thread pool. [#265] [#246]: https://github.com/actix/actix-net/pull/246 [#264]: https://github.com/actix/actix-net/pull/264 +[#265]: https://github.com/actix/actix-net/pull/265 ## 2.0.0-beta.2 - 2021-01-03 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 845dc03e..db9d4d8b 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -36,7 +36,7 @@ slab = "0.4" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "2.0.0" bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 7290f9dd..78a1323d 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -19,7 +19,7 @@ use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle}; +use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; use crate::{join_all, Token}; /// Server builder @@ -32,11 +32,11 @@ pub struct ServerBuilder { sockets: Vec<(Token, String, MioListener)>, accept: AcceptLoop, exit: bool, - shutdown_timeout: Duration, no_signals: bool, cmd: UnboundedReceiver, server: Server, notify: Vec>, + worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -60,11 +60,11 @@ impl ServerBuilder { accept: AcceptLoop::new(server.clone()), backlog: 2048, exit: false, - shutdown_timeout: Duration::from_secs(30), no_signals: false, cmd: rx, notify: Vec::new(), server, + worker_config: ServerWorkerConfig::default(), } } @@ -78,6 +78,24 @@ impl ServerBuilder { self } + /// Set max number of threads for each worker's blocking task thread pool. + /// + /// One thread pool is set up **per worker**; not shared across workers. + /// + /// # Examples: + /// ``` + /// # use actix_server::ServerBuilder; + /// let builder = ServerBuilder::new() + /// .workers(4) // server has 4 worker thread. + /// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads. + /// ``` + /// + /// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference. + pub fn worker_max_blocking_threads(mut self, num: usize) -> Self { + self.worker_config.max_blocking_threads(num); + self + } + /// Set the maximum number of pending connections. /// /// This refers to the number of clients that can be waiting to be served. @@ -124,7 +142,8 @@ impl ServerBuilder { /// /// By default shutdown timeout sets to 30 seconds. pub fn shutdown_timeout(mut self, sec: u64) -> Self { - self.shutdown_timeout = Duration::from_secs(sec); + self.worker_config + .shutdown_timeout(Duration::from_secs(sec)); self } @@ -297,7 +316,7 @@ impl ServerBuilder { let avail = WorkerAvailability::new(waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, avail, self.shutdown_timeout) + ServerWorker::start(idx, services, avail, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 25a0429c..defc7306 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -133,7 +133,7 @@ pub(crate) struct ServerWorker { conns: Counter, factories: Vec>, state: WorkerState, - shutdown_timeout: Duration, + config: ServerWorkerConfig, } struct WorkerService { @@ -159,26 +159,62 @@ enum WorkerServiceStatus { Stopped, } +/// Config for worker behavior passed down from server builder. +#[derive(Copy, Clone)] +pub(crate) struct ServerWorkerConfig { + shutdown_timeout: Duration, + max_blocking_threads: usize, +} + +impl Default for ServerWorkerConfig { + fn default() -> Self { + // 512 is the default max blocking thread count of tokio runtime. + let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); + Self { + shutdown_timeout: Duration::from_secs(30), + max_blocking_threads, + } + } +} + +impl ServerWorkerConfig { + pub(crate) fn max_blocking_threads(&mut self, num: usize) { + self.max_blocking_threads = num; + } + + pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { + self.shutdown_timeout = dur; + } +} + impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, availability: WorkerAvailability, - shutdown_timeout: Duration, + config: ServerWorkerConfig, ) -> WorkerHandle { let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); // every worker runs in it's own arbiter. - Arbiter::new().spawn(Box::pin(async move { + // use a custom tokio runtime builder to change the settings of runtime. + Arbiter::with_tokio_rt(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + }) + .spawn(async move { availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, availability, factories, - shutdown_timeout, + config, services: Vec::new(), conns: conns.clone(), state: WorkerState::Unavailable, @@ -198,6 +234,8 @@ impl ServerWorker { }) .collect::>(); + // a second spawn to make sure worker future runs as non boxed future. + // As Arbiter::spawn would box the future before send it to arbiter. spawn(async move { let res: Result, _> = join_all(fut).await.into_iter().collect(); match res { @@ -220,7 +258,7 @@ impl ServerWorker { } wrk.await }); - })); + }); WorkerHandle::new(idx, tx1, tx2, avail) } @@ -324,7 +362,7 @@ impl Future for ServerWorker { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))), - Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)), + Box::pin(sleep_until(Instant::now() + self.config.shutdown_timeout)), Some(result), ); } else {