1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-07 15:54:24 +01:00

actix-rt: Set threshold size for arbiter's pending futures list

Signed-off-by: Jonathas-Conceicao <jadoliveira@inf.ufpel.edu.br>
This commit is contained in:
Jonathas-Conceicao 2020-04-16 03:12:00 -03:00
parent 06bca19524
commit 6906f25e01
2 changed files with 20 additions and 7 deletions

View File

@ -21,4 +21,5 @@ actix-threadpool = "0.3"
futures-channel = { version = "0.3.4", default-features = false } futures-channel = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
copyless = "0.1.4" copyless = "0.1.4"
smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }

View File

@ -18,13 +18,14 @@ use crate::system::System;
use copyless::BoxHelper; use copyless::BoxHelper;
use smallvec::SmallVec;
pub use tokio::task::JoinHandle; pub use tokio::task::JoinHandle;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false); static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new()); static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
static PENDING: RefCell<Vec<JoinHandle<()>>> = RefCell::new(Vec::new()); static PENDING: RefCell<SmallVec<[JoinHandle<()>; 8]>> = RefCell::new(SmallVec::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new()); static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
); );
@ -181,10 +182,15 @@ impl Arbiter {
RUNNING.with(move |cell| { RUNNING.with(move |cell| {
if cell.get() { if cell.get() {
// Spawn the future on running executor // Spawn the future on running executor
PENDING.with(move |cell| { let len = PENDING.with(move |cell| {
cell.borrow_mut().push(tokio::task::spawn_local(future)); let mut p = cell.borrow_mut();
p.push(tokio::task::spawn_local(future));
p.len()
}); });
if len > 7 {
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending); tokio::task::spawn_local(CleanupPending);
}
} else { } else {
// Box the future and push it to the queue, this results in double boxing // Box the future and push it to the queue, this results in double boxing
// because the executor boxes the future again, but works for now // because the executor boxes the future again, but works for now
@ -312,7 +318,7 @@ impl Arbiter {
/// have completed. /// have completed.
pub fn local_join() -> impl Future<Output = ()> { pub fn local_join() -> impl Future<Output = ()> {
PENDING.with(move |cell| { PENDING.with(move |cell| {
let current = cell.replace(Vec::new()); let current = cell.replace(SmallVec::new());
future::join_all(current).map(|_| ()) future::join_all(current).map(|_| ())
}) })
} }
@ -375,9 +381,15 @@ impl Future for ArbiterController {
return Poll::Ready(()); return Poll::Ready(());
} }
ArbiterCommand::Execute(fut) => { ArbiterCommand::Execute(fut) => {
PENDING.with(move |cell| { let len = PENDING.with(move |cell| {
cell.borrow_mut().push(tokio::task::spawn_local(fut)); let mut p = cell.borrow_mut();
p.push(tokio::task::spawn_local(fut));
p.len()
}); });
if len > 7 {
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending);
}
} }
ArbiterCommand::ExecuteFn(f) => { ArbiterCommand::ExecuteFn(f) => {
f.call_box(); f.call_box();