1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 20:12:58 +01:00

express spawn fn as spawn fut (#256)

This commit is contained in:
Rob Ede 2021-01-29 15:16:30 +00:00 committed by GitHub
parent 2924419905
commit 1b35ff8ee6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 130 additions and 111 deletions

View File

@ -2,16 +2,19 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Rename `Arbiter => Worker`. [#254]
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
* Return `JoinHandle` from `actix_rt::spawn`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253]
* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] * Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] * Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253]
* Remove `Arbiter::exec`. [#253] * Remove `Worker::exec`. [#253]
* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] * Remove `System::arbiter`. [#256]
* Rename `Arbiter => Worker`. [#254] * Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253]
* `Worker::spawn` now accepts !Unpin futures. [#256]
[#253]: https://github.com/actix/actix-net/pull/253 [#253]: https://github.com/actix/actix-net/pull/253
[#254]: https://github.com/actix/actix-net/pull/254 [#254]: https://github.com/actix/actix-net/pull/254
[#256]: https://github.com/actix/actix-net/pull/256
## 2.0.0-beta.2 - 2021-01-09 ## 2.0.0-beta.2 - 2021-01-09

View File

@ -27,4 +27,3 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
futures-util = { version = "0.3.7", default-features = true, features = ["alloc"] }

View File

@ -53,7 +53,7 @@ impl Builder {
where where
F: FnOnce(), F: FnOnce(),
{ {
let (stop_tx, stop) = channel(); let (stop_tx, stop_rx) = channel();
let (sys_sender, sys_receiver) = unbounded_channel(); let (sys_sender, sys_receiver) = unbounded_channel();
let rt = Runtime::new().unwrap(); let rt = Runtime::new().unwrap();
@ -67,27 +67,30 @@ impl Builder {
// run system init method // run system init method
rt.block_on(async { init_fn() }); rt.block_on(async { init_fn() });
SystemRunner { rt, stop, system } SystemRunner {
rt,
stop_rx,
system,
}
} }
} }
/// Helper object that runs System's event loop /// System runner object that keeps event loop alive and running until stop message is received.
#[must_use = "SystemRunner must be run"] #[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner { pub struct SystemRunner {
rt: Runtime, rt: Runtime,
stop: Receiver<i32>, stop_rx: Receiver<i32>,
system: System, system: System,
} }
impl SystemRunner { impl SystemRunner {
/// This function will start event loop and will finish once the /// Starts event loop and will finish once [`System::stop()`] is called.
/// `System::stop()` function is called.
pub fn run(self) -> io::Result<()> { pub fn run(self) -> io::Result<()> {
let SystemRunner { rt, stop, .. } = self; let SystemRunner { rt, stop_rx, .. } = self;
// run loop // run loop
match rt.block_on(stop) { match rt.block_on(stop_rx) {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -98,11 +101,12 @@ impl SystemRunner {
Ok(()) Ok(())
} }
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
} }
} }
/// Execute a future and wait for result. /// Runs the provided future, blocking the current thread until the future completes.
#[inline] #[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output { pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.rt.block_on(fut) self.rt.block_on(fut)

View File

@ -25,18 +25,6 @@ pub use self::runtime::Runtime;
pub use self::system::System; pub use self::system::System;
pub use self::worker::Worker; pub use self::worker::Worker;
/// Spawns a future on the current [Arbiter].
///
/// # Panics
/// Panics if Actix system is not running.
#[inline]
pub fn spawn<Fut>(f: Fut) -> JoinHandle<()>
where
Fut: Future<Output = ()> + 'static,
{
tokio::task::spawn_local(f)
}
pub mod signal { pub mod signal {
//! Asynchronous signal handling (Tokio re-exports). //! Asynchronous signal handling (Tokio re-exports).
@ -72,3 +60,15 @@ pub mod task {
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
} }
/// Spawns a future on the current [Worker].
///
/// # Panics
/// Panics if Actix system is not running.
#[inline]
pub fn spawn<Fut>(f: Fut) -> JoinHandle<()>
where
Fut: Future<Output = ()> + 'static,
{
tokio::task::spawn_local(f)
}

View File

@ -23,7 +23,8 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct System { pub struct System {
id: usize, id: usize,
tx: mpsc::UnboundedSender<SystemCommand>, sys_tx: mpsc::UnboundedSender<SystemCommand>,
// TODO: which worker is this exactly
worker: Worker, worker: Worker,
} }
@ -32,10 +33,13 @@ thread_local!(
); );
impl System { impl System {
/// Constructs new system and sets it as current /// Constructs new system and sets it as current.
pub(crate) fn construct(sys: mpsc::UnboundedSender<SystemCommand>, worker: Worker) -> Self { pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
worker: Worker,
) -> Self {
let sys = System { let sys = System {
tx: sys, sys_tx,
worker, worker,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
}; };
@ -60,6 +64,9 @@ impl System {
} }
/// Get current running system. /// Get current running system.
///
/// # Panics
/// Panics if no system is registered on the current thread.
pub fn current() -> System { pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() { CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => sys.clone(), Some(ref sys) => sys.clone(),
@ -103,15 +110,16 @@ impl System {
/// Stop the system with a particular exit code. /// Stop the system with a particular exit code.
pub fn stop_with_code(&self, code: i32) { pub fn stop_with_code(&self, code: i32) {
let _ = self.tx.send(SystemCommand::Exit(code)); let _ = self.sys_tx.send(SystemCommand::Exit(code));
} }
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> { pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
&self.tx &self.sys_tx
} }
/// Get shared reference to system arbiter. // TODO: give clarity on which worker this is; previous documented as returning "system worker"
pub fn arbiter(&self) -> &Worker { /// Get shared reference to a worker.
pub fn worker(&self) -> &Worker {
&self.worker &self.worker
} }
@ -165,18 +173,21 @@ impl Future for SystemWorker {
// process system command // process system command
Some(cmd) => match cmd { Some(cmd) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
// stop arbiters // stop workers
for arb in self.workers.values() { for wkr in self.workers.values() {
arb.stop(); wkr.stop();
} }
// stop event loop // stop event loop
if let Some(stop) = self.stop.take() { if let Some(stop) = self.stop.take() {
let _ = stop.send(code); let _ = stop.send(code);
} }
} }
SystemCommand::RegisterArbiter(name, hnd) => { SystemCommand::RegisterArbiter(name, hnd) => {
self.workers.insert(name, hnd); self.workers.insert(name, hnd);
} }
SystemCommand::DeregisterArbiter(name) => { SystemCommand::DeregisterArbiter(name) => {
self.workers.remove(&name); self.workers.remove(&name);
} }

View File

@ -27,8 +27,7 @@ thread_local!(
pub(crate) enum WorkerCommand { pub(crate) enum WorkerCommand {
Stop, Stop,
Execute(Box<dyn Future<Output = ()> + Unpin + Send>), Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
ExecuteFn(Box<dyn FnOnce() + Send + 'static>),
} }
impl fmt::Debug for WorkerCommand { impl fmt::Debug for WorkerCommand {
@ -36,7 +35,6 @@ impl fmt::Debug for WorkerCommand {
match self { match self {
WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"), WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"),
WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
WorkerCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"),
} }
} }
} }
@ -65,44 +63,12 @@ impl Default for Worker {
} }
impl Worker { impl Worker {
pub(crate) fn new_system(local: &LocalSet) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let arb = Worker::new_handle(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(WorkerRunner { rx });
arb
}
fn new_handle(sender: mpsc::UnboundedSender<WorkerCommand>) -> Self {
Self {
sender,
thread_handle: None,
}
}
/// Returns the current Worker's handle.
///
/// # Panics
/// Panics if no Worker is running on the current thread.
pub fn current() -> Worker {
ADDR.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("Worker is not running."),
})
}
/// Stop worker from continuing it's event loop.
pub fn stop(&self) {
let _ = self.sender.send(WorkerCommand::Stop);
}
/// Spawn new thread and run event loop in spawned thread. /// Spawn new thread and run event loop in spawned thread.
/// ///
/// Returns handle of newly created worker. /// Returns handle of newly created worker.
///
/// # Panics
/// Panics if a [System] not registered on the current thread.
pub fn new() -> Worker { pub fn new() -> Worker {
let id = COUNT.fetch_add(1, Ordering::Relaxed); let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id); let name = format!("actix-rt:worker:{}", id);
@ -147,6 +113,22 @@ impl Worker {
} }
} }
/// Returns the current Worker's handle.
///
/// # Panics
/// Panics if no Worker is running on the current thread.
pub fn current() -> Worker {
ADDR.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
None => panic!("Worker is not running."),
})
}
/// Stop worker from continuing it's event loop.
pub fn stop(&self) {
let _ = self.sender.send(WorkerCommand::Stop);
}
/// Send a future to the Arbiter's thread and spawn it. /// Send a future to the Arbiter's thread and spawn it.
/// ///
/// If you require a result, include a response channel in the future. /// If you require a result, include a response channel in the future.
@ -154,10 +136,10 @@ impl Worker {
/// Returns true if future was sent successfully and false if the Arbiter has died. /// Returns true if future was sent successfully and false if the Arbiter has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool pub fn spawn<Fut>(&self, future: Fut) -> bool
where where
Fut: Future<Output = ()> + Unpin + Send + 'static, Fut: Future<Output = ()> + Send + 'static,
{ {
self.sender self.sender
.send(WorkerCommand::Execute(Box::new(future))) .send(WorkerCommand::Execute(Box::pin(future)))
.is_ok() .is_ok()
} }
@ -171,9 +153,37 @@ impl Worker {
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
{ {
self.sender self.spawn(async { f() })
.send(WorkerCommand::ExecuteFn(Box::new(f))) }
.is_ok()
/// Wait for worker's event loop to complete.
///
/// Joins the underlying OS thread handle, if contained.
pub fn join(&mut self) -> thread::Result<()> {
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join()
} else {
Ok(())
}
}
pub(crate) fn new_system(local: &LocalSet) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let arb = Worker::new_handle(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(WorkerRunner { rx });
arb
}
fn new_handle(sender: mpsc::UnboundedSender<WorkerCommand>) -> Self {
Self {
sender,
thread_handle: None,
}
} }
/// Insert item into worker's thread-local storage. /// Insert item into worker's thread-local storage.
@ -190,11 +200,6 @@ impl Worker {
/// Call a function with a shared reference to an item in this worker's thread-local storage. /// Call a function with a shared reference to an item in this worker's thread-local storage.
/// ///
/// # Examples
/// ```
///
/// ```
///
/// # Panics /// # Panics
/// Panics if item is not in worker's thread-local item storage. /// Panics if item is not in worker's thread-local item storage.
pub fn get_item<T: 'static, F, R>(mut f: F) -> R pub fn get_item<T: 'static, F, R>(mut f: F) -> R
@ -228,17 +233,6 @@ impl Worker {
f(item) f(item)
}) })
} }
/// Wait for worker's event loop to complete.
///
/// Joins the underlying OS thread handle, if contained.
pub fn join(&mut self) -> thread::Result<()> {
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join()
} else {
Ok(())
}
}
} }
/// A persistent worker future that processes worker commands. /// A persistent worker future that processes worker commands.
@ -256,17 +250,12 @@ impl Future for WorkerRunner {
// channel closed; no more messages can be received // channel closed; no more messages can be received
None => return Poll::Ready(()), None => return Poll::Ready(()),
// process arbiter command // process worker command
Some(item) => match item { Some(item) => match item {
WorkerCommand::Stop => return Poll::Ready(()), WorkerCommand::Stop => return Poll::Ready(()),
WorkerCommand::Execute(task_fut) => { WorkerCommand::Execute(task_fut) => {
tokio::task::spawn_local(task_fut); tokio::task::spawn_local(task_fut);
} }
WorkerCommand::ExecuteFn(task_fn) => {
tokio::task::spawn_local(async {
task_fn();
});
}
}, },
} }
} }

