diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index eb9965e0..2c0c1b50 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -21,4 +21,5 @@ actix-threadpool = "0.3" futures-channel = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } copyless = "0.1.4" +smallvec = "1" tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index cf24ad8e..eff10ca3 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -18,13 +18,14 @@ use crate::system::System; use copyless::BoxHelper; +use smallvec::SmallVec; pub use tokio::task::JoinHandle; thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); static Q: RefCell>>>> = RefCell::new(Vec::new()); - static PENDING: RefCell>> = RefCell::new(Vec::new()); + static PENDING: RefCell; 8]>> = RefCell::new(SmallVec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -181,10 +182,15 @@ impl Arbiter { RUNNING.with(move |cell| { if cell.get() { // Spawn the future on running executor - PENDING.with(move |cell| { - cell.borrow_mut().push(tokio::task::spawn_local(future)); + let len = PENDING.with(move |cell| { + let mut p = cell.borrow_mut(); + p.push(tokio::task::spawn_local(future)); + p.len() }); - tokio::task::spawn_local(CleanupPending); + if len > 7 { + // Before reaching the inline size + tokio::task::spawn_local(CleanupPending); + } } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now @@ -312,7 +318,7 @@ impl Arbiter { /// have completed. pub fn local_join() -> impl Future { PENDING.with(move |cell| { - let current = cell.replace(Vec::new()); + let current = cell.replace(SmallVec::new()); future::join_all(current).map(|_| ()) }) } @@ -375,9 +381,15 @@ impl Future for ArbiterController { return Poll::Ready(()); } ArbiterCommand::Execute(fut) => { - PENDING.with(move |cell| { - cell.borrow_mut().push(tokio::task::spawn_local(fut)); + let len = PENDING.with(move |cell| { + let mut p = cell.borrow_mut(); + p.push(tokio::task::spawn_local(fut)); + p.len() }); + if len > 7 { + // Before reaching the inline size + tokio::task::spawn_local(CleanupPending); + } } ArbiterCommand::ExecuteFn(f) => { f.call_box();