From ad38973767f38eba50cd52bb37dc1a0919185045 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 22 Feb 2022 08:45:28 +0000 Subject: [PATCH] move blocking error to web (#2660) --- actix-files/src/chunked.rs | 5 ++--- actix-http/CHANGES.md | 4 ++++ actix-http/src/encoding/decoder.rs | 16 +++++++++++++--- actix-http/src/encoding/encoder.rs | 14 ++++++++------ actix-http/src/error.rs | 23 ++++------------------- actix-http/tests/test_server.rs | 3 ++- actix-web/CHANGES.md | 1 + actix-web/src/error/error.rs | 1 - actix-web/src/error/mod.rs | 10 +++++++++- actix-web/src/error/response_error.rs | 20 +++++++++----------- actix-web/src/http/mod.rs | 1 - 11 files changed, 52 insertions(+), 46 deletions(-) diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index 3ee2ee072..241b4dccb 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -81,7 +81,7 @@ async fn chunked_read_file_callback( ) -> Result<(File, Bytes), Error> { use io::{Read as _, Seek as _}; - let res = actix_web::rt::task::spawn_blocking(move || { + let res = actix_web::web::block(move || { let mut buf = Vec::with_capacity(max_bytes); file.seek(io::SeekFrom::Start(offset))?; @@ -94,8 +94,7 @@ async fn chunked_read_file_callback( Ok((file, Bytes::from(buf))) } }) - .await - .map_err(|_| actix_web::error::BlockingError)??; + .await??; Ok(res) } diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 97ea7dd94..0561e82fc 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,10 @@ # Changes ## Unreleased - 2021-xx-xx +### Removed +- `error::BlockingError` [#2660] + +[#2660]: https://github.com/actix/actix-web/pull/2660 ## 3.0.0-rc.4 - 2022-02-22 diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index 2ed7be899..06b672fd8 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -19,7 +19,7 @@ use zstd::stream::write::Decoder as ZstdDecoder; use crate::{ encoding::Writer, - error::{BlockingError, PayloadError}, + error::PayloadError, header::{ContentEncoding, HeaderMap, CONTENT_ENCODING}, }; @@ -47,14 +47,17 @@ where ContentEncoding::Brotli => Some(ContentDecoder::Brotli(Box::new( brotli::DecompressorWriter::new(Writer::new(), 8_096), ))), + #[cfg(feature = "compress-gzip")] ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new( ZlibDecoder::new(Writer::new()), ))), + #[cfg(feature = "compress-gzip")] ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(GzDecoder::new( Writer::new(), )))), + #[cfg(feature = "compress-zstd")] ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new( ZstdDecoder::new(Writer::new()).expect( @@ -98,8 +101,12 @@ where loop { if let Some(ref mut fut) = this.fut { - let (chunk, decoder) = - ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; + let (chunk, decoder) = ready!(Pin::new(fut).poll(cx)).map_err(|_| { + PayloadError::Io(io::Error::new( + io::ErrorKind::Other, + "Blocking task was cancelled unexpectedly", + )) + })??; *this.decoder = Some(decoder); this.fut.take(); @@ -159,10 +166,13 @@ where enum ContentDecoder { #[cfg(feature = "compress-gzip")] Deflate(Box>), + #[cfg(feature = "compress-gzip")] Gzip(Box>), + #[cfg(feature = "compress-brotli")] Brotli(Box>), + // We need explicit 'static lifetime here because ZstdDecoder need lifetime // argument, and we use `spawn_blocking` in `Decoder::poll_next` that require `FnOnce() -> R + Send + 'static` #[cfg(feature = "compress-zstd")] diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 2f104ee8f..0c81ffe1b 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -23,7 +23,6 @@ use zstd::stream::write::Encoder as ZstdEncoder; use super::Writer; use crate::{ body::{self, BodySize, MessageBody}, - error::BlockingError, header::{self, ContentEncoding, HeaderValue, CONTENT_ENCODING}, ResponseHead, StatusCode, }; @@ -173,7 +172,12 @@ where if let Some(ref mut fut) = this.fut { let mut encoder = ready!(Pin::new(fut).poll(cx)) - .map_err(|_| EncoderError::Blocking(BlockingError))? + .map_err(|_| { + EncoderError::Io(io::Error::new( + io::ErrorKind::Other, + "Blocking task was cancelled unexpectedly", + )) + })? .map_err(EncoderError::Io)?; let chunk = encoder.take(); @@ -400,12 +404,11 @@ fn new_brotli_compressor() -> Box> { #[derive(Debug, Display)] #[non_exhaustive] pub enum EncoderError { + /// Wrapped body stream error. #[display(fmt = "body")] Body(Box), - #[display(fmt = "blocking")] - Blocking(BlockingError), - + /// Generic I/O error. #[display(fmt = "io")] Io(io::Error), } @@ -414,7 +417,6 @@ impl StdError for EncoderError { fn source(&self) -> Option<&(dyn StdError + 'static)> { match self { EncoderError::Body(err) => Some(&**err), - EncoderError::Blocking(err) => Some(err), EncoderError::Io(err) => Some(err), } } diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 2802d57a4..3fce0a60b 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -51,7 +51,7 @@ impl Error { Self::new(Kind::SendResponse) } - #[allow(unused)] // reserved for future use (TODO: remove allow when being used) + #[allow(unused)] // available for future use pub(crate) fn new_io() -> Self { Self::new(Kind::Io) } @@ -252,12 +252,6 @@ impl From for Response { } } -/// A set of errors that can occur running blocking tasks in thread pool. -#[derive(Debug, Display, Error)] -#[display(fmt = "Blocking thread pool is gone")] -// TODO: non-exhaustive -pub struct BlockingError; - /// A set of errors that can occur during payload parsing. #[derive(Debug, Display)] #[non_exhaustive] @@ -295,13 +289,13 @@ impl std::error::Error for PayloadError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { PayloadError::Incomplete(None) => None, - PayloadError::Incomplete(Some(err)) => Some(err as &dyn std::error::Error), + PayloadError::Incomplete(Some(err)) => Some(err), PayloadError::EncodingCorrupted => None, PayloadError::Overflow => None, PayloadError::UnknownLength => None, #[cfg(feature = "http2")] - PayloadError::Http2Payload(err) => Some(err as &dyn std::error::Error), - PayloadError::Io(err) => Some(err as &dyn std::error::Error), + PayloadError::Http2Payload(err) => Some(err), + PayloadError::Io(err) => Some(err), } } } @@ -325,15 +319,6 @@ impl From for PayloadError { } } -impl From for PayloadError { - fn from(_: BlockingError) -> Self { - PayloadError::Io(io::Error::new( - io::ErrorKind::Other, - "Operation is canceled", - )) - } -} - impl From for Error { fn from(err: PayloadError) -> Self { Self::new_payload().with_cause(err) diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index 1b5de3425..e8d103c96 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -850,7 +850,8 @@ async fn not_modified_spec_h1() { Some(&header::HeaderValue::from_static("4")), ); // server does not prevent payload from being sent but clients may choose not to read it - // TODO: this is probably a bug, especially since CL header can differ in length from the body + // TODO: this is probably a bug in the client, especially since CL header can differ in length + // from the body assert!(!srv.load_body(res).await.unwrap().is_empty()); // TODO: add stream response tests diff --git a/actix-web/CHANGES.md b/actix-web/CHANGES.md index afdc28b6c..ff4823149 100644 --- a/actix-web/CHANGES.md +++ b/actix-web/CHANGES.md @@ -1,6 +1,7 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed - Rename `test::{simple_service => status_service}`. [#2659] [#2659]: https://github.com/actix/actix-web/pull/2659 diff --git a/actix-web/src/error/error.rs b/actix-web/src/error/error.rs index 8450bed35..3d3978dde 100644 --- a/actix-web/src/error/error.rs +++ b/actix-web/src/error/error.rs @@ -47,7 +47,6 @@ impl fmt::Debug for Error { impl StdError for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - // TODO: populate if replacement for Box is found None } } diff --git a/actix-web/src/error/mod.rs b/actix-web/src/error/mod.rs index 64df9f553..6095cd5d2 100644 --- a/actix-web/src/error/mod.rs +++ b/actix-web/src/error/mod.rs @@ -6,7 +6,7 @@ // // See pub use actix_http::error::{ - BlockingError, ContentTypeError, DispatchError, HttpError, ParseError, PayloadError, + ContentTypeError, DispatchError, HttpError, ParseError, PayloadError, }; use derive_more::{Display, Error, From}; @@ -33,6 +33,14 @@ pub(crate) use macros::{downcast_dyn, downcast_get_type_id}; /// This type alias is generally used to avoid writing out `actix_http::Error` directly. pub type Result = std::result::Result; +/// An error representing a problem running a blocking task on a thread pool. +#[derive(Debug, Display, Error)] +#[display(fmt = "Blocking thread pool is shut down unexpectedly")] +#[non_exhaustive] +pub struct BlockingError; + +impl ResponseError for crate::error::BlockingError {} + /// Errors which can occur when attempting to generate resource uri. #[derive(Debug, PartialEq, Display, Error, From)] #[non_exhaustive] diff --git a/actix-web/src/error/response_error.rs b/actix-web/src/error/response_error.rs index e0b4af44c..0b8a82ce8 100644 --- a/actix-web/src/error/response_error.rs +++ b/actix-web/src/error/response_error.rs @@ -6,20 +6,22 @@ use std::{ io::{self, Write as _}, }; -use actix_http::{ - body::BoxBody, - header::{self, TryIntoHeaderValue}, - Response, StatusCode, -}; +use actix_http::Response; use bytes::BytesMut; use crate::{ + body::BoxBody, error::{downcast_dyn, downcast_get_type_id}, - helpers, HttpResponse, + helpers, + http::{ + header::{self, TryIntoHeaderValue}, + StatusCode, + }, + HttpResponse, }; /// Errors that can generate responses. -// TODO: add std::error::Error bound when replacement for Box is found +// TODO: flesh out documentation pub trait ResponseError: fmt::Debug + fmt::Display { /// Returns appropriate status code for error. /// @@ -73,7 +75,6 @@ impl ResponseError for std::str::Utf8Error { impl ResponseError for std::io::Error { fn status_code(&self) -> StatusCode { - // TODO: decide if these errors should consider not found or permission errors match self.kind() { io::ErrorKind::NotFound => StatusCode::NOT_FOUND, io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN, @@ -86,7 +87,6 @@ impl ResponseError for actix_http::error::HttpError {} impl ResponseError for actix_http::Error { fn status_code(&self) -> StatusCode { - // TODO: map error kinds to status code better StatusCode::INTERNAL_SERVER_ERROR } @@ -107,8 +107,6 @@ impl ResponseError for actix_http::error::ParseError { } } -impl ResponseError for actix_http::error::BlockingError {} - impl ResponseError for actix_http::error::PayloadError { fn status_code(&self) -> StatusCode { match *self { diff --git a/actix-web/src/http/mod.rs b/actix-web/src/http/mod.rs index 91c0ca377..2866e1a2c 100644 --- a/actix-web/src/http/mod.rs +++ b/actix-web/src/http/mod.rs @@ -2,5 +2,4 @@ pub mod header; -// TODO: figure out how best to expose http::Error vs actix_http::Error pub use actix_http::{uri, ConnectionType, Error, KeepAlive, Method, StatusCode, Uri, Version};