From f1d4bcef4b3788d5f98c86b1f3714d919dc498d7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 11 Mar 2019 22:51:17 -0700 Subject: [PATCH] add Arbiter::exec_fn and exec functions --- actix-rt/CHANGES.md | 9 +++++++ actix-rt/Cargo.toml | 2 +- actix-rt/src/arbiter.rs | 55 +++++++++++++++++++++++++++++++++++------ actix-rt/src/system.rs | 10 ++++++++ 4 files changed, 67 insertions(+), 9 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 83d2b73f..105bbfb8 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,5 +1,14 @@ # Changes +## [0.2.1] - 2019-03-11 + +### Added + +* Arbiter::exec_fn - execute fn on the arbiter's thread + +* Arbiter::exec - execute fn on the arbiter's thread and wait result + + ## [0.2.0] - 2019-03-06 * `run` method returns `io::Result<()>` diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index f89eda05..b60935ef 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-rt" -version = "0.2.0" +version = "0.2.1" authors = ["Nikolay Kim "] description = "Actix runtime" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index fab71c64..66143e15 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, thread}; use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::sync::oneshot::{channel, Sender}; +use futures::sync::oneshot::{channel, Canceled, Sender}; use futures::{future, Async, Future, IntoFuture, Poll, Stream}; use tokio_current_thread::spawn; @@ -22,6 +22,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) enum ArbiterCommand { Stop, Execute(Box + Send>), + ExecuteFn(Box), } impl fmt::Debug for ArbiterCommand { @@ -29,6 +30,7 @@ impl fmt::Debug for ArbiterCommand { match self { ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"), ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), + ArbiterCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"), } } } @@ -158,6 +160,35 @@ impl Arbiter { .0 .unbounded_send(ArbiterCommand::Execute(Box::new(future))); } + + /// Send a function to the arbiter's thread and exeute. + pub fn exec_fn(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let _ = self + .0 + .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { + let _ = f(); + }))); + } + + /// Send a function to the arbiter's thread, exeute and return result. + pub fn exec(&self, f: F) -> impl Future + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = channel(); + let _ = self + .0 + .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { + if !tx.is_canceled() { + let _ = tx.send(f()); + } + }))); + rx + } } struct ArbiterController { @@ -194,6 +225,9 @@ impl Future for ArbiterController { ArbiterCommand::Execute(fut) => { spawn(fut); } + ArbiterCommand::ExecuteFn(f) => { + f.call_box(); + } }, Ok(Async::NotReady) => return Ok(Async::NotReady), } @@ -257,11 +291,16 @@ impl Future for SystemArbiter { } } -// /// Execute function in arbiter's thread -// impl Handler> for SystemArbiter { -// type Result = Result; +pub trait FnExec: Send + 'static { + fn call_box(self: Box); +} -// fn handle(&mut self, msg: Execute, _: &mut Context) -> Result { -// msg.exec() -// } -// } +impl FnExec for F +where + F: FnOnce() + Send + 'static, +{ + #[cfg_attr(feature = "cargo-clippy", allow(boxed_local))] + fn call_box(self: Box) { + (*self)() + } +} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 63091f73..7fe31448 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,14 +1,18 @@ use std::cell::RefCell; use std::io; +use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; use futures::sync::mpsc::UnboundedSender; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; +static SYSTEM_COUNT: AtomicUsize = ATOMIC_USIZE_INIT; + /// System is a runtime manager. #[derive(Clone, Debug)] pub struct System { + id: usize, sys: UnboundedSender, arbiter: Arbiter, stop_on_panic: bool, @@ -29,6 +33,7 @@ impl System { sys, arbiter, stop_on_panic, + id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; System::set_current(sys.clone()); sys @@ -82,6 +87,11 @@ impl System { }) } + /// System id + pub fn id(&self) -> usize { + self.id + } + /// Stop the system pub fn stop(&self) { self.stop_with_code(0)