//! Thread pool for blocking operations use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use derive_more::Display; use futures_channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; /// Env variable for default cpu pool size. const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; lazy_static::lazy_static! { pub(crate) static ref DEFAULT_POOL: Mutex = { let num = std::env::var(ENV_CPU_POOL_VAR) .map_err(|_| ()) .and_then(|val| { val.parse().map_err(|_| log::warn!( "Can not parse {} value, using default", ENV_CPU_POOL_VAR, )) }) .unwrap_or_else(|_| num_cpus::get() * 5); Mutex::new( threadpool::Builder::new() .thread_name("actix-web".to_owned()) .num_threads(num) .build(), ) }; } thread_local! { static POOL: ThreadPool = { DEFAULT_POOL.lock().clone() }; } /// Blocking operation execution error #[derive(Debug, Display)] pub enum BlockingError { #[display(fmt = "{:?}", _0)] Error(E), #[display(fmt = "Thread pool is gone")] Canceled, } impl std::error::Error for BlockingError {} /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. pub fn run(f: F) -> CpuFuture where F: FnOnce() -> Result + Send + 'static, I: Send + 'static, E: Send + fmt::Debug + 'static, { let (tx, rx) = oneshot::channel(); POOL.with(|pool| { pool.execute(move || { if !tx.is_canceled() { let _ = tx.send(f()); } }) }); CpuFuture { rx } } /// Blocking operation completion future. It resolves with results /// of blocking function execution. pub struct CpuFuture { rx: oneshot::Receiver>, } impl Future for CpuFuture { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let rx = Pin::new(&mut self.rx); let res = match rx.poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(res) => res .map_err(|_| BlockingError::Canceled) .and_then(|res| res.map_err(BlockingError::Error)), }; Poll::Ready(res) } }