1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 18:02:58 +01:00

prevent arbiter leaks by waiting for registration

This commit is contained in:
Rob Ede 2021-01-31 04:41:28 +00:00
parent b75254403a
commit 2fa60b07ae
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
5 changed files with 62 additions and 34 deletions

View File

@ -14,8 +14,8 @@
* Add `System::with_init` as replacement for `Builder::run`. [#257] * Add `System::with_init` as replacement for `Builder::run`. [#257]
* Rename `System::{is_set => is_registered}`. [#257] * Rename `System::{is_set => is_registered}`. [#257]
* Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257]. * Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257].
* `System::arbiter` now returns a `&ArbiterHandle`. [#257] * `System::arbiter` now returns an `&ArbiterHandle`. [#257]
* Rename `Arbiter::{current => handle}` and return a `ArbiterHandle` instead. [#257] * `Arbiter::current` now returns an `ArbiterHandle` instead. [#257]
* `Arbiter::join` now takes self by value. [#257] * `Arbiter::join` now takes self by value. [#257]
[#253]: https://github.com/actix/actix-net/pull/253 [#253]: https://github.com/actix/actix-net/pull/253

View File

@ -42,12 +42,12 @@ impl fmt::Debug for ArbiterCommand {
/// A handle for sending spawn and stop messages to an [Arbiter]. /// A handle for sending spawn and stop messages to an [Arbiter].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ArbiterHandle { pub struct ArbiterHandle {
sender: mpsc::UnboundedSender<ArbiterCommand>, tx: mpsc::UnboundedSender<ArbiterCommand>,
} }
impl ArbiterHandle { impl ArbiterHandle {
pub(crate) fn new(sender: mpsc::UnboundedSender<ArbiterCommand>) -> Self { pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
Self { sender } Self { tx }
} }
/// Send a future to the [Arbiter]'s thread and spawn it. /// Send a future to the [Arbiter]'s thread and spawn it.
@ -59,7 +59,7 @@ impl ArbiterHandle {
where where
Fut: Future<Output = ()> + Send + 'static, Fut: Future<Output = ()> + Send + 'static,
{ {
self.sender self.tx
.send(ArbiterCommand::Execute(Box::pin(future))) .send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok() .is_ok()
} }
@ -82,7 +82,7 @@ impl ArbiterHandle {
/// Returns true if stop message was sent successfully and false if the [Arbiter] has /// Returns true if stop message was sent successfully and false if the [Arbiter] has
/// been dropped. /// been dropped.
pub fn stop(&self) -> bool { pub fn stop(&self) -> bool {
self.sender.send(ArbiterCommand::Stop).is_ok() self.tx.send(ArbiterCommand::Stop).is_ok()
} }
} }
@ -92,7 +92,7 @@ impl ArbiterHandle {
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. /// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)] #[derive(Debug)]
pub struct Arbiter { pub struct Arbiter {
sender: mpsc::UnboundedSender<ArbiterCommand>, tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>, thread_handle: thread::JoinHandle<()>,
} }
@ -109,12 +109,14 @@ impl Arbiter {
let sys = System::current(); let sys = System::current();
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
let thread_handle = thread::Builder::new() let thread_handle = thread::Builder::new()
.name(name.clone()) .name(name.clone())
.spawn({ .spawn({
let tx = tx.clone(); let tx = tx.clone();
move || { move || {
let rt = Runtime::new().expect("Can not create Runtime"); let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime.");
let hnd = ArbiterHandle::new(tx); let hnd = ArbiterHandle::new(tx);
System::set_current(sys); System::set_current(sys);
@ -127,6 +129,8 @@ impl Arbiter {
.tx() .tx()
.send(SystemCommand::RegisterArbiter(id, hnd)); .send(SystemCommand::RegisterArbiter(id, hnd));
ready_tx.send(()).unwrap();
// run arbiter event processing loop // run arbiter event processing loop
rt.block_on(ArbiterRunner { rx }); rt.block_on(ArbiterRunner { rx });
@ -140,13 +144,12 @@ impl Arbiter {
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
}); });
Arbiter { ready_rx.recv().unwrap();
sender: tx,
thread_handle, Arbiter { tx, thread_handle }
}
} }
/// Sets up an Arbiter runner on the current thread using the provided runtime local task set. /// Sets up an Arbiter runner in a new System using the provided runtime local task set.
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
@ -160,11 +163,11 @@ impl Arbiter {
hnd hnd
} }
/// Return a handle to the Arbiter's message sender. /// Return a handle to the current thread's Arbiter's message sender.
/// ///
/// # Panics /// # Panics
/// Panics if no Arbiter is running on the current thread. /// Panics if no Arbiter is running on the current thread.
pub fn handle() -> ArbiterHandle { pub fn current() -> ArbiterHandle {
HANDLE.with(|cell| match *cell.borrow() { HANDLE.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(), Some(ref addr) => addr.clone(),
None => panic!("Arbiter is not running."), None => panic!("Arbiter is not running."),
@ -175,7 +178,7 @@ impl Arbiter {
/// ///
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
pub fn stop(&self) -> bool { pub fn stop(&self) -> bool {
self.sender.send(ArbiterCommand::Stop).is_ok() self.tx.send(ArbiterCommand::Stop).is_ok()
} }
/// Send a future to the Arbiter's thread and spawn it. /// Send a future to the Arbiter's thread and spawn it.
@ -187,7 +190,7 @@ impl Arbiter {
where where
Fut: Future<Output = ()> + Send + 'static, Fut: Future<Output = ()> + Send + 'static,
{ {
self.sender self.tx
.send(ArbiterCommand::Execute(Box::pin(future))) .send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok() .is_ok()
} }

View File

@ -40,7 +40,13 @@ impl System {
let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let system = System::construct(sys_tx, Arbiter::in_new_system(rt.local_set())); let sys_arbiter = Arbiter::in_new_system(rt.local_set());
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter // init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx); let sys_ctrl = SystemController::new(sys_rx, stop_tx);
@ -201,8 +207,8 @@ impl Future for SystemController {
Some(cmd) => match cmd { Some(cmd) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
// stop all arbiters // stop all arbiters
for wkr in self.arbiters.values() { for arb in self.arbiters.values() {
wkr.stop(); arb.stop();
} }
// stop event loop // stop event loop
@ -212,12 +218,12 @@ impl Future for SystemController {
} }
} }
SystemCommand::RegisterArbiter(name, hnd) => { SystemCommand::RegisterArbiter(id, arb) => {
self.arbiters.insert(name, hnd); self.arbiters.insert(id, arb);
} }
SystemCommand::DeregisterArbiter(name) => { SystemCommand::DeregisterArbiter(id) => {
self.arbiters.remove(&name); self.arbiters.remove(&id);
} }
}, },
} }

View File

@ -28,7 +28,7 @@ fn join_another_arbiter() {
let arbiter = Arbiter::new(); let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move { arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Arbiter::handle().stop(); Arbiter::current().stop();
})); }));
arbiter.join().unwrap(); arbiter.join().unwrap();
}); });
@ -43,7 +43,7 @@ fn join_another_arbiter() {
arbiter.spawn_fn(move || { arbiter.spawn_fn(move || {
actix_rt::spawn(async move { actix_rt::spawn(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Arbiter::handle().stop(); Arbiter::current().stop();
}); });
}); });
arbiter.join().unwrap(); arbiter.join().unwrap();
@ -58,7 +58,7 @@ fn join_another_arbiter() {
let arbiter = Arbiter::new(); let arbiter = Arbiter::new();
arbiter.spawn(Box::pin(async move { arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Arbiter::handle().stop(); Arbiter::current().stop();
})); }));
arbiter.stop(); arbiter.stop();
arbiter.join().unwrap(); arbiter.join().unwrap();
@ -190,7 +190,7 @@ fn system_arbiter_spawn() {
thread::spawn(|| { thread::spawn(|| {
// this thread will have no arbiter in it's thread local so call will panic // this thread will have no arbiter in it's thread local so call will panic
Arbiter::handle(); Arbiter::current();
}) })
.join() .join()
.unwrap_err(); .unwrap_err();
@ -200,8 +200,8 @@ fn system_arbiter_spawn() {
System::set_current(sys); System::set_current(sys);
let sys = System::current(); let sys = System::current();
let wrk = sys.arbiter(); let arb = sys.arbiter();
wrk.spawn(async move { arb.spawn(async move {
tx.send(42u32).unwrap(); tx.send(42u32).unwrap();
System::current().stop(); System::current().stop();
}); });
@ -210,3 +210,22 @@ fn system_arbiter_spawn() {
assert_eq!(runner.block_on(rx).unwrap(), 42); assert_eq!(runner.block_on(rx).unwrap(), 42);
thread.join().unwrap(); thread.join().unwrap();
} }
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
let arb = Arbiter::new();
// arbiter should be alive to receive spawn msg
assert!(Arbiter::current().spawn_fn(|| {}));
assert!(arb.spawn_fn(|| {}));
System::current().stop();
sys.run().unwrap();
// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
assert!(!arb.spawn_fn(|| {}));
arb.join().unwrap();
}

View File

@ -215,7 +215,7 @@ impl ServerWorker {
} }
Err(e) => { Err(e) => {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::handle().stop(); Arbiter::current().stop();
} }
} }
wrk.await wrk.await
@ -386,7 +386,7 @@ impl Future for ServerWorker {
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
let _ = tx.take().unwrap().send(true); let _ = tx.take().unwrap().send(true);
Arbiter::handle().stop(); Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }
@ -394,7 +394,7 @@ impl Future for ServerWorker {
if Pin::new(t2).poll(cx).is_ready() { if Pin::new(t2).poll(cx).is_ready() {
let _ = tx.take().unwrap().send(false); let _ = tx.take().unwrap().send(false);
self.shutdown(true); self.shutdown(true);
Arbiter::handle().stop(); Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }