diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 105bbfb8..a0736672 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -4,11 +4,12 @@ ### Added +* Added `blocking` module + * Arbiter::exec_fn - execute fn on the arbiter's thread * Arbiter::exec - execute fn on the arbiter's thread and wait result - ## [0.2.0] - 2019-03-06 * `run` method returns `io::Result<()>` diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index b60935ef..9ab4db6d 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -18,9 +18,14 @@ name = "actix_rt" path = "src/lib.rs" [dependencies] -log = "0.4" bytes = "0.4" +derive_more = "0.14" futures = "0.1.25" +parking_lot = "0.7" +lazy_static = "1.2" +log = "0.4" +num_cpus = "1.10" +threadpool = "1.7" tokio-current-thread = "0.1" tokio-executor = "0.1.5" tokio-reactor = "0.1.7" diff --git a/actix-rt/src/blocking.rs b/actix-rt/src/blocking.rs new file mode 100644 index 00000000..e209a3e2 --- /dev/null +++ b/actix-rt/src/blocking.rs @@ -0,0 +1,88 @@ +//! Thread pool for blocking operations + +use std::fmt; + +use derive_more::Display; +use futures::sync::oneshot; +use futures::{Async, Future, Poll}; +use parking_lot::Mutex; +use threadpool::ThreadPool; + +/// Env variable for default cpu pool size +const ENV_CPU_POOL_VAR: &str = "ACTIX_CPU_POOL"; + +lazy_static::lazy_static! { + pub(crate) static ref DEFAULT_POOL: Mutex = { + let default = match std::env::var(ENV_CPU_POOL_VAR) { + Ok(val) => { + if let Ok(val) = val.parse() { + val + } else { + log::error!("Can not parse ACTIX_CPU_POOL value"); + num_cpus::get() * 5 + } + } + Err(_) => num_cpus::get() * 5, + }; + Mutex::new( + threadpool::Builder::new() + .thread_name("actix-web".to_owned()) + .num_threads(default) + .build(), + ) + }; +} + +thread_local! { + static POOL: ThreadPool = { + DEFAULT_POOL.lock().clone() + }; +} + +/// Blocking operation execution error +#[derive(Debug, Display)] +pub enum BlockingError { + #[display(fmt = "{:?}", _0)] + Error(E), + #[display(fmt = "Thread pool is gone")] + Canceled, +} + +/// Execute blocking function on a thread pool, returns future that resolves +/// to result of the function execution. +pub fn run(f: F) -> CpuFuture +where + F: FnOnce() -> Result + Send + 'static, + I: Send + 'static, + E: Send + fmt::Debug + 'static, +{ + let (tx, rx) = oneshot::channel(); + POOL.with(|pool| { + pool.execute(move || { + if !tx.is_canceled() { + let _ = tx.send(f()); + } + }) + }); + + CpuFuture { rx } +} + +/// Blocking operation completion future. It resolves with results +/// of blocking function execution. +pub struct CpuFuture { + rx: oneshot::Receiver>, +} + +impl Future for CpuFuture { + type Item = I; + type Error = BlockingError; + + fn poll(&mut self) -> Poll { + let res = futures::try_ready!(self.rx.poll().map_err(|_| BlockingError::Canceled)); + match res { + Ok(val) => Ok(Async::Ready(val)), + Err(err) => Err(BlockingError::Error(err)), + } + } +} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 3bfbb7b2..2c585fb5 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -1,6 +1,7 @@ //! A runtime implementation that runs everything on the current thread. mod arbiter; +pub mod blocking; mod builder; mod runtime; mod system;