1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 03:42:59 +01:00

remove builder and introduce worker handle (#257)

This commit is contained in:
Rob Ede 2021-01-31 03:34:07 +00:00 committed by GitHub
parent 1b35ff8ee6
commit b75254403a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 545 additions and 585 deletions

View File

@ -7,7 +7,7 @@
use proc_macro::TokenStream; use proc_macro::TokenStream;
use quote::quote; use quote::quote;
/// Marks async function to be executed by actix system. /// Marks async function to be executed by Actix system.
/// ///
/// ## Usage /// ## Usage
/// ///
@ -26,7 +26,6 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
let vis = &input.vis; let vis = &input.vis;
let sig = &mut input.sig; let sig = &mut input.sig;
let body = &input.block; let body = &input.block;
let name = &sig.ident;
if sig.asyncness.is_none() { if sig.asyncness.is_none() {
return syn::Error::new_spanned(sig.fn_token, "only async fn is supported") return syn::Error::new_spanned(sig.fn_token, "only async fn is supported")
@ -39,14 +38,14 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
(quote! { (quote! {
#(#attrs)* #(#attrs)*
#vis #sig { #vis #sig {
actix_rt::System::new(stringify!(#name)) actix_rt::System::new()
.block_on(async move { #body }) .block_on(async move { #body })
} }
}) })
.into() .into()
} }
/// Marks async test function to be executed by actix runtime. /// Marks async test function to be executed by Actix system.
/// ///
/// ## Usage /// ## Usage
/// ///
@ -86,7 +85,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
quote! { quote! {
#(#attrs)* #(#attrs)*
#vis #sig { #vis #sig {
actix_rt::System::new("test") actix_rt::System::new()
.block_on(async { #body }) .block_on(async { #body })
} }
} }
@ -95,7 +94,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
#[test] #[test]
#(#attrs)* #(#attrs)*
#vis #sig { #vis #sig {
actix_rt::System::new("test") actix_rt::System::new()
.block_on(async { #body }) .block_on(async { #body })
} }
} }

View File

@ -1,20 +1,27 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Rename `Arbiter => Worker`. [#254]
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
* Return `JoinHandle` from `actix_rt::spawn`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253]
* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] * Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] * Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253]
* Remove `Worker::exec`. [#253] * Remove `Arbiter::exec`. [#253]
* Remove `System::arbiter`. [#256] * Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253]
* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] * `Arbiter::spawn` now accepts !Unpin futures. [#256]
* `Worker::spawn` now accepts !Unpin futures. [#256] * `System::new` no longer takes arguments. [#257]
* Remove `System::with_current`. [#257]
* Remove `Builder`. [#257]
* Add `System::with_init` as replacement for `Builder::run`. [#257]
* Rename `System::{is_set => is_registered}`. [#257]
* Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257].
* `System::arbiter` now returns a `&ArbiterHandle`. [#257]
* Rename `Arbiter::{current => handle}` and return a `ArbiterHandle` instead. [#257]
* `Arbiter::join` now takes self by value. [#257]
[#253]: https://github.com/actix/actix-net/pull/253 [#253]: https://github.com/actix/actix-net/pull/253
[#254]: https://github.com/actix/actix-net/pull/254 [#254]: https://github.com/actix/actix-net/pull/254
[#256]: https://github.com/actix/actix-net/pull/256 [#256]: https://github.com/actix/actix-net/pull/256
[#257]: https://github.com/actix/actix-net/pull/257
## 2.0.0-beta.2 - 2021-01-09 ## 2.0.0-beta.2 - 2021-01-09

299
actix-rt/src/arbiter.rs Normal file
View File

@ -0,0 +1,299 @@
use std::{
any::{Any, TypeId},
cell::RefCell,
collections::HashMap,
fmt,
future::Future,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
thread,
};
use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet};
use crate::{
runtime::Runtime,
system::{System, SystemCommand},
};
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local!(
static HANDLE: RefCell<Option<ArbiterHandle>> = RefCell::new(None);
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
pub(crate) enum ArbiterCommand {
Stop,
Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
}
impl fmt::Debug for ArbiterCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
}
}
}
/// A handle for sending spawn and stop messages to an [Arbiter].
#[derive(Debug, Clone)]
pub struct ArbiterHandle {
sender: mpsc::UnboundedSender<ArbiterCommand>,
}
impl ArbiterHandle {
pub(crate) fn new(sender: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
Self { sender }
}
/// Send a future to the [Arbiter]'s thread and spawn it.
///
/// If you require a result, include a response channel in the future.
///
/// Returns true if future was sent successfully and false if the [Arbiter] has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool
where
Fut: Future<Output = ()> + Send + 'static,
{
self.sender
.send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok()
}
/// Send a function to the [Arbiter]'s thread and execute it.
///
/// Any result from the function is discarded. If you require a result, include a response
/// channel in the function.
///
/// Returns true if function was sent successfully and false if the [Arbiter] has died.
pub fn spawn_fn<F>(&self, f: F) -> bool
where
F: FnOnce() + Send + 'static,
{
self.spawn(async { f() })
}
/// Instruct [Arbiter] to stop processing it's event loop.
///
/// Returns true if stop message was sent successfully and false if the [Arbiter] has
/// been dropped.
pub fn stop(&self) -> bool {
self.sender.send(ArbiterCommand::Stop).is_ok()
}
}
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)]
pub struct Arbiter {
sender: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>,
}
impl Arbiter {
/// Spawn new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let system_id = System::current().id();
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
let sys = System::current();
let (tx, rx) = mpsc::unbounded_channel();
let thread_handle = thread::Builder::new()
.name(name.clone())
.spawn({
let tx = tx.clone();
move || {
let rt = Runtime::new().expect("Can not create Runtime");
let hnd = ArbiterHandle::new(tx);
System::set_current(sys);
STORAGE.with(|cell| cell.borrow_mut().clear());
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(id, hnd));
// run arbiter event processing loop
rt.block_on(ArbiterRunner { rx });
// deregister arbiter
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(id));
}
})
.unwrap_or_else(|err| {
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
});
Arbiter {
sender: tx,
thread_handle,
}
}
/// Sets up an Arbiter runner on the current thread using the provided runtime local task set.
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel();
let hnd = ArbiterHandle::new(tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(ArbiterRunner { rx });
hnd
}
/// Return a handle to the Arbiter's message sender.
///
/// # Panics
/// Panics if no Arbiter is running on the current thread.
pub fn handle() -> ArbiterHandle {
HANDLE.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("Arbiter is not running."),
})
}
/// Stop Arbiter from continuing it's event loop.
///
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
pub fn stop(&self) -> bool {
self.sender.send(ArbiterCommand::Stop).is_ok()
}
/// Send a future to the Arbiter's thread and spawn it.
///
/// If you require a result, include a response channel in the future.
///
/// Returns true if future was sent successfully and false if the Arbiter has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool
where
Fut: Future<Output = ()> + Send + 'static,
{
self.sender
.send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok()
}
/// Send a function to the Arbiter's thread and execute it.
///
/// Any result from the function is discarded. If you require a result, include a response
/// channel in the function.
///
/// Returns true if function was sent successfully and false if the Arbiter has died.
pub fn spawn_fn<F>(&self, f: F) -> bool
where
F: FnOnce() + Send + 'static,
{
self.spawn(async { f() })
}
/// Wait for Arbiter's event loop to complete.
///
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
pub fn join(self) -> thread::Result<()> {
self.thread_handle.join()
}
/// Insert item into Arbiter's thread-local storage.
///
/// Overwrites any item of the same type previously inserted.
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}
/// Check if Arbiter's thread-local storage contains an item type.
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>()))
}
/// Call a function with a shared reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
let type_id = TypeId::of::<T>();
let item = st.get(&type_id).and_then(downcast_ref).unwrap();
f(item)
})
}
/// Call a function with a mutable reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
let type_id = TypeId::of::<T>();
let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap();
f(item)
})
}
}
/// A persistent future that processes [Arbiter] commands.
struct ArbiterRunner {
rx: mpsc::UnboundedReceiver<ArbiterCommand>,
}
impl Future for ArbiterRunner {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel
loop {
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
// channel closed; no more messages can be received
None => return Poll::Ready(()),
// process arbiter command
Some(item) => match item {
ArbiterCommand::Stop => {
return Poll::Ready(());
}
ArbiterCommand::Execute(task_fut) => {
tokio::task::spawn_local(task_fut);
}
},
}
}
}
}
fn downcast_ref<T: 'static>(boxed: &Box<dyn Any>) -> Option<&T> {
boxed.downcast_ref()
}
fn downcast_mut<T: 'static>(boxed: &mut Box<dyn Any>) -> Option<&mut T> {
boxed.downcast_mut()
}

