diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 7b89b2fa..c38373dd 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -2,6 +2,15 @@ ## Unreleased - 2021-xx-xx +* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] +* Return `JoinHandle` from `actix_rt::spawn`. [#253] +* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] +* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] +* Remove `Arbiter::exec`. [#253] +* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] + +[#253]: https://github.com/actix/actix-net/pull/253 + ## 2.0.0-beta.2 - 2021-01-09 * Add `task` mod with re-export of `tokio::task::{spawn_blocking, yield_now, JoinHandle}` [#245] diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 5bc79416..f5a6ba6a 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -22,6 +22,7 @@ macros = ["actix-macros"] [dependencies] actix-macros = { version = "0.2.0-beta.1", optional = true } +futures-core = { version = "0.3", default-features = false } tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } [dev-dependencies] diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 48d11b36..fde3cd1c 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -10,10 +10,11 @@ use std::{ thread, }; +use futures_core::ready; use tokio::{ sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot::{channel, error::RecvError as Canceled, Sender}, + oneshot::Sender, }, task::LocalSet, }; @@ -86,12 +87,6 @@ impl Arbiter { }) } - /// Check if current arbiter is running. - #[deprecated(note = "Thread local variables for running state of Arbiter is removed")] - pub fn is_running() -> bool { - false - } - /// Stop arbiter from continuing it's event loop. pub fn stop(&self) { let _ = self.sender.send(ArbiterCommand::Stop); @@ -121,16 +116,16 @@ impl Arbiter { // register arbiter let _ = System::current() - .sys() + .tx() .send(SystemCommand::RegisterArbiter(id, arb)); // start arbiter controller // run loop rt.block_on(ArbiterController { rx }); - // unregister arbiter + // deregister arbiter let _ = System::current() - .sys() + .tx() .send(SystemCommand::DeregisterArbiter(id)); } }) @@ -144,67 +139,35 @@ impl Arbiter { } } - /// Spawn a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for spawning futures on the current - /// thread. - pub fn spawn(future: F) + /// Send a future to the Arbiter's thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if future was sent successfully and false if the Arbiter has died. + pub fn spawn(&self, future: Fut) -> bool where - F: Future + 'static, + Fut: Future + Unpin + Send + 'static, { - let _ = tokio::task::spawn_local(future); + match self.sender.send(ArbiterCommand::Execute(Box::new(future))) { + Ok(_) => true, + Err(_) => false, + } } - /// Executes a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for executing futures on the current - /// thread. - pub fn spawn_fn(f: F) - where - F: FnOnce() -> R + 'static, - R: Future + 'static, - { - Arbiter::spawn(async { - f(); - }) - } - - /// Send a future to the Arbiter's thread, and spawn it. - pub fn send(&self, future: F) - where - F: Future + Send + Unpin + 'static, - { - let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future))); - } - - /// Send a function to the Arbiter's thread, and execute it. Any result from the function - /// is discarded. - pub fn exec_fn(&self, f: F) + /// Send a function to the Arbiter's thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the Arbiter has died. + pub fn spawn_fn(&self, f: F) -> bool where F: FnOnce() + Send + 'static, { - let _ = self - .sender - .send(ArbiterCommand::ExecuteFn(Box::new(move || { - f(); - }))); - } - - /// Send a function to the Arbiter's thread. This function will be executed asynchronously. - /// A future is created, and when resolved will contain the result of the function sent - /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future> - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = channel(); - let _ = self - .sender - .send(ArbiterCommand::ExecuteFn(Box::new(move || { - if !tx.is_closed() { - let _ = tx.send(f()); - } - }))); - rx + match self.sender.send(ArbiterCommand::ExecuteFn(Box::new(f))) { + Ok(_) => true, + Err(_) => false, + } } /// Set item to arbiter storage @@ -266,13 +229,6 @@ impl Arbiter { Ok(()) } } - - /// Returns a future that will be completed once all currently spawned futures - /// have completed. - #[deprecated(since = "2.0.0", note = "Arbiter::local_join function is removed.")] - pub async fn local_join() { - unimplemented!("Arbiter::local_join function is removed.") - } } struct ArbiterController { @@ -281,6 +237,7 @@ struct ArbiterController { impl Drop for ArbiterController { fn drop(&mut self) { + // panics can only occur with spawn_fn calls if thread::panicking() { if System::current().stop_on_panic() { eprintln!("Panic in Arbiter thread, shutting down system."); @@ -296,10 +253,14 @@ impl Future for ArbiterController { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel loop { - match Pin::new(&mut self.rx).poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(item)) => match item { + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process arbiter command + Some(item) => match item { ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { tokio::task::spawn_local(fut); @@ -308,7 +269,6 @@ impl Future for ArbiterController { f.call_box(); } }, - Poll::Pending => return Poll::Pending, } } } @@ -342,10 +302,14 @@ impl Future for SystemArbiter { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel loop { - match Pin::new(&mut self.commands).poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(cmd)) => match cmd { + match ready!(Pin::new(&mut self.commands).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process system command + Some(cmd) => match cmd { SystemCommand::Exit(code) => { // stop arbiters for arb in self.arbiters.values() { @@ -363,7 +327,6 @@ impl Future for SystemArbiter { self.arbiters.remove(&name); } }, - Poll::Pending => return Poll::Pending, } } } diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index c43af7c5..56cfcb91 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,11 +1,8 @@ use std::{borrow::Cow, future::Future, io}; -use tokio::{ - sync::{ - mpsc::unbounded_channel, - oneshot::{channel, Receiver}, - }, - task::LocalSet, +use tokio::sync::{ + mpsc::unbounded_channel, + oneshot::{channel, Receiver}, }; use crate::{ @@ -56,13 +53,6 @@ impl Builder { self.create_runtime(|| {}) } - /// Create new System that can run asynchronously. - /// - /// This method panics if it cannot start the system arbiter - pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner { - self.create_async_runtime(local) - } - /// This function will start Tokio runtime and will finish once the `System::stop()` message /// is called. Function `f` is called within Tokio runtime context. pub fn run(self, f: F) -> io::Result<()> @@ -72,22 +62,6 @@ impl Builder { self.create_runtime(f).run() } - fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { - let (stop_tx, stop_rx) = channel(); - let (sys_sender, sys_receiver) = unbounded_channel(); - - let system = - System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); - - // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); - - // start the system arbiter - let _ = local.spawn_local(arb); - - AsyncSystemRunner { system, stop_rx } - } - fn create_runtime(self, f: F) -> SystemRunner where F: FnOnce(), @@ -115,35 +89,6 @@ impl Builder { } } -#[derive(Debug)] -pub(crate) struct AsyncSystemRunner { - system: System, - stop_rx: Receiver, -} - -impl AsyncSystemRunner { - /// This function will start event loop and returns a future that resolves once the - /// `System::stop()` function is called. - pub(crate) async fn run(self) -> Result<(), io::Error> { - let AsyncSystemRunner { stop_rx: stop, .. } = self; - - // run loop - match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } - } -} - /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] #[derive(Debug)] diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 2151952e..c2222a79 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -8,6 +8,8 @@ use std::future::Future; +use tokio::task::JoinHandle; + // Cannot define a main macro when compiled into test harness. // Workaround for https://github.com/rust-lang/rust/issues/62127. #[cfg(all(feature = "macros", not(test)))] @@ -26,13 +28,13 @@ pub use self::system::System; /// Spawns a future on the current arbiter. /// /// # Panics -/// This function panics if actix system is not running. +/// Panics if Actix system is not running. #[inline] -pub fn spawn(f: F) +pub fn spawn(f: Fut) -> JoinHandle<()> where - F: Future + 'static, + Fut: Future + 'static, { - Arbiter::spawn(f) + tokio::task::spawn_local(f) } /// Asynchronous signal handling diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 262f60a6..64080a63 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,11 +1,10 @@ use std::{ cell::RefCell, - future::Future, io, sync::atomic::{AtomicUsize, Ordering}, }; -use tokio::{sync::mpsc::UnboundedSender, task::LocalSet}; +use tokio::sync::mpsc::UnboundedSender; use crate::{ arbiter::{Arbiter, SystemCommand}, @@ -18,7 +17,7 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); #[derive(Clone, Debug)] pub struct System { id: usize, - sys: UnboundedSender, + tx: UnboundedSender, arbiter: Arbiter, stop_on_panic: bool, } @@ -35,7 +34,7 @@ impl System { stop_on_panic: bool, ) -> Self { let sys = System { - sys, + tx: sys, arbiter, stop_on_panic, id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), @@ -55,126 +54,10 @@ impl System { /// /// This method panics if it can not create tokio runtime #[allow(clippy::new_ret_no_self)] - pub fn new>(name: T) -> SystemRunner { + pub fn new(name: impl Into) -> SystemRunner { Self::builder().name(name).build() } - /// Create new system using provided tokio `LocalSet`. - /// - /// This method panics if it can not spawn system arbiter - /// - /// Note: This method uses provided `LocalSet` to create a `System` future only. - /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. - /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative Tokio runtimes such as those provided by `tokio_compat`. - /// - /// # Examples - /// ``` - /// use tokio::{runtime::Runtime, task::LocalSet}; - /// use actix_rt::System; - /// use futures_util::future::try_join_all; - /// - /// async fn run_application() { - /// let first_task = tokio::spawn(async { - /// // ... - /// # println!("One task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// let second_task = tokio::spawn(async { - /// // ... - /// # println!("Another task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// try_join_all(vec![first_task, second_task]) - /// .await - /// .expect("Some of the futures finished unexpectedly"); - /// } - /// - /// let runtime = tokio::runtime::Builder::new_multi_thread() - /// .worker_threads(2) - /// .enable_all() - /// .build() - /// .unwrap(); - /// - /// let actix_system_task = LocalSet::new(); - /// let sys = System::run_in_tokio("actix-main-system", &actix_system_task); - /// actix_system_task.spawn_local(sys); - /// - /// let rest_operations = run_application(); - /// runtime.block_on(actix_system_task.run_until(rest_operations)); - /// ``` - pub fn run_in_tokio>( - name: T, - local: &LocalSet, - ) -> impl Future> { - Self::builder().name(name).build_async(local).run() - } - - /// Consume the provided Tokio Runtime and start the `System` in it. - /// This method will create a `LocalSet` object and occupy the current thread - /// for the created `System` exclusively. All the other asynchronous tasks that - /// should be executed as well must be aggregated into one future, provided as the last - /// argument to this method. - /// - /// Note: This method uses provided `Runtime` to create a `System` future only. - /// All the [`Arbiter`]s will be started in separate threads using their own Tokio `Runtime`s. - /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative Tokio runtimes such as those provided by `tokio_compat`. - /// - /// # Arguments - /// - /// - `name`: Name of the System - /// - `runtime`: A Tokio Runtime to run the system in. - /// - `rest_operations`: A future to be executed in the runtime along with the System. - /// - /// # Examples - /// ``` - /// use tokio::runtime::Runtime; - /// use actix_rt::System; - /// use futures_util::future::try_join_all; - /// - /// async fn run_application() { - /// let first_task = tokio::spawn(async { - /// // ... - /// # println!("One task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// let second_task = tokio::spawn(async { - /// // ... - /// # println!("Another task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// try_join_all(vec![first_task, second_task]) - /// .await - /// .expect("Some of the futures finished unexpectedly"); - /// } - /// - /// - /// let runtime = tokio::runtime::Builder::new_multi_thread() - /// .worker_threads(2) - /// .enable_all() - /// .build() - /// .unwrap(); - /// - /// let rest_operations = run_application(); - /// System::attach_to_tokio("actix-main-system", runtime, rest_operations); - /// ``` - pub fn attach_to_tokio( - name: impl Into, - runtime: tokio::runtime::Runtime, - rest_operations: Fut, - ) -> Fut::Output { - let actix_system_task = LocalSet::new(); - let sys = System::run_in_tokio(name.into(), &actix_system_task); - actix_system_task.spawn_local(sys); - - runtime.block_on(actix_system_task.run_until(rest_operations)) - } - /// Get current running system. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() { @@ -219,11 +102,11 @@ impl System { /// Stop the system with a particular exit code. pub fn stop_with_code(&self, code: i32) { - let _ = self.sys.send(SystemCommand::Exit(code)); + let _ = self.tx.send(SystemCommand::Exit(code)); } - pub(crate) fn sys(&self) -> &UnboundedSender { - &self.sys + pub(crate) fn tx(&self) -> &UnboundedSender { + &self.tx } /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 225fd53b..abaff1c9 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -1,6 +1,9 @@ -use std::time::{Duration, Instant}; +use std::{ + thread, + time::{Duration, Instant}, +}; -use futures_util::future::try_join_all; +use actix_rt::{Arbiter, System}; #[test] fn await_for_timer() { @@ -21,7 +24,7 @@ fn join_another_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); - arbiter.send(Box::pin(async move { + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); @@ -35,7 +38,7 @@ fn join_another_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); - arbiter.exec_fn(move || { + arbiter.spawn_fn(move || { actix_rt::spawn(async move { tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); @@ -51,7 +54,7 @@ fn join_another_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); - arbiter.send(Box::pin(async move { + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); @@ -104,71 +107,31 @@ fn wait_for_spawns() { } #[test] -fn run_in_existing_tokio() { - use actix_rt::System; - use futures_util::future::try_join_all; - use tokio::task::LocalSet; +#[should_panic] +fn arbiter_drop_panic_fn() { + let _ = System::new("test-system"); - async fn run_application() { - let first_task = tokio::spawn(async { - println!("One task"); - Ok::<(), ()>(()) - }); + let mut arbiter = Arbiter::new(); + arbiter.spawn_fn(|| panic!("test")); - let second_task = tokio::spawn(async { - println!("Another task"); - Ok::<(), ()>(()) - }); - - try_join_all(vec![first_task, second_task]) - .await - .expect("Some of the futures finished unexpectedly"); - } - - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - .unwrap(); - - let actix_local_set = LocalSet::new(); - let sys = System::run_in_tokio("actix-main-system", &actix_local_set); - actix_local_set.spawn_local(sys); - - let rest_operations = run_application(); - runtime.block_on(actix_local_set.run_until(rest_operations)); -} - -async fn run_application() -> usize { - let first_task = tokio::spawn(async { - println!("One task"); - Ok::<(), ()>(()) - }); - - let second_task = tokio::spawn(async { - println!("Another task"); - Ok::<(), ()>(()) - }); - - let tasks = try_join_all(vec![first_task, second_task]) - .await - .expect("Some of the futures finished unexpectedly"); - - tasks.len() + arbiter.join().unwrap(); } #[test] -fn attack_to_tokio() { - use actix_rt::System; +fn arbiter_drop_no_panic_fut() { + use futures_util::future::lazy; - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - .unwrap(); + let _ = System::new("test-system"); - let rest_operations = run_application(); - let res = System::attach_to_tokio("actix-main-system", runtime, rest_operations); + let mut arbiter = Arbiter::new(); + arbiter.spawn(lazy(|_| panic!("test"))); - assert_eq!(res, 2); + let arb = arbiter.clone(); + let thread = thread::spawn(move || { + thread::sleep(Duration::from_millis(200)); + arb.stop(); + }); + + arbiter.join().unwrap(); + thread.join().unwrap(); } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index bf895f06..5c434709 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -401,7 +401,7 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - System::current().arbiter().send(Box::pin(async move { + System::current().arbiter().spawn(Box::pin(async move { sleep_until(Instant::now() + Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); })); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 08ab2d09..a78e4175 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -6,7 +6,7 @@ use std::{io, mem}; use actix_rt::net::TcpStream; use actix_rt::time::{sleep_until, Instant}; -use actix_rt::{spawn, System}; +use actix_rt::{self as rt, System}; use log::{error, info}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; @@ -288,7 +288,7 @@ impl ServerBuilder { // start http server actor let server = self.server.clone(); - spawn(self); + rt::spawn(self); server } } @@ -364,7 +364,7 @@ impl ServerBuilder { let fut = join_all(iter); - spawn(async move { + rt::spawn(async move { let _ = fut.await; if let Some(tx) = completion { let _ = tx.send(()); @@ -373,16 +373,16 @@ impl ServerBuilder { let _ = tx.send(()); } if exit { - spawn(async { + rt::spawn(async { sleep_until(Instant::now() + Duration::from_millis(300)).await; System::current().stop(); }); } - }) + }); } else { // we need to stop system if server was spawned if self.exit { - spawn(async { + rt::spawn(async { sleep_until(Instant::now() + Duration::from_millis(300)).await; System::current().stop(); }); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 91e98fc2..d54a0829 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -172,7 +172,7 @@ impl Worker { let avail = availability.clone(); // every worker runs in it's own arbiter. - Arbiter::new().send(Box::pin(async move { + Arbiter::new().spawn(Box::pin(async move { availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { rx,