From 9e61f628711cd013aac351d825ecfdc3e60132fa Mon Sep 17 00:00:00 2001 From: George Hahn Date: Wed, 5 Jun 2019 12:36:04 -0500 Subject: [PATCH] `new_async` -> `run_in_executor` and return future directly + builder cleanup --- actix-rt/src/builder.rs | 47 ++++++++++++++++++----------------------- actix-rt/src/system.rs | 15 +++++++++---- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index ef33528f..e9d2c8f1 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -73,7 +73,7 @@ impl Builder { /// Create new System that can run asynchronously. /// /// This method panics if it cannot start the system arbiter - pub fn build_async(self, executor: Handle) -> AsyncSystemRunner { + pub(crate) fn build_async(self, executor: Handle) -> AsyncSystemRunner { self.create_async_runtime(executor) } @@ -87,8 +87,7 @@ impl Builder { self.create_runtime(f).run() } - fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner - { + fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); @@ -152,7 +151,7 @@ impl Builder { } #[derive(Debug)] -pub struct AsyncSystemRunner { +pub(crate) struct AsyncSystemRunner { stop: Receiver, system: System, } @@ -160,36 +159,30 @@ pub struct AsyncSystemRunner { impl AsyncSystemRunner { /// This function will start event loop and returns a future that /// resolves once the `System::stop()` function is called. - pub fn run_nonblocking(self) -> Box + Send + 'static> { + pub(crate) fn run_nonblocking(self) -> Box + Send + 'static> { let AsyncSystemRunner { stop, .. } = self; // run loop - Box::new(future::ok(()) - .and_then(|_| { - Arbiter::run_system(); - future::ok(()) - }). - and_then(|_| { - stop.then(|res| { - match res { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + Box::new(future::lazy(|| { + Arbiter::run_system(); + stop.then(|res| match res { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) } - }) - }).then(|result| { + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + }) + .then(|result| { Arbiter::stop_system(); result }) - ) + })) } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 8d8a873f..62179369 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -2,11 +2,12 @@ use std::cell::RefCell; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use tokio_current_thread::Handle; use futures::sync::mpsc::UnboundedSender; +use futures::Future; +use tokio_current_thread::Handle; use crate::arbiter::{Arbiter, SystemCommand}; -use crate::builder::{Builder, SystemRunner, AsyncSystemRunner}; +use crate::builder::{AsyncSystemRunner, Builder, SystemRunner}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -60,8 +61,14 @@ impl System { /// Create new system using provided CurrentThread Handle. /// /// This method panics if it can not spawn system arbiter - pub fn new_async>(name: T, executor: Handle) -> AsyncSystemRunner { - Self::builder().name(name).build_async(executor) + pub fn run_in_executor>( + name: T, + executor: Handle, + ) -> Box + Send + 'static> { + Self::builder() + .name(name) + .build_async(executor) + .run_nonblocking() } /// Get current running system.