1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-07 10:14:22 +01:00
actix-net/actix-rt/src/system.rs

324 lines
9.5 KiB
Rust
Raw Normal View History

2021-01-26 09:46:14 +00:00
use std::{
cell::RefCell,
2021-01-29 04:08:14 +00:00
collections::HashMap,
future::Future,
2021-01-26 09:46:14 +00:00
io,
2021-01-29 04:08:14 +00:00
pin::Pin,
2021-01-26 09:46:14 +00:00
sync::atomic::{AtomicUsize, Ordering},
2021-01-29 04:08:14 +00:00
task::{Context, Poll},
2021-01-26 09:46:14 +00:00
};
2018-12-09 19:55:40 -08:00
2021-01-29 04:08:14 +00:00
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
2018-12-09 19:55:40 -08:00
use crate::{arbiter::ArbiterHandle, Arbiter};
2018-12-09 19:55:40 -08:00
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local!(
static CURRENT: RefCell<Option<System>> = RefCell::new(None);
);
/// A manager for a per-thread distributed async runtime.
2018-12-09 19:55:40 -08:00
#[derive(Clone, Debug)]
pub struct System {
id: usize,
2021-01-29 15:16:30 +00:00
sys_tx: mpsc::UnboundedSender<SystemCommand>,
2018-12-09 19:55:40 -08:00
/// Handle to the first [Arbiter] that is created with the System.
arbiter_handle: ArbiterHandle,
}
2018-12-09 19:55:40 -08:00
#[cfg(not(feature = "io-uring"))]
2018-12-09 19:55:40 -08:00
impl System {
/// Create a new system.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
2021-02-03 10:25:31 +00:00
Self::with_tokio_rt(|| {
crate::runtime::default_tokio_runtime()
2021-02-03 10:25:31 +00:00
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = crate::runtime::Runtime::from(runtime_factory());
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_ctrl);
2021-11-21 23:29:25 +00:00
SystemRunner { rt, stop_rx }
}
}
#[cfg(feature = "io-uring")]
impl System {
/// Create a new system.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
SystemRunner
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
2021-11-21 23:29:25 +00:00
unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
}
}
impl System {
/// Constructs new system and registers it on the current thread.
2021-01-29 15:16:30 +00:00
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
arbiter_handle: ArbiterHandle,
2021-01-29 15:16:30 +00:00
) -> Self {
2018-12-09 19:55:40 -08:00
let sys = System {
2021-01-29 15:16:30 +00:00
sys_tx,
arbiter_handle,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
2018-12-09 19:55:40 -08:00
};
System::set_current(sys.clone());
2018-12-09 19:55:40 -08:00
sys
2018-12-09 19:55:40 -08:00
}
/// Get current running system.
2021-01-29 15:16:30 +00:00
///
/// # Panics
/// Panics if no system is registered on the current thread.
2018-12-09 19:55:40 -08:00
pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => sys.clone(),
None => panic!("System is not running"),
})
}
2021-02-06 22:45:03 +00:00
/// Try to get current running system.
///
/// Returns `None` if no System has been started.
///
2021-11-04 20:30:43 +00:00
/// Unlike [`current`](Self::current), this never panics.
2021-02-06 22:45:03 +00:00
pub fn try_current() -> Option<System> {
CURRENT.with(|cell| cell.borrow().clone())
}
/// Get handle to a the System's initial [Arbiter].
pub fn arbiter(&self) -> &ArbiterHandle {
&self.arbiter_handle
2018-12-09 19:55:40 -08:00
}
/// Check if there is a System registered on the current thread.
pub fn is_registered() -> bool {
CURRENT.with(|sys| sys.borrow().is_some())
2018-12-09 19:55:40 -08:00
}
/// Register given system on current thread.
#[doc(hidden)]
pub fn set_current(sys: System) {
CURRENT.with(|cell| {
*cell.borrow_mut() = Some(sys);
2018-12-09 19:55:40 -08:00
})
}
/// Numeric system identifier.
///
/// Useful when using multiple Systems.
pub fn id(&self) -> usize {
self.id
}
2021-01-26 09:46:14 +00:00
/// Stop the system (with code 0).
2018-12-09 19:55:40 -08:00
pub fn stop(&self) {
self.stop_with_code(0)
}
/// Stop the system with a given exit code.
2018-12-09 19:55:40 -08:00
pub fn stop_with_code(&self, code: i32) {
2021-01-29 15:16:30 +00:00
let _ = self.sys_tx.send(SystemCommand::Exit(code));
2018-12-09 19:55:40 -08:00
}
2021-01-29 04:08:14 +00:00
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
2021-01-29 15:16:30 +00:00
&self.sys_tx
2018-12-09 19:55:40 -08:00
}
}
2018-12-09 19:55:40 -08:00
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(not(feature = "io-uring"))]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
}
#[cfg(not(feature = "io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
let exit_code = self.run_with_code()?;
match exit_code {
0 => Ok(()),
nonzero => Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", nonzero),
)),
}
}
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
pub fn run_with_code(self) -> io::Result<i32> {
let SystemRunner { rt, stop_rx, .. } = self;
// run loop
rt.block_on(stop_rx)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
2018-12-09 19:55:40 -08:00
}
/// Runs the provided future, blocking the current thread until the future completes.
#[track_caller]
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.rt.block_on(fut)
2018-12-09 19:55:40 -08:00
}
}
2021-01-29 04:08:14 +00:00
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(feature = "io-uring")]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner;
#[cfg(feature = "io-uring")]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
}
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
pub fn run_with_code(self) -> io::Result<i32> {
2023-07-17 03:05:39 +01:00
unimplemented!("SystemRunner::run_with_code is not implemented for io-uring feature yet");
}
/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
tokio_uring::start(async move {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let sys_arbiter = Arbiter::in_new_system();
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
tokio_uring::spawn(sys_ctrl);
let res = fut.await;
drop(stop_rx);
res
})
}
}
2021-01-29 04:08:14 +00:00
#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),
RegisterArbiter(usize, ArbiterHandle),
2021-01-29 04:08:14 +00:00
DeregisterArbiter(usize),
}
/// There is one `SystemController` per [System]. It runs in the background, keeping track of
/// [Arbiter]s and is able to distribute a system-wide stop command.
2021-01-29 04:08:14 +00:00
#[derive(Debug)]
pub(crate) struct SystemController {
stop_tx: Option<oneshot::Sender<i32>>,
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
arbiters: HashMap<usize, ArbiterHandle>,
2021-01-29 04:08:14 +00:00
}
impl SystemController {
2021-01-29 04:08:14 +00:00
pub(crate) fn new(
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
stop_tx: oneshot::Sender<i32>,
2021-01-29 04:08:14 +00:00
) -> Self {
SystemController {
cmd_rx,
stop_tx: Some(stop_tx),
arbiters: HashMap::with_capacity(4),
2021-01-29 04:08:14 +00:00
}
}
}
impl Future for SystemController {
2021-01-29 04:08:14 +00:00
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel
loop {
match ready!(self.cmd_rx.poll_recv(cx)) {
2021-01-29 04:08:14 +00:00
// channel closed; no more messages can be received
None => return Poll::Ready(()),
// process system command
Some(cmd) => match cmd {
SystemCommand::Exit(code) => {
// stop all arbiters
for arb in self.arbiters.values() {
arb.stop();
2021-01-29 04:08:14 +00:00
}
2021-01-29 15:16:30 +00:00
2021-01-29 04:08:14 +00:00
// stop event loop
// will only fire once
if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(code);
2021-01-29 04:08:14 +00:00
}
}
2021-01-29 15:16:30 +00:00
SystemCommand::RegisterArbiter(id, arb) => {
self.arbiters.insert(id, arb);
2021-01-29 04:08:14 +00:00
}
2021-01-29 15:16:30 +00:00
SystemCommand::DeregisterArbiter(id) => {
self.arbiters.remove(&id);
2021-01-29 04:08:14 +00:00
}
},
}
}
}
}