1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 02:21:07 +01:00

assure arbiter stop ordering

This commit is contained in:
Rob Ede 2021-02-21 03:03:17 +00:00
parent 8d74cf387d
commit 1c5a0a7c11
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
10 changed files with 564 additions and 394 deletions

View File

@ -26,6 +26,7 @@ macros = ["actix-macros"]
actix-macros = { version = "0.2.0", optional = true } actix-macros = { version = "0.2.0", optional = true }
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
futures-intrusive = "0.4"
tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[dev-dependencies] [dev-dependencies]

View File

@ -9,6 +9,9 @@ use std::{
}; };
use futures_core::ready; use futures_core::ready;
use futures_intrusive::channel::shared::{
oneshot_broadcast_channel, OneshotBroadcastReceiver, OneshotBroadcastSender,
};
use tokio::{sync::mpsc, task::LocalSet}; use tokio::{sync::mpsc, task::LocalSet};
use crate::{ use crate::{
@ -40,11 +43,26 @@ impl fmt::Debug for ArbiterCommand {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ArbiterHandle { pub struct ArbiterHandle {
tx: mpsc::UnboundedSender<ArbiterCommand>, tx: mpsc::UnboundedSender<ArbiterCommand>,
/// Is `None` for system arbiter.
stopped_rx: Option<OneshotBroadcastReceiver<()>>,
} }
impl ArbiterHandle { impl ArbiterHandle {
pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self { pub(crate) fn new(
Self { tx } tx: mpsc::UnboundedSender<ArbiterCommand>,
stopped_rx: OneshotBroadcastReceiver<()>,
) -> Self {
Self {
tx,
stopped_rx: Some(stopped_rx),
}
}
pub(crate) fn for_system(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
Self {
tx,
stopped_rx: None,
}
} }
/// Send a future to the [Arbiter]'s thread and spawn it. /// Send a future to the [Arbiter]'s thread and spawn it.
@ -81,6 +99,25 @@ impl ArbiterHandle {
pub fn stop(&self) -> bool { pub fn stop(&self) -> bool {
self.tx.send(ArbiterCommand::Stop).is_ok() self.tx.send(ArbiterCommand::Stop).is_ok()
} }
/// Will wait for [Arbiter] to complete all commands up until it's Stop command is processed.
///
/// For [Arbiter]s that have already stopped, the future will resolve immediately.
///
/// # Panics
/// Panics if called on the system Arbiter. In this situation the Arbiter's lifetime is
/// implicitly bound by the main thread's lifetime.
pub async fn join(self) {
match self.stopped_rx {
Some(rx) => {
rx.receive().await;
}
None => {
// TODO: decide if this is correct
panic!("cannot wait on the system Arbiter's completion")
}
}
}
} }
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures /// An Arbiter represents a thread that provides an asynchronous execution environment for futures
@ -89,8 +126,10 @@ impl ArbiterHandle {
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. /// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)] #[derive(Debug)]
pub struct Arbiter { pub struct Arbiter {
tx: mpsc::UnboundedSender<ArbiterCommand>, id: usize,
thread_handle: thread::JoinHandle<()>, stopped_tx: OneshotBroadcastSender<()>,
cmd_tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: Option<thread::JoinHandle<()>>,
} }
impl Arbiter { impl Arbiter {
@ -99,7 +138,7 @@ impl Arbiter {
/// # Panics /// # Panics
/// Panics if a [System] is not registered on the current thread. /// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Arbiter { pub fn new() -> ArbiterHandle {
Self::with_tokio_rt(|| { Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
}) })
@ -109,74 +148,109 @@ impl Arbiter {
/// ///
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)] #[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter pub fn with_tokio_rt<F>(runtime_factory: F) -> ArbiterHandle
where where
F: Fn() -> tokio::runtime::Runtime + Send + 'static, F: Fn() -> tokio::runtime::Runtime + Send + 'static,
{ {
eprintln!("get sys current");
let sys = System::current(); let sys = System::current();
eprintln!("get sys id");
let system_id = sys.id(); let system_id = sys.id();
eprintln!("calc arb id");
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); // let ready_barrier = Arc::new(Barrier::new(2));
let (stopped_tx, stopped_rx) = oneshot_broadcast_channel::<()>();
eprintln!("make arb handle");
let hnd = ArbiterHandle::new(cmd_tx.clone(), stopped_rx);
eprintln!("make thread");
let thread_handle = thread::Builder::new() let thread_handle = thread::Builder::new()
.name(name.clone()) .name(name.clone())
.spawn({ .spawn({
let tx = tx.clone(); let hnd = hnd.clone();
move || { // let ready_barrier = Arc::clone(&ready_barrier);
let rt = Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx);
move || {
eprintln!("thread: make rt");
let rt = Runtime::from(runtime_factory());
eprintln!("thread: set sys");
System::set_current(sys); System::set_current(sys);
// // wait until register message is sent
// eprintln!("thread: wait for arb registered");
// ready_barrier.wait();
eprintln!("thread: set arb handle");
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
ready_tx.send(()).unwrap();
// run arbiter event processing loop // run arbiter event processing loop
rt.block_on(ArbiterRunner { rx }); eprintln!("thread: block on arbiter loop");
rt.block_on(ArbiterLoop { cmd_rx });
// deregister arbiter // deregister arbiter
let _ = System::current() eprintln!("thread: send deregister arbiter message");
System::current()
.tx() .tx()
.send(SystemCommand::DeregisterArbiter(arb_id)); .send(SystemCommand::DeregisterArbiter(arb_id))
.unwrap();
} }
}) })
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
}); });
ready_rx.recv().unwrap(); let arb = Arbiter {
id: arb_id,
cmd_tx,
stopped_tx,
thread_handle: Some(thread_handle),
};
Arbiter { tx, thread_handle } // register arbiter
eprintln!("send register arbiter message");
System::current()
.tx()
.send(SystemCommand::RegisterArbiter(arb))
.unwrap();
// eprintln!("inform arbiter that it is registered");
// ready_barrier.wait();
eprintln!("arbiter::new done");
hnd
} }
/// Sets up an Arbiter runner in a new System using the provided runtime local task set. /// Sets up an Arbiter runner in a new System using the provided runtime local task set.
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { pub(crate) fn for_system(local: &LocalSet) -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let hnd = ArbiterHandle::new(tx); let hnd = ArbiterHandle::for_system(cmd_tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
local.spawn_local(ArbiterRunner { rx }); local.spawn_local(ArbiterLoop { cmd_rx });
hnd hnd
} }
/// Return a handle to the this Arbiter's message sender. /// Return `Arbiter`'s numeric ID.
pub fn handle(&self) -> ArbiterHandle { pub(crate) fn id(&self) -> usize {
ArbiterHandle::new(self.tx.clone()) self.id
} }
// /// Return a handle to the this Arbiter's message sender.
// pub fn handle(&self) -> ArbiterHandle {
// ArbiterHandle::new(self.cmd_tx.clone())
// }
/// Return a handle to the current thread's Arbiter's message sender. /// Return a handle to the current thread's Arbiter's message sender.
/// ///
/// # Panics /// # Panics
@ -191,57 +265,68 @@ impl Arbiter {
/// Stop Arbiter from continuing it's event loop. /// Stop Arbiter from continuing it's event loop.
/// ///
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
pub fn stop(&self) -> bool { pub(crate) fn stop(&self) -> bool {
self.tx.send(ArbiterCommand::Stop).is_ok() self.cmd_tx.send(ArbiterCommand::Stop).is_ok()
} }
/// Send a future to the Arbiter's thread and spawn it. // /// Send a future to the Arbiter's thread and spawn it.
/// // ///
/// If you require a result, include a response channel in the future. // /// If you require a result, include a response channel in the future.
/// // ///
/// Returns true if future was sent successfully and false if the Arbiter has died. // /// Returns true if future was sent successfully and false if the Arbiter has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool // pub fn spawn<Fut>(&self, future: Fut) -> bool
where // where
Fut: Future<Output = ()> + Send + 'static, // Fut: Future<Output = ()> + Send + 'static,
{ // {
self.tx // self.cmd_tx
.send(ArbiterCommand::Execute(Box::pin(future))) // .send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok() // .is_ok()
// }
// /// Send a function to the Arbiter's thread and execute it.
// ///
// /// Any result from the function is discarded. If you require a result, include a response
// /// channel in the function.
// ///
// /// Returns true if function was sent successfully and false if the Arbiter has died.
// pub fn spawn_fn<F>(&self, f: F) -> bool
// where
// F: FnOnce() + Send + 'static,
// {
// self.spawn(async { f() })
// }
} }
/// Send a function to the Arbiter's thread and execute it. impl Drop for Arbiter {
/// fn drop(&mut self) {
/// Any result from the function is discarded. If you require a result, include a response eprintln!("Arb::drop: joining arbiter thread");
/// channel in the function. match self.thread_handle.take().unwrap().join() {
/// Ok(()) => {}
/// Returns true if function was sent successfully and false if the Arbiter has died. Err(err) => {
pub fn spawn_fn<F>(&self, f: F) -> bool eprintln!("arbiter {} thread panicked: {:?}", self.id(), err)
where
F: FnOnce() + Send + 'static,
{
self.spawn(async { f() })
} }
}
eprintln!("Arb::drop: sending stopped tx");
/// Wait for Arbiter's event loop to complete. // could fail if all handles are dropped already so ignore result
/// let _ = self.stopped_tx.send(());
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
pub fn join(self) -> thread::Result<()> { eprintln!("Arb::drop: done");
self.thread_handle.join()
} }
} }
/// A persistent future that processes [Arbiter] commands. /// A persistent future that processes [Arbiter] commands.
struct ArbiterRunner { struct ArbiterLoop {
rx: mpsc::UnboundedReceiver<ArbiterCommand>, cmd_rx: mpsc::UnboundedReceiver<ArbiterCommand>,
} }
impl Future for ArbiterRunner { impl Future for ArbiterLoop {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel // process all items currently buffered in channel
loop { loop {
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
// channel closed; no more messages can be received // channel closed; no more messages can be received
None => return Poll::Ready(()), None => return Poll::Ready(()),

View File

@ -16,12 +16,12 @@
//! //!
//! # Examples //! # Examples
//! ``` //! ```
//! use std::sync::mpsc; //! use std::sync::mpsc::channel as std_channel;
//! use actix_rt::{Arbiter, System}; //! use actix_rt::{Arbiter, System};
//! //!
//! let _ = System::new(); //! let sys = System::new();
//! //!
//! let (tx, rx) = mpsc::channel::<u32>(); //! let (tx, rx) = std_channel::<u32>();
//! //!
//! let arbiter = Arbiter::new(); //! let arbiter = Arbiter::new();
//! arbiter.spawn_fn(move || tx.send(42).unwrap()); //! arbiter.spawn_fn(move || tx.send(42).unwrap());
@ -30,7 +30,10 @@
//! assert_eq!(num, 42); //! assert_eq!(num, 42);
//! //!
//! arbiter.stop(); //! arbiter.stop();
//! arbiter.join().unwrap(); //! sys.block_on(arbiter.join());
//!
//! System::current().stop();
//! sys.run().unwrap();
//! ``` //! ```
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]

View File

@ -54,14 +54,9 @@ impl System {
let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::from(runtime_factory()); let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set()); let sys_arbiter = Arbiter::for_system(rt.local_set());
let system = System::construct(sys_tx, sys_arbiter.clone()); let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter // init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx); let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_ctrl); rt.spawn(sys_ctrl);
@ -150,7 +145,10 @@ impl System {
} }
/// Runner that keeps a [System]'s event loop alive until stop message is received. /// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."] ///
/// Dropping the `SystemRunner` (eg. `let _ = System::new();`) will result in no further events
/// being processed. It is required you bind the runner and call `run` or call `block_on`.
#[must_use = "A SystemRunner does nothing unless `run` or `block_on` is called."]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner { pub struct SystemRunner {
rt: Runtime, rt: Runtime,
@ -190,7 +188,7 @@ impl SystemRunner {
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum SystemCommand { pub(crate) enum SystemCommand {
Exit(i32), Exit(i32),
RegisterArbiter(usize, ArbiterHandle), RegisterArbiter(Arbiter),
DeregisterArbiter(usize), DeregisterArbiter(usize),
} }
@ -200,7 +198,7 @@ pub(crate) enum SystemCommand {
pub(crate) struct SystemController { pub(crate) struct SystemController {
stop_tx: Option<oneshot::Sender<i32>>, stop_tx: Option<oneshot::Sender<i32>>,
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>, cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
arbiters: HashMap<usize, ArbiterHandle>, arbiters: HashMap<usize, Arbiter>,
} }
impl SystemController { impl SystemController {
@ -221,35 +219,42 @@ impl Future for SystemController {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel // process all items currently buffered in channel
loop { let code = loop {
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) { match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
// channel closed; no more messages can be received // channel closed; no more messages can be received
None => return Poll::Ready(()), None => break 0,
// process system command // process system command
Some(cmd) => match cmd { Some(cmd) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
// stop all arbiters // stop all arbiters
for arb in self.arbiters.values() { for arb in self.arbiters.values() {
eprintln!("SystemController: stopping arbiter {}", arb.id());
arb.stop(); arb.stop();
} }
// stop event loop eprintln!("SystemController: dropping arbiters");
// will only fire once // destroy all arbiters
if let Some(stop_tx) = self.stop_tx.take() { // drop waits for threads to complete
let _ = stop_tx.send(code); self.arbiters.clear();
}
break code;
} }
SystemCommand::RegisterArbiter(id, arb) => { SystemCommand::RegisterArbiter(arb) => {
self.arbiters.insert(id, arb); self.arbiters.insert(arb.id(), arb);
} }
SystemCommand::DeregisterArbiter(id) => { SystemCommand::DeregisterArbiter(id) => {
self.arbiters.remove(&id); // implicit arbiter drop
let _ = self.arbiters.remove(&id);
} }
}, },
} }
} };
self.stop_tx.take().unwrap().send(code).unwrap();
Poll::Ready(())
} }
} }