View File

@ -136,12 +136,10 @@ fn worker_drop_no_panic_fn() {
#[test] #[test]
fn worker_drop_no_panic_fut() { fn worker_drop_no_panic_fut() {
use futures_util::future::lazy;
let _ = System::new("test-system"); let _ = System::new("test-system");
let mut worker = Worker::new(); let mut worker = Worker::new();
worker.spawn(lazy(|_| panic!("test"))); worker.spawn(async { panic!("test") });
worker.stop(); worker.stop();
worker.join().unwrap(); worker.join().unwrap();
@ -187,3 +185,15 @@ fn system_name_cow_string() {
let _ = System::new("test-system".to_owned()); let _ = System::new("test-system".to_owned());
System::current().stop(); System::current().stop();
} }
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_worker_new_panic() {
Worker::new();
}

View File

@ -1,8 +1,10 @@
use std::time::Duration; use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
use actix_rt::time::{sleep_until, Instant}; use actix_rt::{
use actix_rt::System; time::{sleep_until, Instant},
System,
};
use log::{error, info}; use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use slab::Slab; use slab::Slab;
@ -401,10 +403,11 @@ impl Accept {
// after the sleep a Timer interest is sent to Accept Poll // after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone(); let waker = self.waker.clone();
System::current().arbiter().spawn(Box::pin(async move { System::current().worker().spawn(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await; sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer); waker.wake(WakerInterest::Timer);
})); });
return; return;
} }
} }