diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 124a5580..cf24ad8e 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -183,7 +183,8 @@ impl Arbiter { // Spawn the future on running executor PENDING.with(move |cell| { cell.borrow_mut().push(tokio::task::spawn_local(future)); - }) + }); + tokio::task::spawn_local(CleanupPending); } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now @@ -317,6 +318,30 @@ impl Arbiter { } } +/// Future used for cleaning-up already finished `JoinHandle`s +/// from the `PENDING` list so the vector doesn't grow indefinitely +struct CleanupPending; + +impl Future for CleanupPending { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + PENDING.with(move |cell| { + let mut pending = cell.borrow_mut(); + let mut i = 0; + while i != pending.len() { + if let Poll::Ready(_) = Pin::new(&mut pending[i]).poll(cx) { + pending.remove(i); + } else { + i += 1; + } + } + }); + + Poll::Ready(()) + } +} + struct ArbiterController { stop: Option>, rx: UnboundedReceiver,