diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index a7792455..a95057c2 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -4,6 +4,8 @@ - Expose `System::is_set` to check if current system is running +- Add `Arbiter::local_join` associated function to get be able to `await` for spawned futures + ## [1.0.0] - 2019-12-11 * Update dependencies diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 16534476..08d3089c 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -15,10 +15,13 @@ use crate::system::System; use copyless::BoxHelper; +pub use tokio::task::JoinHandle; + thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); static Q: RefCell>>>> = RefCell::new(Vec::new()); + static PENDING: RefCell>> = RefCell::new(Vec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -170,7 +173,9 @@ impl Arbiter { RUNNING.with(move |cell| { if cell.get() { // Spawn the future on running executor - tokio::task::spawn_local(future); + PENDING.with(move |cell| { + cell.borrow_mut().push(tokio::task::spawn_local(future)); + }) } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now @@ -294,6 +299,15 @@ impl Arbiter { Ok(()) } } + + /// Returns a future that will be completed once all currently spawned futures + /// have completed. + pub fn local_join() -> impl Future { + PENDING.with(move |cell| { + let current = cell.replace(Vec::new()); + future::join_all(current).map(|_| ()) + }) + } } struct ArbiterController { @@ -329,7 +343,9 @@ impl Future for ArbiterController { return Poll::Ready(()); } ArbiterCommand::Execute(fut) => { - tokio::task::spawn_local(fut); + PENDING.with(move |cell| { + cell.borrow_mut().push(tokio::task::spawn_local(fut)); + }); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); diff --git a/actix-rt/tests/wait_spawned.rs b/actix-rt/tests/wait_spawned.rs index e3296e89..af5d0224 100644 --- a/actix-rt/tests/wait_spawned.rs +++ b/actix-rt/tests/wait_spawned.rs @@ -61,3 +61,40 @@ fn join_another_arbiter() { "Premature stop of arbiter should conclude regardless of it's current state" ); } + +#[test] +fn join_current_arbiter() { + let time = Duration::from_secs(2); + + let instant = Instant::now(); + actix_rt::System::new("test_join_current_arbiter").block_on(async move { + actix_rt::spawn(async move { + tokio::time::delay_for(time).await; + actix_rt::Arbiter::current().stop(); + }); + actix_rt::Arbiter::local_join().await; + }); + assert!( + instant.elapsed() >= time, + "Join on current arbiter should wait for all spawned futures" + ); + + let large_timer = Duration::from_secs(20); + let instant = Instant::now(); + actix_rt::System::new("test_join_current_arbiter").block_on(async move { + actix_rt::spawn(async move { + tokio::time::delay_for(time).await; + actix_rt::Arbiter::current().stop(); + }); + let f = actix_rt::Arbiter::local_join(); + actix_rt::spawn(async move { + tokio::time::delay_for(large_timer).await; + actix_rt::Arbiter::current().stop(); + }); + f.await; + }); + assert!( + instant.elapsed() < large_timer, + "local_join should await only for the already spawned futures" + ); +}