diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index 07fc0063..b9240009 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -17,8 +17,8 @@ use v_htmlescape::escape as escape_html_entity; use actix_service::{boxed::BoxedNewService, NewService, Service}; use actix_web::dev::{ - CpuFuture, HttpServiceFactory, ResourceDef, ServiceConfig, ServiceFromRequest, - ServiceRequest, ServiceResponse, + HttpServiceFactory, ResourceDef, ServiceConfig, ServiceFromRequest, ServiceRequest, + ServiceResponse, }; use actix_web::error::{BlockingError, Error, ErrorInternalServerError}; use actix_web::{web, FromRequest, HttpRequest, HttpResponse, Responder}; @@ -52,7 +52,7 @@ pub struct ChunkedReadFile { size: u64, offset: u64, file: Option, - fut: Option>, + fut: Option>>>, counter: u64, } @@ -89,7 +89,7 @@ impl Stream for ChunkedReadFile { Ok(Async::Ready(None)) } else { let mut file = self.file.take().expect("Use after completion"); - self.fut = Some(web::block(move || { + self.fut = Some(Box::new(web::block(move || { let max_bytes: usize; max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; let mut buf = Vec::with_capacity(max_bytes); @@ -100,7 +100,7 @@ impl Stream for ChunkedReadFile { return Err(io::ErrorKind::UnexpectedEof.into()); } Ok((file, Bytes::from(buf))) - })); + }))); self.poll() } } diff --git a/src/error.rs b/src/error.rs index 54ca74dc..06840708 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ //! Error and Result module +use std::fmt; pub use actix_http::error::*; use derive_more::{Display, From}; @@ -20,3 +21,23 @@ pub enum UrlGenerationError { /// `InternalServerError` for `UrlGeneratorError` impl ResponseError for UrlGenerationError {} + +/// Blocking operation execution error +#[derive(Debug, Display)] +pub enum BlockingError { + #[display(fmt = "{:?}", _0)] + Error(E), + #[display(fmt = "Thread pool is gone")] + Canceled, +} + +impl ResponseError for BlockingError {} + +impl From> for BlockingError { + fn from(err: actix_rt::blocking::BlockingError) -> Self { + match err { + actix_rt::blocking::BlockingError::Error(e) => BlockingError::Error(e), + actix_rt::blocking::BlockingError::Canceled => BlockingError::Canceled, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 0094818d..d653fd1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,6 @@ pub mod dev { Extensions, Payload, PayloadStream, RequestHead, ResponseHead, }; pub use actix_router::{Path, ResourceDef, ResourcePath, Url}; - pub use actix_rt::blocking::CpuFuture; pub use actix_server::Server; pub(crate) fn insert_slash(path: &str) -> String { @@ -79,8 +78,8 @@ pub mod dev { pub mod web { use actix_http::{http::Method, Response}; - use actix_rt::blocking::{self, CpuFuture}; - use futures::IntoFuture; + use actix_rt::blocking; + use futures::{Future, IntoFuture}; pub use actix_http::Response as HttpResponse; pub use bytes::{Bytes, BytesMut}; @@ -92,7 +91,7 @@ pub mod web { use crate::route::Route; use crate::scope::Scope; - pub use crate::error::Error; + pub use crate::error::{BlockingError, Error}; pub use crate::extract::{Form, Json, Path, Payload, Query}; pub use crate::extract::{FormConfig, JsonConfig, PayloadConfig}; pub use crate::request::HttpRequest; @@ -251,12 +250,12 @@ pub mod web { /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. - pub fn block(f: F) -> CpuFuture + pub fn block(f: F) -> impl Future> where F: FnOnce() -> Result + Send + 'static, I: Send + 'static, E: Send + std::fmt::Debug + 'static, { - blocking::run(f) + blocking::run(f).from_err() } }