1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 00:01:11 +01:00

Enable System to be executed on an external CurrentThread runtime

This commit is contained in:
George Hahn 2019-05-23 13:34:47 -05:00
parent c1b183e1ce
commit 048314913c
2 changed files with 77 additions and 2 deletions

View File

@ -1,11 +1,12 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::io; use std::io;
use futures::future;
use futures::future::{lazy, Future}; use futures::future::{lazy, Future};
use futures::sync::mpsc::unbounded; use futures::sync::mpsc::unbounded;
use futures::sync::oneshot::{channel, Receiver}; use futures::sync::oneshot::{channel, Receiver};
use tokio_current_thread::CurrentThread; use tokio_current_thread::{CurrentThread, Handle};
use tokio_reactor::Reactor; use tokio_reactor::Reactor;
use tokio_timer::clock::Clock; use tokio_timer::clock::Clock;
use tokio_timer::timer::Timer; use tokio_timer::timer::Timer;
@ -69,6 +70,13 @@ impl Builder {
self.create_runtime(|| {}) self.create_runtime(|| {})
} }
/// Create new System that can run asynchronously.
///
/// This method panics if it cannot start the system arbiter
pub fn build_async(self, executor: Handle) -> AsyncSystemRunner {
self.create_async_runtime(executor)
}
/// This function will start tokio runtime and will finish once the /// This function will start tokio runtime and will finish once the
/// `System::stop()` message get called. /// `System::stop()` message get called.
/// Function `f` get called within tokio runtime context. /// Function `f` get called within tokio runtime context.
@ -79,6 +87,22 @@ impl Builder {
self.create_runtime(f).run() self.create_runtime(f).run()
} }
fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner
{
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
// start the system arbiter
executor.spawn(arb).expect("could not start system arbiter");
AsyncSystemRunner { stop, system }
}
fn create_runtime<F>(self, f: F) -> SystemRunner fn create_runtime<F>(self, f: F) -> SystemRunner
where where
F: FnOnce() + 'static, F: FnOnce() + 'static,
@ -127,6 +151,48 @@ impl Builder {
} }
} }
#[derive(Debug)]
pub struct AsyncSystemRunner {
stop: Receiver<i32>,
system: System,
}
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called.
pub fn run_nonblocking(self) -> Box<Future<Item = (), Error = io::Error> + Send + 'static> {
let AsyncSystemRunner { stop, .. } = self;
// run loop
Box::new(future::ok(())
.and_then(|_| {
Arbiter::run_system();
future::ok(())
}).
and_then(|_| {
stop.then(|res| {
match res {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
})
}).then(|result| {
Arbiter::stop_system();
result
})
)
}
}
/// Helper object that runs System's event loop /// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"] #[must_use = "SystemRunner must be run"]
#[derive(Debug)] #[derive(Debug)]

View File

@ -2,10 +2,11 @@ use std::cell::RefCell;
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_current_thread::Handle;
use futures::sync::mpsc::UnboundedSender; use futures::sync::mpsc::UnboundedSender;
use crate::arbiter::{Arbiter, SystemCommand}; use crate::arbiter::{Arbiter, SystemCommand};
use crate::builder::{Builder, SystemRunner}; use crate::builder::{Builder, SystemRunner, AsyncSystemRunner};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -55,6 +56,14 @@ impl System {
Self::builder().name(name).build() Self::builder().name(name).build()
} }
#[allow(clippy::new_ret_no_self)]
/// Create new system.
///
/// This method panics if it can not create tokio runtime
pub fn new_async<T: Into<String>>(name: T, executor: Handle) -> AsyncSystemRunner {
Self::builder().name(name).build_async(executor)
}
/// Get current running system. /// Get current running system.
pub fn current() -> System { pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() { CURRENT.with(|cell| match *cell.borrow() {