diff --git a/actix-threadpool/CHANGES.md b/actix-threadpool/CHANGES.md index 3ba82ee9..cce89692 100644 --- a/actix-threadpool/CHANGES.md +++ b/actix-threadpool/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.3.0] - 2019-12-02 + +### Changed + +* Expect `Result` type as a function return type + ## [0.2.0] - 2019-11-21 ### Changed diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index ee431969..90fafda0 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-threadpool" -version = "0.2.0" +version = "0.3.0" authors = ["Nikolay Kim "] description = "Actix thread pool for sync code" keywords = ["actix", "network", "framework", "async", "futures"] @@ -18,7 +18,7 @@ name = "actix_threadpool" path = "src/lib.rs" [dependencies] -derive_more = "0.99" +derive_more = "0.99.2" futures = "0.3.1" parking_lot = "0.9" lazy_static = "1.2" diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index 62a34abe..ada9103b 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,15 +1,15 @@ //! Thread pool for blocking operations +use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use derive_more::Display; use futures::channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; -pub use futures::channel::oneshot::Canceled; - /// Env variable for default cpu pool size. const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; @@ -39,12 +39,22 @@ thread_local! { }; } +/// Blocking operation execution error +#[derive(Debug, Display)] +pub enum BlockingError { + #[display(fmt = "{:?}", _0)] + Error(E), + #[display(fmt = "Thread pool is gone")] + Canceled, +} + /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. -pub fn run(f: F) -> CpuFuture +pub fn run(f: F) -> CpuFuture where - F: FnOnce() -> I + Send + 'static, + F: FnOnce() -> Result + Send + 'static, I: Send + 'static, + E: Send + fmt::Debug + 'static, { let (tx, rx) = oneshot::channel(); POOL.with(|pool| { @@ -60,16 +70,18 @@ where /// Blocking operation completion future. It resolves with results /// of blocking function execution. -pub struct CpuFuture { - rx: oneshot::Receiver, +pub struct CpuFuture { + rx: oneshot::Receiver>, } -impl Future for CpuFuture { - type Output = Result; +impl Future for CpuFuture { + type Output = Result>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let rx = Pin::new(&mut Pin::get_mut(self).rx); - let res = futures::ready!(rx.poll(cx)); - Poll::Ready(res.map_err(|_| Canceled)) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let rx = Pin::new(&mut self.rx); + let res = futures::ready!(rx.poll(cx)) + .map_err(|_| BlockingError::Canceled) + .and_then(|res| res.map_err(BlockingError::Error)); + Poll::Ready(res) } }