View File

@ -1,114 +0,0 @@
use std::{borrow::Cow, future::Future, io};
use tokio::sync::{
mpsc::unbounded_channel,
oneshot::{channel, Receiver},
};
use crate::{
runtime::Runtime,
system::{System, SystemWorker},
worker::Worker,
};
/// System builder.
///
/// Either use `Builder::build` to create a system and start actors. Alternatively, use
/// `Builder::run` to start the Tokio runtime and run a function in its context.
pub struct Builder {
/// Name of the System. Defaults to "actix-rt" if unset.
name: Cow<'static, str>,
}
impl Builder {
pub(crate) fn new() -> Self {
Builder {
name: Cow::Borrowed("actix-rt"),
}
}
/// Sets the name of the System.
pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
self.name = name.into();
self
}
/// Create new System.
///
/// This method panics if it can not create Tokio runtime
pub fn build(self) -> SystemRunner {
self.create_runtime(|| {})
}
/// This function will start Tokio runtime and will finish once the `System::stop()` message
/// is called. Function `f` is called within Tokio runtime context.
pub fn run<F>(self, init_fn: F) -> io::Result<()>
where
F: FnOnce(),
{
self.create_runtime(init_fn).run()
}
fn create_runtime<F>(self, init_fn: F) -> SystemRunner
where
F: FnOnce(),
{
let (stop_tx, stop_rx) = channel();
let (sys_sender, sys_receiver) = unbounded_channel();
let rt = Runtime::new().unwrap();
let system = System::construct(sys_sender, Worker::new_system(rt.local()));
// init system worker
let sys_worker = SystemWorker::new(sys_receiver, stop_tx);
rt.spawn(sys_worker);
// run system init method
rt.block_on(async { init_fn() });
SystemRunner {
rt,
stop_rx,
system,
}
}
}
/// System runner object that keeps event loop alive and running until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
stop_rx: Receiver<i32>,
system: System,
}
impl SystemRunner {
/// Starts event loop and will finish once [`System::stop()`] is called.
pub fn run(self) -> io::Result<()> {
let SystemRunner { rt, stop_rx, .. } = self;
// run loop
match rt.block_on(stop_rx) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.rt.block_on(fut)
}
}

