mirror of
https://github.com/fafhrd91/actix-net
synced 2025-02-20 07:40:33 +01:00
Add a builder for arbiter
This commit is contained in:
parent
a4b6943ddc
commit
943ddcd37e
@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
- Add `actix_rt::ArbiterBuilder` to allow user to configure the thread spawned for the arbiter.
|
||||||
|
|
||||||
## 2.10.0
|
## 2.10.0
|
||||||
|
|
||||||
- Relax `F`'s bound (`Fn => FnOnce`) on `{Arbiter, System}::with_tokio_rt()` functions.
|
- Relax `F`'s bound (`Fn => FnOnce`) on `{Arbiter, System}::with_tokio_rt()` functions.
|
||||||
|
@ -80,42 +80,79 @@ impl ArbiterHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
|
/// A builder for configuring and spawning a new [Arbiter] thread.
|
||||||
/// and functions.
|
pub struct ArbiterBuilder {
|
||||||
///
|
name_factory: Option<Box<dyn Fn(usize, usize) -> String + 'static>>,
|
||||||
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
#[derive(Debug)]
|
runtime_factory: Option<Box<dyn FnOnce() -> tokio::runtime::Runtime + Send + 'static>>,
|
||||||
pub struct Arbiter {
|
|
||||||
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
|
||||||
thread_handle: thread::JoinHandle<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Arbiter {
|
impl ArbiterBuilder {
|
||||||
|
/// Create a new [ArbiterBuilder].
|
||||||
|
#[allow(clippy::new_without_default)]
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
name_factory: None,
|
||||||
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
|
runtime_factory: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Specify a factory function for generating the name of the Arbiter thread.
|
||||||
|
///
|
||||||
|
/// Defaults to `actix-rt|system:<system_id>|arbiter:<arb_id>`
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// let _ = actix_rt::System::new();
|
||||||
|
/// actix_rt::ArbiterBuilder::new()
|
||||||
|
/// .name(|system_id, arb_id| {
|
||||||
|
/// format!("some-prefix|system:{}|arbiter:{}", system_id, arb_id)
|
||||||
|
/// })
|
||||||
|
/// .build();
|
||||||
|
/// ```
|
||||||
|
pub fn name<N>(mut self, name_factory: N) -> Self
|
||||||
|
where
|
||||||
|
N: Fn(usize, usize) -> String + 'static,
|
||||||
|
{
|
||||||
|
self.name_factory = Some(Box::new(name_factory));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Specify a factory function for generating the [Tokio Runtime](tokio-runtime) used by the Arbiter.
|
||||||
|
///
|
||||||
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
|
pub fn runtime<R>(mut self, runtime_factory: R) -> Self
|
||||||
|
where
|
||||||
|
R: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
|
||||||
|
{
|
||||||
|
self.runtime_factory = Some(Box::new(runtime_factory));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn a new Arbiter thread and start its event loop.
|
/// Spawn a new Arbiter thread and start its event loop.
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if a [System] is not registered on the current thread.
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
#[allow(clippy::new_without_default)]
|
pub fn build(self) -> Arbiter {
|
||||||
pub fn new() -> Arbiter {
|
|
||||||
Self::with_tokio_rt(|| {
|
|
||||||
crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
|
||||||
///
|
|
||||||
/// [tokio-runtime]: tokio::runtime::Runtime
|
|
||||||
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
|
||||||
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
|
|
||||||
where
|
|
||||||
F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
|
|
||||||
{
|
|
||||||
let sys = System::current();
|
let sys = System::current();
|
||||||
let system_id = sys.id();
|
let system_id = sys.id();
|
||||||
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
|
let name = self.name_factory.unwrap_or_else(|| {
|
||||||
|
Box::new(|system_id, arb_id| {
|
||||||
|
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
|
||||||
|
})
|
||||||
|
})(system_id, arb_id);
|
||||||
|
let runtime_factory = self.runtime_factory.unwrap_or_else(|| {
|
||||||
|
Box::new(|| {
|
||||||
|
crate::runtime::default_tokio_runtime()
|
||||||
|
.expect("Cannot create new Arbiter's Runtime.")
|
||||||
|
})
|
||||||
|
});
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
||||||
@ -160,13 +197,16 @@ impl Arbiter {
|
|||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if a [System] is not registered on the current thread.
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
#[allow(clippy::new_without_default)]
|
pub fn build(self) -> Arbiter {
|
||||||
pub fn new() -> Arbiter {
|
|
||||||
let sys = System::current();
|
let sys = System::current();
|
||||||
let system_id = sys.id();
|
let system_id = sys.id();
|
||||||
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
|
let name = self.name_factory.unwrap_or_else(|| {
|
||||||
|
Box::new(|system_id, arb_id| {
|
||||||
|
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
|
||||||
|
})
|
||||||
|
})(system_id, arb_id);
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
||||||
@ -204,6 +244,54 @@ impl Arbiter {
|
|||||||
|
|
||||||
Arbiter { tx, thread_handle }
|
Arbiter { tx, thread_handle }
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
|
||||||
|
/// and functions.
|
||||||
|
///
|
||||||
|
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Arbiter {
|
||||||
|
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||||
|
thread_handle: thread::JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Arbiter {
|
||||||
|
/// Create an [ArbiterBuilder] to configure and spawn a new Arbiter thread.
|
||||||
|
pub fn builder() -> ArbiterBuilder {
|
||||||
|
ArbiterBuilder::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a new Arbiter thread and start its event loop.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
|
#[allow(clippy::new_without_default)]
|
||||||
|
pub fn new() -> Arbiter {
|
||||||
|
ArbiterBuilder::new().build()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
||||||
|
///
|
||||||
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
|
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
|
||||||
|
where
|
||||||
|
F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
|
||||||
|
{
|
||||||
|
ArbiterBuilder::new().runtime(runtime_factory).build()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
|
#[allow(clippy::new_without_default)]
|
||||||
|
pub fn new() -> Arbiter {
|
||||||
|
ArbiterBuilder::new().build()
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets up an Arbiter runner in a new System using the environment's local set.
|
/// Sets up an Arbiter runner in a new System using the environment's local set.
|
||||||
pub(crate) fn in_new_system() -> ArbiterHandle {
|
pub(crate) fn in_new_system() -> ArbiterHandle {
|
||||||
|
@ -67,7 +67,7 @@ pub use tokio::pin;
|
|||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
pub use self::{
|
pub use self::{
|
||||||
arbiter::{Arbiter, ArbiterHandle},
|
arbiter::{Arbiter, ArbiterBuilder, ArbiterHandle},
|
||||||
runtime::Runtime,
|
runtime::Runtime,
|
||||||
system::{System, SystemRunner},
|
system::{System, SystemRunner},
|
||||||
};
|
};
|
||||||
|
@ -301,6 +301,28 @@ fn new_arbiter_with_tokio() {
|
|||||||
assert!(!counter.load(Ordering::SeqCst));
|
assert!(!counter.load(Ordering::SeqCst));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn arbiter_builder_name() {
|
||||||
|
let _ = System::new();
|
||||||
|
|
||||||
|
let arbiter = Arbiter::builder()
|
||||||
|
.name(|_, _| "test_thread".to_string())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let (tx, rx) = std::sync::mpsc::channel();
|
||||||
|
arbiter.spawn(async move {
|
||||||
|
let current_thread = std::thread::current();
|
||||||
|
let thread_name = current_thread.name().unwrap().to_string();
|
||||||
|
tx.send(thread_name).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
let name = rx.recv().unwrap();
|
||||||
|
assert_eq!(name, "test_thread");
|
||||||
|
|
||||||
|
arbiter.stop();
|
||||||
|
arbiter.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[should_panic]
|
#[should_panic]
|
||||||
fn no_system_current_panic() {
|
fn no_system_current_panic() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user