2021-01-26 09:46:14 +00:00
|
|
|
use std::{
|
|
|
|
cell::RefCell,
|
2021-01-29 04:08:14 +00:00
|
|
|
collections::HashMap,
|
|
|
|
future::Future,
|
2021-01-26 09:46:14 +00:00
|
|
|
io,
|
2021-01-29 04:08:14 +00:00
|
|
|
pin::Pin,
|
2021-01-26 09:46:14 +00:00
|
|
|
sync::atomic::{AtomicUsize, Ordering},
|
2021-01-29 04:08:14 +00:00
|
|
|
task::{Context, Poll},
|
2021-01-26 09:46:14 +00:00
|
|
|
};
|
2018-12-09 19:55:40 -08:00
|
|
|
|
2021-01-29 04:08:14 +00:00
|
|
|
use futures_core::ready;
|
|
|
|
use tokio::sync::{mpsc, oneshot};
|
2018-12-09 19:55:40 -08:00
|
|
|
|
2021-10-21 18:04:51 +08:00
|
|
|
use crate::{arbiter::ArbiterHandle, Arbiter};
|
2018-12-09 19:55:40 -08:00
|
|
|
|
2019-03-13 08:41:26 +01:00
|
|
|
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
2019-03-11 22:51:17 -07:00
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
thread_local!(
|
2024-02-19 11:36:15 +00:00
|
|
|
static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
|
2021-01-31 03:34:07 +00:00
|
|
|
);
|
|
|
|
|
|
|
|
/// A manager for a per-thread distributed async runtime.
|
2018-12-09 19:55:40 -08:00
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct System {
|
2019-03-11 22:51:17 -07:00
|
|
|
id: usize,
|
2021-01-29 15:16:30 +00:00
|
|
|
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
2018-12-09 19:55:40 -08:00
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Handle to the first [Arbiter] that is created with the System.
|
|
|
|
arbiter_handle: ArbiterHandle,
|
|
|
|
}
|
2018-12-09 19:55:40 -08:00
|
|
|
|
2021-10-21 18:04:51 +08:00
|
|
|
#[cfg(not(feature = "io-uring"))]
|
2018-12-09 19:55:40 -08:00
|
|
|
impl System {
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Create a new system.
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
/// Panics if underlying Tokio runtime can not be created.
|
|
|
|
#[allow(clippy::new_ret_no_self)]
|
|
|
|
pub fn new() -> SystemRunner {
|
2021-02-03 10:25:31 +00:00
|
|
|
Self::with_tokio_rt(|| {
|
2021-10-21 18:04:51 +08:00
|
|
|
crate::runtime::default_tokio_runtime()
|
2021-02-03 10:25:31 +00:00
|
|
|
.expect("Default Actix (Tokio) runtime could not be created.")
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
|
|
|
///
|
|
|
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
|
|
|
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
|
|
|
|
where
|
2024-06-09 07:12:24 +01:00
|
|
|
F: FnOnce() -> tokio::runtime::Runtime,
|
2021-02-03 10:25:31 +00:00
|
|
|
{
|
2021-01-31 03:34:07 +00:00
|
|
|
let (stop_tx, stop_rx) = oneshot::channel();
|
|
|
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
|
|
|
|
2021-10-21 18:04:51 +08:00
|
|
|
let rt = crate::runtime::Runtime::from(runtime_factory());
|
2021-10-11 09:58:11 +08:00
|
|
|
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
|
2021-01-31 04:41:28 +00:00
|
|
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
|
|
|
|
|
|
|
system
|
|
|
|
.tx()
|
|
|
|
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
|
|
|
.unwrap();
|
2021-01-31 03:34:07 +00:00
|
|
|
|
|
|
|
// init background system arbiter
|
|
|
|
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
|
|
|
rt.spawn(sys_ctrl);
|
|
|
|
|
2021-11-21 23:29:25 +00:00
|
|
|
SystemRunner { rt, stop_rx }
|
2021-01-31 03:34:07 +00:00
|
|
|
}
|
2021-10-21 18:04:51 +08:00
|
|
|
}
|
2021-01-31 03:34:07 +00:00
|
|
|
|
2021-10-21 18:04:51 +08:00
|
|
|
#[cfg(feature = "io-uring")]
|
|
|
|
impl System {
|
|
|
|
/// Create a new system.
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
/// Panics if underlying Tokio runtime can not be created.
|
|
|
|
#[allow(clippy::new_ret_no_self)]
|
|
|
|
pub fn new() -> SystemRunner {
|
|
|
|
SystemRunner
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
|
|
|
///
|
|
|
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
|
|
|
#[doc(hidden)]
|
|
|
|
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
|
|
|
|
where
|
2024-06-09 07:12:24 +01:00
|
|
|
F: FnOnce() -> tokio::runtime::Runtime,
|
2021-10-21 18:04:51 +08:00
|
|
|
{
|
2021-11-21 23:29:25 +00:00
|
|
|
unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
|
2021-10-21 18:04:51 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl System {
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Constructs new system and registers it on the current thread.
|
2021-01-29 15:16:30 +00:00
|
|
|
pub(crate) fn construct(
|
|
|
|
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
2021-01-31 03:34:07 +00:00
|
|
|
arbiter_handle: ArbiterHandle,
|
2021-01-29 15:16:30 +00:00
|
|
|
) -> Self {
|
2018-12-09 19:55:40 -08:00
|
|
|
let sys = System {
|
2021-01-29 15:16:30 +00:00
|
|
|
sys_tx,
|
2021-01-31 03:34:07 +00:00
|
|
|
arbiter_handle,
|
2019-03-11 22:51:17 -07:00
|
|
|
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
|
2018-12-09 19:55:40 -08:00
|
|
|
};
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
System::set_current(sys.clone());
|
2018-12-09 19:55:40 -08:00
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
sys
|
2018-12-09 19:55:40 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Get current running system.
|
2021-01-29 15:16:30 +00:00
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
/// Panics if no system is registered on the current thread.
|
2018-12-09 19:55:40 -08:00
|
|
|
pub fn current() -> System {
|
|
|
|
CURRENT.with(|cell| match *cell.borrow() {
|
|
|
|
Some(ref sys) => sys.clone(),
|
|
|
|
None => panic!("System is not running"),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-02-06 22:45:03 +00:00
|
|
|
/// Try to get current running system.
|
|
|
|
///
|
|
|
|
/// Returns `None` if no System has been started.
|
|
|
|
///
|
2021-11-04 20:30:43 +00:00
|
|
|
/// Unlike [`current`](Self::current), this never panics.
|
2021-02-06 22:45:03 +00:00
|
|
|
pub fn try_current() -> Option<System> {
|
|
|
|
CURRENT.with(|cell| cell.borrow().clone())
|
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Get handle to a the System's initial [Arbiter].
|
|
|
|
pub fn arbiter(&self) -> &ArbiterHandle {
|
|
|
|
&self.arbiter_handle
|
2018-12-09 19:55:40 -08:00
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Check if there is a System registered on the current thread.
|
|
|
|
pub fn is_registered() -> bool {
|
|
|
|
CURRENT.with(|sys| sys.borrow().is_some())
|
2018-12-09 19:55:40 -08:00
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Register given system on current thread.
|
|
|
|
#[doc(hidden)]
|
|
|
|
pub fn set_current(sys: System) {
|
|
|
|
CURRENT.with(|cell| {
|
|
|
|
*cell.borrow_mut() = Some(sys);
|
2018-12-09 19:55:40 -08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Numeric system identifier.
|
|
|
|
///
|
|
|
|
/// Useful when using multiple Systems.
|
2019-03-11 22:51:17 -07:00
|
|
|
pub fn id(&self) -> usize {
|
|
|
|
self.id
|
|
|
|
}
|
|
|
|
|
2021-01-26 09:46:14 +00:00
|
|
|
/// Stop the system (with code 0).
|
2018-12-09 19:55:40 -08:00
|
|
|
pub fn stop(&self) {
|
|
|
|
self.stop_with_code(0)
|
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Stop the system with a given exit code.
|
2018-12-09 19:55:40 -08:00
|
|
|
pub fn stop_with_code(&self, code: i32) {
|
2021-01-29 15:16:30 +00:00
|
|
|
let _ = self.sys_tx.send(SystemCommand::Exit(code));
|
2018-12-09 19:55:40 -08:00
|
|
|
}
|
|
|
|
|
2021-01-29 04:08:14 +00:00
|
|
|
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
|
2021-01-29 15:16:30 +00:00
|
|
|
&self.sys_tx
|
2018-12-09 19:55:40 -08:00
|
|
|
}
|
2021-01-31 03:34:07 +00:00
|
|
|
}
|
2018-12-09 19:55:40 -08:00
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
2021-11-15 18:49:02 +00:00
|
|
|
#[cfg(not(feature = "io-uring"))]
|
2021-01-31 03:34:07 +00:00
|
|
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct SystemRunner {
|
2021-10-21 18:04:51 +08:00
|
|
|
rt: crate::runtime::Runtime,
|
2021-01-31 03:34:07 +00:00
|
|
|
stop_rx: oneshot::Receiver<i32>,
|
|
|
|
}
|
|
|
|
|
2021-10-21 18:04:51 +08:00
|
|
|
#[cfg(not(feature = "io-uring"))]
|
2021-01-31 03:34:07 +00:00
|
|
|
impl SystemRunner {
|
|
|
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
|
|
|
pub fn run(self) -> io::Result<()> {
|
2021-11-15 18:49:02 +00:00
|
|
|
let exit_code = self.run_with_code()?;
|
|
|
|
|
|
|
|
match exit_code {
|
|
|
|
0 => Ok(()),
|
|
|
|
nonzero => Err(io::Error::new(
|
|
|
|
io::ErrorKind::Other,
|
|
|
|
format!("Non-zero exit code: {}", nonzero),
|
|
|
|
)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
|
|
|
|
pub fn run_with_code(self) -> io::Result<i32> {
|
2021-01-31 03:34:07 +00:00
|
|
|
let SystemRunner { rt, stop_rx, .. } = self;
|
|
|
|
|
|
|
|
// run loop
|
2021-11-15 18:49:02 +00:00
|
|
|
rt.block_on(stop_rx)
|
|
|
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
2018-12-09 19:55:40 -08:00
|
|
|
}
|
|
|
|
|
2023-10-29 18:45:12 +00:00
|
|
|
/// Retrieves a reference to the underlying [Actix runtime](crate::Runtime) associated with this
|
|
|
|
/// `SystemRunner` instance.
|
2023-08-26 19:04:08 +04:00
|
|
|
///
|
2023-10-29 18:45:12 +00:00
|
|
|
/// The Actix runtime is responsible for managing the event loop for an Actix system and
|
|
|
|
/// executing asynchronous tasks. This method provides access to the runtime, allowing direct
|
|
|
|
/// interaction with its features.
|
2023-08-26 19:04:08 +04:00
|
|
|
///
|
|
|
|
/// In a typical use case, you might need to share the same runtime between different
|
2023-10-29 18:45:12 +00:00
|
|
|
/// parts of your project. For example, some components might require a [`Runtime`] to spawn
|
|
|
|
/// tasks on the same runtime.
|
2023-08-26 19:04:08 +04:00
|
|
|
///
|
2023-10-29 18:45:12 +00:00
|
|
|
/// Read more in the documentation for [`Runtime`].
|
|
|
|
///
|
|
|
|
/// # Examples
|
2023-08-26 19:04:08 +04:00
|
|
|
///
|
|
|
|
/// ```
|
|
|
|
/// let system_runner = actix_rt::System::new();
|
|
|
|
/// let actix_runtime = system_runner.runtime();
|
|
|
|
///
|
|
|
|
/// // Use the runtime to spawn an async task or perform other operations
|
|
|
|
/// ```
|
|
|
|
///
|
|
|
|
/// # Note
|
|
|
|
///
|
2023-10-29 18:45:12 +00:00
|
|
|
/// While this method provides an immutable reference to the Actix runtime, which is safe to
|
|
|
|
/// share across threads, be aware that spawning blocking tasks on the Actix runtime could
|
|
|
|
/// potentially impact system performance. This is because the Actix runtime is responsible for
|
|
|
|
/// driving the system, and blocking tasks could delay other tasks in the run loop.
|
|
|
|
///
|
|
|
|
/// [`Runtime`]: crate::Runtime
|
2023-08-26 19:04:08 +04:00
|
|
|
pub fn runtime(&self) -> &crate::runtime::Runtime {
|
|
|
|
&self.rt
|
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// Runs the provided future, blocking the current thread until the future completes.
|
2022-04-25 21:05:48 +01:00
|
|
|
#[track_caller]
|
2021-01-31 03:34:07 +00:00
|
|
|
#[inline]
|
|
|
|
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
|
|
|
self.rt.block_on(fut)
|
2018-12-09 19:55:40 -08:00
|
|
|
}
|
|
|
|
}
|
2021-01-29 04:08:14 +00:00
|
|
|
|
2021-10-21 18:04:51 +08:00
|
|
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
2021-11-15 18:49:02 +00:00
|
|
|
#[cfg(feature = "io-uring")]
|
2021-10-21 18:04:51 +08:00
|
|
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct SystemRunner;
|
|
|
|
|
|
|
|
#[cfg(feature = "io-uring")]
|
|
|
|
impl SystemRunner {
|
|
|
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
|
|
|
pub fn run(self) -> io::Result<()> {
|
2021-11-15 18:49:02 +00:00
|
|
|
unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
|
|
|
|
pub fn run_with_code(self) -> io::Result<i32> {
|
2023-07-17 03:05:39 +01:00
|
|
|
unimplemented!("SystemRunner::run_with_code is not implemented for io-uring feature yet");
|
2021-10-21 18:04:51 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Runs the provided future, blocking the current thread until the future completes.
|
|
|
|
#[inline]
|
|
|
|
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
|
|
|
tokio_uring::start(async move {
|
|
|
|
let (stop_tx, stop_rx) = oneshot::channel();
|
|
|
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
|
|
|
|
|
|
|
let sys_arbiter = Arbiter::in_new_system();
|
|
|
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
|
|
|
|
|
|
|
system
|
|
|
|
.tx()
|
|
|
|
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// init background system arbiter
|
|
|
|
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
|
|
|
tokio_uring::spawn(sys_ctrl);
|
|
|
|
|
|
|
|
let res = fut.await;
|
|
|
|
drop(stop_rx);
|
|
|
|
res
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-29 04:08:14 +00:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub(crate) enum SystemCommand {
|
|
|
|
Exit(i32),
|
2021-01-31 03:34:07 +00:00
|
|
|
RegisterArbiter(usize, ArbiterHandle),
|
2021-01-29 04:08:14 +00:00
|
|
|
DeregisterArbiter(usize),
|
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
/// There is one `SystemController` per [System]. It runs in the background, keeping track of
|
|
|
|
/// [Arbiter]s and is able to distribute a system-wide stop command.
|
2021-01-29 04:08:14 +00:00
|
|
|
#[derive(Debug)]
|
2021-01-31 03:34:07 +00:00
|
|
|
pub(crate) struct SystemController {
|
|
|
|
stop_tx: Option<oneshot::Sender<i32>>,
|
|
|
|
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
|
|
|
arbiters: HashMap<usize, ArbiterHandle>,
|
2021-01-29 04:08:14 +00:00
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
impl SystemController {
|
2021-01-29 04:08:14 +00:00
|
|
|
pub(crate) fn new(
|
2021-01-31 03:34:07 +00:00
|
|
|
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
|
|
|
stop_tx: oneshot::Sender<i32>,
|
2021-01-29 04:08:14 +00:00
|
|
|
) -> Self {
|
2021-01-31 03:34:07 +00:00
|
|
|
SystemController {
|
|
|
|
cmd_rx,
|
|
|
|
stop_tx: Some(stop_tx),
|
|
|
|
arbiters: HashMap::with_capacity(4),
|
2021-01-29 04:08:14 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-31 03:34:07 +00:00
|
|
|
impl Future for SystemController {
|
2021-01-29 04:08:14 +00:00
|
|
|
type Output = ();
|
|
|
|
|
|
|
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
|
// process all items currently buffered in channel
|
|
|
|
loop {
|
2023-01-02 14:36:46 +01:00
|
|
|
match ready!(self.cmd_rx.poll_recv(cx)) {
|
2021-01-29 04:08:14 +00:00
|
|
|
// channel closed; no more messages can be received
|
|
|
|
None => return Poll::Ready(()),
|
|
|
|
|
|
|
|
// process system command
|
|
|
|
Some(cmd) => match cmd {
|
|
|
|
SystemCommand::Exit(code) => {
|
2021-01-31 03:34:07 +00:00
|
|
|
// stop all arbiters
|
2021-01-31 04:41:28 +00:00
|
|
|
for arb in self.arbiters.values() {
|
|
|
|
arb.stop();
|
2021-01-29 04:08:14 +00:00
|
|
|
}
|
2021-01-29 15:16:30 +00:00
|
|
|
|
2021-01-29 04:08:14 +00:00
|
|
|
// stop event loop
|
2021-01-31 03:34:07 +00:00
|
|
|
// will only fire once
|
|
|
|
if let Some(stop_tx) = self.stop_tx.take() {
|
|
|
|
let _ = stop_tx.send(code);
|
2021-01-29 04:08:14 +00:00
|
|
|
}
|
|
|
|
}
|
2021-01-29 15:16:30 +00:00
|
|
|
|
2021-01-31 04:41:28 +00:00
|
|
|
SystemCommand::RegisterArbiter(id, arb) => {
|
|
|
|
self.arbiters.insert(id, arb);
|
2021-01-29 04:08:14 +00:00
|
|
|
}
|
2021-01-29 15:16:30 +00:00
|
|
|
|
2021-01-31 04:41:28 +00:00
|
|
|
SystemCommand::DeregisterArbiter(id) => {
|
|
|
|
self.arbiters.remove(&id);
|
2021-01-29 04:08:14 +00:00
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|