From 06bca19524b73a0bd71b12b752acd35c8916964c Mon Sep 17 00:00:00 2001 From: Jonathas-Conceicao Date: Thu, 9 Apr 2020 20:36:35 -0300 Subject: [PATCH 1/2] actix-rt: Spawn future to cleanup pending JoinHandles Signed-off-by: Jonathas-Conceicao --- actix-rt/src/arbiter.rs | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 124a5580..cf24ad8e 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -183,7 +183,8 @@ impl Arbiter { // Spawn the future on running executor PENDING.with(move |cell| { cell.borrow_mut().push(tokio::task::spawn_local(future)); - }) + }); + tokio::task::spawn_local(CleanupPending); } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now @@ -317,6 +318,30 @@ impl Arbiter { } } +/// Future used for cleaning-up already finished `JoinHandle`s +/// from the `PENDING` list so the vector doesn't grow indefinitely +struct CleanupPending; + +impl Future for CleanupPending { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + PENDING.with(move |cell| { + let mut pending = cell.borrow_mut(); + let mut i = 0; + while i != pending.len() { + if let Poll::Ready(_) = Pin::new(&mut pending[i]).poll(cx) { + pending.remove(i); + } else { + i += 1; + } + } + }); + + Poll::Ready(()) + } +} + struct ArbiterController { stop: Option>, rx: UnboundedReceiver, From 6906f25e014ec76869b8e7ebffb1b38590edda3d Mon Sep 17 00:00:00 2001 From: Jonathas-Conceicao Date: Thu, 16 Apr 2020 03:12:00 -0300 Subject: [PATCH 2/2] actix-rt: Set threshold size for arbiter's pending futures list Signed-off-by: Jonathas-Conceicao --- actix-rt/Cargo.toml | 1 + actix-rt/src/arbiter.rs | 26 +++++++++++++++++++------- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index eb9965e0..2c0c1b50 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -21,4 +21,5 @@ actix-threadpool = "0.3" futures-channel = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } copyless = "0.1.4" +smallvec = "1" tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index cf24ad8e..eff10ca3 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -18,13 +18,14 @@ use crate::system::System; use copyless::BoxHelper; +use smallvec::SmallVec; pub use tokio::task::JoinHandle; thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); static Q: RefCell>>>> = RefCell::new(Vec::new()); - static PENDING: RefCell>> = RefCell::new(Vec::new()); + static PENDING: RefCell; 8]>> = RefCell::new(SmallVec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -181,10 +182,15 @@ impl Arbiter { RUNNING.with(move |cell| { if cell.get() { // Spawn the future on running executor - PENDING.with(move |cell| { - cell.borrow_mut().push(tokio::task::spawn_local(future)); + let len = PENDING.with(move |cell| { + let mut p = cell.borrow_mut(); + p.push(tokio::task::spawn_local(future)); + p.len() }); - tokio::task::spawn_local(CleanupPending); + if len > 7 { + // Before reaching the inline size + tokio::task::spawn_local(CleanupPending); + } } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now @@ -312,7 +318,7 @@ impl Arbiter { /// have completed. pub fn local_join() -> impl Future { PENDING.with(move |cell| { - let current = cell.replace(Vec::new()); + let current = cell.replace(SmallVec::new()); future::join_all(current).map(|_| ()) }) } @@ -375,9 +381,15 @@ impl Future for ArbiterController { return Poll::Ready(()); } ArbiterCommand::Execute(fut) => { - PENDING.with(move |cell| { - cell.borrow_mut().push(tokio::task::spawn_local(fut)); + let len = PENDING.with(move |cell| { + let mut p = cell.borrow_mut(); + p.push(tokio::task::spawn_local(fut)); + p.len() }); + if len > 7 { + // Before reaching the inline size + tokio::task::spawn_local(CleanupPending); + } } ArbiterCommand::ExecuteFn(f) => { f.call_box();