View File

@ -15,15 +15,13 @@ use tokio::task::JoinHandle;
#[cfg(all(feature = "macros", not(test)))] #[cfg(all(feature = "macros", not(test)))]
pub use actix_macros::{main, test}; pub use actix_macros::{main, test};
mod builder; mod arbiter;
mod runtime; mod runtime;
mod system; mod system;
mod worker;
pub use self::builder::{Builder, SystemRunner}; pub use self::arbiter::{Arbiter, ArbiterHandle};
pub use self::runtime::Runtime; pub use self::runtime::Runtime;
pub use self::system::System; pub use self::system::{System, SystemRunner};
pub use self::worker::Worker;
pub mod signal { pub mod signal {
//! Asynchronous signal handling (Tokio re-exports). //! Asynchronous signal handling (Tokio re-exports).
@ -61,7 +59,7 @@ pub mod task {
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
} }
/// Spawns a future on the current [Worker]. /// Spawns a future on the current thread.
/// ///
/// # Panics /// # Panics
/// Panics if Actix system is not running. /// Panics if Actix system is not running.

View File

@ -2,9 +2,10 @@ use std::{future::Future, io};
use tokio::task::{JoinHandle, LocalSet}; use tokio::task::{JoinHandle, LocalSet};
/// Single-threaded runtime provides a way to start reactor and runtime on the current thread. /// A single-threaded runtime based on Tokio's "current thread" runtime.
/// ///
/// See [crate root][crate] documentation for more details. /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures.
#[derive(Debug)] #[derive(Debug)]
pub struct Runtime { pub struct Runtime {
local: LocalSet, local: LocalSet,
@ -27,7 +28,7 @@ impl Runtime {
} }
/// Reference to local task set. /// Reference to local task set.
pub(crate) fn local(&self) -> &LocalSet { pub(crate) fn local_set(&self) -> &LocalSet {
&self.local &self.local
} }

View File

@ -1,5 +1,4 @@
use std::{ use std::{
borrow::Cow,
cell::RefCell, cell::RefCell,
collections::HashMap, collections::HashMap,
future::Future, future::Future,
@ -12,55 +11,62 @@ use std::{
use futures_core::ready; use futures_core::ready;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::{ use crate::{arbiter::ArbiterHandle, Arbiter, Runtime};
builder::{Builder, SystemRunner},
worker::Worker,
};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
/// System is a runtime manager.
#[derive(Clone, Debug)]
pub struct System {
id: usize,
sys_tx: mpsc::UnboundedSender<SystemCommand>,
// TODO: which worker is this exactly
worker: Worker,
}
thread_local!( thread_local!(
static CURRENT: RefCell<Option<System>> = RefCell::new(None); static CURRENT: RefCell<Option<System>> = RefCell::new(None);
); );
/// A manager for a per-thread distributed async runtime.
#[derive(Clone, Debug)]
pub struct System {
id: usize,
sys_tx: mpsc::UnboundedSender<SystemCommand>,
/// Handle to the first [Arbiter] that is created with the System.
arbiter_handle: ArbiterHandle,
}
impl System { impl System {
/// Constructs new system and sets it as current. /// Create a new system.
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
worker: Worker,
) -> Self {
let sys = System {
sys_tx,
worker,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
};
System::set_current(sys.clone());
sys
}
/// Build a new system with a customized Tokio runtime.
///
/// This allows to customize the runtime. See [`Builder`] for more information.
pub fn builder() -> Builder {
Builder::new()
}
/// Create new system.
/// ///
/// # Panics /// # Panics
/// Panics if underlying Tokio runtime can not be created. /// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new(name: impl Into<Cow<'static, str>>) -> SystemRunner { pub fn new() -> SystemRunner {
Self::builder().name(name).build() let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let system = System::construct(sys_tx, Arbiter::in_new_system(rt.local_set()));
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_ctrl);
SystemRunner {
rt,
stop_rx,
system,
}
}
/// Constructs new system and registers it on the current thread.
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
arbiter_handle: ArbiterHandle,
) -> Self {
let sys = System {
sys_tx,
arbiter_handle,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
};
System::set_current(sys.clone());
sys
} }
/// Get current running system. /// Get current running system.
@ -74,31 +80,27 @@ impl System {
}) })
} }
/// Check if current system has started. /// Get handle to a the System's initial [Arbiter].
pub fn is_set() -> bool { pub fn arbiter(&self) -> &ArbiterHandle {
CURRENT.with(|cell| cell.borrow().is_some()) &self.arbiter_handle
} }
/// Set current running system. /// Check if there is a System registered on the current thread.
pub fn is_registered() -> bool {
CURRENT.with(|sys| sys.borrow().is_some())
}
/// Register given system on current thread.
#[doc(hidden)] #[doc(hidden)]
pub fn set_current(sys: System) { pub fn set_current(sys: System) {
CURRENT.with(|s| { CURRENT.with(|cell| {
*s.borrow_mut() = Some(sys); *cell.borrow_mut() = Some(sys);
}) })
} }
/// Execute function with system reference. /// Numeric system identifier.
pub fn with_current<F, R>(f: F) -> R ///
where /// Useful when using multiple Systems.
F: FnOnce(&System) -> R,
{
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => f(sys),
None => panic!("System is not running"),
})
}
/// Numeric system ID.
pub fn id(&self) -> usize { pub fn id(&self) -> usize {
self.id self.id
} }
@ -108,7 +110,7 @@ impl System {
self.stop_with_code(0) self.stop_with_code(0)
} }
/// Stop the system with a particular exit code. /// Stop the system with a given exit code.
pub fn stop_with_code(&self, code: i32) { pub fn stop_with_code(&self, code: i32) {
let _ = self.sys_tx.send(SystemCommand::Exit(code)); let _ = self.sys_tx.send(SystemCommand::Exit(code));
} }
@ -116,80 +118,106 @@ impl System {
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> { pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
&self.sys_tx &self.sys_tx
} }
}
// TODO: give clarity on which worker this is; previous documented as returning "system worker" /// Runner that keeps a [System]'s event loop alive until stop message is received.
/// Get shared reference to a worker. #[must_use = "A SystemRunner does nothing unless `run` is called."]
pub fn worker(&self) -> &Worker { #[derive(Debug)]
&self.worker pub struct SystemRunner {
rt: Runtime,
stop_rx: oneshot::Receiver<i32>,
system: System,
}
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
let SystemRunner { rt, stop_rx, .. } = self;
// run loop
match rt.block_on(stop_rx) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
} }
/// This function will start Tokio runtime and will finish once the `System::stop()` message /// Runs the provided future, blocking the current thread until the future completes.
/// is called. Function `f` is called within Tokio runtime context. #[inline]
pub fn run<F>(f: F) -> io::Result<()> pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
where self.rt.block_on(fut)
F: FnOnce(),
{
Self::builder().run(f)
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum SystemCommand { pub(crate) enum SystemCommand {
Exit(i32), Exit(i32),
RegisterArbiter(usize, Worker), RegisterArbiter(usize, ArbiterHandle),
DeregisterArbiter(usize), DeregisterArbiter(usize),
} }
/// There is one `SystemController` per [System]. It runs in the background, keeping track of
/// [Arbiter]s and is able to distribute a system-wide stop command.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct SystemWorker { pub(crate) struct SystemController {
stop: Option<oneshot::Sender<i32>>, stop_tx: Option<oneshot::Sender<i32>>,
commands: mpsc::UnboundedReceiver<SystemCommand>, cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
workers: HashMap<usize, Worker>, arbiters: HashMap<usize, ArbiterHandle>,
} }
impl SystemWorker { impl SystemController {
pub(crate) fn new( pub(crate) fn new(
commands: mpsc::UnboundedReceiver<SystemCommand>, cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
stop: oneshot::Sender<i32>, stop_tx: oneshot::Sender<i32>,
) -> Self { ) -> Self {
SystemWorker { SystemController {
commands, cmd_rx,
stop: Some(stop), stop_tx: Some(stop_tx),
workers: HashMap::new(), arbiters: HashMap::with_capacity(4),
} }
} }
} }
impl Future for SystemWorker { impl Future for SystemController {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel // process all items currently buffered in channel
loop { loop {
match ready!(Pin::new(&mut self.commands).poll_recv(cx)) { match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
// channel closed; no more messages can be received // channel closed; no more messages can be received
None => return Poll::Ready(()), None => return Poll::Ready(()),
// process system command // process system command
Some(cmd) => match cmd { Some(cmd) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
// stop workers // stop all arbiters
for wkr in self.workers.values() { for wkr in self.arbiters.values() {
wkr.stop(); wkr.stop();
} }
// stop event loop // stop event loop
if let Some(stop) = self.stop.take() { // will only fire once
let _ = stop.send(code); if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(code);
} }
} }
SystemCommand::RegisterArbiter(name, hnd) => { SystemCommand::RegisterArbiter(name, hnd) => {
self.workers.insert(name, hnd); self.arbiters.insert(name, hnd);
} }
SystemCommand::DeregisterArbiter(name) => { SystemCommand::DeregisterArbiter(name) => {
self.workers.remove(&name); self.arbiters.remove(&name);
} }
}, },
} }