View File

@ -0,0 +1,44 @@
//! Derived from this comment:
//! https://github.com/actix/actix/issues/464#issuecomment-779427825
use std::{thread, time::Duration};
use actix_rt::{Arbiter, System};
use tokio::sync::mpsc;
#[test]
fn actix_sample() {
let sys = System::new();
let arb = Arbiter::new();
let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
// create "actor"
arb.spawn_fn(move || {
let a = A;
actix_rt::spawn(async move {
while let Some(_) = rx.recv().await {
println!("{:?}", a);
}
});
});
System::current().stop();
// all arbiters must be dropped when sys.run returns
sys.run().unwrap();
thread::sleep(Duration::from_millis(100));
}
#[derive(Debug)]
struct A;
impl Drop for A {
fn drop(&mut self) {
println!("start drop");
thread::sleep(Duration::from_millis(200));
println!("finish drop");
}
}

View File

@ -0,0 +1,123 @@
use std::{
sync::mpsc::channel as std_channel,
time::{Duration, Instant},
};
use actix_rt::{time, Arbiter, System};
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[test]
fn join_arbiter_wait_fut() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(async move {
time::sleep(time).await;
Arbiter::current().stop();
});
arbiter.join().await;
});
assert!(
instant.elapsed() >= time,
"Join on another arbiter should complete only when it calls stop"
);
}
#[test]
fn join_arbiter_wait_fn() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || {
actix_rt::spawn(async move {
time::sleep(time).await;
Arbiter::current().stop();
});
});
arbiter.join().await;
});
assert!(
instant.elapsed() >= time,
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
);
}
#[test]
fn join_arbiter_early_stop_call() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move {
time::sleep(time).await;
Arbiter::current().stop();
}));
arbiter.stop();
arbiter.join().await;
});
assert!(
instant.elapsed() < time,
"Premature stop of Arbiter should conclude regardless of it's current state."
);
}
#[test]
fn arbiter_spawn_fn_runs() {
let sys = System::new();
let (tx, rx) = std_channel::<u32>();
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || {
tx.send(42).unwrap();
System::current().stop();
});
let num = rx.recv().unwrap();
assert_eq!(num, 42);
sys.run().unwrap();
}
#[test]
fn arbiter_inner_panic() {
let sys = System::new();
let (tx, rx) = std_channel::<u32>();
let arbiter = Arbiter::new();
// spawned panics should not cause arbiter to crash
arbiter.spawn(async { panic!("inner panic; will be caught") });
arbiter.spawn_fn(|| panic!("inner panic; will be caught"));
arbiter.spawn(async move { tx.send(42).unwrap() });
let num = rx.recv().unwrap();
assert_eq!(num, 42);
System::current().stop();
sys.run().unwrap();
}

