From 048314913cfd9bd22d40bddd2f84e6971fa7db5b Mon Sep 17 00:00:00 2001 From: George Hahn Date: Thu, 23 May 2019 13:34:47 -0500 Subject: [PATCH 1/3] Enable System to be executed on an external CurrentThread runtime --- actix-rt/src/builder.rs | 68 ++++++++++++++++++++++++++++++++++++++++- actix-rt/src/system.rs | 11 ++++++- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 49e8c1f5..ef33528f 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,11 +1,12 @@ use std::borrow::Cow; use std::io; +use futures::future; use futures::future::{lazy, Future}; use futures::sync::mpsc::unbounded; use futures::sync::oneshot::{channel, Receiver}; -use tokio_current_thread::CurrentThread; +use tokio_current_thread::{CurrentThread, Handle}; use tokio_reactor::Reactor; use tokio_timer::clock::Clock; use tokio_timer::timer::Timer; @@ -69,6 +70,13 @@ impl Builder { self.create_runtime(|| {}) } + /// Create new System that can run asynchronously. + /// + /// This method panics if it cannot start the system arbiter + pub fn build_async(self, executor: Handle) -> AsyncSystemRunner { + self.create_async_runtime(executor) + } + /// This function will start tokio runtime and will finish once the /// `System::stop()` message get called. /// Function `f` get called within tokio runtime context. @@ -79,6 +87,22 @@ impl Builder { self.create_runtime(f).run() } + fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner + { + let (stop_tx, stop) = channel(); + let (sys_sender, sys_receiver) = unbounded(); + + let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + + // system arbiter + let arb = SystemArbiter::new(stop_tx, sys_receiver); + + // start the system arbiter + executor.spawn(arb).expect("could not start system arbiter"); + + AsyncSystemRunner { stop, system } + } + fn create_runtime(self, f: F) -> SystemRunner where F: FnOnce() + 'static, @@ -127,6 +151,48 @@ impl Builder { } } +#[derive(Debug)] +pub struct AsyncSystemRunner { + stop: Receiver, + system: System, +} + +impl AsyncSystemRunner { + /// This function will start event loop and returns a future that + /// resolves once the `System::stop()` function is called. + pub fn run_nonblocking(self) -> Box + Send + 'static> { + let AsyncSystemRunner { stop, .. } = self; + + // run loop + Box::new(future::ok(()) + .and_then(|_| { + Arbiter::run_system(); + future::ok(()) + }). + and_then(|_| { + stop.then(|res| { + match res { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } + }) + }).then(|result| { + Arbiter::stop_system(); + result + }) + ) + } +} + /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] #[derive(Debug)] diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index aaf15c7c..7bead2a3 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -2,10 +2,11 @@ use std::cell::RefCell; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio_current_thread::Handle; use futures::sync::mpsc::UnboundedSender; use crate::arbiter::{Arbiter, SystemCommand}; -use crate::builder::{Builder, SystemRunner}; +use crate::builder::{Builder, SystemRunner, AsyncSystemRunner}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -55,6 +56,14 @@ impl System { Self::builder().name(name).build() } + #[allow(clippy::new_ret_no_self)] + /// Create new system. + /// + /// This method panics if it can not create tokio runtime + pub fn new_async>(name: T, executor: Handle) -> AsyncSystemRunner { + Self::builder().name(name).build_async(executor) + } + /// Get current running system. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() { From c4f05e033f9b15a0774dc66160e750677dcc7dd4 Mon Sep 17 00:00:00 2001 From: George Hahn Date: Fri, 24 May 2019 10:29:52 -0500 Subject: [PATCH 2/3] fixup: fix `new_async` doc comment --- actix-rt/src/system.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 7bead2a3..8d8a873f 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -57,9 +57,9 @@ impl System { } #[allow(clippy::new_ret_no_self)] - /// Create new system. + /// Create new system using provided CurrentThread Handle. /// - /// This method panics if it can not create tokio runtime + /// This method panics if it can not spawn system arbiter pub fn new_async>(name: T, executor: Handle) -> AsyncSystemRunner { Self::builder().name(name).build_async(executor) } From 9e61f628711cd013aac351d825ecfdc3e60132fa Mon Sep 17 00:00:00 2001 From: George Hahn Date: Wed, 5 Jun 2019 12:36:04 -0500 Subject: [PATCH 3/3] `new_async` -> `run_in_executor` and return future directly + builder cleanup --- actix-rt/src/builder.rs | 47 ++++++++++++++++++----------------------- actix-rt/src/system.rs | 15 +++++++++---- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index ef33528f..e9d2c8f1 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -73,7 +73,7 @@ impl Builder { /// Create new System that can run asynchronously. /// /// This method panics if it cannot start the system arbiter - pub fn build_async(self, executor: Handle) -> AsyncSystemRunner { + pub(crate) fn build_async(self, executor: Handle) -> AsyncSystemRunner { self.create_async_runtime(executor) } @@ -87,8 +87,7 @@ impl Builder { self.create_runtime(f).run() } - fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner - { + fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); @@ -152,7 +151,7 @@ impl Builder { } #[derive(Debug)] -pub struct AsyncSystemRunner { +pub(crate) struct AsyncSystemRunner { stop: Receiver, system: System, } @@ -160,36 +159,30 @@ pub struct AsyncSystemRunner { impl AsyncSystemRunner { /// This function will start event loop and returns a future that /// resolves once the `System::stop()` function is called. - pub fn run_nonblocking(self) -> Box + Send + 'static> { + pub(crate) fn run_nonblocking(self) -> Box + Send + 'static> { let AsyncSystemRunner { stop, .. } = self; // run loop - Box::new(future::ok(()) - .and_then(|_| { - Arbiter::run_system(); - future::ok(()) - }). - and_then(|_| { - stop.then(|res| { - match res { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + Box::new(future::lazy(|| { + Arbiter::run_system(); + stop.then(|res| match res { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) } - }) - }).then(|result| { + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + }) + .then(|result| { Arbiter::stop_system(); result }) - ) + })) } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 8d8a873f..62179369 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -2,11 +2,12 @@ use std::cell::RefCell; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use tokio_current_thread::Handle; use futures::sync::mpsc::UnboundedSender; +use futures::Future; +use tokio_current_thread::Handle; use crate::arbiter::{Arbiter, SystemCommand}; -use crate::builder::{Builder, SystemRunner, AsyncSystemRunner}; +use crate::builder::{AsyncSystemRunner, Builder, SystemRunner}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -60,8 +61,14 @@ impl System { /// Create new system using provided CurrentThread Handle. /// /// This method panics if it can not spawn system arbiter - pub fn new_async>(name: T, executor: Handle) -> AsyncSystemRunner { - Self::builder().name(name).build_async(executor) + pub fn run_in_executor>( + name: T, + executor: Handle, + ) -> Box + Send + 'static> { + Self::builder() + .name(name) + .build_async(executor) + .run_nonblocking() } /// Get current running system.