From 1b35ff8ee6df13151aa7e7c7f37a65398e2c2d4e Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 29 Jan 2021 15:16:30 +0000 Subject: [PATCH] express spawn fn as spawn fut (#256) --- actix-rt/CHANGES.md | 13 ++-- actix-rt/Cargo.toml | 1 - actix-rt/src/builder.rs | 24 ++++---- actix-rt/src/lib.rs | 24 ++++---- actix-rt/src/system.rs | 33 ++++++---- actix-rt/src/worker.rs | 119 +++++++++++++++++-------------------- actix-rt/tests/tests.rs | 16 ++++- actix-server/src/accept.rs | 11 ++-- 8 files changed, 130 insertions(+), 111 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 73bcac5c..379afbd7 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -2,16 +2,19 @@ ## Unreleased - 2021-xx-xx +* Rename `Arbiter => Worker`. [#254] * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253] -* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] -* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] -* Remove `Arbiter::exec`. [#253] -* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] -* Rename `Arbiter => Worker`. [#254] +* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] +* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] +* Remove `Worker::exec`. [#253] +* Remove `System::arbiter`. [#256] +* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] +* `Worker::spawn` now accepts !Unpin futures. [#256] [#253]: https://github.com/actix/actix-net/pull/253 [#254]: https://github.com/actix/actix-net/pull/254 +[#256]: https://github.com/actix/actix-net/pull/256 ## 2.0.0-beta.2 - 2021-01-09 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index f5a6ba6a..68ed8563 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -27,4 +27,3 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync [dev-dependencies] tokio = { version = "1", features = ["full"] } -futures-util = { version = "0.3.7", default-features = true, features = ["alloc"] } diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 93d2fe28..f9a3fca2 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -53,7 +53,7 @@ impl Builder { where F: FnOnce(), { - let (stop_tx, stop) = channel(); + let (stop_tx, stop_rx) = channel(); let (sys_sender, sys_receiver) = unbounded_channel(); let rt = Runtime::new().unwrap(); @@ -67,27 +67,30 @@ impl Builder { // run system init method rt.block_on(async { init_fn() }); - SystemRunner { rt, stop, system } + SystemRunner { + rt, + stop_rx, + system, + } } } -/// Helper object that runs System's event loop -#[must_use = "SystemRunner must be run"] +/// System runner object that keeps event loop alive and running until stop message is received. +#[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { rt: Runtime, - stop: Receiver, + stop_rx: Receiver, system: System, } impl SystemRunner { - /// This function will start event loop and will finish once the - /// `System::stop()` function is called. + /// Starts event loop and will finish once [`System::stop()`] is called. pub fn run(self) -> io::Result<()> { - let SystemRunner { rt, stop, .. } = self; + let SystemRunner { rt, stop_rx, .. } = self; // run loop - match rt.block_on(stop) { + match rt.block_on(stop_rx) { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -98,11 +101,12 @@ impl SystemRunner { Ok(()) } } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } } - /// Execute a future and wait for result. + /// Runs the provided future, blocking the current thread until the future completes. #[inline] pub fn block_on(&self, fut: F) -> F::Output { self.rt.block_on(fut) diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 212c0c65..b0303d6c 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -25,18 +25,6 @@ pub use self::runtime::Runtime; pub use self::system::System; pub use self::worker::Worker; -/// Spawns a future on the current [Arbiter]. -/// -/// # Panics -/// Panics if Actix system is not running. -#[inline] -pub fn spawn(f: Fut) -> JoinHandle<()> -where - Fut: Future + 'static, -{ - tokio::task::spawn_local(f) -} - pub mod signal { //! Asynchronous signal handling (Tokio re-exports). @@ -72,3 +60,15 @@ pub mod task { pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } + +/// Spawns a future on the current [Worker]. +/// +/// # Panics +/// Panics if Actix system is not running. +#[inline] +pub fn spawn(f: Fut) -> JoinHandle<()> +where + Fut: Future + 'static, +{ + tokio::task::spawn_local(f) +} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index e3ee720b..0182136e 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -23,7 +23,8 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); #[derive(Clone, Debug)] pub struct System { id: usize, - tx: mpsc::UnboundedSender, + sys_tx: mpsc::UnboundedSender, + // TODO: which worker is this exactly worker: Worker, } @@ -32,10 +33,13 @@ thread_local!( ); impl System { - /// Constructs new system and sets it as current - pub(crate) fn construct(sys: mpsc::UnboundedSender, worker: Worker) -> Self { + /// Constructs new system and sets it as current. + pub(crate) fn construct( + sys_tx: mpsc::UnboundedSender, + worker: Worker, + ) -> Self { let sys = System { - tx: sys, + sys_tx, worker, id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; @@ -60,6 +64,9 @@ impl System { } /// Get current running system. + /// + /// # Panics + /// Panics if no system is registered on the current thread. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() { Some(ref sys) => sys.clone(), @@ -103,15 +110,16 @@ impl System { /// Stop the system with a particular exit code. pub fn stop_with_code(&self, code: i32) { - let _ = self.tx.send(SystemCommand::Exit(code)); + let _ = self.sys_tx.send(SystemCommand::Exit(code)); } pub(crate) fn tx(&self) -> &mpsc::UnboundedSender { - &self.tx + &self.sys_tx } - /// Get shared reference to system arbiter. - pub fn arbiter(&self) -> &Worker { + // TODO: give clarity on which worker this is; previous documented as returning "system worker" + /// Get shared reference to a worker. + pub fn worker(&self) -> &Worker { &self.worker } @@ -165,18 +173,21 @@ impl Future for SystemWorker { // process system command Some(cmd) => match cmd { SystemCommand::Exit(code) => { - // stop arbiters - for arb in self.workers.values() { - arb.stop(); + // stop workers + for wkr in self.workers.values() { + wkr.stop(); } + // stop event loop if let Some(stop) = self.stop.take() { let _ = stop.send(code); } } + SystemCommand::RegisterArbiter(name, hnd) => { self.workers.insert(name, hnd); } + SystemCommand::DeregisterArbiter(name) => { self.workers.remove(&name); } diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs index f586d15f..adda3cff 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/worker.rs @@ -27,8 +27,7 @@ thread_local!( pub(crate) enum WorkerCommand { Stop, - Execute(Box + Unpin + Send>), - ExecuteFn(Box), + Execute(Pin + Send>>), } impl fmt::Debug for WorkerCommand { @@ -36,7 +35,6 @@ impl fmt::Debug for WorkerCommand { match self { WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"), WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), - WorkerCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"), } } } @@ -65,44 +63,12 @@ impl Default for Worker { } impl Worker { - pub(crate) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = mpsc::unbounded_channel(); - - let arb = Worker::new_handle(tx); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - STORAGE.with(|cell| cell.borrow_mut().clear()); - - local.spawn_local(WorkerRunner { rx }); - - arb - } - - fn new_handle(sender: mpsc::UnboundedSender) -> Self { - Self { - sender, - thread_handle: None, - } - } - - /// Returns the current Worker's handle. - /// - /// # Panics - /// Panics if no Worker is running on the current thread. - pub fn current() -> Worker { - ADDR.with(|cell| match *cell.borrow() { - Some(ref addr) => addr.clone(), - None => panic!("Worker is not running."), - }) - } - - /// Stop worker from continuing it's event loop. - pub fn stop(&self) { - let _ = self.sender.send(WorkerCommand::Stop); - } - /// Spawn new thread and run event loop in spawned thread. /// /// Returns handle of newly created worker. + /// + /// # Panics + /// Panics if a [System] not registered on the current thread. pub fn new() -> Worker { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); @@ -147,6 +113,22 @@ impl Worker { } } + /// Returns the current Worker's handle. + /// + /// # Panics + /// Panics if no Worker is running on the current thread. + pub fn current() -> Worker { + ADDR.with(|cell| match *cell.borrow() { + Some(ref addr) => addr.clone(), + None => panic!("Worker is not running."), + }) + } + + /// Stop worker from continuing it's event loop. + pub fn stop(&self) { + let _ = self.sender.send(WorkerCommand::Stop); + } + /// Send a future to the Arbiter's thread and spawn it. /// /// If you require a result, include a response channel in the future. @@ -154,10 +136,10 @@ impl Worker { /// Returns true if future was sent successfully and false if the Arbiter has died. pub fn spawn(&self, future: Fut) -> bool where - Fut: Future + Unpin + Send + 'static, + Fut: Future + Send + 'static, { self.sender - .send(WorkerCommand::Execute(Box::new(future))) + .send(WorkerCommand::Execute(Box::pin(future))) .is_ok() } @@ -171,9 +153,37 @@ impl Worker { where F: FnOnce() + Send + 'static, { - self.sender - .send(WorkerCommand::ExecuteFn(Box::new(f))) - .is_ok() + self.spawn(async { f() }) + } + + /// Wait for worker's event loop to complete. + /// + /// Joins the underlying OS thread handle, if contained. + pub fn join(&mut self) -> thread::Result<()> { + if let Some(thread_handle) = self.thread_handle.take() { + thread_handle.join() + } else { + Ok(()) + } + } + + pub(crate) fn new_system(local: &LocalSet) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + let arb = Worker::new_handle(tx); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + STORAGE.with(|cell| cell.borrow_mut().clear()); + + local.spawn_local(WorkerRunner { rx }); + + arb + } + + fn new_handle(sender: mpsc::UnboundedSender) -> Self { + Self { + sender, + thread_handle: None, + } } /// Insert item into worker's thread-local storage. @@ -190,11 +200,6 @@ impl Worker { /// Call a function with a shared reference to an item in this worker's thread-local storage. /// - /// # Examples - /// ``` - /// - /// ``` - /// /// # Panics /// Panics if item is not in worker's thread-local item storage. pub fn get_item(mut f: F) -> R @@ -228,17 +233,6 @@ impl Worker { f(item) }) } - - /// Wait for worker's event loop to complete. - /// - /// Joins the underlying OS thread handle, if contained. - pub fn join(&mut self) -> thread::Result<()> { - if let Some(thread_handle) = self.thread_handle.take() { - thread_handle.join() - } else { - Ok(()) - } - } } /// A persistent worker future that processes worker commands. @@ -256,17 +250,12 @@ impl Future for WorkerRunner { // channel closed; no more messages can be received None => return Poll::Ready(()), - // process arbiter command + // process worker command Some(item) => match item { WorkerCommand::Stop => return Poll::Ready(()), WorkerCommand::Execute(task_fut) => { tokio::task::spawn_local(task_fut); } - WorkerCommand::ExecuteFn(task_fn) => { - tokio::task::spawn_local(async { - task_fn(); - }); - } }, } } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index e05e8975..ec71656c 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -136,12 +136,10 @@ fn worker_drop_no_panic_fn() { #[test] fn worker_drop_no_panic_fut() { - use futures_util::future::lazy; - let _ = System::new("test-system"); let mut worker = Worker::new(); - worker.spawn(lazy(|_| panic!("test"))); + worker.spawn(async { panic!("test") }); worker.stop(); worker.join().unwrap(); @@ -187,3 +185,15 @@ fn system_name_cow_string() { let _ = System::new("test-system".to_owned()); System::current().stop(); } + +#[test] +#[should_panic] +fn no_system_current_panic() { + System::current(); +} + +#[test] +#[should_panic] +fn no_system_worker_new_panic() { + Worker::new(); +} diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5c434709..82c00ef5 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,8 +1,10 @@ use std::time::Duration; use std::{io, thread}; -use actix_rt::time::{sleep_until, Instant}; -use actix_rt::System; +use actix_rt::{ + time::{sleep_until, Instant}, + System, +}; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; @@ -401,10 +403,11 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - System::current().arbiter().spawn(Box::pin(async move { + System::current().worker().spawn(async move { sleep_until(Instant::now() + Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); - })); + }); + return; } }