1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 13:01:49 +01:00

remove tokio runners (#253)

This commit is contained in:
Rob Ede 2021-01-29 02:21:06 +00:00 committed by GitHub
parent feac376c17
commit ba39c8436d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 103 additions and 337 deletions

View File

@ -2,6 +2,15 @@
## Unreleased - 2021-xx-xx
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
* Return `JoinHandle` from `actix_rt::spawn`. [#253]
* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253]
* Remove `Arbiter::exec`. [#253]
* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253]
[#253]: https://github.com/actix/actix-net/pull/253
## 2.0.0-beta.2 - 2021-01-09
* Add `task` mod with re-export of `tokio::task::{spawn_blocking, yield_now, JoinHandle}` [#245]

View File

@ -22,6 +22,7 @@ macros = ["actix-macros"]
[dependencies]
actix-macros = { version = "0.2.0-beta.1", optional = true }
futures-core = { version = "0.3", default-features = false }
tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[dev-dependencies]

View File

@ -10,10 +10,11 @@ use std::{
thread,
};
use futures_core::ready;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{channel, error::RecvError as Canceled, Sender},
oneshot::Sender,
},
task::LocalSet,
};
@ -86,12 +87,6 @@ impl Arbiter {
})
}
/// Check if current arbiter is running.
#[deprecated(note = "Thread local variables for running state of Arbiter is removed")]
pub fn is_running() -> bool {
false
}
/// Stop arbiter from continuing it's event loop.
pub fn stop(&self) {
let _ = self.sender.send(ArbiterCommand::Stop);
@ -121,16 +116,16 @@ impl Arbiter {
// register arbiter
let _ = System::current()
.sys()
.tx()
.send(SystemCommand::RegisterArbiter(id, arb));
// start arbiter controller
// run loop
rt.block_on(ArbiterController { rx });
// unregister arbiter
// deregister arbiter
let _ = System::current()
.sys()
.tx()
.send(SystemCommand::DeregisterArbiter(id));
}
})
@ -144,67 +139,35 @@ impl Arbiter {
}
}
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
pub fn spawn<F>(future: F)
/// Send a future to the Arbiter's thread and spawn it.
///
/// If you require a result, include a response channel in the future.
///
/// Returns true if future was sent successfully and false if the Arbiter has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool
where
F: Future<Output = ()> + 'static,
Fut: Future<Output = ()> + Unpin + Send + 'static,
{
let _ = tokio::task::spawn_local(future);
match self.sender.send(ArbiterCommand::Execute(Box::new(future))) {
Ok(_) => true,
Err(_) => false,
}
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
R: Future<Output = ()> + 'static,
{
Arbiter::spawn(async {
f();
})
}
/// Send a future to the Arbiter's thread, and spawn it.
pub fn send<F>(&self, future: F)
where
F: Future<Output = ()> + Send + Unpin + 'static,
{
let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future)));
}
/// Send a function to the Arbiter's thread, and execute it. Any result from the function
/// is discarded.
pub fn exec_fn<F>(&self, f: F)
/// Send a function to the Arbiter's thread and execute it.
///
/// Any result from the function is discarded. If you require a result, include a response
/// channel in the function.
///
/// Returns true if function was sent successfully and false if the Arbiter has died.
pub fn spawn_fn<F>(&self, f: F) -> bool
where
F: FnOnce() + Send + 'static,
{
let _ = self
.sender
.send(ArbiterCommand::ExecuteFn(Box::new(move || {
f();
})));
}
/// Send a function to the Arbiter's thread. This function will be executed asynchronously.
/// A future is created, and when resolved will contain the result of the function sent
/// to the Arbiters thread.
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, Canceled>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = channel();
let _ = self
.sender
.send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_closed() {
let _ = tx.send(f());
}
})));
rx
match self.sender.send(ArbiterCommand::ExecuteFn(Box::new(f))) {
Ok(_) => true,
Err(_) => false,
}
}
/// Set item to arbiter storage
@ -266,13 +229,6 @@ impl Arbiter {
Ok(())
}
}
/// Returns a future that will be completed once all currently spawned futures
/// have completed.
#[deprecated(since = "2.0.0", note = "Arbiter::local_join function is removed.")]
pub async fn local_join() {
unimplemented!("Arbiter::local_join function is removed.")
}
}
struct ArbiterController {
@ -281,6 +237,7 @@ struct ArbiterController {
impl Drop for ArbiterController {
fn drop(&mut self) {
// panics can only occur with spawn_fn calls
if thread::panicking() {
if System::current().stop_on_panic() {
eprintln!("Panic in Arbiter thread, shutting down system.");
@ -296,10 +253,14 @@ impl Future for ArbiterController {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel
loop {
match Pin::new(&mut self.rx).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item {
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
// channel closed; no more messages can be received
None => return Poll::Ready(()),
// process arbiter command
Some(item) => match item {
ArbiterCommand::Stop => return Poll::Ready(()),
ArbiterCommand::Execute(fut) => {
tokio::task::spawn_local(fut);
@ -308,7 +269,6 @@ impl Future for ArbiterController {
f.call_box();
}
},
Poll::Pending => return Poll::Pending,
}
}
}
@ -342,10 +302,14 @@ impl Future for SystemArbiter {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// process all items currently buffered in channel
loop {
match Pin::new(&mut self.commands).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd {
match ready!(Pin::new(&mut self.commands).poll_recv(cx)) {
// channel closed; no more messages can be received
None => return Poll::Ready(()),
// process system command
Some(cmd) => match cmd {
SystemCommand::Exit(code) => {
// stop arbiters
for arb in self.arbiters.values() {
@ -363,7 +327,6 @@ impl Future for SystemArbiter {
self.arbiters.remove(&name);
}
},
Poll::Pending => return Poll::Pending,
}
}
}

View File

@ -1,11 +1,8 @@
use std::{borrow::Cow, future::Future, io};
use tokio::{
sync::{
mpsc::unbounded_channel,
oneshot::{channel, Receiver},
},
task::LocalSet,
use tokio::sync::{
mpsc::unbounded_channel,
oneshot::{channel, Receiver},
};
use crate::{
@ -56,13 +53,6 @@ impl Builder {
self.create_runtime(|| {})
}
/// Create new System that can run asynchronously.
///
/// This method panics if it cannot start the system arbiter
pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
self.create_async_runtime(local)
}
/// This function will start Tokio runtime and will finish once the `System::stop()` message
/// is called. Function `f` is called within Tokio runtime context.
pub fn run<F>(self, f: F) -> io::Result<()>
@ -72,22 +62,6 @@ impl Builder {
self.create_runtime(f).run()
}
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
let (stop_tx, stop_rx) = channel();
let (sys_sender, sys_receiver) = unbounded_channel();
let system =
System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
// start the system arbiter
let _ = local.spawn_local(arb);
AsyncSystemRunner { system, stop_rx }
}
fn create_runtime<F>(self, f: F) -> SystemRunner
where
F: FnOnce(),
@ -115,35 +89,6 @@ impl Builder {
}
}
#[derive(Debug)]
pub(crate) struct AsyncSystemRunner {
system: System,
stop_rx: Receiver<i32>,
}
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that resolves once the
/// `System::stop()` function is called.
pub(crate) async fn run(self) -> Result<(), io::Error> {
let AsyncSystemRunner { stop_rx: stop, .. } = self;
// run loop
match stop.await {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
}
/// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"]
#[derive(Debug)]

View File

@ -8,6 +8,8 @@
use std::future::Future;
use tokio::task::JoinHandle;
// Cannot define a main macro when compiled into test harness.
// Workaround for https://github.com/rust-lang/rust/issues/62127.
#[cfg(all(feature = "macros", not(test)))]
@ -26,13 +28,13 @@ pub use self::system::System;
/// Spawns a future on the current arbiter.
///
/// # Panics
/// This function panics if actix system is not running.
/// Panics if Actix system is not running.
#[inline]
pub fn spawn<F>(f: F)
pub fn spawn<Fut>(f: Fut) -> JoinHandle<()>
where
F: Future<Output = ()> + 'static,
Fut: Future<Output = ()> + 'static,
{
Arbiter::spawn(f)
tokio::task::spawn_local(f)
}
/// Asynchronous signal handling

View File

@ -1,11 +1,10 @@
use std::{
cell::RefCell,
future::Future,
io,
sync::atomic::{AtomicUsize, Ordering},
};
use tokio::{sync::mpsc::UnboundedSender, task::LocalSet};
use tokio::sync::mpsc::UnboundedSender;
use crate::{
arbiter::{Arbiter, SystemCommand},
@ -18,7 +17,7 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug)]
pub struct System {
id: usize,
sys: UnboundedSender<SystemCommand>,
tx: UnboundedSender<SystemCommand>,
arbiter: Arbiter,
stop_on_panic: bool,
}
@ -35,7 +34,7 @@ impl System {
stop_on_panic: bool,
) -> Self {
let sys = System {
sys,
tx: sys,
arbiter,
stop_on_panic,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
@ -55,126 +54,10 @@ impl System {
///
/// This method panics if it can not create tokio runtime
#[allow(clippy::new_ret_no_self)]
pub fn new<T: Into<String>>(name: T) -> SystemRunner {
pub fn new(name: impl Into<String>) -> SystemRunner {
Self::builder().name(name).build()
}
/// Create new system using provided tokio `LocalSet`.
///
/// This method panics if it can not spawn system arbiter
///
/// Note: This method uses provided `LocalSet` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative Tokio runtimes such as those provided by `tokio_compat`.
///
/// # Examples
/// ```
/// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
/// let runtime = tokio::runtime::Builder::new_multi_thread()
/// .worker_threads(2)
/// .enable_all()
/// .build()
/// .unwrap();
///
/// let actix_system_task = LocalSet::new();
/// let sys = System::run_in_tokio("actix-main-system", &actix_system_task);
/// actix_system_task.spawn_local(sys);
///
/// let rest_operations = run_application();
/// runtime.block_on(actix_system_task.run_until(rest_operations));
/// ```
pub fn run_in_tokio<T: Into<String>>(
name: T,
local: &LocalSet,
) -> impl Future<Output = io::Result<()>> {
Self::builder().name(name).build_async(local).run()
}
/// Consume the provided Tokio Runtime and start the `System` in it.
/// This method will create a `LocalSet` object and occupy the current thread
/// for the created `System` exclusively. All the other asynchronous tasks that
/// should be executed as well must be aggregated into one future, provided as the last
/// argument to this method.
///
/// Note: This method uses provided `Runtime` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own Tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative Tokio runtimes such as those provided by `tokio_compat`.
///
/// # Arguments
///
/// - `name`: Name of the System
/// - `runtime`: A Tokio Runtime to run the system in.
/// - `rest_operations`: A future to be executed in the runtime along with the System.
///
/// # Examples
/// ```
/// use tokio::runtime::Runtime;
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let runtime = tokio::runtime::Builder::new_multi_thread()
/// .worker_threads(2)
/// .enable_all()
/// .build()
/// .unwrap();
///
/// let rest_operations = run_application();
/// System::attach_to_tokio("actix-main-system", runtime, rest_operations);
/// ```
pub fn attach_to_tokio<Fut: Future>(
name: impl Into<String>,
runtime: tokio::runtime::Runtime,
rest_operations: Fut,
) -> Fut::Output {
let actix_system_task = LocalSet::new();
let sys = System::run_in_tokio(name.into(), &actix_system_task);
actix_system_task.spawn_local(sys);
runtime.block_on(actix_system_task.run_until(rest_operations))
}
/// Get current running system.
pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() {
@ -219,11 +102,11 @@ impl System {
/// Stop the system with a particular exit code.
pub fn stop_with_code(&self, code: i32) {
let _ = self.sys.send(SystemCommand::Exit(code));
let _ = self.tx.send(SystemCommand::Exit(code));
}
pub(crate) fn sys(&self) -> &UnboundedSender<SystemCommand> {
&self.sys
pub(crate) fn tx(&self) -> &UnboundedSender<SystemCommand> {
&self.tx
}
/// Return status of 'stop_on_panic' option which controls whether the System is stopped when an

View File

@ -1,6 +1,9 @@
use std::time::{Duration, Instant};
use std::{
thread,
time::{Duration, Instant},
};
use futures_util::future::try_join_all;
use actix_rt::{Arbiter, System};
#[test]
fn await_for_timer() {
@ -21,7 +24,7 @@ fn join_another_arbiter() {
let instant = Instant::now();
actix_rt::System::new("test_join_another_arbiter").block_on(async move {
let mut arbiter = actix_rt::Arbiter::new();
arbiter.send(Box::pin(async move {
arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
}));
@ -35,7 +38,7 @@ fn join_another_arbiter() {
let instant = Instant::now();
actix_rt::System::new("test_join_another_arbiter").block_on(async move {
let mut arbiter = actix_rt::Arbiter::new();
arbiter.exec_fn(move || {
arbiter.spawn_fn(move || {
actix_rt::spawn(async move {
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
@ -51,7 +54,7 @@ fn join_another_arbiter() {
let instant = Instant::now();
actix_rt::System::new("test_join_another_arbiter").block_on(async move {
let mut arbiter = actix_rt::Arbiter::new();
arbiter.send(Box::pin(async move {
arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await;
actix_rt::Arbiter::current().stop();
}));
@ -104,71 +107,31 @@ fn wait_for_spawns() {
}
#[test]
fn run_in_existing_tokio() {
use actix_rt::System;
use futures_util::future::try_join_all;
use tokio::task::LocalSet;
#[should_panic]
fn arbiter_drop_panic_fn() {
let _ = System::new("test-system");
async fn run_application() {
let first_task = tokio::spawn(async {
println!("One task");
Ok::<(), ()>(())
});
let mut arbiter = Arbiter::new();
arbiter.spawn_fn(|| panic!("test"));
let second_task = tokio::spawn(async {
println!("Another task");
Ok::<(), ()>(())
});
try_join_all(vec![first_task, second_task])
.await
.expect("Some of the futures finished unexpectedly");
}
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap();
let actix_local_set = LocalSet::new();
let sys = System::run_in_tokio("actix-main-system", &actix_local_set);
actix_local_set.spawn_local(sys);
let rest_operations = run_application();
runtime.block_on(actix_local_set.run_until(rest_operations));
}
async fn run_application() -> usize {
let first_task = tokio::spawn(async {
println!("One task");
Ok::<(), ()>(())
});
let second_task = tokio::spawn(async {
println!("Another task");
Ok::<(), ()>(())
});
let tasks = try_join_all(vec![first_task, second_task])
.await
.expect("Some of the futures finished unexpectedly");
tasks.len()
arbiter.join().unwrap();
}
#[test]
fn attack_to_tokio() {
use actix_rt::System;
fn arbiter_drop_no_panic_fut() {
use futures_util::future::lazy;
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap();
let _ = System::new("test-system");
let rest_operations = run_application();
let res = System::attach_to_tokio("actix-main-system", runtime, rest_operations);
let mut arbiter = Arbiter::new();
arbiter.spawn(lazy(|_| panic!("test")));
assert_eq!(res, 2);
let arb = arbiter.clone();
let thread = thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
arb.stop();
});
arbiter.join().unwrap();
thread.join().unwrap();
}

View File

@ -401,7 +401,7 @@ impl Accept {
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().send(Box::pin(async move {
System::current().arbiter().spawn(Box::pin(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
}));

View File

@ -6,7 +6,7 @@ use std::{io, mem};
use actix_rt::net::TcpStream;
use actix_rt::time::{sleep_until, Instant};
use actix_rt::{spawn, System};
use actix_rt::{self as rt, System};
use log::{error, info};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
@ -288,7 +288,7 @@ impl ServerBuilder {
// start http server actor
let server = self.server.clone();
spawn(self);
rt::spawn(self);
server
}
}
@ -364,7 +364,7 @@ impl ServerBuilder {
let fut = join_all(iter);
spawn(async move {
rt::spawn(async move {
let _ = fut.await;
if let Some(tx) = completion {
let _ = tx.send(());
@ -373,16 +373,16 @@ impl ServerBuilder {
let _ = tx.send(());
}
if exit {
spawn(async {
rt::spawn(async {
sleep_until(Instant::now() + Duration::from_millis(300)).await;
System::current().stop();
});
}
})
});
} else {
// we need to stop system if server was spawned
if self.exit {
spawn(async {
rt::spawn(async {
sleep_until(Instant::now() + Duration::from_millis(300)).await;
System::current().stop();
});

View File

@ -172,7 +172,7 @@ impl Worker {
let avail = availability.clone();
// every worker runs in it's own arbiter.
Arbiter::new().send(Box::pin(async move {
Arbiter::new().spawn(Box::pin(async move {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
rx,