From 943ddcd37e581242abd1b0eaeb9be4208ea65f8b Mon Sep 17 00:00:00 2001 From: Emile Fugulin Date: Thu, 9 May 2024 14:22:17 -0400 Subject: [PATCH] Add a builder for arbiter --- actix-rt/CHANGES.md | 2 + actix-rt/src/arbiter.rs | 144 ++++++++++++++++++++++++++++++++-------- actix-rt/src/lib.rs | 2 +- actix-rt/tests/tests.rs | 22 ++++++ 4 files changed, 141 insertions(+), 29 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 18466fa6..4aadc419 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased +- Add `actix_rt::ArbiterBuilder` to allow user to configure the thread spawned for the arbiter. + ## 2.10.0 - Relax `F`'s bound (`Fn => FnOnce`) on `{Arbiter, System}::with_tokio_rt()` functions. diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 1da76c52..574fd6f1 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -80,42 +80,79 @@ impl ArbiterHandle { } } -/// An Arbiter represents a thread that provides an asynchronous execution environment for futures -/// and functions. -/// -/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. -#[derive(Debug)] -pub struct Arbiter { - tx: mpsc::UnboundedSender, - thread_handle: thread::JoinHandle<()>, +/// A builder for configuring and spawning a new [Arbiter] thread. +pub struct ArbiterBuilder { + name_factory: Option String + 'static>>, + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + runtime_factory: Option tokio::runtime::Runtime + Send + 'static>>, } -impl Arbiter { +impl ArbiterBuilder { + /// Create a new [ArbiterBuilder]. + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + name_factory: None, + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + runtime_factory: None, + } + } + + /// Specify a factory function for generating the name of the Arbiter thread. + /// + /// Defaults to `actix-rt|system:|arbiter:` + /// + /// # Example + /// + /// ```no_run + /// let _ = actix_rt::System::new(); + /// actix_rt::ArbiterBuilder::new() + /// .name(|system_id, arb_id| { + /// format!("some-prefix|system:{}|arbiter:{}", system_id, arb_id) + /// }) + /// .build(); + /// ``` + pub fn name(mut self, name_factory: N) -> Self + where + N: Fn(usize, usize) -> String + 'static, + { + self.name_factory = Some(Box::new(name_factory)); + self + } + + /// Specify a factory function for generating the [Tokio Runtime](tokio-runtime) used by the Arbiter. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + pub fn runtime(mut self, runtime_factory: R) -> Self + where + R: FnOnce() -> tokio::runtime::Runtime + Send + 'static, + { + self.runtime_factory = Some(Box::new(runtime_factory)); + self + } + /// Spawn a new Arbiter thread and start its event loop. /// /// # Panics /// Panics if a [System] is not registered on the current thread. #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - #[allow(clippy::new_without_default)] - pub fn new() -> Arbiter { - Self::with_tokio_rt(|| { - crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") - }) - } - - /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. - /// - /// [tokio-runtime]: tokio::runtime::Runtime - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - pub fn with_tokio_rt(runtime_factory: F) -> Arbiter - where - F: FnOnce() -> tokio::runtime::Runtime + Send + 'static, - { + pub fn build(self) -> Arbiter { let sys = System::current(); let system_id = sys.id(); let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); + let name = self.name_factory.unwrap_or_else(|| { + Box::new(|system_id, arb_id| { + format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id) + }) + })(system_id, arb_id); + let runtime_factory = self.runtime_factory.unwrap_or_else(|| { + Box::new(|| { + crate::runtime::default_tokio_runtime() + .expect("Cannot create new Arbiter's Runtime.") + }) + }); let (tx, rx) = mpsc::unbounded_channel(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); @@ -160,13 +197,16 @@ impl Arbiter { /// # Panics /// Panics if a [System] is not registered on the current thread. #[cfg(all(target_os = "linux", feature = "io-uring"))] - #[allow(clippy::new_without_default)] - pub fn new() -> Arbiter { + pub fn build(self) -> Arbiter { let sys = System::current(); let system_id = sys.id(); let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); + let name = self.name_factory.unwrap_or_else(|| { + Box::new(|system_id, arb_id| { + format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id) + }) + })(system_id, arb_id); let (tx, rx) = mpsc::unbounded_channel(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); @@ -204,6 +244,54 @@ impl Arbiter { Arbiter { tx, thread_handle } } +} + +/// An Arbiter represents a thread that provides an asynchronous execution environment for futures +/// and functions. +/// +/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. +#[derive(Debug)] +pub struct Arbiter { + tx: mpsc::UnboundedSender, + thread_handle: thread::JoinHandle<()>, +} + +impl Arbiter { + /// Create an [ArbiterBuilder] to configure and spawn a new Arbiter thread. + pub fn builder() -> ArbiterBuilder { + ArbiterBuilder::new() + } + + /// Spawn a new Arbiter thread and start its event loop. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + ArbiterBuilder::new().build() + } + + /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + pub fn with_tokio_rt(runtime_factory: F) -> Arbiter + where + F: FnOnce() -> tokio::runtime::Runtime + Send + 'static, + { + ArbiterBuilder::new().runtime(runtime_factory).build() + } + + /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[cfg(all(target_os = "linux", feature = "io-uring"))] + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + ArbiterBuilder::new().build() + } /// Sets up an Arbiter runner in a new System using the environment's local set. pub(crate) fn in_new_system() -> ArbiterHandle { diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index fc2a56ba..987d4ca7 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -67,7 +67,7 @@ pub use tokio::pin; use tokio::task::JoinHandle; pub use self::{ - arbiter::{Arbiter, ArbiterHandle}, + arbiter::{Arbiter, ArbiterBuilder, ArbiterHandle}, runtime::Runtime, system::{System, SystemRunner}, }; diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 330e27ff..bd6e1a1e 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -301,6 +301,28 @@ fn new_arbiter_with_tokio() { assert!(!counter.load(Ordering::SeqCst)); } +#[test] +fn arbiter_builder_name() { + let _ = System::new(); + + let arbiter = Arbiter::builder() + .name(|_, _| "test_thread".to_string()) + .build(); + + let (tx, rx) = std::sync::mpsc::channel(); + arbiter.spawn(async move { + let current_thread = std::thread::current(); + let thread_name = current_thread.name().unwrap().to_string(); + tx.send(thread_name).unwrap(); + }); + + let name = rx.recv().unwrap(); + assert_eq!(name, "test_thread"); + + arbiter.stop(); + arbiter.join().unwrap(); +} + #[test] #[should_panic] fn no_system_current_panic() {