diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index f2fe8435..5bc79416 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -23,3 +23,7 @@ macros = ["actix-macros"] actix-macros = { version = "0.2.0-beta.1", optional = true } tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } +futures-util = { version = "0.3.7", default-features = true, features = ["alloc"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 95b40b25..48d11b36 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,26 +1,32 @@ -use std::any::{Any, TypeId}; -use std::cell::RefCell; -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll}; -use std::{fmt, thread}; +use std::{ + any::{Any, TypeId}, + cell::RefCell, + collections::HashMap, + fmt, + future::Future, + pin::Pin, + sync::atomic::{AtomicUsize, Ordering}, + task::{Context, Poll}, + thread, +}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot::{channel, error::RecvError as Canceled, Sender}; -use tokio::task::LocalSet; +use tokio::{ + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot::{channel, error::RecvError as Canceled, Sender}, + }, + task::LocalSet, +}; -use crate::runtime::Runtime; -use crate::system::System; +use crate::{runtime::Runtime, system::System}; + +pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); thread_local!( static ADDR: RefCell> = RefCell::new(None); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); -pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); - pub(crate) enum ArbiterCommand { Stop, Execute(Box + Unpin + Send>), @@ -37,10 +43,10 @@ impl fmt::Debug for ArbiterCommand { } } +/// Arbiters provide an asynchronous execution environment for actors, functions and futures. When +/// an Arbiter is created, it spawns a new OS thread, and hosts an event loop. Some Arbiter +/// functions execute on the current thread. #[derive(Debug)] -/// Arbiters provide an asynchronous execution environment for actors, functions -/// and futures. When an Arbiter is created, it spawns a new OS thread, and -/// hosts an event loop. Some Arbiter functions execute on the current thread. pub struct Arbiter { sender: UnboundedSender, thread_handle: Option>, @@ -125,7 +131,7 @@ impl Arbiter { // unregister arbiter let _ = System::current() .sys() - .send(SystemCommand::UnregisterArbiter(id)); + .send(SystemCommand::DeregisterArbiter(id)); } }) .unwrap_or_else(|err| { @@ -312,7 +318,7 @@ impl Future for ArbiterController { pub(crate) enum SystemCommand { Exit(i32), RegisterArbiter(usize, Arbiter), - UnregisterArbiter(usize), + DeregisterArbiter(usize), } #[derive(Debug)] @@ -353,7 +359,7 @@ impl Future for SystemArbiter { SystemCommand::RegisterArbiter(name, hnd) => { self.arbiters.insert(name, hnd); } - SystemCommand::UnregisterArbiter(name) => { + SystemCommand::DeregisterArbiter(name) => { self.arbiters.remove(&name); } }, diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index ff7b0e06..c43af7c5 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,22 +1,25 @@ -use std::borrow::Cow; -use std::future::Future; -use std::io; +use std::{borrow::Cow, future::Future, io}; -use tokio::sync::mpsc::unbounded_channel; -use tokio::sync::oneshot::{channel, Receiver}; -use tokio::task::LocalSet; +use tokio::{ + sync::{ + mpsc::unbounded_channel, + oneshot::{channel, Receiver}, + }, + task::LocalSet, +}; -use crate::arbiter::{Arbiter, SystemArbiter}; -use crate::runtime::Runtime; -use crate::system::System; +use crate::{ + arbiter::{Arbiter, SystemArbiter}, + runtime::Runtime, + system::System, +}; -/// Builder struct for a actix runtime. +/// Builder an actix runtime. /// -/// Either use `Builder::build` to create a system and start actors. -/// Alternatively, use `Builder::run` to start the tokio runtime and -/// run a function in its context. +/// Either use `Builder::build` to create a system and start actors. Alternatively, use +/// `Builder::run` to start the Tokio runtime and run a function in its context. pub struct Builder { - /// Name of the System. Defaults to "actix" if unset. + /// Name of the System. Defaults to "actix-rt" if unset. name: Cow<'static, str>, /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. @@ -26,13 +29,13 @@ pub struct Builder { impl Builder { pub(crate) fn new() -> Self { Builder { - name: Cow::Borrowed("actix"), + name: Cow::Borrowed("actix-rt"), stop_on_panic: false, } } /// Sets the name of the System. - pub fn name>(mut self, name: T) -> Self { + pub fn name(mut self, name: impl Into) -> Self { self.name = Cow::Owned(name.into()); self } @@ -48,7 +51,7 @@ impl Builder { /// Create new System. /// - /// This method panics if it can not create tokio runtime + /// This method panics if it can not create Tokio runtime pub fn build(self) -> SystemRunner { self.create_runtime(|| {}) } @@ -60,9 +63,8 @@ impl Builder { self.create_async_runtime(local) } - /// This function will start tokio runtime and will finish once the - /// `System::stop()` message get called. - /// Function `f` get called within tokio runtime context. + /// This function will start Tokio runtime and will finish once the `System::stop()` message + /// is called. Function `f` is called within Tokio runtime context. pub fn run(self, f: F) -> io::Result<()> where F: FnOnce(), @@ -71,7 +73,7 @@ impl Builder { } fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { - let (stop_tx, stop) = channel(); + let (stop_tx, stop_rx) = channel(); let (sys_sender, sys_receiver) = unbounded_channel(); let system = @@ -83,7 +85,7 @@ impl Builder { // start the system arbiter let _ = local.spawn_local(arb); - AsyncSystemRunner { stop, system } + AsyncSystemRunner { system, stop_rx } } fn create_runtime(self, f: F) -> SystemRunner @@ -115,31 +117,29 @@ impl Builder { #[derive(Debug)] pub(crate) struct AsyncSystemRunner { - stop: Receiver, system: System, + stop_rx: Receiver, } impl AsyncSystemRunner { - /// This function will start event loop and returns a future that - /// resolves once the `System::stop()` function is called. - pub(crate) fn run_nonblocking(self) -> impl Future> + Send { - let AsyncSystemRunner { stop, .. } = self; + /// This function will start event loop and returns a future that resolves once the + /// `System::stop()` function is called. + pub(crate) async fn run(self) -> Result<(), io::Error> { + let AsyncSystemRunner { stop_rx: stop, .. } = self; // run loop - async { - match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } + match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 6e9d0464..2151952e 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -2,6 +2,7 @@ #![deny(rust_2018_idioms, nonstandard_style)] #![allow(clippy::type_complexity)] +#![warn(missing_docs)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] @@ -25,7 +26,6 @@ pub use self::system::System; /// Spawns a future on the current arbiter. /// /// # Panics -/// /// This function panics if actix system is not running. #[inline] pub fn spawn(f: F) @@ -39,13 +39,15 @@ where pub mod signal { #[cfg(unix)] pub mod unix { + //! Unix specific signals. pub use tokio::signal::unix::*; } pub use tokio::signal::ctrl_c; } -/// TCP/UDP/Unix bindings pub mod net { + //! TCP/UDP/Unix bindings + pub use tokio::net::UdpSocket; pub use tokio::net::{TcpListener, TcpStream}; @@ -58,15 +60,17 @@ pub mod net { pub use self::unix::*; } -/// Utilities for tracking time. pub mod time { + //! Utilities for tracking time. + pub use tokio::time::Instant; pub use tokio::time::{interval, interval_at, Interval}; pub use tokio::time::{sleep, sleep_until, Sleep}; pub use tokio::time::{timeout, Timeout}; } -/// Task management. pub mod task { + //! Task management. + pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 63653e13..c7f611ed 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -1,24 +1,21 @@ -use std::future::Future; -use std::io; -use tokio::{runtime, task::LocalSet}; +use std::{future::Future, io}; -/// Single-threaded runtime provides a way to start reactor -/// and runtime on the current thread. +use tokio::task::{JoinHandle, LocalSet}; + +/// Single-threaded runtime provides a way to start reactor and runtime on the current thread. /// -/// See [module level][mod] documentation for more details. -/// -/// [mod]: crate +/// See [crate root][crate] documentation for more details. #[derive(Debug)] pub struct Runtime { local: LocalSet, - rt: runtime::Runtime, + rt: tokio::runtime::Runtime, } impl Runtime { - #[allow(clippy::new_ret_no_self)] /// Returns a new runtime initialized with default configuration values. + #[allow(clippy::new_ret_no_self)] pub fn new() -> io::Result { - let rt = runtime::Builder::new_current_thread() + let rt = tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() .build()?; @@ -29,62 +26,53 @@ impl Runtime { }) } - pub(super) fn local(&self) -> &LocalSet { + /// Reference to local task set. + pub(crate) fn local(&self) -> &LocalSet { &self.local } - /// Spawn a future onto the single-threaded runtime. + /// Offload a future onto the single-threaded runtime. /// - /// See [module level][mod] documentation for more details. + /// The returned join handle can be used to await the future's result. /// - /// [mod]: crate + /// See [crate root][crate] documentation for more details. /// /// # Examples - /// - /// ```ignore - /// # use futures::{future, Future, Stream}; - /// use actix_rt::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); + /// ``` + /// let rt = actix_rt::Runtime::new().unwrap(); /// /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|_| { + /// let handle = rt.spawn(async { /// println!("running on the runtime"); - /// })); - /// # } - /// # pub fn main() {} + /// 42 + /// }); + /// + /// assert_eq!(rt.block_on(handle).unwrap(), 42); /// ``` /// /// # Panics - /// - /// This function panics if the spawn fails. Failure occurs if the executor - /// is currently at capacity and is unable to spawn a new future. - pub fn spawn(&self, future: F) -> &Self + /// This function panics if the spawn fails. Failure occurs if the executor is currently at + /// capacity and is unable to spawn a new future. + pub fn spawn(&self, future: F) -> JoinHandle where - F: Future + 'static, + F: Future + 'static, { - self.local.spawn_local(future); - self + self.local.spawn_local(future) } - /// Runs the provided future, blocking the current thread until the future - /// completes. + /// Runs the provided future, blocking the current thread until the future completes. /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. + /// This function can be used to synchronously block the current thread until the provided + /// `future` has resolved either successfully or with an error. The result of the future is + /// then returned from this function call. /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. Once the function returns, any uncompleted futures - /// remain pending in the `Runtime` instance. These futures will not run + /// Note that this function will also execute any spawned futures on the current thread, but + /// will not block until these other spawned futures have completed. Once the function returns, + /// any uncompleted futures remain pending in the `Runtime` instance. These futures will not run /// until `block_on` or `run` is called again. /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution by calling `block_on` or `run`. + /// The caller is responsible for ensuring that other spawned futures complete execution by + /// calling `block_on` or `run`. pub fn block_on(&self, f: F) -> F::Output where F: Future, diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 3a199da7..262f60a6 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,13 +1,16 @@ -use std::cell::RefCell; -use std::future::Future; -use std::io; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ + cell::RefCell, + future::Future, + io, + sync::atomic::{AtomicUsize, Ordering}, +}; -use tokio::sync::mpsc::UnboundedSender; -use tokio::task::LocalSet; +use tokio::{sync::mpsc::UnboundedSender, task::LocalSet}; -use crate::arbiter::{Arbiter, SystemCommand}; -use crate::builder::{Builder, SystemRunner}; +use crate::{ + arbiter::{Arbiter, SystemCommand}, + builder::{Builder, SystemRunner}, +}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -43,16 +46,15 @@ impl System { /// Build a new system with a customized tokio runtime. /// - /// This allows to customize the runtime. See struct level docs on - /// `Builder` for more information. + /// This allows to customize the runtime. See [`Builder`] for more information. pub fn builder() -> Builder { Builder::new() } - #[allow(clippy::new_ret_no_self)] /// Create new system. /// /// This method panics if it can not create tokio runtime + #[allow(clippy::new_ret_no_self)] pub fn new>(name: T) -> SystemRunner { Self::builder().name(name).build() } @@ -64,13 +66,10 @@ impl System { /// Note: This method uses provided `LocalSet` to create a `System` future only. /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative `tokio` `Runtime`s (e.g. provided by [`tokio_compat`]). - /// - /// [`tokio_compat`]: https://crates.io/crates/tokio-compat + /// alternative Tokio runtimes such as those provided by `tokio_compat`. /// /// # Examples - /// - /// ```ignore + /// ``` /// use tokio::{runtime::Runtime, task::LocalSet}; /// use actix_rt::System; /// use futures_util::future::try_join_all; @@ -78,14 +77,14 @@ impl System { /// async fn run_application() { /// let first_task = tokio::spawn(async { /// // ... - /// # println!("One task"); - /// # Ok::<(),()>(()) + /// # println!("One task"); + /// # Ok::<(),()>(()) /// }); /// /// let second_task = tokio::spawn(async { /// // ... - /// # println!("Another task"); - /// # Ok::<(),()>(()) + /// # println!("Another task"); + /// # Ok::<(),()>(()) /// }); /// /// try_join_all(vec![first_task, second_task]) @@ -93,14 +92,12 @@ impl System { /// .expect("Some of the futures finished unexpectedly"); /// } /// - /// /// let runtime = tokio::runtime::Builder::new_multi_thread() /// .worker_threads(2) /// .enable_all() /// .build() /// .unwrap(); /// - /// /// let actix_system_task = LocalSet::new(); /// let sys = System::run_in_tokio("actix-main-system", &actix_system_task); /// actix_system_task.spawn_local(sys); @@ -112,34 +109,28 @@ impl System { name: T, local: &LocalSet, ) -> impl Future> { - Self::builder() - .name(name) - .build_async(local) - .run_nonblocking() + Self::builder().name(name).build_async(local).run() } - /// Consume the provided tokio Runtime and start the `System` in it. + /// Consume the provided Tokio Runtime and start the `System` in it. /// This method will create a `LocalSet` object and occupy the current thread /// for the created `System` exclusively. All the other asynchronous tasks that /// should be executed as well must be aggregated into one future, provided as the last /// argument to this method. /// /// Note: This method uses provided `Runtime` to create a `System` future only. - /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. + /// All the [`Arbiter`]s will be started in separate threads using their own Tokio `Runtime`s. /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative `tokio` `Runtime`s (e.g. provided by `tokio_compat`). - /// - /// [`tokio_compat`]: https://crates.io/crates/tokio-compat + /// alternative Tokio runtimes such as those provided by `tokio_compat`. /// /// # Arguments /// /// - `name`: Name of the System - /// - `runtime`: A tokio Runtime to run the system in. + /// - `runtime`: A Tokio Runtime to run the system in. /// - `rest_operations`: A future to be executed in the runtime along with the System. /// /// # Examples - /// - /// ```ignore + /// ``` /// use tokio::runtime::Runtime; /// use actix_rt::System; /// use futures_util::future::try_join_all; @@ -172,14 +163,11 @@ impl System { /// let rest_operations = run_application(); /// System::attach_to_tokio("actix-main-system", runtime, rest_operations); /// ``` - pub fn attach_to_tokio( + pub fn attach_to_tokio( name: impl Into, runtime: tokio::runtime::Runtime, rest_operations: Fut, - ) -> R - where - Fut: std::future::Future, - { + ) -> Fut::Output { let actix_system_task = LocalSet::new(); let sys = System::run_in_tokio(name.into(), &actix_system_task); actix_system_task.spawn_local(sys); @@ -195,7 +183,7 @@ impl System { }) } - /// Check if current system is set, i.e., as already been started. + /// Check if current system has started. pub fn is_set() -> bool { CURRENT.with(|cell| cell.borrow().is_some()) } @@ -219,12 +207,12 @@ impl System { }) } - /// System id + /// Numeric system ID. pub fn id(&self) -> usize { self.id } - /// Stop the system + /// Stop the system (with code 0). pub fn stop(&self) { self.stop_with_code(0) } @@ -240,18 +228,17 @@ impl System { /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an /// uncaught panic is thrown from a worker thread. - pub fn stop_on_panic(&self) -> bool { + pub(crate) fn stop_on_panic(&self) -> bool { self.stop_on_panic } - /// System arbiter + /// Get shared reference to system arbiter. pub fn arbiter(&self) -> &Arbiter { &self.arbiter } - /// This function will start tokio runtime and will finish once the - /// `System::stop()` message get called. - /// Function `f` get called within tokio runtime context. + /// This function will start tokio runtime and will finish once the `System::stop()` message + /// is called. Function `f` is called within tokio runtime context. pub fn run(f: F) -> io::Result<()> where F: FnOnce(), diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index f338602d..fd579827 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -1,8 +1,10 @@ use std::time::{Duration, Instant}; +use futures_util::future::try_join_all; + #[test] fn await_for_timer() { - let time = Duration::from_secs(2); + let time = Duration::from_secs(1); let instant = Instant::now(); actix_rt::System::new("test_wait_timer").block_on(async move { tokio::time::sleep(time).await; @@ -15,7 +17,7 @@ fn await_for_timer() { #[test] fn join_another_arbiter() { - let time = Duration::from_secs(2); + let time = Duration::from_secs(1); let instant = Instant::now(); actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); @@ -87,3 +89,100 @@ fn non_static_block_on() { }) .unwrap(); } + +#[test] +fn wait_for_spawns() { + let rt = actix_rt::Runtime::new().unwrap(); + + let handle = rt.spawn(async { + println!("running on the runtime"); + // assertion panic is caught at task boundary + assert_eq!(1, 2); + }); + + assert!(rt.block_on(handle).is_err()); +} + +#[test] +fn run_in_existing_tokio() { + use actix_rt::System; + use futures_util::future::try_join_all; + use tokio::task::LocalSet; + + async fn run_application() { + let first_task = tokio::spawn(async { + println!("One task"); + Ok::<(), ()>(()) + }); + + let second_task = tokio::spawn(async { + println!("Another task"); + Ok::<(), ()>(()) + }); + + try_join_all(vec![first_task, second_task]) + .await + .expect("Some of the futures finished unexpectedly"); + } + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + + let actix_local_set = LocalSet::new(); + let sys = System::run_in_tokio("actix-main-system", &actix_local_set); + actix_local_set.spawn_local(sys); + + let rest_operations = run_application(); + runtime.block_on(actix_local_set.run_until(rest_operations)); +} + +async fn run_application() -> usize { + let first_task = tokio::spawn(async { + println!("One task"); + Ok::<(), ()>(()) + }); + + let second_task = tokio::spawn(async { + println!("Another task"); + Ok::<(), ()>(()) + }); + + let tasks = try_join_all(vec![first_task, second_task]) + .await + .expect("Some of the futures finished unexpectedly"); + + tasks.len() +} + +#[test] +fn attack_to_tokio() { + use actix_rt::System; + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap(); + + let rest_operations = run_application(); + let res = System::attach_to_tokio("actix-main-system", runtime, rest_operations); + + assert_eq!(res, 2); +} + +#[tokio::test] +async fn attack_to_tokio_macro() { + use actix_rt::System; + + let rest_operations = run_application(); + let res = System::attach_to_tokio( + "actix-main-system", + tokio::runtime::Runtime::handle(&self), + rest_operations, + ); + + assert_eq!(res, 2); +}