1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 23:21:50 +01:00

actix-rt: Make the process of running System in existing Runtime more clear (#173)

This commit is contained in:
Igor Aleksanov 2020-09-06 13:01:24 +03:00 committed by GitHub
parent 88d99ac89c
commit b7a9cb7bb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 132 additions and 3 deletions

View File

@ -1,5 +1,11 @@
# Changes # Changes
## Unreleased - 2020-xx-xx
### Added
* Add `System::attach_to_tokio` method. [#173]
## [1.1.1] - 2020-04-30 ## [1.1.1] - 2020-04-30
### Fixed ### Fixed

View File

@ -23,3 +23,6 @@ futures-util = { version = "0.3.4", default-features = false, features = ["alloc
copyless = "0.1.4" copyless = "0.1.4"
smallvec = "1" smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
[dev-dependencies]
tokio = { version = "0.2.6", features = ["full"] }

View File

@ -137,7 +137,7 @@ impl AsyncSystemRunner {
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}; };
Arbiter::stop_system(); Arbiter::stop_system();
return res; res
} }
}) })
.flatten() .flatten()

View File

@ -57,10 +57,59 @@ impl System {
Self::builder().name(name).build() Self::builder().name(name).build()
} }
#[allow(clippy::new_ret_no_self)] /// Create new system using provided tokio `LocalSet`.
/// Create new system using provided tokio Handle.
/// ///
/// This method panics if it can not spawn system arbiter /// This method panics if it can not spawn system arbiter
///
/// Note: This method uses provided `LocalSet` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by [`tokio_compat`]).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Examples
///
/// ```
/// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let mut runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
///
/// let actix_system_task = LocalSet::new();
/// let sys = System::run_in_tokio("actix-main-system", &actix_system_task);
/// actix_system_task.spawn_local(sys);
///
/// let rest_operations = run_application();
/// runtime.block_on(actix_system_task.run_until(rest_operations));
/// ```
pub fn run_in_tokio<T: Into<String>>( pub fn run_in_tokio<T: Into<String>>(
name: T, name: T,
local: &LocalSet, local: &LocalSet,
@ -71,6 +120,77 @@ impl System {
.run_nonblocking() .run_nonblocking()
} }
/// Consume the provided tokio Runtime and start the `System` in it.
/// This method will create a `LocalSet` object and occupy the current thread
/// for the created `System` exclusively. All the other asynchronous tasks that
/// should be executed as well must be aggregated into one future, provided as the last
/// argument to this method.
///
/// Note: This method uses provided `Runtime` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by `tokio_compat`).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Arguments
///
/// - `name`: Name of the System
/// - `runtime`: A tokio Runtime to run the system in.
/// - `rest_operations`: A future to be executed in the runtime along with the System.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
/// let rest_operations = run_application();
/// System::attach_to_tokio("actix-main-system", runtime, rest_operations);
/// ```
pub fn attach_to_tokio<Fut, R>(
name: impl Into<String>,
mut runtime: tokio::runtime::Runtime,
rest_operations: Fut,
) -> R
where
Fut: std::future::Future<Output = R>,
{
let actix_system_task = LocalSet::new();
let sys = System::run_in_tokio(name.into(), &actix_system_task);
actix_system_task.spawn_local(sys);
runtime.block_on(actix_system_task.run_until(rest_operations))
}
/// Get current running system. /// Get current running system.
pub fn current() -> System { pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() { CURRENT.with(|cell| match *cell.borrow() {