1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

map BlockingError

This commit is contained in:
Nikolay Kim 2019-03-11 23:19:05 -07:00
parent a2c4639074
commit 7242d96701
3 changed files with 31 additions and 11 deletions

View File

@ -17,8 +17,8 @@ use v_htmlescape::escape as escape_html_entity;
use actix_service::{boxed::BoxedNewService, NewService, Service}; use actix_service::{boxed::BoxedNewService, NewService, Service};
use actix_web::dev::{ use actix_web::dev::{
CpuFuture, HttpServiceFactory, ResourceDef, ServiceConfig, ServiceFromRequest, HttpServiceFactory, ResourceDef, ServiceConfig, ServiceFromRequest, ServiceRequest,
ServiceRequest, ServiceResponse, ServiceResponse,
}; };
use actix_web::error::{BlockingError, Error, ErrorInternalServerError}; use actix_web::error::{BlockingError, Error, ErrorInternalServerError};
use actix_web::{web, FromRequest, HttpRequest, HttpResponse, Responder}; use actix_web::{web, FromRequest, HttpRequest, HttpResponse, Responder};
@ -52,7 +52,7 @@ pub struct ChunkedReadFile {
size: u64, size: u64,
offset: u64, offset: u64,
file: Option<File>, file: Option<File>,
fut: Option<CpuFuture<(File, Bytes), io::Error>>, fut: Option<Box<Future<Item = (File, Bytes), Error = BlockingError<io::Error>>>>,
counter: u64, counter: u64,
} }
@ -89,7 +89,7 @@ impl Stream for ChunkedReadFile {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
let mut file = self.file.take().expect("Use after completion"); let mut file = self.file.take().expect("Use after completion");
self.fut = Some(web::block(move || { self.fut = Some(Box::new(web::block(move || {
let max_bytes: usize; let max_bytes: usize;
max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes); let mut buf = Vec::with_capacity(max_bytes);
@ -100,7 +100,7 @@ impl Stream for ChunkedReadFile {
return Err(io::ErrorKind::UnexpectedEof.into()); return Err(io::ErrorKind::UnexpectedEof.into());
} }
Ok((file, Bytes::from(buf))) Ok((file, Bytes::from(buf)))
})); })));
self.poll() self.poll()
} }
} }

View File

@ -1,4 +1,5 @@
//! Error and Result module //! Error and Result module
use std::fmt;
pub use actix_http::error::*; pub use actix_http::error::*;
use derive_more::{Display, From}; use derive_more::{Display, From};
@ -20,3 +21,23 @@ pub enum UrlGenerationError {
/// `InternalServerError` for `UrlGeneratorError` /// `InternalServerError` for `UrlGeneratorError`
impl ResponseError for UrlGenerationError {} impl ResponseError for UrlGenerationError {}
/// Blocking operation execution error
#[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
impl<E: fmt::Debug> From<actix_rt::blocking::BlockingError<E>> for BlockingError<E> {
fn from(err: actix_rt::blocking::BlockingError<E>) -> Self {
match err {
actix_rt::blocking::BlockingError::Error(e) => BlockingError::Error(e),
actix_rt::blocking::BlockingError::Canceled => BlockingError::Canceled,
}
}
}

View File

@ -65,7 +65,6 @@ pub mod dev {
Extensions, Payload, PayloadStream, RequestHead, ResponseHead, Extensions, Payload, PayloadStream, RequestHead, ResponseHead,
}; };
pub use actix_router::{Path, ResourceDef, ResourcePath, Url}; pub use actix_router::{Path, ResourceDef, ResourcePath, Url};
pub use actix_rt::blocking::CpuFuture;
pub use actix_server::Server; pub use actix_server::Server;
pub(crate) fn insert_slash(path: &str) -> String { pub(crate) fn insert_slash(path: &str) -> String {
@ -79,8 +78,8 @@ pub mod dev {
pub mod web { pub mod web {
use actix_http::{http::Method, Response}; use actix_http::{http::Method, Response};
use actix_rt::blocking::{self, CpuFuture}; use actix_rt::blocking;
use futures::IntoFuture; use futures::{Future, IntoFuture};
pub use actix_http::Response as HttpResponse; pub use actix_http::Response as HttpResponse;
pub use bytes::{Bytes, BytesMut}; pub use bytes::{Bytes, BytesMut};
@ -92,7 +91,7 @@ pub mod web {
use crate::route::Route; use crate::route::Route;
use crate::scope::Scope; use crate::scope::Scope;
pub use crate::error::Error; pub use crate::error::{BlockingError, Error};
pub use crate::extract::{Form, Json, Path, Payload, Query}; pub use crate::extract::{Form, Json, Path, Payload, Query};
pub use crate::extract::{FormConfig, JsonConfig, PayloadConfig}; pub use crate::extract::{FormConfig, JsonConfig, PayloadConfig};
pub use crate::request::HttpRequest; pub use crate::request::HttpRequest;
@ -251,12 +250,12 @@ pub mod web {
/// Execute blocking function on a thread pool, returns future that resolves /// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution. /// to result of the function execution.
pub fn block<F, I, E>(f: F) -> CpuFuture<I, E> pub fn block<F, I, E>(f: F) -> impl Future<Item = I, Error = BlockingError<E>>
where where
F: FnOnce() -> Result<I, E> + Send + 'static, F: FnOnce() -> Result<I, E> + Send + 'static,
I: Send + 'static, I: Send + 'static,
E: Send + std::fmt::Debug + 'static, E: Send + std::fmt::Debug + 'static,
{ {
blocking::run(f) blocking::run(f).from_err()
} }
} }