View File

@ -1,271 +0,0 @@
use std::{
any::{Any, TypeId},
cell::RefCell,
collections::HashMap,
fmt,
future::Future,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
thread,
};
use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet};
use crate::{
runtime::Runtime,
system::{System, SystemCommand},
};
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local!(
static ADDR: RefCell<Option<Worker>> = RefCell::new(None);
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
pub(crate) enum WorkerCommand {
Stop,
Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
}
impl fmt::Debug for WorkerCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"),
WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
}
}
}
/// A worker represent a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop.
/// Some Arbiter functions execute on the current thread.
#[derive(Debug)]
pub struct Worker {
sender: mpsc::UnboundedSender<WorkerCommand>,
thread_handle: Option<thread::JoinHandle<()>>,
}
impl Clone for Worker {
fn clone(&self) -> Self {
Self::new_handle(self.sender.clone())
}
}
impl Default for Worker {
fn default() -> Self {
Self::new()
}
}
impl Worker {
/// Spawn new thread and run event loop in spawned thread.
///
/// Returns handle of newly created worker.
///
/// # Panics
/// Panics if a [System] not registered on the current thread.
pub fn new() -> Worker {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id);
let sys = System::current();
let (tx, rx) = mpsc::unbounded_channel();
let handle = thread::Builder::new()
.name(name.clone())
.spawn({
let tx = tx.clone();
move || {
let rt = Runtime::new().expect("Can not create Runtime");
let arb = Worker::new_handle(tx);
STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register worker
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(id, arb));
// run worker event processing loop
rt.block_on(WorkerRunner { rx });
// deregister worker
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(id));
}
})
.unwrap_or_else(|err| {
panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err)
});
Worker {
sender: tx,
thread_handle: Some(handle),
}
}
/// Returns the current Worker's handle.
///
/// # Panics
/// Panics if no Worker is running on the current thread.
pub fn current() -> Worker {
ADDR.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("Worker is not running."),
})
}
/// Stop worker from continuing it's event loop.
pub fn stop(&self) {
let _ = self.sender.send(WorkerCommand::Stop);
}
/// Send a future to the Arbiter's thread and spawn it.
///
/// If you require a result, include a response channel in the future.
///
/// Returns true if future was sent successfully and false if the Arbiter has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool
where
Fut: Future<Output = ()> + Send + 'static,
{
self.sender
.send(WorkerCommand::Execute(Box::pin(future)))
.is_ok()
}
/// Send a function to the Arbiter's thread and execute it.
///
/// Any result from the function is discarded. If you require a result, include a response
/// channel in the function.
///
/// Returns true if function was sent successfully and false if the Arbiter has died.
pub fn spawn_fn<F>(&self, f: F) -> bool
where
F: FnOnce() + Send + 'static,
{
self.spawn(async { f() })
}
/// Wait for worker's event loop to complete.
///
/// Joins the underlying OS thread handle, if contained.
pub fn join(&mut self) -> thread::Result<()> {
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join()
} else {
Ok(())
}
}
pub(crate) fn new_system(local: &LocalSet) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let arb = Worker::new_handle(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(WorkerRunner { rx });
arb
}
fn new_handle(sender: mpsc::UnboundedSender<WorkerCommand>) -> Self {
Self {
sender,
thread_handle: None,
}
}
/// Insert item into worker's thread-local storage.
///
/// Overwrites any item of the same type previously inserted.
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}
/// Check if worker's thread-local storage contains an item type.
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>()))
}
/// Call a function with a shared reference to an item in this worker's thread-local storage.
///
/// # Panics
/// Panics if item is not in worker's thread-local item storage.
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
let type_id = TypeId::of::<T>();
let item = st.get(&type_id).and_then(downcast_ref).unwrap();
f(item)
})
}
/// Call a function with a mutable reference to an item in this worker's thread-local storage.
///
/// # Panics
/// Panics if item is not in worker's thread-local item storage.
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
let type_id = TypeId::of::<T>();
let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap();
f(item)
})
}
}
/// A persistent worker future that processes worker commands.
struct WorkerRunner {
rx: mpsc::UnboundedReceiver<WorkerCommand>,
}
impl Future for WorkerRunner {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel
loop {
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
// channel closed; no more messages can be received
None => return Poll::Ready(()),
// process worker command
Some(item) => match item {
WorkerCommand::Stop => return Poll::Ready(()),
WorkerCommand::Execute(task_fut) => {
tokio::task::spawn_local(task_fut);
}
},
}
}
}
}
fn downcast_ref<T: 'static>(boxed: &Box<dyn Any>) -> Option<&T> {
boxed.downcast_ref()
}
fn downcast_mut<T: 'static>(boxed: &mut Box<dyn Any>) -> Option<&mut T> {
boxed.downcast_mut()
}