View File

@ -0,0 +1,82 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel as std_channel,
Arc,
},
time::Duration,
};
use actix_rt::{Arbiter, System};
#[test]
fn wait_for_spawns() {
let rt = actix_rt::Runtime::new().unwrap();
let handle = rt.spawn(async {
println!("running on the runtime");
// assertion panic is caught at task boundary
assert_eq!(1, 2);
});
assert!(rt.block_on(handle).is_err());
}
#[test]
fn new_system_with_tokio() {
let (tx, rx) = std_channel();
let res = System::with_tokio_rt(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_keep_alive(Duration::from_millis(1000))
.worker_threads(2)
.max_blocking_threads(2)
.on_thread_start(|| {})
.on_thread_stop(|| {})
.build()
.unwrap()
})
.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
tokio::task::spawn(async move {
tx.send(42).unwrap();
})
.await
.unwrap();
123usize
});
assert_eq!(res, 123);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn new_arbiter_with_tokio() {
let sys = System::new();
let arb = Arbiter::with_tokio_rt(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
let counter = Arc::new(AtomicBool::new(true));
let counter1 = counter.clone();
let did_spawn = arb.spawn(async move {
actix_rt::time::sleep(Duration::from_millis(1)).await;
counter1.store(false, Ordering::SeqCst);
Arbiter::current().stop();
System::current().stop();
});
sys.run().unwrap();
assert!(did_spawn);
assert_eq!(false, counter.load(Ordering::SeqCst));
}

View File

@ -0,0 +1,130 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
use actix_rt::{time, Arbiter, System};
use tokio::sync::oneshot;
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
}
#[test]
fn try_current_with_system() {
System::new().block_on(async { assert!(System::try_current().is_some()) });
}
#[test]
fn non_static_block_on() {
let string = String::from("test_str");
let string = string.as_str();
let sys = System::new();
sys.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
let rt = actix_rt::Runtime::new().unwrap();
rt.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
System::current().stop();
sys.run().unwrap();
}
#[test]
fn await_for_timer() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
time::sleep(time).await;
});
assert!(
instant.elapsed() >= time,
"Calling `block_on` should poll awaited future to completion."
);
}
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
let (tx, rx) = oneshot::channel();
let sys = System::current();
thread::spawn(|| {
// this thread will have no arbiter in it's thread local so call will panic
Arbiter::current();
})
.join()
.unwrap_err();
let thread = thread::spawn(|| {
// this thread will have no arbiter in it's thread local so use the system handle instead
System::set_current(sys);
let sys = System::current();
let arb = sys.arbiter();
arb.spawn(async move {
tx.send(42u32).unwrap();
System::current().stop();
});
});
assert_eq!(runner.block_on(rx).unwrap(), 42);
thread.join().unwrap();
}
struct Atom(Arc<AtomicBool>);
impl Drop for Atom {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}
#[test]
fn system_stop_arbiter_join_barrier() {
let sys = System::new();
let arb = Arbiter::new();
let atom = Atom(Arc::new(AtomicBool::new(false)));
// arbiter should be alive to receive spawn msg
assert!(Arbiter::current().spawn_fn(|| {}));
assert!(arb.spawn_fn(move || {
// thread should get dropped during sleep
thread::sleep(Duration::from_secs(2));
// pointless load to move atom into scope
atom.0.load(Ordering::SeqCst);
panic!("spawned fn (thread) should be dropped during sleep");
}));
System::current().stop();
sys.run().unwrap();
// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
assert!(!arb.spawn_fn(|| {}));
}

