diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 3f2db63f..a18c57d8 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -14,8 +14,8 @@ * Add `System::with_init` as replacement for `Builder::run`. [#257] * Rename `System::{is_set => is_registered}`. [#257] * Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257]. -* `System::arbiter` now returns a `&ArbiterHandle`. [#257] -* Rename `Arbiter::{current => handle}` and return a `ArbiterHandle` instead. [#257] +* `System::arbiter` now returns an `&ArbiterHandle`. [#257] +* `Arbiter::current` now returns an `ArbiterHandle` instead. [#257] * `Arbiter::join` now takes self by value. [#257] [#253]: https://github.com/actix/actix-net/pull/253 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 6f723111..44e34b5d 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -42,12 +42,12 @@ impl fmt::Debug for ArbiterCommand { /// A handle for sending spawn and stop messages to an [Arbiter]. #[derive(Debug, Clone)] pub struct ArbiterHandle { - sender: mpsc::UnboundedSender, + tx: mpsc::UnboundedSender, } impl ArbiterHandle { - pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { - Self { sender } + pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { + Self { tx } } /// Send a future to the [Arbiter]'s thread and spawn it. @@ -59,7 +59,7 @@ impl ArbiterHandle { where Fut: Future + Send + 'static, { - self.sender + self.tx .send(ArbiterCommand::Execute(Box::pin(future))) .is_ok() } @@ -82,7 +82,7 @@ impl ArbiterHandle { /// Returns true if stop message was sent successfully and false if the [Arbiter] has /// been dropped. pub fn stop(&self) -> bool { - self.sender.send(ArbiterCommand::Stop).is_ok() + self.tx.send(ArbiterCommand::Stop).is_ok() } } @@ -92,7 +92,7 @@ impl ArbiterHandle { /// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. #[derive(Debug)] pub struct Arbiter { - sender: mpsc::UnboundedSender, + tx: mpsc::UnboundedSender, thread_handle: thread::JoinHandle<()>, } @@ -109,12 +109,14 @@ impl Arbiter { let sys = System::current(); let (tx, rx) = mpsc::unbounded_channel(); + let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); + let thread_handle = thread::Builder::new() .name(name.clone()) .spawn({ let tx = tx.clone(); move || { - let rt = Runtime::new().expect("Can not create Runtime"); + let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime."); let hnd = ArbiterHandle::new(tx); System::set_current(sys); @@ -127,6 +129,8 @@ impl Arbiter { .tx() .send(SystemCommand::RegisterArbiter(id, hnd)); + ready_tx.send(()).unwrap(); + // run arbiter event processing loop rt.block_on(ArbiterRunner { rx }); @@ -140,13 +144,12 @@ impl Arbiter { panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) }); - Arbiter { - sender: tx, - thread_handle, - } + ready_rx.recv().unwrap(); + + Arbiter { tx, thread_handle } } - /// Sets up an Arbiter runner on the current thread using the provided runtime local task set. + /// Sets up an Arbiter runner in a new System using the provided runtime local task set. pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { let (tx, rx) = mpsc::unbounded_channel(); @@ -160,11 +163,11 @@ impl Arbiter { hnd } - /// Return a handle to the Arbiter's message sender. + /// Return a handle to the current thread's Arbiter's message sender. /// /// # Panics /// Panics if no Arbiter is running on the current thread. - pub fn handle() -> ArbiterHandle { + pub fn current() -> ArbiterHandle { HANDLE.with(|cell| match *cell.borrow() { Some(ref addr) => addr.clone(), None => panic!("Arbiter is not running."), @@ -175,7 +178,7 @@ impl Arbiter { /// /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. pub fn stop(&self) -> bool { - self.sender.send(ArbiterCommand::Stop).is_ok() + self.tx.send(ArbiterCommand::Stop).is_ok() } /// Send a future to the Arbiter's thread and spawn it. @@ -187,7 +190,7 @@ impl Arbiter { where Fut: Future + Send + 'static, { - self.sender + self.tx .send(ArbiterCommand::Execute(Box::pin(future))) .is_ok() } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index d2f38ca8..58fe3cab 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -40,7 +40,13 @@ impl System { let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); - let system = System::construct(sys_tx, Arbiter::in_new_system(rt.local_set())); + let sys_arbiter = Arbiter::in_new_system(rt.local_set()); + let system = System::construct(sys_tx, sys_arbiter.clone()); + + system + .tx() + .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter)) + .unwrap(); // init background system arbiter let sys_ctrl = SystemController::new(sys_rx, stop_tx); @@ -201,8 +207,8 @@ impl Future for SystemController { Some(cmd) => match cmd { SystemCommand::Exit(code) => { // stop all arbiters - for wkr in self.arbiters.values() { - wkr.stop(); + for arb in self.arbiters.values() { + arb.stop(); } // stop event loop @@ -212,12 +218,12 @@ impl Future for SystemController { } } - SystemCommand::RegisterArbiter(name, hnd) => { - self.arbiters.insert(name, hnd); + SystemCommand::RegisterArbiter(id, arb) => { + self.arbiters.insert(id, arb); } - SystemCommand::DeregisterArbiter(name) => { - self.arbiters.remove(&name); + SystemCommand::DeregisterArbiter(id) => { + self.arbiters.remove(&id); } }, } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 7749ad0a..b2607a91 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -28,7 +28,7 @@ fn join_another_arbiter() { let arbiter = Arbiter::new(); arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Arbiter::handle().stop(); + Arbiter::current().stop(); })); arbiter.join().unwrap(); }); @@ -43,7 +43,7 @@ fn join_another_arbiter() { arbiter.spawn_fn(move || { actix_rt::spawn(async move { tokio::time::sleep(time).await; - Arbiter::handle().stop(); + Arbiter::current().stop(); }); }); arbiter.join().unwrap(); @@ -58,7 +58,7 @@ fn join_another_arbiter() { let arbiter = Arbiter::new(); arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Arbiter::handle().stop(); + Arbiter::current().stop(); })); arbiter.stop(); arbiter.join().unwrap(); @@ -190,7 +190,7 @@ fn system_arbiter_spawn() { thread::spawn(|| { // this thread will have no arbiter in it's thread local so call will panic - Arbiter::handle(); + Arbiter::current(); }) .join() .unwrap_err(); @@ -200,8 +200,8 @@ fn system_arbiter_spawn() { System::set_current(sys); let sys = System::current(); - let wrk = sys.arbiter(); - wrk.spawn(async move { + let arb = sys.arbiter(); + arb.spawn(async move { tx.send(42u32).unwrap(); System::current().stop(); }); @@ -210,3 +210,22 @@ fn system_arbiter_spawn() { assert_eq!(runner.block_on(rx).unwrap(), 42); thread.join().unwrap(); } + +#[test] +fn system_stop_stops_arbiters() { + let sys = System::new(); + let arb = Arbiter::new(); + + // arbiter should be alive to receive spawn msg + assert!(Arbiter::current().spawn_fn(|| {})); + assert!(arb.spawn_fn(|| {})); + + System::current().stop(); + sys.run().unwrap(); + + // arbiter should be dead and return false + assert!(!Arbiter::current().spawn_fn(|| {})); + assert!(!arb.spawn_fn(|| {})); + + arb.join().unwrap(); +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 9c61e4ad..25a0429c 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -215,7 +215,7 @@ impl ServerWorker { } Err(e) => { error!("Can not start worker: {:?}", e); - Arbiter::handle().stop(); + Arbiter::current().stop(); } } wrk.await @@ -386,7 +386,7 @@ impl Future for ServerWorker { let num = num_connections(); if num == 0 { let _ = tx.take().unwrap().send(true); - Arbiter::handle().stop(); + Arbiter::current().stop(); return Poll::Ready(()); } @@ -394,7 +394,7 @@ impl Future for ServerWorker { if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); self.shutdown(true); - Arbiter::handle().stop(); + Arbiter::current().stop(); return Poll::Ready(()); }