1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-30 17:44:34 +01:00

new_async -> run_in_executor and return future directly + builder cleanup

This commit is contained in:
George Hahn 2019-06-05 12:36:04 -05:00
parent c4f05e033f
commit 9e61f62871
2 changed files with 31 additions and 31 deletions

View File

@ -73,7 +73,7 @@ impl Builder {
/// Create new System that can run asynchronously. /// Create new System that can run asynchronously.
/// ///
/// This method panics if it cannot start the system arbiter /// This method panics if it cannot start the system arbiter
pub fn build_async(self, executor: Handle) -> AsyncSystemRunner { pub(crate) fn build_async(self, executor: Handle) -> AsyncSystemRunner {
self.create_async_runtime(executor) self.create_async_runtime(executor)
} }
@ -87,8 +87,7 @@ impl Builder {
self.create_runtime(f).run() self.create_runtime(f).run()
} }
fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner {
{
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded();
@ -152,7 +151,7 @@ impl Builder {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct AsyncSystemRunner { pub(crate) struct AsyncSystemRunner {
stop: Receiver<i32>, stop: Receiver<i32>,
system: System, system: System,
} }
@ -160,18 +159,13 @@ pub struct AsyncSystemRunner {
impl AsyncSystemRunner { impl AsyncSystemRunner {
/// This function will start event loop and returns a future that /// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called. /// resolves once the `System::stop()` function is called.
pub fn run_nonblocking(self) -> Box<Future<Item = (), Error = io::Error> + Send + 'static> { pub(crate) fn run_nonblocking(self) -> Box<Future<Item = (), Error = io::Error> + Send + 'static> {
let AsyncSystemRunner { stop, .. } = self; let AsyncSystemRunner { stop, .. } = self;
// run loop // run loop
Box::new(future::ok(()) Box::new(future::lazy(|| {
.and_then(|_| {
Arbiter::run_system(); Arbiter::run_system();
future::ok(()) stop.then(|res| match res {
}).
and_then(|_| {
stop.then(|res| {
match res {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -183,13 +177,12 @@ impl AsyncSystemRunner {
} }
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
}) })
}).then(|result| { .then(|result| {
Arbiter::stop_system(); Arbiter::stop_system();
result result
}) })
) }))
} }
} }

View File

@ -2,11 +2,12 @@ use std::cell::RefCell;
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_current_thread::Handle;
use futures::sync::mpsc::UnboundedSender; use futures::sync::mpsc::UnboundedSender;
use futures::Future;
use tokio_current_thread::Handle;
use crate::arbiter::{Arbiter, SystemCommand}; use crate::arbiter::{Arbiter, SystemCommand};
use crate::builder::{Builder, SystemRunner, AsyncSystemRunner}; use crate::builder::{AsyncSystemRunner, Builder, SystemRunner};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -60,8 +61,14 @@ impl System {
/// Create new system using provided CurrentThread Handle. /// Create new system using provided CurrentThread Handle.
/// ///
/// This method panics if it can not spawn system arbiter /// This method panics if it can not spawn system arbiter
pub fn new_async<T: Into<String>>(name: T, executor: Handle) -> AsyncSystemRunner { pub fn run_in_executor<T: Into<String>>(
Self::builder().name(name).build_async(executor) name: T,
executor: Handle,
) -> Box<Future<Item = (), Error = io::Error> + Send + 'static> {
Self::builder()
.name(name)
.build_async(executor)
.run_nonblocking()
} }
/// Get current running system. /// Get current running system.