use std::borrow::Cow; use std::future::Future; use std::io; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::oneshot::{channel, Receiver}; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; use crate::runtime::Runtime; use crate::system::System; /// Builder struct for a actix runtime. /// /// Either use `Builder::build` to create a system and start actors. /// Alternatively, use `Builder::run` to start the tokio runtime and /// run a function in its context. pub struct Builder { /// Name of the System. Defaults to "actix" if unset. name: Cow<'static, str>, /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. stop_on_panic: bool, } impl Builder { pub(crate) fn new() -> Self { Builder { name: Cow::Borrowed("actix"), stop_on_panic: false, } } /// Sets the name of the System. pub fn name>(mut self, name: T) -> Self { self.name = Cow::Owned(name.into()); self } /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an /// uncaught panic is thrown from a worker thread. /// /// Defaults to false. pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self { self.stop_on_panic = stop_on_panic; self } /// Create new System. /// /// This method panics if it can not create tokio runtime pub fn build(self) -> SystemRunner { self.create_runtime(|| {}) } /// Create new System that can run asynchronously. /// /// This method panics if it cannot start the system arbiter pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner { self.create_async_runtime(local) } /// This function will start tokio runtime and will finish once the /// `System::stop()` message get called. /// Function `f` get called within tokio runtime context. pub fn run(self, f: F) -> io::Result<()> where F: FnOnce(), { self.create_runtime(f).run() } fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded_channel(); let system = System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); // start the system arbiter let _ = local.spawn_local(arb); AsyncSystemRunner { stop, system } } fn create_runtime(self, f: F) -> SystemRunner where F: FnOnce(), { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded_channel(); let rt = Runtime::new().unwrap(); let system = System::construct( sys_sender, Arbiter::new_system(rt.local()), self.stop_on_panic, ); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); rt.spawn(arb); // init system arbiter and run configuration method rt.block_on(async { f() }); SystemRunner { rt, stop, system } } } #[derive(Debug)] pub(crate) struct AsyncSystemRunner { stop: Receiver, system: System, } impl AsyncSystemRunner { /// This function will start event loop and returns a future that /// resolves once the `System::stop()` function is called. pub(crate) fn run_nonblocking(self) -> impl Future> + Send { let AsyncSystemRunner { stop, .. } = self; // run loop async { match stop.await { Ok(code) => { if code != 0 { Err(io::Error::new( io::ErrorKind::Other, format!("Non-zero exit code: {}", code), )) } else { Ok(()) } } Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } } } } /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] #[derive(Debug)] pub struct SystemRunner { rt: Runtime, stop: Receiver, system: System, } impl SystemRunner { /// This function will start event loop and will finish once the /// `System::stop()` function is called. pub fn run(self) -> io::Result<()> { let SystemRunner { rt, stop, .. } = self; // run loop match rt.block_on(stop) { Ok(code) => { if code != 0 { Err(io::Error::new( io::ErrorKind::Other, format!("Non-zero exit code: {}", code), )) } else { Ok(()) } } Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } } /// Execute a future and wait for result. #[inline] pub fn block_on(&self, fut: F) -> F::Output { self.rt.block_on(fut) } }