From b7a9cb7bb44c74006bbba17dbd26cdfbbb171701 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Sun, 6 Sep 2020 13:01:24 +0300 Subject: [PATCH] actix-rt: Make the process of running System in existing Runtime more clear (#173) --- actix-rt/CHANGES.md | 6 ++ actix-rt/Cargo.toml | 3 + actix-rt/src/builder.rs | 2 +- actix-rt/src/system.rs | 124 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 132 insertions(+), 3 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 518a68ac..9b5fb636 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## Unreleased - 2020-xx-xx + +### Added + +* Add `System::attach_to_tokio` method. [#173] + ## [1.1.1] - 2020-04-30 ### Fixed diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index c0bc750b..b7d272cd 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -23,3 +23,6 @@ futures-util = { version = "0.3.4", default-features = false, features = ["alloc copyless = "0.1.4" smallvec = "1" tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } + +[dev-dependencies] +tokio = { version = "0.2.6", features = ["full"] } diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 29963703..f4d9b1bf 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -137,7 +137,7 @@ impl AsyncSystemRunner { Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), }; Arbiter::stop_system(); - return res; + res } }) .flatten() diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 21264669..f33854ba 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -57,10 +57,59 @@ impl System { Self::builder().name(name).build() } - #[allow(clippy::new_ret_no_self)] - /// Create new system using provided tokio Handle. + /// Create new system using provided tokio `LocalSet`. /// /// This method panics if it can not spawn system arbiter + /// + /// Note: This method uses provided `LocalSet` to create a `System` future only. + /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. + /// It means that using this method currently it is impossible to make `actix-rt` work in the + /// alternative `tokio` `Runtime`s (e.g. provided by [`tokio_compat`]). + /// + /// [`Arbiter`]: struct.Arbiter.html + /// [`tokio_compat`]: https://crates.io/crates/tokio-compat + /// + /// # Examples + /// + /// ``` + /// use tokio::{runtime::Runtime, task::LocalSet}; + /// use actix_rt::System; + /// use futures_util::future::try_join_all; + /// + /// async fn run_application() { + /// let first_task = tokio::spawn(async { + /// // ... + /// # println!("One task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// let second_task = tokio::spawn(async { + /// // ... + /// # println!("Another task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// try_join_all(vec![first_task, second_task]) + /// .await + /// .expect("Some of the futures finished unexpectedly"); + /// } + /// + /// + /// let mut runtime = tokio::runtime::Builder::new() + /// .core_threads(2) + /// .enable_all() + /// .threaded_scheduler() + /// .build() + /// .unwrap(); + /// + /// + /// let actix_system_task = LocalSet::new(); + /// let sys = System::run_in_tokio("actix-main-system", &actix_system_task); + /// actix_system_task.spawn_local(sys); + /// + /// let rest_operations = run_application(); + /// runtime.block_on(actix_system_task.run_until(rest_operations)); + /// ``` pub fn run_in_tokio>( name: T, local: &LocalSet, @@ -71,6 +120,77 @@ impl System { .run_nonblocking() } + /// Consume the provided tokio Runtime and start the `System` in it. + /// This method will create a `LocalSet` object and occupy the current thread + /// for the created `System` exclusively. All the other asynchronous tasks that + /// should be executed as well must be aggregated into one future, provided as the last + /// argument to this method. + /// + /// Note: This method uses provided `Runtime` to create a `System` future only. + /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. + /// It means that using this method currently it is impossible to make `actix-rt` work in the + /// alternative `tokio` `Runtime`s (e.g. provided by `tokio_compat`). + /// + /// [`Arbiter`]: struct.Arbiter.html + /// [`tokio_compat`]: https://crates.io/crates/tokio-compat + /// + /// # Arguments + /// + /// - `name`: Name of the System + /// - `runtime`: A tokio Runtime to run the system in. + /// - `rest_operations`: A future to be executed in the runtime along with the System. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use actix_rt::System; + /// use futures_util::future::try_join_all; + /// + /// async fn run_application() { + /// let first_task = tokio::spawn(async { + /// // ... + /// # println!("One task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// let second_task = tokio::spawn(async { + /// // ... + /// # println!("Another task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// try_join_all(vec![first_task, second_task]) + /// .await + /// .expect("Some of the futures finished unexpectedly"); + /// } + /// + /// + /// let runtime = tokio::runtime::Builder::new() + /// .core_threads(2) + /// .enable_all() + /// .threaded_scheduler() + /// .build() + /// .unwrap(); + /// + /// let rest_operations = run_application(); + /// System::attach_to_tokio("actix-main-system", runtime, rest_operations); + /// ``` + pub fn attach_to_tokio( + name: impl Into, + mut runtime: tokio::runtime::Runtime, + rest_operations: Fut, + ) -> R + where + Fut: std::future::Future, + { + let actix_system_task = LocalSet::new(); + let sys = System::run_in_tokio(name.into(), &actix_system_task); + actix_system_task.spawn_local(sys); + + runtime.block_on(actix_system_task.run_until(rest_operations)) + } + /// Get current running system. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() {