diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 7990e67d..e7ce06be 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -26,6 +26,7 @@ macros = ["actix-macros"] actix-macros = { version = "0.2.0", optional = true } futures-core = { version = "0.3", default-features = false } +futures-intrusive = "0.4" tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } [dev-dependencies] diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 9ff1419d..634fc0ed 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,6 +9,9 @@ use std::{ }; use futures_core::ready; +use futures_intrusive::channel::shared::{ + oneshot_broadcast_channel, OneshotBroadcastReceiver, OneshotBroadcastSender, +}; use tokio::{sync::mpsc, task::LocalSet}; use crate::{ @@ -40,11 +43,26 @@ impl fmt::Debug for ArbiterCommand { #[derive(Debug, Clone)] pub struct ArbiterHandle { tx: mpsc::UnboundedSender, + /// Is `None` for system arbiter. + stopped_rx: Option>, } impl ArbiterHandle { - pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { - Self { tx } + pub(crate) fn new( + tx: mpsc::UnboundedSender, + stopped_rx: OneshotBroadcastReceiver<()>, + ) -> Self { + Self { + tx, + stopped_rx: Some(stopped_rx), + } + } + + pub(crate) fn for_system(tx: mpsc::UnboundedSender) -> Self { + Self { + tx, + stopped_rx: None, + } } /// Send a future to the [Arbiter]'s thread and spawn it. @@ -81,6 +99,25 @@ impl ArbiterHandle { pub fn stop(&self) -> bool { self.tx.send(ArbiterCommand::Stop).is_ok() } + + /// Will wait for [Arbiter] to complete all commands up until it's Stop command is processed. + /// + /// For [Arbiter]s that have already stopped, the future will resolve immediately. + /// + /// # Panics + /// Panics if called on the system Arbiter. In this situation the Arbiter's lifetime is + /// implicitly bound by the main thread's lifetime. + pub async fn join(self) { + match self.stopped_rx { + Some(rx) => { + rx.receive().await; + } + None => { + // TODO: decide if this is correct + panic!("cannot wait on the system Arbiter's completion") + } + } + } } /// An Arbiter represents a thread that provides an asynchronous execution environment for futures @@ -89,8 +126,10 @@ impl ArbiterHandle { /// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. #[derive(Debug)] pub struct Arbiter { - tx: mpsc::UnboundedSender, - thread_handle: thread::JoinHandle<()>, + id: usize, + stopped_tx: OneshotBroadcastSender<()>, + cmd_tx: mpsc::UnboundedSender, + thread_handle: Option>, } impl Arbiter { @@ -99,7 +138,7 @@ impl Arbiter { /// # Panics /// Panics if a [System] is not registered on the current thread. #[allow(clippy::new_without_default)] - pub fn new() -> Arbiter { + pub fn new() -> ArbiterHandle { Self::with_tokio_rt(|| { default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") }) @@ -109,74 +148,109 @@ impl Arbiter { /// /// [tokio-runtime]: tokio::runtime::Runtime #[doc(hidden)] - pub fn with_tokio_rt(runtime_factory: F) -> Arbiter + pub fn with_tokio_rt(runtime_factory: F) -> ArbiterHandle where F: Fn() -> tokio::runtime::Runtime + Send + 'static, { + eprintln!("get sys current"); + let sys = System::current(); + eprintln!("get sys id"); let system_id = sys.id(); + eprintln!("calc arb id"); let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); - let (tx, rx) = mpsc::unbounded_channel(); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); + // let ready_barrier = Arc::new(Barrier::new(2)); + let (stopped_tx, stopped_rx) = oneshot_broadcast_channel::<()>(); + + eprintln!("make arb handle"); + let hnd = ArbiterHandle::new(cmd_tx.clone(), stopped_rx); + + eprintln!("make thread"); let thread_handle = thread::Builder::new() .name(name.clone()) .spawn({ - let tx = tx.clone(); - move || { - let rt = Runtime::from(runtime_factory()); - let hnd = ArbiterHandle::new(tx); + let hnd = hnd.clone(); + // let ready_barrier = Arc::clone(&ready_barrier); + move || { + eprintln!("thread: make rt"); + let rt = Runtime::from(runtime_factory()); + + eprintln!("thread: set sys"); System::set_current(sys); + // // wait until register message is sent + // eprintln!("thread: wait for arb registered"); + // ready_barrier.wait(); + + eprintln!("thread: set arb handle"); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - // register arbiter - let _ = System::current() - .tx() - .send(SystemCommand::RegisterArbiter(arb_id, hnd)); - - ready_tx.send(()).unwrap(); - // run arbiter event processing loop - rt.block_on(ArbiterRunner { rx }); + eprintln!("thread: block on arbiter loop"); + rt.block_on(ArbiterLoop { cmd_rx }); // deregister arbiter - let _ = System::current() + eprintln!("thread: send deregister arbiter message"); + System::current() .tx() - .send(SystemCommand::DeregisterArbiter(arb_id)); + .send(SystemCommand::DeregisterArbiter(arb_id)) + .unwrap(); } }) .unwrap_or_else(|err| { panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) }); - ready_rx.recv().unwrap(); + let arb = Arbiter { + id: arb_id, + cmd_tx, + stopped_tx, + thread_handle: Some(thread_handle), + }; - Arbiter { tx, thread_handle } + // register arbiter + eprintln!("send register arbiter message"); + System::current() + .tx() + .send(SystemCommand::RegisterArbiter(arb)) + .unwrap(); + + // eprintln!("inform arbiter that it is registered"); + // ready_barrier.wait(); + + eprintln!("arbiter::new done"); + hnd } /// Sets up an Arbiter runner in a new System using the provided runtime local task set. - pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { - let (tx, rx) = mpsc::unbounded_channel(); + pub(crate) fn for_system(local: &LocalSet) -> ArbiterHandle { + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let hnd = ArbiterHandle::new(tx); + let hnd = ArbiterHandle::for_system(cmd_tx); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - local.spawn_local(ArbiterRunner { rx }); + local.spawn_local(ArbiterLoop { cmd_rx }); hnd } - /// Return a handle to the this Arbiter's message sender. - pub fn handle(&self) -> ArbiterHandle { - ArbiterHandle::new(self.tx.clone()) + /// Return `Arbiter`'s numeric ID. + pub(crate) fn id(&self) -> usize { + self.id } + // /// Return a handle to the this Arbiter's message sender. + // pub fn handle(&self) -> ArbiterHandle { + // ArbiterHandle::new(self.cmd_tx.clone()) + // } + /// Return a handle to the current thread's Arbiter's message sender. /// /// # Panics @@ -191,57 +265,68 @@ impl Arbiter { /// Stop Arbiter from continuing it's event loop. /// /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. - pub fn stop(&self) -> bool { - self.tx.send(ArbiterCommand::Stop).is_ok() + pub(crate) fn stop(&self) -> bool { + self.cmd_tx.send(ArbiterCommand::Stop).is_ok() } - /// Send a future to the Arbiter's thread and spawn it. - /// - /// If you require a result, include a response channel in the future. - /// - /// Returns true if future was sent successfully and false if the Arbiter has died. - pub fn spawn(&self, future: Fut) -> bool - where - Fut: Future + Send + 'static, - { - self.tx - .send(ArbiterCommand::Execute(Box::pin(future))) - .is_ok() - } + // /// Send a future to the Arbiter's thread and spawn it. + // /// + // /// If you require a result, include a response channel in the future. + // /// + // /// Returns true if future was sent successfully and false if the Arbiter has died. + // pub fn spawn(&self, future: Fut) -> bool + // where + // Fut: Future + Send + 'static, + // { + // self.cmd_tx + // .send(ArbiterCommand::Execute(Box::pin(future))) + // .is_ok() + // } - /// Send a function to the Arbiter's thread and execute it. - /// - /// Any result from the function is discarded. If you require a result, include a response - /// channel in the function. - /// - /// Returns true if function was sent successfully and false if the Arbiter has died. - pub fn spawn_fn(&self, f: F) -> bool - where - F: FnOnce() + Send + 'static, - { - self.spawn(async { f() }) - } + // /// Send a function to the Arbiter's thread and execute it. + // /// + // /// Any result from the function is discarded. If you require a result, include a response + // /// channel in the function. + // /// + // /// Returns true if function was sent successfully and false if the Arbiter has died. + // pub fn spawn_fn(&self, f: F) -> bool + // where + // F: FnOnce() + Send + 'static, + // { + // self.spawn(async { f() }) + // } +} - /// Wait for Arbiter's event loop to complete. - /// - /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join). - pub fn join(self) -> thread::Result<()> { - self.thread_handle.join() +impl Drop for Arbiter { + fn drop(&mut self) { + eprintln!("Arb::drop: joining arbiter thread"); + match self.thread_handle.take().unwrap().join() { + Ok(()) => {} + Err(err) => { + eprintln!("arbiter {} thread panicked: {:?}", self.id(), err) + } + } + eprintln!("Arb::drop: sending stopped tx"); + + // could fail if all handles are dropped already so ignore result + let _ = self.stopped_tx.send(()); + + eprintln!("Arb::drop: done"); } } /// A persistent future that processes [Arbiter] commands. -struct ArbiterRunner { - rx: mpsc::UnboundedReceiver, +struct ArbiterLoop { + cmd_rx: mpsc::UnboundedReceiver, } -impl Future for ArbiterRunner { +impl Future for ArbiterLoop { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // process all items currently buffered in channel loop { - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) { // channel closed; no more messages can be received None => return Poll::Ready(()), diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index a529bdb0..0bce2114 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -16,12 +16,12 @@ //! //! # Examples //! ``` -//! use std::sync::mpsc; +//! use std::sync::mpsc::channel as std_channel; //! use actix_rt::{Arbiter, System}; //! -//! let _ = System::new(); +//! let sys = System::new(); //! -//! let (tx, rx) = mpsc::channel::(); +//! let (tx, rx) = std_channel::(); //! //! let arbiter = Arbiter::new(); //! arbiter.spawn_fn(move || tx.send(42).unwrap()); @@ -30,7 +30,10 @@ //! assert_eq!(num, 42); //! //! arbiter.stop(); -//! arbiter.join().unwrap(); +//! sys.block_on(arbiter.join()); +//! +//! System::current().stop(); +//! sys.run().unwrap(); //! ``` #![deny(rust_2018_idioms, nonstandard_style)] diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 3bc8a6e3..81a75b12 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -54,14 +54,9 @@ impl System { let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let rt = Runtime::from(runtime_factory()); - let sys_arbiter = Arbiter::in_new_system(rt.local_set()); + let sys_arbiter = Arbiter::for_system(rt.local_set()); let system = System::construct(sys_tx, sys_arbiter.clone()); - system - .tx() - .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter)) - .unwrap(); - // init background system arbiter let sys_ctrl = SystemController::new(sys_rx, stop_tx); rt.spawn(sys_ctrl); @@ -150,7 +145,10 @@ impl System { } /// Runner that keeps a [System]'s event loop alive until stop message is received. -#[must_use = "A SystemRunner does nothing unless `run` is called."] +/// +/// Dropping the `SystemRunner` (eg. `let _ = System::new();`) will result in no further events +/// being processed. It is required you bind the runner and call `run` or call `block_on`. +#[must_use = "A SystemRunner does nothing unless `run` or `block_on` is called."] #[derive(Debug)] pub struct SystemRunner { rt: Runtime, @@ -190,7 +188,7 @@ impl SystemRunner { #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), - RegisterArbiter(usize, ArbiterHandle), + RegisterArbiter(Arbiter), DeregisterArbiter(usize), } @@ -200,7 +198,7 @@ pub(crate) enum SystemCommand { pub(crate) struct SystemController { stop_tx: Option>, cmd_rx: mpsc::UnboundedReceiver, - arbiters: HashMap, + arbiters: HashMap, } impl SystemController { @@ -221,35 +219,42 @@ impl Future for SystemController { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // process all items currently buffered in channel - loop { + let code = loop { match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) { // channel closed; no more messages can be received - None => return Poll::Ready(()), + None => break 0, // process system command Some(cmd) => match cmd { SystemCommand::Exit(code) => { // stop all arbiters for arb in self.arbiters.values() { + eprintln!("SystemController: stopping arbiter {}", arb.id()); arb.stop(); } - // stop event loop - // will only fire once - if let Some(stop_tx) = self.stop_tx.take() { - let _ = stop_tx.send(code); - } + eprintln!("SystemController: dropping arbiters"); + // destroy all arbiters + // drop waits for threads to complete + self.arbiters.clear(); + + break code; } - SystemCommand::RegisterArbiter(id, arb) => { - self.arbiters.insert(id, arb); + SystemCommand::RegisterArbiter(arb) => { + self.arbiters.insert(arb.id(), arb); } SystemCommand::DeregisterArbiter(id) => { - self.arbiters.remove(&id); + // implicit arbiter drop + let _ = self.arbiters.remove(&id); } }, } - } + }; + + self.stop_tx.take().unwrap().send(code).unwrap(); + + Poll::Ready(()) } } diff --git a/actix-rt/tests/multi_arbiter_check.rs b/actix-rt/tests/multi_arbiter_check.rs new file mode 100644 index 00000000..ff2c66e3 --- /dev/null +++ b/actix-rt/tests/multi_arbiter_check.rs @@ -0,0 +1,44 @@ +//! Derived from this comment: +//! https://github.com/actix/actix/issues/464#issuecomment-779427825 + +use std::{thread, time::Duration}; + +use actix_rt::{Arbiter, System}; +use tokio::sync::mpsc; + +#[test] +fn actix_sample() { + let sys = System::new(); + let arb = Arbiter::new(); + + let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); + + // create "actor" + arb.spawn_fn(move || { + let a = A; + + actix_rt::spawn(async move { + while let Some(_) = rx.recv().await { + println!("{:?}", a); + } + }); + }); + + System::current().stop(); + + // all arbiters must be dropped when sys.run returns + sys.run().unwrap(); + + thread::sleep(Duration::from_millis(100)); +} + +#[derive(Debug)] +struct A; + +impl Drop for A { + fn drop(&mut self) { + println!("start drop"); + thread::sleep(Duration::from_millis(200)); + println!("finish drop"); + } +} diff --git a/actix-rt/tests/test_arbiter.rs b/actix-rt/tests/test_arbiter.rs new file mode 100644 index 00000000..778ef6eb --- /dev/null +++ b/actix-rt/tests/test_arbiter.rs @@ -0,0 +1,123 @@ +use std::{ + sync::mpsc::channel as std_channel, + time::{Duration, Instant}, +}; + +use actix_rt::{time, Arbiter, System}; + + + +#[test] +#[should_panic] +fn no_system_arbiter_new_panic() { + Arbiter::new(); +} + + +#[test] +fn join_arbiter_wait_fut() { + let time = Duration::from_secs(1); + let instant = Instant::now(); + + System::new().block_on(async move { + let arbiter = Arbiter::new(); + + arbiter.spawn(async move { + time::sleep(time).await; + Arbiter::current().stop(); + }); + + arbiter.join().await; + }); + + assert!( + instant.elapsed() >= time, + "Join on another arbiter should complete only when it calls stop" + ); +} + +#[test] +fn join_arbiter_wait_fn() { + let time = Duration::from_secs(1); + let instant = Instant::now(); + + System::new().block_on(async move { + let arbiter = Arbiter::new(); + + arbiter.spawn_fn(move || { + actix_rt::spawn(async move { + time::sleep(time).await; + Arbiter::current().stop(); + }); + }); + + arbiter.join().await; + }); + + assert!( + instant.elapsed() >= time, + "Join on an arbiter that has used actix_rt::spawn should wait for said future" + ); +} + +#[test] +fn join_arbiter_early_stop_call() { + let time = Duration::from_secs(1); + let instant = Instant::now(); + + System::new().block_on(async move { + let arbiter = Arbiter::new(); + + arbiter.spawn(Box::pin(async move { + time::sleep(time).await; + Arbiter::current().stop(); + })); + + arbiter.stop(); + arbiter.join().await; + }); + + assert!( + instant.elapsed() < time, + "Premature stop of Arbiter should conclude regardless of it's current state." + ); +} + +#[test] +fn arbiter_spawn_fn_runs() { + let sys = System::new(); + + let (tx, rx) = std_channel::(); + + let arbiter = Arbiter::new(); + arbiter.spawn_fn(move || { + tx.send(42).unwrap(); + System::current().stop(); + }); + + let num = rx.recv().unwrap(); + assert_eq!(num, 42); + + sys.run().unwrap(); +} + +#[test] +fn arbiter_inner_panic() { + let sys = System::new(); + + let (tx, rx) = std_channel::(); + + let arbiter = Arbiter::new(); + + // spawned panics should not cause arbiter to crash + arbiter.spawn(async { panic!("inner panic; will be caught") }); + arbiter.spawn_fn(|| panic!("inner panic; will be caught")); + + arbiter.spawn(async move { tx.send(42).unwrap() }); + + let num = rx.recv().unwrap(); + assert_eq!(num, 42); + + System::current().stop(); + sys.run().unwrap(); +} diff --git a/actix-rt/tests/test_runtime.rs b/actix-rt/tests/test_runtime.rs new file mode 100644 index 00000000..f51196d2 --- /dev/null +++ b/actix-rt/tests/test_runtime.rs @@ -0,0 +1,82 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel as std_channel, + Arc, + }, + time::Duration, +}; + +use actix_rt::{Arbiter, System}; + +#[test] +fn wait_for_spawns() { + let rt = actix_rt::Runtime::new().unwrap(); + + let handle = rt.spawn(async { + println!("running on the runtime"); + // assertion panic is caught at task boundary + assert_eq!(1, 2); + }); + + assert!(rt.block_on(handle).is_err()); +} + +#[test] +fn new_system_with_tokio() { + let (tx, rx) = std_channel(); + + let res = System::with_tokio_rt(move || { + tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_keep_alive(Duration::from_millis(1000)) + .worker_threads(2) + .max_blocking_threads(2) + .on_thread_start(|| {}) + .on_thread_stop(|| {}) + .build() + .unwrap() + }) + .block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + + tokio::task::spawn(async move { + tx.send(42).unwrap(); + }) + .await + .unwrap(); + + 123usize + }); + + assert_eq!(res, 123); + assert_eq!(rx.recv().unwrap(), 42); +} + +#[test] +fn new_arbiter_with_tokio() { + let sys = System::new(); + + let arb = Arbiter::with_tokio_rt(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + }); + + let counter = Arc::new(AtomicBool::new(true)); + + let counter1 = counter.clone(); + let did_spawn = arb.spawn(async move { + actix_rt::time::sleep(Duration::from_millis(1)).await; + counter1.store(false, Ordering::SeqCst); + Arbiter::current().stop(); + System::current().stop(); + }); + + sys.run().unwrap(); + + assert!(did_spawn); + assert_eq!(false, counter.load(Ordering::SeqCst)); +} diff --git a/actix-rt/tests/test_system.rs b/actix-rt/tests/test_system.rs new file mode 100644 index 00000000..a3900741 --- /dev/null +++ b/actix-rt/tests/test_system.rs @@ -0,0 +1,130 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread, + time::{Duration, Instant}, +}; + +use actix_rt::{time, Arbiter, System}; +use tokio::sync::oneshot; + +#[test] +#[should_panic] +fn no_system_current_panic() { + System::current(); +} + +#[test] +fn try_current_no_system() { + assert!(System::try_current().is_none()) +} + +#[test] +fn try_current_with_system() { + System::new().block_on(async { assert!(System::try_current().is_some()) }); +} + +#[test] +fn non_static_block_on() { + let string = String::from("test_str"); + let string = string.as_str(); + + let sys = System::new(); + + sys.block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + assert_eq!("test_str", string); + }); + + let rt = actix_rt::Runtime::new().unwrap(); + + rt.block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + assert_eq!("test_str", string); + }); + + System::current().stop(); + sys.run().unwrap(); +} + +#[test] +fn await_for_timer() { + let time = Duration::from_secs(1); + let instant = Instant::now(); + + System::new().block_on(async move { + time::sleep(time).await; + }); + + assert!( + instant.elapsed() >= time, + "Calling `block_on` should poll awaited future to completion." + ); +} + +#[test] +fn system_arbiter_spawn() { + let runner = System::new(); + + let (tx, rx) = oneshot::channel(); + let sys = System::current(); + + thread::spawn(|| { + // this thread will have no arbiter in it's thread local so call will panic + Arbiter::current(); + }) + .join() + .unwrap_err(); + + let thread = thread::spawn(|| { + // this thread will have no arbiter in it's thread local so use the system handle instead + System::set_current(sys); + let sys = System::current(); + + let arb = sys.arbiter(); + arb.spawn(async move { + tx.send(42u32).unwrap(); + System::current().stop(); + }); + }); + + assert_eq!(runner.block_on(rx).unwrap(), 42); + thread.join().unwrap(); +} + +struct Atom(Arc); + +impl Drop for Atom { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } +} + +#[test] +fn system_stop_arbiter_join_barrier() { + let sys = System::new(); + let arb = Arbiter::new(); + + let atom = Atom(Arc::new(AtomicBool::new(false))); + + // arbiter should be alive to receive spawn msg + assert!(Arbiter::current().spawn_fn(|| {})); + assert!(arb.spawn_fn(move || { + // thread should get dropped during sleep + thread::sleep(Duration::from_secs(2)); + + // pointless load to move atom into scope + atom.0.load(Ordering::SeqCst); + + panic!("spawned fn (thread) should be dropped during sleep"); + })); + + System::current().stop(); + sys.run().unwrap(); + + // arbiter should be dead and return false + assert!(!Arbiter::current().spawn_fn(|| {})); + assert!(!arb.spawn_fn(|| {})); +} diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs deleted file mode 100644 index 86fba96d..00000000 --- a/actix-rt/tests/tests.rs +++ /dev/null @@ -1,300 +0,0 @@ -use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::channel, - Arc, - }, - thread, - time::{Duration, Instant}, -}; - -use actix_rt::{Arbiter, System}; -use tokio::sync::oneshot; - -#[test] -fn await_for_timer() { - let time = Duration::from_secs(1); - let instant = Instant::now(); - System::new().block_on(async move { - tokio::time::sleep(time).await; - }); - assert!( - instant.elapsed() >= time, - "Block on should poll awaited future to completion" - ); -} - -#[test] -fn join_another_arbiter() { - let time = Duration::from_secs(1); - let instant = Instant::now(); - System::new().block_on(async move { - let arbiter = Arbiter::new(); - arbiter.spawn(Box::pin(async move { - tokio::time::sleep(time).await; - Arbiter::current().stop(); - })); - arbiter.join().unwrap(); - }); - assert!( - instant.elapsed() >= time, - "Join on another arbiter should complete only when it calls stop" - ); - - let instant = Instant::now(); - System::new().block_on(async move { - let arbiter = Arbiter::new(); - arbiter.spawn_fn(move || { - actix_rt::spawn(async move { - tokio::time::sleep(time).await; - Arbiter::current().stop(); - }); - }); - arbiter.join().unwrap(); - }); - assert!( - instant.elapsed() >= time, - "Join on an arbiter that has used actix_rt::spawn should wait for said future" - ); - - let instant = Instant::now(); - System::new().block_on(async move { - let arbiter = Arbiter::new(); - arbiter.spawn(Box::pin(async move { - tokio::time::sleep(time).await; - Arbiter::current().stop(); - })); - arbiter.stop(); - arbiter.join().unwrap(); - }); - assert!( - instant.elapsed() < time, - "Premature stop of arbiter should conclude regardless of it's current state" - ); -} - -#[test] -fn non_static_block_on() { - let string = String::from("test_str"); - let string = string.as_str(); - - let sys = System::new(); - - sys.block_on(async { - actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", string); - }); - - let rt = actix_rt::Runtime::new().unwrap(); - - rt.block_on(async { - actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", string); - }); -} - -#[test] -fn wait_for_spawns() { - let rt = actix_rt::Runtime::new().unwrap(); - - let handle = rt.spawn(async { - println!("running on the runtime"); - // assertion panic is caught at task boundary - assert_eq!(1, 2); - }); - - assert!(rt.block_on(handle).is_err()); -} - -#[test] -fn arbiter_spawn_fn_runs() { - let _ = System::new(); - - let (tx, rx) = channel::(); - - let arbiter = Arbiter::new(); - arbiter.spawn_fn(move || tx.send(42).unwrap()); - - let num = rx.recv().unwrap(); - assert_eq!(num, 42); - - arbiter.stop(); - arbiter.join().unwrap(); -} - -#[test] -fn arbiter_handle_spawn_fn_runs() { - let sys = System::new(); - - let (tx, rx) = channel::(); - - let arbiter = Arbiter::new(); - let handle = arbiter.handle(); - drop(arbiter); - - handle.spawn_fn(move || { - tx.send(42).unwrap(); - System::current().stop() - }); - - let num = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - assert_eq!(num, 42); - - handle.stop(); - sys.run().unwrap(); -} - -#[test] -fn arbiter_drop_no_panic_fn() { - let _ = System::new(); - - let arbiter = Arbiter::new(); - arbiter.spawn_fn(|| panic!("test")); - - arbiter.stop(); - arbiter.join().unwrap(); -} - -#[test] -fn arbiter_drop_no_panic_fut() { - let _ = System::new(); - - let arbiter = Arbiter::new(); - arbiter.spawn(async { panic!("test") }); - - arbiter.stop(); - arbiter.join().unwrap(); -} - -#[test] -#[should_panic] -fn no_system_current_panic() { - System::current(); -} - -#[test] -#[should_panic] -fn no_system_arbiter_new_panic() { - Arbiter::new(); -} - -#[test] -fn system_arbiter_spawn() { - let runner = System::new(); - - let (tx, rx) = oneshot::channel(); - let sys = System::current(); - - thread::spawn(|| { - // this thread will have no arbiter in it's thread local so call will panic - Arbiter::current(); - }) - .join() - .unwrap_err(); - - let thread = thread::spawn(|| { - // this thread will have no arbiter in it's thread local so use the system handle instead - System::set_current(sys); - let sys = System::current(); - - let arb = sys.arbiter(); - arb.spawn(async move { - tx.send(42u32).unwrap(); - System::current().stop(); - }); - }); - - assert_eq!(runner.block_on(rx).unwrap(), 42); - thread.join().unwrap(); -} - -#[test] -fn system_stop_stops_arbiters() { - let sys = System::new(); - let arb = Arbiter::new(); - - // arbiter should be alive to receive spawn msg - assert!(Arbiter::current().spawn_fn(|| {})); - assert!(arb.spawn_fn(|| {})); - - System::current().stop(); - sys.run().unwrap(); - - // account for slightly slow thread de-spawns (only observed on windows) - thread::sleep(Duration::from_millis(100)); - - // arbiter should be dead and return false - assert!(!Arbiter::current().spawn_fn(|| {})); - assert!(!arb.spawn_fn(|| {})); - - arb.join().unwrap(); -} - -#[test] -fn new_system_with_tokio() { - let (tx, rx) = channel(); - - let res = System::with_tokio_rt(move || { - tokio::runtime::Builder::new_multi_thread() - .enable_io() - .enable_time() - .thread_keep_alive(Duration::from_millis(1000)) - .worker_threads(2) - .max_blocking_threads(2) - .on_thread_start(|| {}) - .on_thread_stop(|| {}) - .build() - .unwrap() - }) - .block_on(async { - actix_rt::time::sleep(Duration::from_millis(1)).await; - - tokio::task::spawn(async move { - tx.send(42).unwrap(); - }) - .await - .unwrap(); - - 123usize - }); - - assert_eq!(res, 123); - assert_eq!(rx.recv().unwrap(), 42); -} - -#[test] -fn new_arbiter_with_tokio() { - let _ = System::new(); - - let arb = Arbiter::with_tokio_rt(|| { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - }); - - let counter = Arc::new(AtomicBool::new(true)); - - let counter1 = counter.clone(); - let did_spawn = arb.spawn(async move { - actix_rt::time::sleep(Duration::from_millis(1)).await; - counter1.store(false, Ordering::SeqCst); - Arbiter::current().stop(); - }); - - assert!(did_spawn); - - arb.join().unwrap(); - - assert_eq!(false, counter.load(Ordering::SeqCst)); -} - -#[test] -fn try_current_no_system() { - assert!(System::try_current().is_none()) -} - -#[test] -fn try_current_with_system() { - System::new().block_on(async { assert!(System::try_current().is_some()) }); -} diff --git a/rustfmt.toml b/rustfmt.toml index 2e19d167..973e002c 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,5 +1,2 @@ max_width = 96 reorder_imports = true -#wrap_comments = true -#fn_args_density = "Compressed" -#use_small_heuristics = false