From 29244199055abaa868c9ec790c5fbd399b7d71df Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 29 Jan 2021 14:16:10 +0000 Subject: [PATCH] prevent spawn_fn panic bubbling (#255) --- actix-rt/src/builder.rs | 41 ++++++++++++----------------------------- actix-rt/src/system.rs | 20 +++++--------------- actix-rt/src/worker.rs | 18 +++--------------- actix-rt/tests/tests.rs | 33 +++++++++++++++++++++++++++++++-- 4 files changed, 51 insertions(+), 61 deletions(-) diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 01c85512..93d2fe28 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -11,38 +11,25 @@ use crate::{ worker::Worker, }; -/// Builder an actix runtime. +/// System builder. /// /// Either use `Builder::build` to create a system and start actors. Alternatively, use /// `Builder::run` to start the Tokio runtime and run a function in its context. pub struct Builder { /// Name of the System. Defaults to "actix-rt" if unset. name: Cow<'static, str>, - - /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. - stop_on_panic: bool, } impl Builder { pub(crate) fn new() -> Self { Builder { name: Cow::Borrowed("actix-rt"), - stop_on_panic: false, } } /// Sets the name of the System. - pub fn name(mut self, name: impl Into) -> Self { - self.name = Cow::Owned(name.into()); - self - } - - /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an - /// uncaught panic is thrown from a worker thread. - /// - /// Defaults to false. - pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self { - self.stop_on_panic = stop_on_panic; + pub fn name(mut self, name: impl Into>) -> Self { + self.name = name.into(); self } @@ -55,14 +42,14 @@ impl Builder { /// This function will start Tokio runtime and will finish once the `System::stop()` message /// is called. Function `f` is called within Tokio runtime context. - pub fn run(self, f: F) -> io::Result<()> + pub fn run(self, init_fn: F) -> io::Result<()> where F: FnOnce(), { - self.create_runtime(f).run() + self.create_runtime(init_fn).run() } - fn create_runtime(self, f: F) -> SystemRunner + fn create_runtime(self, init_fn: F) -> SystemRunner where F: FnOnce(), { @@ -71,18 +58,14 @@ impl Builder { let rt = Runtime::new().unwrap(); - let system = System::construct( - sys_sender, - Worker::new_system(rt.local()), - self.stop_on_panic, - ); + let system = System::construct(sys_sender, Worker::new_system(rt.local())); - let arb = SystemWorker::new(sys_receiver, stop_tx); + // init system worker + let sys_worker = SystemWorker::new(sys_receiver, stop_tx); + rt.spawn(sys_worker); - rt.spawn(arb); - - // init system arbiter and run configuration method - rt.block_on(async { f() }); + // run system init method + rt.block_on(async { init_fn() }); SystemRunner { rt, stop, system } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index e7830175..e3ee720b 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Cow, cell::RefCell, collections::HashMap, future::Future, @@ -24,7 +25,6 @@ pub struct System { id: usize, tx: mpsc::UnboundedSender, worker: Worker, - stop_on_panic: bool, } thread_local!( @@ -33,15 +33,10 @@ thread_local!( impl System { /// Constructs new system and sets it as current - pub(crate) fn construct( - sys: mpsc::UnboundedSender, - worker: Worker, - stop_on_panic: bool, - ) -> Self { + pub(crate) fn construct(sys: mpsc::UnboundedSender, worker: Worker) -> Self { let sys = System { tx: sys, worker, - stop_on_panic, id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; System::set_current(sys.clone()); @@ -57,9 +52,10 @@ impl System { /// Create new system. /// - /// This method panics if it can not create Tokio runtime + /// # Panics + /// Panics if underlying Tokio runtime can not be created. #[allow(clippy::new_ret_no_self)] - pub fn new(name: impl Into) -> SystemRunner { + pub fn new(name: impl Into>) -> SystemRunner { Self::builder().name(name).build() } @@ -114,12 +110,6 @@ impl System { &self.tx } - /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an - /// uncaught panic is thrown from a worker thread. - pub(crate) fn stop_on_panic(&self) -> bool { - self.stop_on_panic - } - /// Get shared reference to system arbiter. pub fn arbiter(&self) -> &Worker { &self.worker diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs index 6b022eb2..f586d15f 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/worker.rs @@ -246,20 +246,6 @@ struct WorkerRunner { rx: mpsc::UnboundedReceiver, } -impl Drop for WorkerRunner { - fn drop(&mut self) { - // panics can only occur with spawn_fn calls - if thread::panicking() { - if System::current().stop_on_panic() { - eprintln!("Panic in Worker thread, shutting down system."); - System::current().stop_with_code(1) - } else { - eprintln!("Panic in Worker thread."); - } - } - } -} - impl Future for WorkerRunner { type Output = (); @@ -277,7 +263,9 @@ impl Future for WorkerRunner { tokio::task::spawn_local(task_fut); } WorkerCommand::ExecuteFn(task_fn) => { - task_fn(); + tokio::task::spawn_local(async { + task_fn(); + }); } }, } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index d3ce4e68..e05e8975 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,4 +1,5 @@ use std::{ + sync::mpsc::sync_channel, thread, time::{Duration, Instant}, }; @@ -107,13 +108,29 @@ fn wait_for_spawns() { } #[test] -#[should_panic] -fn worker_drop_panic_fn() { +fn worker_spawn_fn_runs() { + let _ = System::new("test-system"); + + let (tx, rx) = sync_channel::(1); + + let mut worker = Worker::new(); + worker.spawn_fn(move || tx.send(42).unwrap()); + + let num = rx.recv().unwrap(); + assert_eq!(num, 42); + + worker.stop(); + worker.join().unwrap(); +} + +#[test] +fn worker_drop_no_panic_fn() { let _ = System::new("test-system"); let mut worker = Worker::new(); worker.spawn_fn(|| panic!("test")); + worker.stop(); worker.join().unwrap(); } @@ -158,3 +175,15 @@ fn worker_item_storage() { worker.stop(); worker.join().unwrap(); } + +#[test] +fn system_name_cow_str() { + let _ = System::new("test-system"); + System::current().stop(); +} + +#[test] +fn system_name_cow_string() { + let _ = System::new("test-system".to_owned()); + System::current().stop(); +}