View File

@ -1,300 +0,0 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
thread,
time::{Duration, Instant},
};
use actix_rt::{Arbiter, System};
use tokio::sync::oneshot;
#[test]
fn await_for_timer() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
tokio::time::sleep(time).await;
});
assert!(
instant.elapsed() >= time,
"Block on should poll awaited future to completion"
);
}
#[test]
fn join_another_arbiter() {
let time = Duration::from_secs(1);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await;
Arbiter::current().stop();
}));
arbiter.join().unwrap();
});
assert!(
instant.elapsed() >= time,
"Join on another arbiter should complete only when it calls stop"
);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || {
actix_rt::spawn(async move {
tokio::time::sleep(time).await;
Arbiter::current().stop();
});
});
arbiter.join().unwrap();
});
assert!(
instant.elapsed() >= time,
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
);
let instant = Instant::now();
System::new().block_on(async move {
let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await;
Arbiter::current().stop();
}));
arbiter.stop();
arbiter.join().unwrap();
});
assert!(
instant.elapsed() < time,
"Premature stop of arbiter should conclude regardless of it's current state"
);
}
#[test]
fn non_static_block_on() {
let string = String::from("test_str");
let string = string.as_str();
let sys = System::new();
sys.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
let rt = actix_rt::Runtime::new().unwrap();
rt.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", string);
});
}
#[test]
fn wait_for_spawns() {
let rt = actix_rt::Runtime::new().unwrap();
let handle = rt.spawn(async {
println!("running on the runtime");
// assertion panic is caught at task boundary
assert_eq!(1, 2);
});
assert!(rt.block_on(handle).is_err());
}
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
let (tx, rx) = channel::<u32>();
let arbiter = Arbiter::new();
arbiter.spawn_fn(move || tx.send(42).unwrap());
let num = rx.recv().unwrap();
assert_eq!(num, 42);
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
let (tx, rx) = channel::<u32>();
let arbiter = Arbiter::new();
let handle = arbiter.handle();
drop(arbiter);
handle.spawn_fn(move || {
tx.send(42).unwrap();
System::current().stop()
});
let num = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert_eq!(num, 42);
handle.stop();
sys.run().unwrap();
}
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
let arbiter = Arbiter::new();
arbiter.spawn_fn(|| panic!("test"));
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
let arbiter = Arbiter::new();
arbiter.spawn(async { panic!("test") });
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
let (tx, rx) = oneshot::channel();
let sys = System::current();
thread::spawn(|| {
// this thread will have no arbiter in it's thread local so call will panic
Arbiter::current();
})
.join()
.unwrap_err();
let thread = thread::spawn(|| {
// this thread will have no arbiter in it's thread local so use the system handle instead
System::set_current(sys);
let sys = System::current();
let arb = sys.arbiter();
arb.spawn(async move {
tx.send(42u32).unwrap();
System::current().stop();
});
});
assert_eq!(runner.block_on(rx).unwrap(), 42);
thread.join().unwrap();
}
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
let arb = Arbiter::new();
// arbiter should be alive to receive spawn msg
assert!(Arbiter::current().spawn_fn(|| {}));
assert!(arb.spawn_fn(|| {}));
System::current().stop();
sys.run().unwrap();
// account for slightly slow thread de-spawns (only observed on windows)
thread::sleep(Duration::from_millis(100));
// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
assert!(!arb.spawn_fn(|| {}));
arb.join().unwrap();
}
#[test]
fn new_system_with_tokio() {
let (tx, rx) = channel();
let res = System::with_tokio_rt(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_keep_alive(Duration::from_millis(1000))
.worker_threads(2)
.max_blocking_threads(2)
.on_thread_start(|| {})
.on_thread_stop(|| {})
.build()
.unwrap()
})
.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
tokio::task::spawn(async move {
tx.send(42).unwrap();
})
.await
.unwrap();
123usize
});
assert_eq!(res, 123);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn new_arbiter_with_tokio() {
let _ = System::new();
let arb = Arbiter::with_tokio_rt(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
let counter = Arc::new(AtomicBool::new(true));
let counter1 = counter.clone();
let did_spawn = arb.spawn(async move {
actix_rt::time::sleep(Duration::from_millis(1)).await;
counter1.store(false, Ordering::SeqCst);
Arbiter::current().stop();
});
assert!(did_spawn);
arb.join().unwrap();
assert_eq!(false, counter.load(Ordering::SeqCst));
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
}
#[test]
fn try_current_with_system() {
System::new().block_on(async { assert!(System::try_current().is_some()) });
}

View File

@ -1,5 +1,2 @@
max_width = 96 max_width = 96
reorder_imports = true reorder_imports = true
#wrap_comments = true
#fn_args_density = "Compressed"
#use_small_heuristics = false