View File

@ -1,16 +1,17 @@
use std::{ use std::{
sync::mpsc::sync_channel, sync::mpsc::channel,
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use actix_rt::{System, Worker}; use actix_rt::{Arbiter, System};
use tokio::sync::oneshot;
#[test] #[test]
fn await_for_timer() { fn await_for_timer() {
let time = Duration::from_secs(1); let time = Duration::from_secs(1);
let instant = Instant::now(); let instant = Instant::now();
System::new("test_wait_timer").block_on(async move { System::new().block_on(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
}); });
assert!( assert!(
@ -20,78 +21,72 @@ fn await_for_timer() {
} }
#[test] #[test]
fn join_another_worker() { fn join_another_arbiter() {
let time = Duration::from_secs(1); let time = Duration::from_secs(1);
let instant = Instant::now(); let instant = Instant::now();
System::new("test_join_another_worker").block_on(async move { System::new().block_on(async move {
let mut worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn(Box::pin(async move { arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Worker::current().stop(); Arbiter::handle().stop();
})); }));
worker.join().unwrap(); arbiter.join().unwrap();
}); });
assert!( assert!(
instant.elapsed() >= time, instant.elapsed() >= time,
"Join on another worker should complete only when it calls stop" "Join on another arbiter should complete only when it calls stop"
); );
let instant = Instant::now(); let instant = Instant::now();
System::new("test_join_another_worker").block_on(async move { System::new().block_on(async move {
let mut worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn_fn(move || { arbiter.spawn_fn(move || {
actix_rt::spawn(async move { actix_rt::spawn(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Worker::current().stop(); Arbiter::handle().stop();
}); });
}); });
worker.join().unwrap(); arbiter.join().unwrap();
}); });
assert!( assert!(
instant.elapsed() >= time, instant.elapsed() >= time,
"Join on a worker that has used actix_rt::spawn should wait for said future" "Join on an arbiter that has used actix_rt::spawn should wait for said future"
); );
let instant = Instant::now(); let instant = Instant::now();
System::new("test_join_another_worker").block_on(async move { System::new().block_on(async move {
let mut worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn(Box::pin(async move { arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Worker::current().stop(); Arbiter::handle().stop();
})); }));
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
}); });
assert!( assert!(
instant.elapsed() < time, instant.elapsed() < time,
"Premature stop of worker should conclude regardless of it's current state" "Premature stop of arbiter should conclude regardless of it's current state"
); );
} }
#[test] #[test]
fn non_static_block_on() { fn non_static_block_on() {
let string = String::from("test_str"); let string = String::from("test_str");
let str = string.as_str(); let string = string.as_str();
let sys = System::new("borrow some"); let sys = System::new();
sys.block_on(async { sys.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await; actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str); assert_eq!("test_str", string);
}); });
let rt = actix_rt::Runtime::new().unwrap(); let rt = actix_rt::Runtime::new().unwrap();
rt.block_on(async { rt.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await; actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str); assert_eq!("test_str", string);
}); });
System::run(|| {
assert_eq!("test_str", str);
System::current().stop();
})
.unwrap();
} }
#[test] #[test]
@ -108,82 +103,70 @@ fn wait_for_spawns() {
} }
#[test] #[test]
fn worker_spawn_fn_runs() { fn arbiter_spawn_fn_runs() {
let _ = System::new("test-system"); let _ = System::new();
let (tx, rx) = sync_channel::<u32>(1); let (tx, rx) = channel::<u32>();
let mut worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn_fn(move || tx.send(42).unwrap()); arbiter.spawn_fn(move || tx.send(42).unwrap());
let num = rx.recv().unwrap(); let num = rx.recv().unwrap();
assert_eq!(num, 42); assert_eq!(num, 42);
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
} }
#[test] #[test]
fn worker_drop_no_panic_fn() { fn arbiter_drop_no_panic_fn() {
let _ = System::new("test-system"); let _ = System::new();
let mut worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn_fn(|| panic!("test")); arbiter.spawn_fn(|| panic!("test"));
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
} }
#[test] #[test]
fn worker_drop_no_panic_fut() { fn arbiter_drop_no_panic_fut() {
let _ = System::new("test-system"); let _ = System::new();
let mut worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn(async { panic!("test") }); arbiter.spawn(async { panic!("test") });
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
} }
#[test] #[test]
fn worker_item_storage() { fn arbiter_item_storage() {
let _ = System::new("test-system"); let _ = System::new();
let mut worker = Worker::new(); let arbiter = Arbiter::new();
assert!(!Worker::contains_item::<u32>()); assert!(!Arbiter::contains_item::<u32>());
Worker::set_item(42u32); Arbiter::set_item(42u32);
assert!(Worker::contains_item::<u32>()); assert!(Arbiter::contains_item::<u32>());
Worker::get_item(|&item: &u32| assert_eq!(item, 42)); Arbiter::get_item(|&item: &u32| assert_eq!(item, 42));
Worker::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42));
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
Worker::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread"));
}) })
.join(); .join();
assert!(thread.is_err()); assert!(thread.is_err());
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
Worker::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread"));
}) })
.join(); .join();
assert!(thread.is_err()); assert!(thread.is_err());
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
}
#[test]
fn system_name_cow_str() {
let _ = System::new("test-system");
System::current().stop();
}
#[test]
fn system_name_cow_string() {
let _ = System::new("test-system".to_owned());
System::current().stop();
} }
#[test] #[test]
@ -194,6 +177,36 @@ fn no_system_current_panic() {
#[test] #[test]
#[should_panic] #[should_panic]
fn no_system_worker_new_panic() { fn no_system_arbiter_new_panic() {
Worker::new(); Arbiter::new();
}
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
let (tx, rx) = oneshot::channel();
let sys = System::current();
thread::spawn(|| {
// this thread will have no arbiter in it's thread local so call will panic
Arbiter::handle();
})
.join()
.unwrap_err();
let thread = thread::spawn(|| {
// this thread will have no arbiter in it's thread local so use the system handle instead
System::set_current(sys);
let sys = System::current();
let wrk = sys.arbiter();
wrk.spawn(async move {
tx.send(42u32).unwrap();
System::current().stop();
});
});
assert_eq!(runner.block_on(rx).unwrap(), 42);
thread.join().unwrap();
} }

View File

@ -403,7 +403,7 @@ impl Accept {
// after the sleep a Timer interest is sent to Accept Poll // after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone(); let waker = self.waker.clone();
System::current().worker().spawn(async move { System::current().arbiter().spawn(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await; sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer); waker.wake(WakerInterest::Timer);
}); });

View File

@ -48,7 +48,7 @@ impl TestServer {
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let sys = System::new("actix-test-server"); let sys = System::new();
factory(Server::build()).workers(1).disable_signals().run(); factory(Server::build()).workers(1).disable_signals().run();
tx.send(System::current()).unwrap(); tx.send(System::current()).unwrap();
@ -70,7 +70,7 @@ impl TestServer {
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let sys = System::new("actix-test-server"); let sys = System::new();
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();

View File

@ -6,7 +6,7 @@ use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_rt::{spawn, Worker as Arbiter}; use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::{error, info, trace}; use log::{error, info, trace};
@ -215,7 +215,7 @@ impl ServerWorker {
} }
Err(e) => { Err(e) => {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::current().stop(); Arbiter::handle().stop();
} }
} }
wrk.await wrk.await
@ -386,7 +386,7 @@ impl Future for ServerWorker {
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
let _ = tx.take().unwrap().send(true); let _ = tx.take().unwrap().send(true);
Arbiter::current().stop(); Arbiter::handle().stop();
return Poll::Ready(()); return Poll::Ready(());
} }
@ -394,7 +394,7 @@ impl Future for ServerWorker {
if Pin::new(t2).poll(cx).is_ready() { if Pin::new(t2).poll(cx).is_ready() {
let _ = tx.take().unwrap().send(false); let _ = tx.take().unwrap().send(false);
self.shutdown(true); self.shutdown(true);
Arbiter::current().stop(); Arbiter::handle().stop();
return Poll::Ready(()); return Poll::Ready(());
} }

View File

@ -21,7 +21,7 @@ fn test_bind() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| { let srv = sys.block_on(lazy(|_| {
Server::build() Server::build()
.workers(1) .workers(1)
@ -47,7 +47,7 @@ fn test_listen() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new();
let lst = net::TcpListener::bind(addr).unwrap(); let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(async { sys.block_on(async {
Server::build() Server::build()
@ -81,7 +81,7 @@ fn test_start() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| { let srv = sys.block_on(lazy(|_| {
Server::build() Server::build()
.backlog(100) .backlog(100)
@ -150,7 +150,7 @@ fn test_configure() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num2.clone(); let num = num2.clone();
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| { let srv = sys.block_on(lazy(|_| {
Server::build() Server::build()
.disable_signals() .disable_signals()