diff --git a/actix-macros/src/lib.rs b/actix-macros/src/lib.rs index 60f177fa..54b89565 100644 --- a/actix-macros/src/lib.rs +++ b/actix-macros/src/lib.rs @@ -7,7 +7,7 @@ use proc_macro::TokenStream; use quote::quote; -/// Marks async function to be executed by actix system. +/// Marks async function to be executed by Actix system. /// /// ## Usage /// @@ -26,7 +26,6 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream { let vis = &input.vis; let sig = &mut input.sig; let body = &input.block; - let name = &sig.ident; if sig.asyncness.is_none() { return syn::Error::new_spanned(sig.fn_token, "only async fn is supported") @@ -39,14 +38,14 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream { (quote! { #(#attrs)* #vis #sig { - actix_rt::System::new(stringify!(#name)) + actix_rt::System::new() .block_on(async move { #body }) } }) .into() } -/// Marks async test function to be executed by actix runtime. +/// Marks async test function to be executed by Actix system. /// /// ## Usage /// @@ -86,7 +85,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream { quote! { #(#attrs)* #vis #sig { - actix_rt::System::new("test") + actix_rt::System::new() .block_on(async { #body }) } } @@ -95,7 +94,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream { #[test] #(#attrs)* #vis #sig { - actix_rt::System::new("test") + actix_rt::System::new() .block_on(async { #body }) } } diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 379afbd7..3f2db63f 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,20 +1,27 @@ # Changes ## Unreleased - 2021-xx-xx - -* Rename `Arbiter => Worker`. [#254] * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253] -* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] -* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] -* Remove `Worker::exec`. [#253] -* Remove `System::arbiter`. [#256] -* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] -* `Worker::spawn` now accepts !Unpin futures. [#256] +* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] +* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] +* Remove `Arbiter::exec`. [#253] +* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] +* `Arbiter::spawn` now accepts !Unpin futures. [#256] +* `System::new` no longer takes arguments. [#257] +* Remove `System::with_current`. [#257] +* Remove `Builder`. [#257] +* Add `System::with_init` as replacement for `Builder::run`. [#257] +* Rename `System::{is_set => is_registered}`. [#257] +* Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257]. +* `System::arbiter` now returns a `&ArbiterHandle`. [#257] +* Rename `Arbiter::{current => handle}` and return a `ArbiterHandle` instead. [#257] +* `Arbiter::join` now takes self by value. [#257] [#253]: https://github.com/actix/actix-net/pull/253 [#254]: https://github.com/actix/actix-net/pull/254 [#256]: https://github.com/actix/actix-net/pull/256 +[#257]: https://github.com/actix/actix-net/pull/257 ## 2.0.0-beta.2 - 2021-01-09 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs new file mode 100644 index 00000000..6f723111 --- /dev/null +++ b/actix-rt/src/arbiter.rs @@ -0,0 +1,299 @@ +use std::{ + any::{Any, TypeId}, + cell::RefCell, + collections::HashMap, + fmt, + future::Future, + pin::Pin, + sync::atomic::{AtomicUsize, Ordering}, + task::{Context, Poll}, + thread, +}; + +use futures_core::ready; +use tokio::{sync::mpsc, task::LocalSet}; + +use crate::{ + runtime::Runtime, + system::{System, SystemCommand}, +}; + +pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); + +thread_local!( + static HANDLE: RefCell> = RefCell::new(None); + static STORAGE: RefCell>> = RefCell::new(HashMap::new()); +); + +pub(crate) enum ArbiterCommand { + Stop, + Execute(Pin + Send>>), +} + +impl fmt::Debug for ArbiterCommand { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"), + ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), + } + } +} + +/// A handle for sending spawn and stop messages to an [Arbiter]. +#[derive(Debug, Clone)] +pub struct ArbiterHandle { + sender: mpsc::UnboundedSender, +} + +impl ArbiterHandle { + pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { + Self { sender } + } + + /// Send a future to the [Arbiter]'s thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if future was sent successfully and false if the [Arbiter] has died. + pub fn spawn(&self, future: Fut) -> bool + where + Fut: Future + Send + 'static, + { + self.sender + .send(ArbiterCommand::Execute(Box::pin(future))) + .is_ok() + } + + /// Send a function to the [Arbiter]'s thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the [Arbiter] has died. + pub fn spawn_fn(&self, f: F) -> bool + where + F: FnOnce() + Send + 'static, + { + self.spawn(async { f() }) + } + + /// Instruct [Arbiter] to stop processing it's event loop. + /// + /// Returns true if stop message was sent successfully and false if the [Arbiter] has + /// been dropped. + pub fn stop(&self) -> bool { + self.sender.send(ArbiterCommand::Stop).is_ok() + } +} + +/// An Arbiter represents a thread that provides an asynchronous execution environment for futures +/// and functions. +/// +/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. +#[derive(Debug)] +pub struct Arbiter { + sender: mpsc::UnboundedSender, + thread_handle: thread::JoinHandle<()>, +} + +impl Arbiter { + /// Spawn new Arbiter thread and start its event loop. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + let id = COUNT.fetch_add(1, Ordering::Relaxed); + let system_id = System::current().id(); + let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id); + let sys = System::current(); + let (tx, rx) = mpsc::unbounded_channel(); + + let thread_handle = thread::Builder::new() + .name(name.clone()) + .spawn({ + let tx = tx.clone(); + move || { + let rt = Runtime::new().expect("Can not create Runtime"); + let hnd = ArbiterHandle::new(tx); + + System::set_current(sys); + + STORAGE.with(|cell| cell.borrow_mut().clear()); + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + + // register arbiter + let _ = System::current() + .tx() + .send(SystemCommand::RegisterArbiter(id, hnd)); + + // run arbiter event processing loop + rt.block_on(ArbiterRunner { rx }); + + // deregister arbiter + let _ = System::current() + .tx() + .send(SystemCommand::DeregisterArbiter(id)); + } + }) + .unwrap_or_else(|err| { + panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) + }); + + Arbiter { + sender: tx, + thread_handle, + } + } + + /// Sets up an Arbiter runner on the current thread using the provided runtime local task set. + pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let hnd = ArbiterHandle::new(tx); + + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + STORAGE.with(|cell| cell.borrow_mut().clear()); + + local.spawn_local(ArbiterRunner { rx }); + + hnd + } + + /// Return a handle to the Arbiter's message sender. + /// + /// # Panics + /// Panics if no Arbiter is running on the current thread. + pub fn handle() -> ArbiterHandle { + HANDLE.with(|cell| match *cell.borrow() { + Some(ref addr) => addr.clone(), + None => panic!("Arbiter is not running."), + }) + } + + /// Stop Arbiter from continuing it's event loop. + /// + /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. + pub fn stop(&self) -> bool { + self.sender.send(ArbiterCommand::Stop).is_ok() + } + + /// Send a future to the Arbiter's thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if future was sent successfully and false if the Arbiter has died. + pub fn spawn(&self, future: Fut) -> bool + where + Fut: Future + Send + 'static, + { + self.sender + .send(ArbiterCommand::Execute(Box::pin(future))) + .is_ok() + } + + /// Send a function to the Arbiter's thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the Arbiter has died. + pub fn spawn_fn(&self, f: F) -> bool + where + F: FnOnce() + Send + 'static, + { + self.spawn(async { f() }) + } + + /// Wait for Arbiter's event loop to complete. + /// + /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join). + pub fn join(self) -> thread::Result<()> { + self.thread_handle.join() + } + + /// Insert item into Arbiter's thread-local storage. + /// + /// Overwrites any item of the same type previously inserted. + pub fn set_item(item: T) { + STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); + } + + /// Check if Arbiter's thread-local storage contains an item type. + pub fn contains_item() -> bool { + STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) + } + + /// Call a function with a shared reference to an item in this Arbiter's thread-local storage. + /// + /// # Panics + /// Panics if item is not in Arbiter's thread-local item storage. + pub fn get_item(mut f: F) -> R + where + F: FnMut(&T) -> R, + { + STORAGE.with(move |cell| { + let st = cell.borrow(); + + let type_id = TypeId::of::(); + let item = st.get(&type_id).and_then(downcast_ref).unwrap(); + + f(item) + }) + } + + /// Call a function with a mutable reference to an item in this Arbiter's thread-local storage. + /// + /// # Panics + /// Panics if item is not in Arbiter's thread-local item storage. + pub fn get_mut_item(mut f: F) -> R + where + F: FnMut(&mut T) -> R, + { + STORAGE.with(move |cell| { + let mut st = cell.borrow_mut(); + + let type_id = TypeId::of::(); + let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap(); + + f(item) + }) + } +} + +/// A persistent future that processes [Arbiter] commands. +struct ArbiterRunner { + rx: mpsc::UnboundedReceiver, +} + +impl Future for ArbiterRunner { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel + loop { + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process arbiter command + Some(item) => match item { + ArbiterCommand::Stop => { + return Poll::Ready(()); + } + ArbiterCommand::Execute(task_fut) => { + tokio::task::spawn_local(task_fut); + } + }, + } + } + } +} + +fn downcast_ref(boxed: &Box) -> Option<&T> { + boxed.downcast_ref() +} + +fn downcast_mut(boxed: &mut Box) -> Option<&mut T> { + boxed.downcast_mut() +} diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs deleted file mode 100644 index f9a3fca2..00000000 --- a/actix-rt/src/builder.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::{borrow::Cow, future::Future, io}; - -use tokio::sync::{ - mpsc::unbounded_channel, - oneshot::{channel, Receiver}, -}; - -use crate::{ - runtime::Runtime, - system::{System, SystemWorker}, - worker::Worker, -}; - -/// System builder. -/// -/// Either use `Builder::build` to create a system and start actors. Alternatively, use -/// `Builder::run` to start the Tokio runtime and run a function in its context. -pub struct Builder { - /// Name of the System. Defaults to "actix-rt" if unset. - name: Cow<'static, str>, -} - -impl Builder { - pub(crate) fn new() -> Self { - Builder { - name: Cow::Borrowed("actix-rt"), - } - } - - /// Sets the name of the System. - pub fn name(mut self, name: impl Into>) -> Self { - self.name = name.into(); - self - } - - /// Create new System. - /// - /// This method panics if it can not create Tokio runtime - pub fn build(self) -> SystemRunner { - self.create_runtime(|| {}) - } - - /// This function will start Tokio runtime and will finish once the `System::stop()` message - /// is called. Function `f` is called within Tokio runtime context. - pub fn run(self, init_fn: F) -> io::Result<()> - where - F: FnOnce(), - { - self.create_runtime(init_fn).run() - } - - fn create_runtime(self, init_fn: F) -> SystemRunner - where - F: FnOnce(), - { - let (stop_tx, stop_rx) = channel(); - let (sys_sender, sys_receiver) = unbounded_channel(); - - let rt = Runtime::new().unwrap(); - - let system = System::construct(sys_sender, Worker::new_system(rt.local())); - - // init system worker - let sys_worker = SystemWorker::new(sys_receiver, stop_tx); - rt.spawn(sys_worker); - - // run system init method - rt.block_on(async { init_fn() }); - - SystemRunner { - rt, - stop_rx, - system, - } - } -} - -/// System runner object that keeps event loop alive and running until stop message is received. -#[must_use = "A SystemRunner does nothing unless `run` is called."] -#[derive(Debug)] -pub struct SystemRunner { - rt: Runtime, - stop_rx: Receiver, - system: System, -} - -impl SystemRunner { - /// Starts event loop and will finish once [`System::stop()`] is called. - pub fn run(self) -> io::Result<()> { - let SystemRunner { rt, stop_rx, .. } = self; - - // run loop - match rt.block_on(stop_rx) { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } - } - - /// Runs the provided future, blocking the current thread until the future completes. - #[inline] - pub fn block_on(&self, fut: F) -> F::Output { - self.rt.block_on(fut) - } -} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index b0303d6c..bf8a4796 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -15,15 +15,13 @@ use tokio::task::JoinHandle; #[cfg(all(feature = "macros", not(test)))] pub use actix_macros::{main, test}; -mod builder; +mod arbiter; mod runtime; mod system; -mod worker; -pub use self::builder::{Builder, SystemRunner}; +pub use self::arbiter::{Arbiter, ArbiterHandle}; pub use self::runtime::Runtime; -pub use self::system::System; -pub use self::worker::Worker; +pub use self::system::{System, SystemRunner}; pub mod signal { //! Asynchronous signal handling (Tokio re-exports). @@ -61,7 +59,7 @@ pub mod task { pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } -/// Spawns a future on the current [Worker]. +/// Spawns a future on the current thread. /// /// # Panics /// Panics if Actix system is not running. diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index c7f611ed..a20dfe7e 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,9 +2,10 @@ use std::{future::Future, io}; use tokio::task::{JoinHandle, LocalSet}; -/// Single-threaded runtime provides a way to start reactor and runtime on the current thread. +/// A single-threaded runtime based on Tokio's "current thread" runtime. /// -/// See [crate root][crate] documentation for more details. +/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound +/// on submitted futures. #[derive(Debug)] pub struct Runtime { local: LocalSet, @@ -27,7 +28,7 @@ impl Runtime { } /// Reference to local task set. - pub(crate) fn local(&self) -> &LocalSet { + pub(crate) fn local_set(&self) -> &LocalSet { &self.local } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 0182136e..d2f38ca8 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,5 +1,4 @@ use std::{ - borrow::Cow, cell::RefCell, collections::HashMap, future::Future, @@ -12,55 +11,62 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{ - builder::{Builder, SystemRunner}, - worker::Worker, -}; +use crate::{arbiter::ArbiterHandle, Arbiter, Runtime}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); -/// System is a runtime manager. -#[derive(Clone, Debug)] -pub struct System { - id: usize, - sys_tx: mpsc::UnboundedSender, - // TODO: which worker is this exactly - worker: Worker, -} - thread_local!( static CURRENT: RefCell> = RefCell::new(None); ); +/// A manager for a per-thread distributed async runtime. +#[derive(Clone, Debug)] +pub struct System { + id: usize, + sys_tx: mpsc::UnboundedSender, + + /// Handle to the first [Arbiter] that is created with the System. + arbiter_handle: ArbiterHandle, +} + impl System { - /// Constructs new system and sets it as current. - pub(crate) fn construct( - sys_tx: mpsc::UnboundedSender, - worker: Worker, - ) -> Self { - let sys = System { - sys_tx, - worker, - id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), - }; - System::set_current(sys.clone()); - sys - } - - /// Build a new system with a customized Tokio runtime. - /// - /// This allows to customize the runtime. See [`Builder`] for more information. - pub fn builder() -> Builder { - Builder::new() - } - - /// Create new system. + /// Create a new system. /// /// # Panics /// Panics if underlying Tokio runtime can not be created. #[allow(clippy::new_ret_no_self)] - pub fn new(name: impl Into>) -> SystemRunner { - Self::builder().name(name).build() + pub fn new() -> SystemRunner { + let (stop_tx, stop_rx) = oneshot::channel(); + let (sys_tx, sys_rx) = mpsc::unbounded_channel(); + + let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); + let system = System::construct(sys_tx, Arbiter::in_new_system(rt.local_set())); + + // init background system arbiter + let sys_ctrl = SystemController::new(sys_rx, stop_tx); + rt.spawn(sys_ctrl); + + SystemRunner { + rt, + stop_rx, + system, + } + } + + /// Constructs new system and registers it on the current thread. + pub(crate) fn construct( + sys_tx: mpsc::UnboundedSender, + arbiter_handle: ArbiterHandle, + ) -> Self { + let sys = System { + sys_tx, + arbiter_handle, + id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), + }; + + System::set_current(sys.clone()); + + sys } /// Get current running system. @@ -74,31 +80,27 @@ impl System { }) } - /// Check if current system has started. - pub fn is_set() -> bool { - CURRENT.with(|cell| cell.borrow().is_some()) + /// Get handle to a the System's initial [Arbiter]. + pub fn arbiter(&self) -> &ArbiterHandle { + &self.arbiter_handle } - /// Set current running system. + /// Check if there is a System registered on the current thread. + pub fn is_registered() -> bool { + CURRENT.with(|sys| sys.borrow().is_some()) + } + + /// Register given system on current thread. #[doc(hidden)] pub fn set_current(sys: System) { - CURRENT.with(|s| { - *s.borrow_mut() = Some(sys); + CURRENT.with(|cell| { + *cell.borrow_mut() = Some(sys); }) } - /// Execute function with system reference. - pub fn with_current(f: F) -> R - where - F: FnOnce(&System) -> R, - { - CURRENT.with(|cell| match *cell.borrow() { - Some(ref sys) => f(sys), - None => panic!("System is not running"), - }) - } - - /// Numeric system ID. + /// Numeric system identifier. + /// + /// Useful when using multiple Systems. pub fn id(&self) -> usize { self.id } @@ -108,7 +110,7 @@ impl System { self.stop_with_code(0) } - /// Stop the system with a particular exit code. + /// Stop the system with a given exit code. pub fn stop_with_code(&self, code: i32) { let _ = self.sys_tx.send(SystemCommand::Exit(code)); } @@ -116,80 +118,106 @@ impl System { pub(crate) fn tx(&self) -> &mpsc::UnboundedSender { &self.sys_tx } +} - // TODO: give clarity on which worker this is; previous documented as returning "system worker" - /// Get shared reference to a worker. - pub fn worker(&self) -> &Worker { - &self.worker +/// Runner that keeps a [System]'s event loop alive until stop message is received. +#[must_use = "A SystemRunner does nothing unless `run` is called."] +#[derive(Debug)] +pub struct SystemRunner { + rt: Runtime, + stop_rx: oneshot::Receiver, + system: System, +} + +impl SystemRunner { + /// Starts event loop and will return once [System] is [stopped](System::stop). + pub fn run(self) -> io::Result<()> { + let SystemRunner { rt, stop_rx, .. } = self; + + // run loop + match rt.block_on(stop_rx) { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } + } + + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } } - /// This function will start Tokio runtime and will finish once the `System::stop()` message - /// is called. Function `f` is called within Tokio runtime context. - pub fn run(f: F) -> io::Result<()> - where - F: FnOnce(), - { - Self::builder().run(f) + /// Runs the provided future, blocking the current thread until the future completes. + #[inline] + pub fn block_on(&self, fut: F) -> F::Output { + self.rt.block_on(fut) } } #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), - RegisterArbiter(usize, Worker), + RegisterArbiter(usize, ArbiterHandle), DeregisterArbiter(usize), } +/// There is one `SystemController` per [System]. It runs in the background, keeping track of +/// [Arbiter]s and is able to distribute a system-wide stop command. #[derive(Debug)] -pub(crate) struct SystemWorker { - stop: Option>, - commands: mpsc::UnboundedReceiver, - workers: HashMap, +pub(crate) struct SystemController { + stop_tx: Option>, + cmd_rx: mpsc::UnboundedReceiver, + arbiters: HashMap, } -impl SystemWorker { +impl SystemController { pub(crate) fn new( - commands: mpsc::UnboundedReceiver, - stop: oneshot::Sender, + cmd_rx: mpsc::UnboundedReceiver, + stop_tx: oneshot::Sender, ) -> Self { - SystemWorker { - commands, - stop: Some(stop), - workers: HashMap::new(), + SystemController { + cmd_rx, + stop_tx: Some(stop_tx), + arbiters: HashMap::with_capacity(4), } } } -impl Future for SystemWorker { +impl Future for SystemController { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // process all items currently buffered in channel loop { - match ready!(Pin::new(&mut self.commands).poll_recv(cx)) { + match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) { // channel closed; no more messages can be received None => return Poll::Ready(()), // process system command Some(cmd) => match cmd { SystemCommand::Exit(code) => { - // stop workers - for wkr in self.workers.values() { + // stop all arbiters + for wkr in self.arbiters.values() { wkr.stop(); } // stop event loop - if let Some(stop) = self.stop.take() { - let _ = stop.send(code); + // will only fire once + if let Some(stop_tx) = self.stop_tx.take() { + let _ = stop_tx.send(code); } } SystemCommand::RegisterArbiter(name, hnd) => { - self.workers.insert(name, hnd); + self.arbiters.insert(name, hnd); } SystemCommand::DeregisterArbiter(name) => { - self.workers.remove(&name); + self.arbiters.remove(&name); } }, } diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs deleted file mode 100644 index adda3cff..00000000 --- a/actix-rt/src/worker.rs +++ /dev/null @@ -1,271 +0,0 @@ -use std::{ - any::{Any, TypeId}, - cell::RefCell, - collections::HashMap, - fmt, - future::Future, - pin::Pin, - sync::atomic::{AtomicUsize, Ordering}, - task::{Context, Poll}, - thread, -}; - -use futures_core::ready; -use tokio::{sync::mpsc, task::LocalSet}; - -use crate::{ - runtime::Runtime, - system::{System, SystemCommand}, -}; - -pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); - -thread_local!( - static ADDR: RefCell> = RefCell::new(None); - static STORAGE: RefCell>> = RefCell::new(HashMap::new()); -); - -pub(crate) enum WorkerCommand { - Stop, - Execute(Pin + Send>>), -} - -impl fmt::Debug for WorkerCommand { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"), - WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), - } - } -} - -/// A worker represent a thread that provides an asynchronous execution environment for futures -/// and functions. -/// -/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop. -/// Some Arbiter functions execute on the current thread. -#[derive(Debug)] -pub struct Worker { - sender: mpsc::UnboundedSender, - thread_handle: Option>, -} - -impl Clone for Worker { - fn clone(&self) -> Self { - Self::new_handle(self.sender.clone()) - } -} - -impl Default for Worker { - fn default() -> Self { - Self::new() - } -} - -impl Worker { - /// Spawn new thread and run event loop in spawned thread. - /// - /// Returns handle of newly created worker. - /// - /// # Panics - /// Panics if a [System] not registered on the current thread. - pub fn new() -> Worker { - let id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt:worker:{}", id); - let sys = System::current(); - let (tx, rx) = mpsc::unbounded_channel(); - - let handle = thread::Builder::new() - .name(name.clone()) - .spawn({ - let tx = tx.clone(); - move || { - let rt = Runtime::new().expect("Can not create Runtime"); - let arb = Worker::new_handle(tx); - - STORAGE.with(|cell| cell.borrow_mut().clear()); - - System::set_current(sys); - - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - - // register worker - let _ = System::current() - .tx() - .send(SystemCommand::RegisterArbiter(id, arb)); - - // run worker event processing loop - rt.block_on(WorkerRunner { rx }); - - // deregister worker - let _ = System::current() - .tx() - .send(SystemCommand::DeregisterArbiter(id)); - } - }) - .unwrap_or_else(|err| { - panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err) - }); - - Worker { - sender: tx, - thread_handle: Some(handle), - } - } - - /// Returns the current Worker's handle. - /// - /// # Panics - /// Panics if no Worker is running on the current thread. - pub fn current() -> Worker { - ADDR.with(|cell| match *cell.borrow() { - Some(ref addr) => addr.clone(), - None => panic!("Worker is not running."), - }) - } - - /// Stop worker from continuing it's event loop. - pub fn stop(&self) { - let _ = self.sender.send(WorkerCommand::Stop); - } - - /// Send a future to the Arbiter's thread and spawn it. - /// - /// If you require a result, include a response channel in the future. - /// - /// Returns true if future was sent successfully and false if the Arbiter has died. - pub fn spawn(&self, future: Fut) -> bool - where - Fut: Future + Send + 'static, - { - self.sender - .send(WorkerCommand::Execute(Box::pin(future))) - .is_ok() - } - - /// Send a function to the Arbiter's thread and execute it. - /// - /// Any result from the function is discarded. If you require a result, include a response - /// channel in the function. - /// - /// Returns true if function was sent successfully and false if the Arbiter has died. - pub fn spawn_fn(&self, f: F) -> bool - where - F: FnOnce() + Send + 'static, - { - self.spawn(async { f() }) - } - - /// Wait for worker's event loop to complete. - /// - /// Joins the underlying OS thread handle, if contained. - pub fn join(&mut self) -> thread::Result<()> { - if let Some(thread_handle) = self.thread_handle.take() { - thread_handle.join() - } else { - Ok(()) - } - } - - pub(crate) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = mpsc::unbounded_channel(); - - let arb = Worker::new_handle(tx); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - STORAGE.with(|cell| cell.borrow_mut().clear()); - - local.spawn_local(WorkerRunner { rx }); - - arb - } - - fn new_handle(sender: mpsc::UnboundedSender) -> Self { - Self { - sender, - thread_handle: None, - } - } - - /// Insert item into worker's thread-local storage. - /// - /// Overwrites any item of the same type previously inserted. - pub fn set_item(item: T) { - STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); - } - - /// Check if worker's thread-local storage contains an item type. - pub fn contains_item() -> bool { - STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) - } - - /// Call a function with a shared reference to an item in this worker's thread-local storage. - /// - /// # Panics - /// Panics if item is not in worker's thread-local item storage. - pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, - { - STORAGE.with(move |cell| { - let st = cell.borrow(); - - let type_id = TypeId::of::(); - let item = st.get(&type_id).and_then(downcast_ref).unwrap(); - - f(item) - }) - } - - /// Call a function with a mutable reference to an item in this worker's thread-local storage. - /// - /// # Panics - /// Panics if item is not in worker's thread-local item storage. - pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, - { - STORAGE.with(move |cell| { - let mut st = cell.borrow_mut(); - - let type_id = TypeId::of::(); - let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap(); - - f(item) - }) - } -} - -/// A persistent worker future that processes worker commands. -struct WorkerRunner { - rx: mpsc::UnboundedReceiver, -} - -impl Future for WorkerRunner { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // process all items currently buffered in channel - loop { - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { - // channel closed; no more messages can be received - None => return Poll::Ready(()), - - // process worker command - Some(item) => match item { - WorkerCommand::Stop => return Poll::Ready(()), - WorkerCommand::Execute(task_fut) => { - tokio::task::spawn_local(task_fut); - } - }, - } - } - } -} - -fn downcast_ref(boxed: &Box) -> Option<&T> { - boxed.downcast_ref() -} - -fn downcast_mut(boxed: &mut Box) -> Option<&mut T> { - boxed.downcast_mut() -} diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index ec71656c..7749ad0a 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,16 +1,17 @@ use std::{ - sync::mpsc::sync_channel, + sync::mpsc::channel, thread, time::{Duration, Instant}, }; -use actix_rt::{System, Worker}; +use actix_rt::{Arbiter, System}; +use tokio::sync::oneshot; #[test] fn await_for_timer() { let time = Duration::from_secs(1); let instant = Instant::now(); - System::new("test_wait_timer").block_on(async move { + System::new().block_on(async move { tokio::time::sleep(time).await; }); assert!( @@ -20,78 +21,72 @@ fn await_for_timer() { } #[test] -fn join_another_worker() { +fn join_another_arbiter() { let time = Duration::from_secs(1); let instant = Instant::now(); - System::new("test_join_another_worker").block_on(async move { - let mut worker = Worker::new(); - worker.spawn(Box::pin(async move { + System::new().block_on(async move { + let arbiter = Arbiter::new(); + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::current().stop(); + Arbiter::handle().stop(); })); - worker.join().unwrap(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() >= time, - "Join on another worker should complete only when it calls stop" + "Join on another arbiter should complete only when it calls stop" ); let instant = Instant::now(); - System::new("test_join_another_worker").block_on(async move { - let mut worker = Worker::new(); - worker.spawn_fn(move || { + System::new().block_on(async move { + let arbiter = Arbiter::new(); + arbiter.spawn_fn(move || { actix_rt::spawn(async move { tokio::time::sleep(time).await; - Worker::current().stop(); + Arbiter::handle().stop(); }); }); - worker.join().unwrap(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() >= time, - "Join on a worker that has used actix_rt::spawn should wait for said future" + "Join on an arbiter that has used actix_rt::spawn should wait for said future" ); let instant = Instant::now(); - System::new("test_join_another_worker").block_on(async move { - let mut worker = Worker::new(); - worker.spawn(Box::pin(async move { + System::new().block_on(async move { + let arbiter = Arbiter::new(); + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::current().stop(); + Arbiter::handle().stop(); })); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() < time, - "Premature stop of worker should conclude regardless of it's current state" + "Premature stop of arbiter should conclude regardless of it's current state" ); } #[test] fn non_static_block_on() { let string = String::from("test_str"); - let str = string.as_str(); + let string = string.as_str(); - let sys = System::new("borrow some"); + let sys = System::new(); sys.block_on(async { actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); + assert_eq!("test_str", string); }); let rt = actix_rt::Runtime::new().unwrap(); rt.block_on(async { actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); + assert_eq!("test_str", string); }); - - System::run(|| { - assert_eq!("test_str", str); - System::current().stop(); - }) - .unwrap(); } #[test] @@ -108,82 +103,70 @@ fn wait_for_spawns() { } #[test] -fn worker_spawn_fn_runs() { - let _ = System::new("test-system"); +fn arbiter_spawn_fn_runs() { + let _ = System::new(); - let (tx, rx) = sync_channel::(1); + let (tx, rx) = channel::(); - let mut worker = Worker::new(); - worker.spawn_fn(move || tx.send(42).unwrap()); + let arbiter = Arbiter::new(); + arbiter.spawn_fn(move || tx.send(42).unwrap()); let num = rx.recv().unwrap(); assert_eq!(num, 42); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_drop_no_panic_fn() { - let _ = System::new("test-system"); +fn arbiter_drop_no_panic_fn() { + let _ = System::new(); - let mut worker = Worker::new(); - worker.spawn_fn(|| panic!("test")); + let arbiter = Arbiter::new(); + arbiter.spawn_fn(|| panic!("test")); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_drop_no_panic_fut() { - let _ = System::new("test-system"); +fn arbiter_drop_no_panic_fut() { + let _ = System::new(); - let mut worker = Worker::new(); - worker.spawn(async { panic!("test") }); + let arbiter = Arbiter::new(); + arbiter.spawn(async { panic!("test") }); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_item_storage() { - let _ = System::new("test-system"); +fn arbiter_item_storage() { + let _ = System::new(); - let mut worker = Worker::new(); + let arbiter = Arbiter::new(); - assert!(!Worker::contains_item::()); - Worker::set_item(42u32); - assert!(Worker::contains_item::()); + assert!(!Arbiter::contains_item::()); + Arbiter::set_item(42u32); + assert!(Arbiter::contains_item::()); - Worker::get_item(|&item: &u32| assert_eq!(item, 42)); - Worker::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); + Arbiter::get_item(|&item: &u32| assert_eq!(item, 42)); + Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); let thread = thread::spawn(move || { - Worker::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); + Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); }) .join(); assert!(thread.is_err()); let thread = thread::spawn(move || { - Worker::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); + Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); }) .join(); assert!(thread.is_err()); - worker.stop(); - worker.join().unwrap(); -} - -#[test] -fn system_name_cow_str() { - let _ = System::new("test-system"); - System::current().stop(); -} - -#[test] -fn system_name_cow_string() { - let _ = System::new("test-system".to_owned()); - System::current().stop(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] @@ -194,6 +177,36 @@ fn no_system_current_panic() { #[test] #[should_panic] -fn no_system_worker_new_panic() { - Worker::new(); +fn no_system_arbiter_new_panic() { + Arbiter::new(); +} + +#[test] +fn system_arbiter_spawn() { + let runner = System::new(); + + let (tx, rx) = oneshot::channel(); + let sys = System::current(); + + thread::spawn(|| { + // this thread will have no arbiter in it's thread local so call will panic + Arbiter::handle(); + }) + .join() + .unwrap_err(); + + let thread = thread::spawn(|| { + // this thread will have no arbiter in it's thread local so use the system handle instead + System::set_current(sys); + let sys = System::current(); + + let wrk = sys.arbiter(); + wrk.spawn(async move { + tx.send(42u32).unwrap(); + System::current().stop(); + }); + }); + + assert_eq!(runner.block_on(rx).unwrap(), 42); + thread.join().unwrap(); } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 82c00ef5..a52184d9 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -403,7 +403,7 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - System::current().worker().spawn(async move { + System::current().arbiter().spawn(async move { sleep_until(Instant::now() + Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); }); diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 4b7f7873..864f391c 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -48,7 +48,7 @@ impl TestServer { // run server in separate thread thread::spawn(move || { - let sys = System::new("actix-test-server"); + let sys = System::new(); factory(Server::build()).workers(1).disable_signals().run(); tx.send(System::current()).unwrap(); @@ -70,7 +70,7 @@ impl TestServer { // run server in separate thread thread::spawn(move || { - let sys = System::new("actix-test-server"); + let sys = System::new(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index ae387cbd..9c61e4ad 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -6,7 +6,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; -use actix_rt::{spawn, Worker as Arbiter}; +use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; @@ -215,7 +215,7 @@ impl ServerWorker { } Err(e) => { error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); + Arbiter::handle().stop(); } } wrk.await @@ -386,7 +386,7 @@ impl Future for ServerWorker { let num = num_connections(); if num == 0 { let _ = tx.take().unwrap().send(true); - Arbiter::current().stop(); + Arbiter::handle().stop(); return Poll::Ready(()); } @@ -394,7 +394,7 @@ impl Future for ServerWorker { if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); self.shutdown(true); - Arbiter::current().stop(); + Arbiter::handle().stop(); return Poll::Ready(()); } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 8d93ba0b..86ec25e6 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -21,7 +21,7 @@ fn test_bind() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); + let sys = actix_rt::System::new(); let srv = sys.block_on(lazy(|_| { Server::build() .workers(1) @@ -47,7 +47,7 @@ fn test_listen() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); + let sys = actix_rt::System::new(); let lst = net::TcpListener::bind(addr).unwrap(); sys.block_on(async { Server::build() @@ -81,7 +81,7 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); + let sys = actix_rt::System::new(); let srv = sys.block_on(lazy(|_| { Server::build() .backlog(100) @@ -150,7 +150,7 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); - let sys = actix_rt::System::new("test"); + let sys = actix_rt::System::new(); let srv = sys.block_on(lazy(|_| { Server::build() .disable_signals()