diff --git a/Cargo.toml b/Cargo.toml index fc7e452c..686103f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "./", "actix-codec", "actix-service", + "actix-rt", ] [package.metadata.docs.rs] diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md new file mode 100644 index 00000000..aefde3cd --- /dev/null +++ b/actix-rt/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## [0.1.0] - 2018-12-09 + +* Move codec to separate crate diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml new file mode 100644 index 00000000..eec0f4a3 --- /dev/null +++ b/actix-rt/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "actix-rt" +version = "0.1.0" +authors = ["Nikolay Kim "] +description = "Actix runtime" +keywords = ["network", "framework", "async", "futures"] +homepage = "https://actix.rs" +repository = "https://github.com/actix/actix-net.git" +documentation = "https://docs.rs/actix-rt/" +categories = ["network-programming", "asynchronous"] +license = "MIT/Apache-2.0" +exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] +edition = "2018" +workspace = "../" + +[lib] +name = "actix_rt" +path = "src/lib.rs" + +[dependencies] +log = "0.4" +bytes = "0.4" +futures = "0.1.24" +tokio-current-thread = "0.1" +tokio-executor = "0.1.5" +tokio-reactor = "0.1.7" +tokio-timer = "0.2.8" diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs new file mode 100644 index 00000000..e7492578 --- /dev/null +++ b/actix-rt/src/arbiter.rs @@ -0,0 +1,238 @@ +#![allow( + clippy::borrow_interior_mutable_const, + clippy::declare_interior_mutable_const +)] +use std::cell::{Cell, RefCell}; +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; + +use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; +use futures::sync::oneshot::{channel, Sender}; +use futures::{future, Async, Future, IntoFuture, Poll, Stream}; +use tokio_current_thread::spawn; + +use crate::builder::Builder; +use crate::system::System; + +thread_local!( + static ADDR: RefCell> = RefCell::new(None); + static RUNNING: Cell = Cell::new(false); + static Q: RefCell>>> = RefCell::new(Vec::new()); +); + +pub(crate) const COUNT: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug)] +pub(crate) enum ArbiterCommand { + Stop, +} + +#[derive(Debug, Clone)] +pub struct Arbiter(UnboundedSender); + +impl Default for Arbiter { + fn default() -> Self { + Self::new() + } +} + +impl Arbiter { + pub(crate) fn new_system() -> Self { + let (tx, rx) = unbounded(); + + let arb = Arbiter(tx); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + RUNNING.with(|cell| cell.set(false)); + Arbiter::spawn(ArbiterController { stop: None, rx }); + + arb + } + + /// Stop arbiter + pub fn stop(&self) { + let _ = self.0.unbounded_send(ArbiterCommand::Stop); + } + + /// Spawn new thread and run event loop in spawned thread. + /// Returns address of newly created arbiter. + pub fn new() -> Arbiter { + let id = COUNT.fetch_add(1, Ordering::Relaxed); + let name = format!("actix-rt:worker:{}", id); + let sys = System::current(); + let (arb_tx, arb_rx) = unbounded(); + let arb_tx2 = arb_tx.clone(); + + let _ = thread::Builder::new().name(name.clone()).spawn(move || { + let mut rt = Builder::new().build_rt().expect("Can not create Runtime"); + let arb = Arbiter(arb_tx); + + let (stop, stop_rx) = channel(); + RUNNING.with(|cell| cell.set(true)); + + System::set_current(sys); + + // start arbiter controller + rt.spawn(ArbiterController { + stop: Some(stop), + rx: arb_rx, + }); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + + // register arbiter + let _ = System::current() + .sys() + .unbounded_send(SystemCommand::RegisterArbiter(id, arb.clone())); + + // run loop + let _ = match rt.block_on(stop_rx) { + Ok(code) => code, + Err(_) => 1, + }; + + // unregister arbiter + let _ = System::current() + .sys() + .unbounded_send(SystemCommand::UnregisterArbiter(id)); + }); + + Arbiter(arb_tx2) + } + + pub(crate) fn run_system() { + RUNNING.with(|cell| cell.set(true)); + Q.with(|cell| { + let mut v = cell.borrow_mut(); + for fut in v.drain(..) { + spawn(fut); + } + }); + } + + pub(crate) fn stop_system() { + RUNNING.with(|cell| cell.set(false)); + } + + /// Executes a future on the current thread. + pub fn spawn(future: F) + where + F: Future + 'static, + { + RUNNING.with(move |cell| { + if cell.get() { + spawn(Box::new(future)); + } else { + Q.with(move |cell| cell.borrow_mut().push(Box::new(future))); + } + }); + } + + /// Executes a future on the current thread. + pub fn spawn_fn(f: F) + where + F: FnOnce() -> R + 'static, + R: IntoFuture + 'static, + { + Arbiter::spawn(future::lazy(f)) + } +} + +struct ArbiterController { + stop: Option>, + rx: UnboundedReceiver, +} + +impl Drop for ArbiterController { + fn drop(&mut self) { + if thread::panicking() { + eprintln!("Panic in Arbiter thread, shutting down system."); + if System::current().stop_on_panic() { + System::current().stop_with_code(1) + } + } + } +} + +impl Future for ArbiterController { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(None)) | Err(_) => Ok(Async::Ready(())), + Ok(Async::Ready(Some(item))) => match item { + ArbiterCommand::Stop => { + if let Some(stop) = self.stop.take() { + let _ = stop.send(0); + }; + Ok(Async::Ready(())) + } + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + } + } +} + +#[derive(Debug)] +pub(crate) enum SystemCommand { + Exit(i32), + RegisterArbiter(usize, Arbiter), + UnregisterArbiter(usize), +} + +#[derive(Debug)] +pub(crate) struct SystemArbiter { + stop: Option>, + commands: UnboundedReceiver, + arbiters: HashMap, +} + +impl SystemArbiter { + pub(crate) fn new(stop: Sender, commands: UnboundedReceiver) -> Self { + SystemArbiter { + commands, + stop: Some(stop), + arbiters: HashMap::new(), + } + } +} + +impl Future for SystemArbiter { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.commands.poll() { + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(cmd))) => match cmd { + SystemCommand::Exit(code) => { + // stop arbiters + for arb in self.arbiters.values() { + arb.stop(); + } + // stop event loop + if let Some(stop) = self.stop.take() { + let _ = stop.send(code); + } + } + SystemCommand::RegisterArbiter(name, hnd) => { + self.arbiters.insert(name, hnd); + } + SystemCommand::UnregisterArbiter(name) => { + self.arbiters.remove(&name); + } + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + } + Ok(Async::NotReady) + } +} + +// /// Execute function in arbiter's thread +// impl Handler> for SystemArbiter { +// type Result = Result; + +// fn handle(&mut self, msg: Execute, _: &mut Context) -> Result { +// msg.exec() +// } +// } diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs new file mode 100644 index 00000000..3be9fb77 --- /dev/null +++ b/actix-rt/src/builder.rs @@ -0,0 +1,175 @@ +use std::borrow::Cow; +use std::io; + +use futures::future::{lazy, Future}; +use futures::sync::mpsc::unbounded; +use futures::sync::oneshot::{channel, Receiver}; + +use tokio_current_thread::CurrentThread; +use tokio_reactor::Reactor; +use tokio_timer::clock::Clock; +use tokio_timer::timer::Timer; + +use crate::arbiter::{Arbiter, SystemArbiter}; +use crate::runtime::Runtime; +use crate::system::System; + +/// Builder struct for a actix runtime. +/// +/// Either use `Builder::build` to create a system and start actors. +/// Alternatively, use `Builder::run` to start the tokio runtime and +/// run a function in its context. +pub struct Builder { + /// Name of the System. Defaults to "actix" if unset. + name: Cow<'static, str>, + + /// The clock to use + clock: Clock, + + /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. + stop_on_panic: bool, +} + +impl Builder { + pub(crate) fn new() -> Self { + Builder { + name: Cow::Borrowed("actix"), + clock: Clock::new(), + stop_on_panic: false, + } + } + + /// Sets the name of the System. + pub fn name>(mut self, name: T) -> Self { + self.name = Cow::Owned(name.into()); + self + } + + /// Set the Clock instance that will be used by this System. + /// + /// Defaults to the system clock. + pub fn clock(mut self, clock: Clock) -> Self { + self.clock = clock; + self + } + + /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an + /// uncaught panic is thrown from a worker thread. + /// + /// Defaults to false. + pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self { + self.stop_on_panic = stop_on_panic; + self + } + + /// Create new System. + /// + /// This method panics if it can not create tokio runtime + pub fn build(self) -> SystemRunner { + self.create_runtime(|| {}) + } + + /// This function will start tokio runtime and will finish once the + /// `System::stop()` message get called. + /// Function `f` get called within tokio runtime context. + pub fn run(self, f: F) -> i32 + where + F: FnOnce() + 'static, + { + self.create_runtime(f).run() + } + + fn create_runtime(self, f: F) -> SystemRunner + where + F: FnOnce() + 'static, + { + let (stop_tx, stop) = channel(); + let (sys_sender, sys_receiver) = unbounded(); + + let arbiter = Arbiter::new_system(); + let system = System::construct(sys_sender, arbiter.clone(), self.stop_on_panic); + + // system arbiter + let arb = SystemArbiter::new(stop_tx, sys_receiver); + + let mut rt = self.build_rt().unwrap(); + rt.spawn(arb); + + // init system arbiter and run configuration method + let _ = rt.block_on(lazy(move || { + f(); + Ok::<_, ()>(()) + })); + + SystemRunner { rt, stop, system } + } + + pub(crate) fn build_rt(&self) -> io::Result { + // We need a reactor to receive events about IO objects from kernel + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle(); + + // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the + // reactor pick up some new external events. + let timer = Timer::new_with_now(reactor, self.clock.clone()); + let timer_handle = timer.handle(); + + // And now put a single-threaded executor on top of the timer. When there are no futures ready + // to do something, it'll let the timer or the reactor to generate some new stimuli for the + // futures to continue in their life. + let executor = CurrentThread::new_with_park(timer); + + Ok(Runtime::new2( + reactor_handle, + timer_handle, + self.clock.clone(), + executor, + )) + } +} + +/// Helper object that runs System's event loop +#[must_use = "SystemRunner must be run"] +#[derive(Debug)] +pub struct SystemRunner { + rt: Runtime, + stop: Receiver, + system: System, +} + +impl SystemRunner { + /// This function will start event loop and will finish once the + /// `System::stop()` function is called. + pub fn run(self) -> i32 { + let SystemRunner { mut rt, stop, .. } = self; + + // run loop + let _ = rt.block_on(lazy(move || { + Arbiter::run_system(); + Ok::<_, ()>(()) + })); + let code = match rt.block_on(stop) { + Ok(code) => code, + Err(_) => 1, + }; + Arbiter::stop_system(); + code + } + + /// Execute a future and wait for result. + pub fn block_on(&mut self, fut: F) -> Result + where + F: Future, + { + let _ = self.rt.block_on(lazy(move || { + Arbiter::run_system(); + Ok::<_, ()>(()) + })); + let res = self.rt.block_on(fut); + let _ = self.rt.block_on(lazy(move || { + Arbiter::stop_system(); + Ok::<_, ()>(()) + })); + res + } +} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs new file mode 100644 index 00000000..ef390b97 --- /dev/null +++ b/actix-rt/src/lib.rs @@ -0,0 +1,12 @@ +//! A runtime implementation that runs everything on the current thread. + +mod arbiter; +mod builder; +mod runtime; +mod system; + +pub use self::builder::{Builder, SystemRunner}; +pub use self::runtime::{Handle, Runtime}; +pub use self::system::System; +// pub use tokio_current_thread::spawn; +// pub use tokio_current_thread::TaskExecutor; diff --git a/actix-rt/src/mod.rs b/actix-rt/src/mod.rs new file mode 100644 index 00000000..dca41711 --- /dev/null +++ b/actix-rt/src/mod.rs @@ -0,0 +1,92 @@ +//! A runtime implementation that runs everything on the current thread. +//! +//! [`current_thread::Runtime`][rt] is similar to the primary +//! [`Runtime`][concurrent-rt] except that it runs all components on the current +//! thread instead of using a thread pool. This means that it is able to spawn +//! futures that do not implement `Send`. +//! +//! Same as the default [`Runtime`][concurrent-rt], the +//! [`current_thread::Runtime`][rt] includes: +//! +//! * A [reactor] to drive I/O resources. +//! * An [executor] to execute tasks that use these I/O resources. +//! * A [timer] for scheduling work to run after a set period of time. +//! +//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself +//! and cannot be safely moved to other threads. +//! +//! # Spawning from other threads +//! +//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot +//! safely be moved to other threads, it provides a `Handle` that can be sent +//! to other threads and allows to spawn new tasks from there. +//! +//! For example: +//! +//! ``` +//! # extern crate tokio; +//! # extern crate futures; +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! use std::thread; +//! +//! # fn main() { +//! let mut runtime = Runtime::new().unwrap(); +//! let handle = runtime.handle(); +//! +//! thread::spawn(move || { +//! handle.spawn(future::ok(())); +//! }).join().unwrap(); +//! +//! # /* +//! runtime.run().unwrap(); +//! # */ +//! # } +//! ``` +//! +//! # Examples +//! +//! Creating a new `Runtime` and running a future `f` until its completion and +//! returning its result. +//! +//! ``` +//! use tokio::runtime::current_thread::Runtime; +//! use tokio::prelude::*; +//! +//! let mut runtime = Runtime::new().unwrap(); +//! +//! // Use the runtime... +//! // runtime.block_on(f); // where f is a future +//! ``` +//! +//! [rt]: struct.Runtime.html +//! [concurrent-rt]: ../struct.Runtime.html +//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html +//! [reactor]: ../../reactor/struct.Reactor.html +//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors +//! [timer]: ../../timer/index.html + +mod builder; +mod runtime; + +pub use self::builder::Builder; +pub use self::runtime::{Runtime, Handle}; +pub use tokio_current_thread::spawn; +pub use tokio_current_thread::TaskExecutor; + +use futures::Future; + +/// Run the provided future to completion using a runtime running on the current thread. +/// +/// This first creates a new [`Runtime`], and calls [`Runtime::block_on`] with the provided future, +/// which blocks the current thread until the provided future completes. It then calls +/// [`Runtime::run`] to wait for any other spawned futures to resolve. +pub fn block_on_all(future: F) -> Result +where + F: Future, +{ + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + let v = r.block_on(future)?; + r.run().expect("failed to resolve remaining futures"); + Ok(v) +} diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs new file mode 100644 index 00000000..fac0b9d3 --- /dev/null +++ b/actix-rt/src/runtime.rs @@ -0,0 +1,236 @@ +use std::error::Error; +use std::fmt; +use std::io; + +use futures::{future, Future}; +use tokio_current_thread::Handle as ExecutorHandle; +use tokio_current_thread::{self as current_thread, CurrentThread}; +use tokio_executor; +use tokio_reactor::{self, Reactor}; +use tokio_timer::clock::{self, Clock}; +use tokio_timer::timer::{self, Timer}; + +use crate::builder::Builder; + +/// Single-threaded runtime provides a way to start reactor +/// and executor on the current thread. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +#[derive(Debug)] +pub struct Runtime { + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread>, +} + +/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance +#[derive(Debug, Clone)] +pub struct Handle(ExecutorHandle); + +impl Handle { + /// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn(&self, future: F) -> Result<(), tokio_executor::SpawnError> + where + F: Future + Send + 'static, + { + self.0.spawn(future) + } + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + pub fn status(&self) -> Result<(), tokio_executor::SpawnError> { + self.0.status() + } +} + +impl future::Executor for Handle +where + T: Future + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError> { + if let Err(e) = self.status() { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = self.spawn(future); + Ok(()) + } +} + +/// Error returned by the `run` function. +#[derive(Debug)] +pub struct RunError { + inner: current_thread::RunError, +} + +impl fmt::Display for RunError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.inner) + } +} + +impl Error for RunError { + fn description(&self) -> &str { + self.inner.description() + } + fn cause(&self) -> Option<&Error> { + self.inner.cause() + } +} + +impl Runtime { + #[allow(clippy::new_ret_no_self)] + /// Returns a new runtime initialized with default configuration values. + pub fn new() -> io::Result { + Builder::new().build_rt() + } + + pub(super) fn new2( + reactor_handle: tokio_reactor::Handle, + timer_handle: timer::Handle, + clock: Clock, + executor: CurrentThread>, + ) -> Runtime { + Runtime { + reactor_handle, + timer_handle, + clock, + executor, + } + } + + /// Get a new handle to spawn futures on the single-threaded Tokio runtime + /// + /// Different to the runtime itself, the handle can be sent to different + /// threads. + pub fn handle(&self) -> Handle { + Handle(self.executor.handle().clone()) + } + + /// Spawn a future onto the single-threaded Tokio runtime. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # use futures::{future, Future, Stream}; + /// use actix_rt::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("running on the runtime"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn(&mut self, future: F) -> &mut Self + where + F: Future + 'static, + { + self.executor.spawn(future); + self + } + + /// Runs the provided future, blocking the current thread until the future + /// completes. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. Once the function returns, any uncompleted futures + /// remain pending in the `Runtime` instance. These futures will not run + /// until `block_on` or `run` is called again. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution by calling `block_on` or `run`. + pub fn block_on(&mut self, f: F) -> Result + where + F: Future, + { + self.enter(|executor| { + // Run the provided future + let ret = executor.block_on(f); + ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + }) + } + + /// Run the executor to completion, blocking the thread until **all** + /// spawned futures have completed. + pub fn run(&mut self) -> Result<(), RunError> { + self.enter(|executor| executor.run()) + .map_err(|e| RunError { inner: e }) + } + + fn enter(&mut self, f: F) -> R + where + F: FnOnce(&mut current_thread::Entered>) -> R, + { + let Runtime { + ref reactor_handle, + ref timer_handle, + ref clock, + ref mut executor, + .. + } = *self; + + // Binds an executor to this thread + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + + // This will set the default handle and timer to use inside the closure + // and run the future. + tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { + clock::with_default(clock, enter, |enter| { + timer::with_default(&timer_handle, enter, |enter| { + // The TaskExecutor is a fake executor that looks into the + // current single-threaded executor when used. This is a trick, + // because we need two mutable references to the executor (one + // to run the provided future, another to install as the default + // one). We use the fake one here as the default one. + let mut default_executor = current_thread::TaskExecutor::current(); + tokio_executor::with_default(&mut default_executor, enter, |enter| { + let mut executor = executor.enter(enter); + f(&mut executor) + }) + }) + }) + }) + } +} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs new file mode 100644 index 00000000..a1a4e2f2 --- /dev/null +++ b/actix-rt/src/system.rs @@ -0,0 +1,119 @@ +use std::cell::RefCell; + +use futures::sync::mpsc::UnboundedSender; + +use crate::arbiter::{Arbiter, SystemCommand}; +use crate::builder::{Builder, SystemRunner}; + +/// System is a runtime manager. +#[derive(Clone, Debug)] +pub struct System { + sys: UnboundedSender, + arbiter: Arbiter, + stop_on_panic: bool, +} + +thread_local!( + static CURRENT: RefCell> = RefCell::new(None); +); + +impl System { + /// Constructs new system and sets it as current + pub(crate) fn construct( + sys: UnboundedSender, + arbiter: Arbiter, + stop_on_panic: bool, + ) -> Self { + let sys = System { + sys, + arbiter, + stop_on_panic, + }; + System::set_current(sys.clone()); + sys + } + + /// Build a new system with a customized tokio runtime. + /// + /// This allows to customize the runtime. See struct level docs on + /// `Builder` for more information. + pub fn builder() -> Builder { + Builder::new() + } + + #[allow(clippy::new_ret_no_self)] + /// Create new system. + /// + /// This method panics if it can not create tokio runtime + pub fn new>(name: T) -> SystemRunner { + Self::builder().name(name).build() + } + + /// Get current running system. + pub fn current() -> System { + CURRENT.with(|cell| match *cell.borrow() { + Some(ref sys) => sys.clone(), + None => panic!("System is not running"), + }) + } + + /// Set current running system. + #[doc(hidden)] + pub(crate) fn _is_set() -> bool { + CURRENT.with(|cell| cell.borrow().is_some()) + } + + /// Set current running system. + #[doc(hidden)] + pub fn set_current(sys: System) { + CURRENT.with(|s| { + *s.borrow_mut() = Some(sys); + }) + } + + /// Execute function with system reference. + pub fn with_current(f: F) -> R + where + F: FnOnce(&System) -> R, + { + CURRENT.with(|cell| match *cell.borrow() { + Some(ref sys) => f(sys), + None => panic!("System is not running"), + }) + } + + /// Stop the system + pub fn stop(&self) { + self.stop_with_code(0) + } + + /// Stop the system with a particular exit code. + pub fn stop_with_code(&self, code: i32) { + let _ = self.sys.unbounded_send(SystemCommand::Exit(code)); + } + + pub(crate) fn sys(&self) -> &UnboundedSender { + &self.sys + } + + /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an + /// uncaught panic is thrown from a worker thread. + pub fn stop_on_panic(&self) -> bool { + self.stop_on_panic + } + + /// System arbiter + pub fn arbiter(&self) -> &Arbiter { + &self.arbiter + } + + /// This function will start tokio runtime and will finish once the + /// `System::stop()` message get called. + /// Function `f` get called within tokio runtime context. + pub fn run(f: F) -> i32 + where + F: FnOnce() + 'static, + { + Self::builder().run(f) + } +}