From 06bca19524b73a0bd71b12b752acd35c8916964c Mon Sep 17 00:00:00 2001 From: Jonathas-Conceicao Date: Thu, 9 Apr 2020 20:36:35 -0300 Subject: [PATCH] actix-rt: Spawn future to cleanup pending JoinHandles Signed-off-by: Jonathas-Conceicao --- actix-rt/src/arbiter.rs | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 124a5580..cf24ad8e 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -183,7 +183,8 @@ impl Arbiter { // Spawn the future on running executor PENDING.with(move |cell| { cell.borrow_mut().push(tokio::task::spawn_local(future)); - }) + }); + tokio::task::spawn_local(CleanupPending); } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now @@ -317,6 +318,30 @@ impl Arbiter { } } +/// Future used for cleaning-up already finished `JoinHandle`s +/// from the `PENDING` list so the vector doesn't grow indefinitely +struct CleanupPending; + +impl Future for CleanupPending { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + PENDING.with(move |cell| { + let mut pending = cell.borrow_mut(); + let mut i = 0; + while i != pending.len() { + if let Poll::Ready(_) = Pin::new(&mut pending[i]).poll(cx) { + pending.remove(i); + } else { + i += 1; + } + } + }); + + Poll::Ready(()) + } +} + struct ArbiterController { stop: Option>, rx: UnboundedReceiver,