diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 49e8c1f5..ef33528f 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,11 +1,12 @@ use std::borrow::Cow; use std::io; +use futures::future; use futures::future::{lazy, Future}; use futures::sync::mpsc::unbounded; use futures::sync::oneshot::{channel, Receiver}; -use tokio_current_thread::CurrentThread; +use tokio_current_thread::{CurrentThread, Handle}; use tokio_reactor::Reactor; use tokio_timer::clock::Clock; use tokio_timer::timer::Timer; @@ -69,6 +70,13 @@ impl Builder { self.create_runtime(|| {}) } + /// Create new System that can run asynchronously. + /// + /// This method panics if it cannot start the system arbiter + pub fn build_async(self, executor: Handle) -> AsyncSystemRunner { + self.create_async_runtime(executor) + } + /// This function will start tokio runtime and will finish once the /// `System::stop()` message get called. /// Function `f` get called within tokio runtime context. @@ -79,6 +87,22 @@ impl Builder { self.create_runtime(f).run() } + fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner + { + let (stop_tx, stop) = channel(); + let (sys_sender, sys_receiver) = unbounded(); + + let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + + // system arbiter + let arb = SystemArbiter::new(stop_tx, sys_receiver); + + // start the system arbiter + executor.spawn(arb).expect("could not start system arbiter"); + + AsyncSystemRunner { stop, system } + } + fn create_runtime(self, f: F) -> SystemRunner where F: FnOnce() + 'static, @@ -127,6 +151,48 @@ impl Builder { } } +#[derive(Debug)] +pub struct AsyncSystemRunner { + stop: Receiver, + system: System, +} + +impl AsyncSystemRunner { + /// This function will start event loop and returns a future that + /// resolves once the `System::stop()` function is called. + pub fn run_nonblocking(self) -> Box + Send + 'static> { + let AsyncSystemRunner { stop, .. } = self; + + // run loop + Box::new(future::ok(()) + .and_then(|_| { + Arbiter::run_system(); + future::ok(()) + }). + and_then(|_| { + stop.then(|res| { + match res { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } + }) + }).then(|result| { + Arbiter::stop_system(); + result + }) + ) + } +} + /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] #[derive(Debug)] diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index aaf15c7c..7bead2a3 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -2,10 +2,11 @@ use std::cell::RefCell; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio_current_thread::Handle; use futures::sync::mpsc::UnboundedSender; use crate::arbiter::{Arbiter, SystemCommand}; -use crate::builder::{Builder, SystemRunner}; +use crate::builder::{Builder, SystemRunner, AsyncSystemRunner}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -55,6 +56,14 @@ impl System { Self::builder().name(name).build() } + #[allow(clippy::new_ret_no_self)] + /// Create new system. + /// + /// This method panics if it can not create tokio runtime + pub fn new_async>(name: T, executor: Handle) -> AsyncSystemRunner { + Self::builder().name(name).build_async(executor) + } + /// Get current running system. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() {