diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index c38373dd..73bcac5c 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -8,8 +8,10 @@ * Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] * Remove `Arbiter::exec`. [#253] * Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] +* Rename `Arbiter => Worker`. [#254] [#253]: https://github.com/actix/actix-net/pull/253 +[#254]: https://github.com/actix/actix-net/pull/254 ## 2.0.0-beta.2 - 2021-01-09 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs deleted file mode 100644 index fde3cd1c..00000000 --- a/actix-rt/src/arbiter.rs +++ /dev/null @@ -1,347 +0,0 @@ -use std::{ - any::{Any, TypeId}, - cell::RefCell, - collections::HashMap, - fmt, - future::Future, - pin::Pin, - sync::atomic::{AtomicUsize, Ordering}, - task::{Context, Poll}, - thread, -}; - -use futures_core::ready; -use tokio::{ - sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot::Sender, - }, - task::LocalSet, -}; - -use crate::{runtime::Runtime, system::System}; - -pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); - -thread_local!( - static ADDR: RefCell> = RefCell::new(None); - static STORAGE: RefCell>> = RefCell::new(HashMap::new()); -); - -pub(crate) enum ArbiterCommand { - Stop, - Execute(Box + Unpin + Send>), - ExecuteFn(Box), -} - -impl fmt::Debug for ArbiterCommand { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"), - ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), - ArbiterCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"), - } - } -} - -/// Arbiters provide an asynchronous execution environment for actors, functions and futures. When -/// an Arbiter is created, it spawns a new OS thread, and hosts an event loop. Some Arbiter -/// functions execute on the current thread. -#[derive(Debug)] -pub struct Arbiter { - sender: UnboundedSender, - thread_handle: Option>, -} - -impl Clone for Arbiter { - fn clone(&self) -> Self { - Self::with_sender(self.sender.clone()) - } -} - -impl Default for Arbiter { - fn default() -> Self { - Self::new() - } -} - -impl Arbiter { - pub(crate) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = unbounded_channel(); - - let arb = Arbiter::with_sender(tx); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - STORAGE.with(|cell| cell.borrow_mut().clear()); - - local.spawn_local(ArbiterController { rx }); - - arb - } - - /// Returns the current thread's arbiter's address. If no Arbiter is present, then this - /// function will panic! - pub fn current() -> Arbiter { - ADDR.with(|cell| match *cell.borrow() { - Some(ref addr) => addr.clone(), - None => panic!("Arbiter is not running"), - }) - } - - /// Stop arbiter from continuing it's event loop. - pub fn stop(&self) { - let _ = self.sender.send(ArbiterCommand::Stop); - } - - /// Spawn new thread and run event loop in spawned thread. - /// Returns address of newly created arbiter. - pub fn new() -> Arbiter { - let id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt:worker:{}", id); - let sys = System::current(); - let (tx, rx) = unbounded_channel(); - - let handle = thread::Builder::new() - .name(name.clone()) - .spawn({ - let tx = tx.clone(); - move || { - let rt = Runtime::new().expect("Can not create Runtime"); - let arb = Arbiter::with_sender(tx); - - STORAGE.with(|cell| cell.borrow_mut().clear()); - - System::set_current(sys); - - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - - // register arbiter - let _ = System::current() - .tx() - .send(SystemCommand::RegisterArbiter(id, arb)); - - // start arbiter controller - // run loop - rt.block_on(ArbiterController { rx }); - - // deregister arbiter - let _ = System::current() - .tx() - .send(SystemCommand::DeregisterArbiter(id)); - } - }) - .unwrap_or_else(|err| { - panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) - }); - - Arbiter { - sender: tx, - thread_handle: Some(handle), - } - } - - /// Send a future to the Arbiter's thread and spawn it. - /// - /// If you require a result, include a response channel in the future. - /// - /// Returns true if future was sent successfully and false if the Arbiter has died. - pub fn spawn(&self, future: Fut) -> bool - where - Fut: Future + Unpin + Send + 'static, - { - match self.sender.send(ArbiterCommand::Execute(Box::new(future))) { - Ok(_) => true, - Err(_) => false, - } - } - - /// Send a function to the Arbiter's thread and execute it. - /// - /// Any result from the function is discarded. If you require a result, include a response - /// channel in the function. - /// - /// Returns true if function was sent successfully and false if the Arbiter has died. - pub fn spawn_fn(&self, f: F) -> bool - where - F: FnOnce() + Send + 'static, - { - match self.sender.send(ArbiterCommand::ExecuteFn(Box::new(f))) { - Ok(_) => true, - Err(_) => false, - } - } - - /// Set item to arbiter storage - pub fn set_item(item: T) { - STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); - } - - /// Check if arbiter storage contains item - pub fn contains_item() -> bool { - STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::()).is_some()) - } - - /// Get a reference to a type previously inserted on this arbiter's storage. - /// - /// Panics is item is not inserted - pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, - { - STORAGE.with(move |cell| { - let st = cell.borrow(); - let item = st - .get(&TypeId::of::()) - .and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref()) - .unwrap(); - f(item) - }) - } - - /// Get a mutable reference to a type previously inserted on this arbiter's storage. - /// - /// Panics is item is not inserted - pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, - { - STORAGE.with(move |cell| { - let mut st = cell.borrow_mut(); - let item = st - .get_mut(&TypeId::of::()) - .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut()) - .unwrap(); - f(item) - }) - } - - fn with_sender(sender: UnboundedSender) -> Self { - Self { - sender, - thread_handle: None, - } - } - - /// Wait for the event loop to stop by joining the underlying thread (if have Some). - pub fn join(&mut self) -> thread::Result<()> { - if let Some(thread_handle) = self.thread_handle.take() { - thread_handle.join() - } else { - Ok(()) - } - } -} - -struct ArbiterController { - rx: UnboundedReceiver, -} - -impl Drop for ArbiterController { - fn drop(&mut self) { - // panics can only occur with spawn_fn calls - if thread::panicking() { - if System::current().stop_on_panic() { - eprintln!("Panic in Arbiter thread, shutting down system."); - System::current().stop_with_code(1) - } else { - eprintln!("Panic in Arbiter thread."); - } - } - } -} - -impl Future for ArbiterController { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // process all items currently buffered in channel - loop { - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { - // channel closed; no more messages can be received - None => return Poll::Ready(()), - - // process arbiter command - Some(item) => match item { - ArbiterCommand::Stop => return Poll::Ready(()), - ArbiterCommand::Execute(fut) => { - tokio::task::spawn_local(fut); - } - ArbiterCommand::ExecuteFn(f) => { - f.call_box(); - } - }, - } - } - } -} - -#[derive(Debug)] -pub(crate) enum SystemCommand { - Exit(i32), - RegisterArbiter(usize, Arbiter), - DeregisterArbiter(usize), -} - -#[derive(Debug)] -pub(crate) struct SystemArbiter { - stop: Option>, - commands: UnboundedReceiver, - arbiters: HashMap, -} - -impl SystemArbiter { - pub(crate) fn new(stop: Sender, commands: UnboundedReceiver) -> Self { - SystemArbiter { - commands, - stop: Some(stop), - arbiters: HashMap::new(), - } - } -} - -impl Future for SystemArbiter { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // process all items currently buffered in channel - loop { - match ready!(Pin::new(&mut self.commands).poll_recv(cx)) { - // channel closed; no more messages can be received - None => return Poll::Ready(()), - - // process system command - Some(cmd) => match cmd { - SystemCommand::Exit(code) => { - // stop arbiters - for arb in self.arbiters.values() { - arb.stop(); - } - // stop event loop - if let Some(stop) = self.stop.take() { - let _ = stop.send(code); - } - } - SystemCommand::RegisterArbiter(name, hnd) => { - self.arbiters.insert(name, hnd); - } - SystemCommand::DeregisterArbiter(name) => { - self.arbiters.remove(&name); - } - }, - } - } - } -} - -pub trait FnExec: Send + 'static { - fn call_box(self: Box); -} - -impl FnExec for F -where - F: FnOnce() + Send + 'static, -{ - #[allow(clippy::boxed_local)] - fn call_box(self: Box) { - (*self)() - } -} diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 56cfcb91..01c85512 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -6,9 +6,9 @@ use tokio::sync::{ }; use crate::{ - arbiter::{Arbiter, SystemArbiter}, runtime::Runtime, - system::System, + system::{System, SystemWorker}, + worker::Worker, }; /// Builder an actix runtime. @@ -73,12 +73,11 @@ impl Builder { let system = System::construct( sys_sender, - Arbiter::new_system(rt.local()), + Worker::new_system(rt.local()), self.stop_on_panic, ); - // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); + let arb = SystemWorker::new(sys_receiver, stop_tx); rt.spawn(arb); diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index c2222a79..212c0c65 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -15,17 +15,17 @@ use tokio::task::JoinHandle; #[cfg(all(feature = "macros", not(test)))] pub use actix_macros::{main, test}; -mod arbiter; mod builder; mod runtime; mod system; +mod worker; -pub use self::arbiter::Arbiter; pub use self::builder::{Builder, SystemRunner}; pub use self::runtime::Runtime; pub use self::system::System; +pub use self::worker::Worker; -/// Spawns a future on the current arbiter. +/// Spawns a future on the current [Arbiter]. /// /// # Panics /// Panics if Actix system is not running. @@ -37,33 +37,29 @@ where tokio::task::spawn_local(f) } -/// Asynchronous signal handling pub mod signal { + //! Asynchronous signal handling (Tokio re-exports). + #[cfg(unix)] pub mod unix { - //! Unix specific signals. + //! Unix specific signals (Tokio re-exports). pub use tokio::signal::unix::*; } pub use tokio::signal::ctrl_c; } pub mod net { - //! TCP/UDP/Unix bindings + //! TCP/UDP/Unix bindings (Tokio re-exports). pub use tokio::net::UdpSocket; pub use tokio::net::{TcpListener, TcpStream}; #[cfg(unix)] - mod unix { - pub use tokio::net::{UnixDatagram, UnixListener, UnixStream}; - } - - #[cfg(unix)] - pub use self::unix::*; + pub use tokio::net::{UnixDatagram, UnixListener, UnixStream}; } pub mod time { - //! Utilities for tracking time. + //! Utilities for tracking time (Tokio re-exports). pub use tokio::time::Instant; pub use tokio::time::{interval, interval_at, Interval}; @@ -72,7 +68,7 @@ pub mod time { } pub mod task { - //! Task management. + //! Task management (Tokio re-exports). pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 64080a63..e7830175 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,14 +1,19 @@ use std::{ cell::RefCell, + collections::HashMap, + future::Future, io, + pin::Pin, sync::atomic::{AtomicUsize, Ordering}, + task::{Context, Poll}, }; -use tokio::sync::mpsc::UnboundedSender; +use futures_core::ready; +use tokio::sync::{mpsc, oneshot}; use crate::{ - arbiter::{Arbiter, SystemCommand}, builder::{Builder, SystemRunner}, + worker::Worker, }; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -17,8 +22,8 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); #[derive(Clone, Debug)] pub struct System { id: usize, - tx: UnboundedSender, - arbiter: Arbiter, + tx: mpsc::UnboundedSender, + worker: Worker, stop_on_panic: bool, } @@ -29,13 +34,13 @@ thread_local!( impl System { /// Constructs new system and sets it as current pub(crate) fn construct( - sys: UnboundedSender, - arbiter: Arbiter, + sys: mpsc::UnboundedSender, + worker: Worker, stop_on_panic: bool, ) -> Self { let sys = System { tx: sys, - arbiter, + worker, stop_on_panic, id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; @@ -43,7 +48,7 @@ impl System { sys } - /// Build a new system with a customized tokio runtime. + /// Build a new system with a customized Tokio runtime. /// /// This allows to customize the runtime. See [`Builder`] for more information. pub fn builder() -> Builder { @@ -52,7 +57,7 @@ impl System { /// Create new system. /// - /// This method panics if it can not create tokio runtime + /// This method panics if it can not create Tokio runtime #[allow(clippy::new_ret_no_self)] pub fn new(name: impl Into) -> SystemRunner { Self::builder().name(name).build() @@ -105,7 +110,7 @@ impl System { let _ = self.tx.send(SystemCommand::Exit(code)); } - pub(crate) fn tx(&self) -> &UnboundedSender { + pub(crate) fn tx(&self) -> &mpsc::UnboundedSender { &self.tx } @@ -116,12 +121,12 @@ impl System { } /// Get shared reference to system arbiter. - pub fn arbiter(&self) -> &Arbiter { - &self.arbiter + pub fn arbiter(&self) -> &Worker { + &self.worker } - /// This function will start tokio runtime and will finish once the `System::stop()` message - /// is called. Function `f` is called within tokio runtime context. + /// This function will start Tokio runtime and will finish once the `System::stop()` message + /// is called. Function `f` is called within Tokio runtime context. pub fn run(f: F) -> io::Result<()> where F: FnOnce(), @@ -129,3 +134,64 @@ impl System { Self::builder().run(f) } } + +#[derive(Debug)] +pub(crate) enum SystemCommand { + Exit(i32), + RegisterArbiter(usize, Worker), + DeregisterArbiter(usize), +} + +#[derive(Debug)] +pub(crate) struct SystemWorker { + stop: Option>, + commands: mpsc::UnboundedReceiver, + workers: HashMap, +} + +impl SystemWorker { + pub(crate) fn new( + commands: mpsc::UnboundedReceiver, + stop: oneshot::Sender, + ) -> Self { + SystemWorker { + commands, + stop: Some(stop), + workers: HashMap::new(), + } + } +} + +impl Future for SystemWorker { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel + loop { + match ready!(Pin::new(&mut self.commands).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process system command + Some(cmd) => match cmd { + SystemCommand::Exit(code) => { + // stop arbiters + for arb in self.workers.values() { + arb.stop(); + } + // stop event loop + if let Some(stop) = self.stop.take() { + let _ = stop.send(code); + } + } + SystemCommand::RegisterArbiter(name, hnd) => { + self.workers.insert(name, hnd); + } + SystemCommand::DeregisterArbiter(name) => { + self.workers.remove(&name); + } + }, + } + } + } +} diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs new file mode 100644 index 00000000..6b022eb2 --- /dev/null +++ b/actix-rt/src/worker.rs @@ -0,0 +1,294 @@ +use std::{ + any::{Any, TypeId}, + cell::RefCell, + collections::HashMap, + fmt, + future::Future, + pin::Pin, + sync::atomic::{AtomicUsize, Ordering}, + task::{Context, Poll}, + thread, +}; + +use futures_core::ready; +use tokio::{sync::mpsc, task::LocalSet}; + +use crate::{ + runtime::Runtime, + system::{System, SystemCommand}, +}; + +pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); + +thread_local!( + static ADDR: RefCell> = RefCell::new(None); + static STORAGE: RefCell>> = RefCell::new(HashMap::new()); +); + +pub(crate) enum WorkerCommand { + Stop, + Execute(Box + Unpin + Send>), + ExecuteFn(Box), +} + +impl fmt::Debug for WorkerCommand { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"), + WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), + WorkerCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"), + } + } +} + +/// A worker represent a thread that provides an asynchronous execution environment for futures +/// and functions. +/// +/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop. +/// Some Arbiter functions execute on the current thread. +#[derive(Debug)] +pub struct Worker { + sender: mpsc::UnboundedSender, + thread_handle: Option>, +} + +impl Clone for Worker { + fn clone(&self) -> Self { + Self::new_handle(self.sender.clone()) + } +} + +impl Default for Worker { + fn default() -> Self { + Self::new() + } +} + +impl Worker { + pub(crate) fn new_system(local: &LocalSet) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + let arb = Worker::new_handle(tx); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + STORAGE.with(|cell| cell.borrow_mut().clear()); + + local.spawn_local(WorkerRunner { rx }); + + arb + } + + fn new_handle(sender: mpsc::UnboundedSender) -> Self { + Self { + sender, + thread_handle: None, + } + } + + /// Returns the current Worker's handle. + /// + /// # Panics + /// Panics if no Worker is running on the current thread. + pub fn current() -> Worker { + ADDR.with(|cell| match *cell.borrow() { + Some(ref addr) => addr.clone(), + None => panic!("Worker is not running."), + }) + } + + /// Stop worker from continuing it's event loop. + pub fn stop(&self) { + let _ = self.sender.send(WorkerCommand::Stop); + } + + /// Spawn new thread and run event loop in spawned thread. + /// + /// Returns handle of newly created worker. + pub fn new() -> Worker { + let id = COUNT.fetch_add(1, Ordering::Relaxed); + let name = format!("actix-rt:worker:{}", id); + let sys = System::current(); + let (tx, rx) = mpsc::unbounded_channel(); + + let handle = thread::Builder::new() + .name(name.clone()) + .spawn({ + let tx = tx.clone(); + move || { + let rt = Runtime::new().expect("Can not create Runtime"); + let arb = Worker::new_handle(tx); + + STORAGE.with(|cell| cell.borrow_mut().clear()); + + System::set_current(sys); + + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + + // register worker + let _ = System::current() + .tx() + .send(SystemCommand::RegisterArbiter(id, arb)); + + // run worker event processing loop + rt.block_on(WorkerRunner { rx }); + + // deregister worker + let _ = System::current() + .tx() + .send(SystemCommand::DeregisterArbiter(id)); + } + }) + .unwrap_or_else(|err| { + panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err) + }); + + Worker { + sender: tx, + thread_handle: Some(handle), + } + } + + /// Send a future to the Arbiter's thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if future was sent successfully and false if the Arbiter has died. + pub fn spawn(&self, future: Fut) -> bool + where + Fut: Future + Unpin + Send + 'static, + { + self.sender + .send(WorkerCommand::Execute(Box::new(future))) + .is_ok() + } + + /// Send a function to the Arbiter's thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the Arbiter has died. + pub fn spawn_fn(&self, f: F) -> bool + where + F: FnOnce() + Send + 'static, + { + self.sender + .send(WorkerCommand::ExecuteFn(Box::new(f))) + .is_ok() + } + + /// Insert item into worker's thread-local storage. + /// + /// Overwrites any item of the same type previously inserted. + pub fn set_item(item: T) { + STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); + } + + /// Check if worker's thread-local storage contains an item type. + pub fn contains_item() -> bool { + STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) + } + + /// Call a function with a shared reference to an item in this worker's thread-local storage. + /// + /// # Examples + /// ``` + /// + /// ``` + /// + /// # Panics + /// Panics if item is not in worker's thread-local item storage. + pub fn get_item(mut f: F) -> R + where + F: FnMut(&T) -> R, + { + STORAGE.with(move |cell| { + let st = cell.borrow(); + + let type_id = TypeId::of::(); + let item = st.get(&type_id).and_then(downcast_ref).unwrap(); + + f(item) + }) + } + + /// Call a function with a mutable reference to an item in this worker's thread-local storage. + /// + /// # Panics + /// Panics if item is not in worker's thread-local item storage. + pub fn get_mut_item(mut f: F) -> R + where + F: FnMut(&mut T) -> R, + { + STORAGE.with(move |cell| { + let mut st = cell.borrow_mut(); + + let type_id = TypeId::of::(); + let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap(); + + f(item) + }) + } + + /// Wait for worker's event loop to complete. + /// + /// Joins the underlying OS thread handle, if contained. + pub fn join(&mut self) -> thread::Result<()> { + if let Some(thread_handle) = self.thread_handle.take() { + thread_handle.join() + } else { + Ok(()) + } + } +} + +/// A persistent worker future that processes worker commands. +struct WorkerRunner { + rx: mpsc::UnboundedReceiver, +} + +impl Drop for WorkerRunner { + fn drop(&mut self) { + // panics can only occur with spawn_fn calls + if thread::panicking() { + if System::current().stop_on_panic() { + eprintln!("Panic in Worker thread, shutting down system."); + System::current().stop_with_code(1) + } else { + eprintln!("Panic in Worker thread."); + } + } + } +} + +impl Future for WorkerRunner { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel + loop { + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process arbiter command + Some(item) => match item { + WorkerCommand::Stop => return Poll::Ready(()), + WorkerCommand::Execute(task_fut) => { + tokio::task::spawn_local(task_fut); + } + WorkerCommand::ExecuteFn(task_fn) => { + task_fn(); + } + }, + } + } + } +} + +fn downcast_ref(boxed: &Box) -> Option<&T> { + boxed.downcast_ref() +} + +fn downcast_mut(boxed: &mut Box) -> Option<&mut T> { + boxed.downcast_mut() +} diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs deleted file mode 100644 index abaff1c9..00000000 --- a/actix-rt/tests/integration_tests.rs +++ /dev/null @@ -1,137 +0,0 @@ -use std::{ - thread, - time::{Duration, Instant}, -}; - -use actix_rt::{Arbiter, System}; - -#[test] -fn await_for_timer() { - let time = Duration::from_secs(1); - let instant = Instant::now(); - actix_rt::System::new("test_wait_timer").block_on(async move { - tokio::time::sleep(time).await; - }); - assert!( - instant.elapsed() >= time, - "Block on should poll awaited future to completion" - ); -} - -#[test] -fn join_another_arbiter() { - let time = Duration::from_secs(1); - let instant = Instant::now(); - actix_rt::System::new("test_join_another_arbiter").block_on(async move { - let mut arbiter = actix_rt::Arbiter::new(); - arbiter.spawn(Box::pin(async move { - tokio::time::sleep(time).await; - actix_rt::Arbiter::current().stop(); - })); - arbiter.join().unwrap(); - }); - assert!( - instant.elapsed() >= time, - "Join on another arbiter should complete only when it calls stop" - ); - - let instant = Instant::now(); - actix_rt::System::new("test_join_another_arbiter").block_on(async move { - let mut arbiter = actix_rt::Arbiter::new(); - arbiter.spawn_fn(move || { - actix_rt::spawn(async move { - tokio::time::sleep(time).await; - actix_rt::Arbiter::current().stop(); - }); - }); - arbiter.join().unwrap(); - }); - assert!( - instant.elapsed() >= time, - "Join on a arbiter that has used actix_rt::spawn should wait for said future" - ); - - let instant = Instant::now(); - actix_rt::System::new("test_join_another_arbiter").block_on(async move { - let mut arbiter = actix_rt::Arbiter::new(); - arbiter.spawn(Box::pin(async move { - tokio::time::sleep(time).await; - actix_rt::Arbiter::current().stop(); - })); - arbiter.stop(); - arbiter.join().unwrap(); - }); - assert!( - instant.elapsed() < time, - "Premature stop of arbiter should conclude regardless of it's current state" - ); -} - -#[test] -fn non_static_block_on() { - let string = String::from("test_str"); - let str = string.as_str(); - - let sys = actix_rt::System::new("borrow some"); - - sys.block_on(async { - actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); - }); - - let rt = actix_rt::Runtime::new().unwrap(); - - rt.block_on(async { - actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); - }); - - actix_rt::System::run(|| { - assert_eq!("test_str", str); - actix_rt::System::current().stop(); - }) - .unwrap(); -} - -#[test] -fn wait_for_spawns() { - let rt = actix_rt::Runtime::new().unwrap(); - - let handle = rt.spawn(async { - println!("running on the runtime"); - // assertion panic is caught at task boundary - assert_eq!(1, 2); - }); - - assert!(rt.block_on(handle).is_err()); -} - -#[test] -#[should_panic] -fn arbiter_drop_panic_fn() { - let _ = System::new("test-system"); - - let mut arbiter = Arbiter::new(); - arbiter.spawn_fn(|| panic!("test")); - - arbiter.join().unwrap(); -} - -#[test] -fn arbiter_drop_no_panic_fut() { - use futures_util::future::lazy; - - let _ = System::new("test-system"); - - let mut arbiter = Arbiter::new(); - arbiter.spawn(lazy(|_| panic!("test"))); - - let arb = arbiter.clone(); - let thread = thread::spawn(move || { - thread::sleep(Duration::from_millis(200)); - arb.stop(); - }); - - arbiter.join().unwrap(); - thread.join().unwrap(); -} diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs new file mode 100644 index 00000000..d3ce4e68 --- /dev/null +++ b/actix-rt/tests/tests.rs @@ -0,0 +1,160 @@ +use std::{ + thread, + time::{Duration, Instant}, +}; + +use actix_rt::{System, Worker}; + +#[test] +fn await_for_timer() { + let time = Duration::from_secs(1); + let instant = Instant::now(); + System::new("test_wait_timer").block_on(async move { + tokio::time::sleep(time).await; + }); + assert!( + instant.elapsed() >= time, + "Block on should poll awaited future to completion" + ); +} + +#[test] +fn join_another_worker() { + let time = Duration::from_secs(1); + let instant = Instant::now(); + System::new("test_join_another_worker").block_on(async move { + let mut worker = Worker::new(); + worker.spawn(Box::pin(async move { + tokio::time::sleep(time).await; + Worker::current().stop(); + })); + worker.join().unwrap(); + }); + assert!( + instant.elapsed() >= time, + "Join on another worker should complete only when it calls stop" + ); + + let instant = Instant::now(); + System::new("test_join_another_worker").block_on(async move { + let mut worker = Worker::new(); + worker.spawn_fn(move || { + actix_rt::spawn(async move { + tokio::time::sleep(time).await; + Worker::current().stop(); + }); + }); + worker.join().unwrap(); + }); + assert!( + instant.elapsed() >= time, + "Join on a worker that has used actix_rt::spawn should wait for said future" + ); + + let instant = Instant::now(); + System::new("test_join_another_worker").block_on(async move { + let mut worker = Worker::new(); + worker.spawn(Box::pin(async move { + tokio::time::sleep(time).await; + Worker::current().stop(); + })); + worker.stop(); + worker.join().unwrap(); + }); + assert!( + instant.elapsed() < time, + "Premature stop of worker should conclude regardless of it's current state" + ); +} + +#[test] +fn non_static_block_on() { + let string = String::from("test_str"); + let str = string.as_str(); + + let sys = System::new("borrow some"); + + sys.block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + assert_eq!("test_str", str); + }); + + let rt = actix_rt::Runtime::new().unwrap(); + + rt.block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + assert_eq!("test_str", str); + }); + + System::run(|| { + assert_eq!("test_str", str); + System::current().stop(); + }) + .unwrap(); +} + +#[test] +fn wait_for_spawns() { + let rt = actix_rt::Runtime::new().unwrap(); + + let handle = rt.spawn(async { + println!("running on the runtime"); + // assertion panic is caught at task boundary + assert_eq!(1, 2); + }); + + assert!(rt.block_on(handle).is_err()); +} + +#[test] +#[should_panic] +fn worker_drop_panic_fn() { + let _ = System::new("test-system"); + + let mut worker = Worker::new(); + worker.spawn_fn(|| panic!("test")); + + worker.join().unwrap(); +} + +#[test] +fn worker_drop_no_panic_fut() { + use futures_util::future::lazy; + + let _ = System::new("test-system"); + + let mut worker = Worker::new(); + worker.spawn(lazy(|_| panic!("test"))); + + worker.stop(); + worker.join().unwrap(); +} + +#[test] +fn worker_item_storage() { + let _ = System::new("test-system"); + + let mut worker = Worker::new(); + + assert!(!Worker::contains_item::()); + Worker::set_item(42u32); + assert!(Worker::contains_item::()); + + Worker::get_item(|&item: &u32| assert_eq!(item, 42)); + Worker::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); + + let thread = thread::spawn(move || { + Worker::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); + }) + .join(); + assert!(thread.is_err()); + + let thread = thread::spawn(move || { + Worker::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); + }) + .join(); + assert!(thread.is_err()); + + worker.stop(); + worker.join().unwrap(); +} diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index a78e4175..7290f9dd 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -19,7 +19,7 @@ use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle}; +use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle}; use crate::{join_all, Token}; /// Server builder @@ -297,7 +297,7 @@ impl ServerBuilder { let avail = WorkerAvailability::new(waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - Worker::start(idx, services, avail, self.shutdown_timeout) + ServerWorker::start(idx, services, avail, self.shutdown_timeout) } fn handle_cmd(&mut self, item: ServerCommand) { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index d54a0829..ae387cbd 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -6,7 +6,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; -use actix_rt::{spawn, Arbiter}; +use actix_rt::{spawn, Worker as Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; @@ -122,11 +122,10 @@ impl WorkerAvailability { } } -/// Service worker +/// Service worker. /// -/// Worker accepts Socket objects via unbounded channel and starts stream -/// processing. -pub(crate) struct Worker { +/// Worker accepts Socket objects via unbounded channel and starts stream processing. +pub(crate) struct ServerWorker { rx: UnboundedReceiver, rx2: UnboundedReceiver, services: Vec, @@ -160,7 +159,7 @@ enum WorkerServiceStatus { Stopped, } -impl Worker { +impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, @@ -174,7 +173,7 @@ impl Worker { // every worker runs in it's own arbiter. Arbiter::new().spawn(Box::pin(async move { availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, availability, @@ -304,7 +303,7 @@ enum WorkerState { ), } -impl Future for Worker { +impl Future for ServerWorker { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {