From fe392abeb470e9bcf9aec5c8146b95426394ba79 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 10 Jan 2021 00:04:19 +0800 Subject: [PATCH] remove actix-threadpool.use actix_rt::task::spawn_blocking (#1878) --- CHANGES.md | 4 ++- Cargo.toml | 5 ++-- actix-files/Cargo.toml | 4 +-- actix-files/src/chunked.rs | 48 +++++++++++++----------------- actix-files/src/lib.rs | 11 +------ actix-http-test/Cargo.toml | 4 +-- actix-http/CHANGES.md | 5 +++- actix-http/Cargo.toml | 5 ++-- actix-http/src/encoding/decoder.rs | 17 +++++++---- actix-http/src/encoding/encoder.rs | 18 +++++++---- actix-http/src/error.rs | 18 ++++++++--- actix-multipart/Cargo.toml | 2 +- actix-web-actors/Cargo.toml | 2 +- actix-web-codegen/Cargo.toml | 2 +- awc/Cargo.toml | 4 +-- src/web.rs | 5 +++- 16 files changed, 83 insertions(+), 71 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 733f28ca7..70f7705c8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -38,13 +38,15 @@ ### Removed * Public modules `middleware::{normalize, err_handlers}`. All necessary middleware structs are now exposed directly by the `middleware` module. +* Remove `actix-threadpool` as dependency. `actix_threadpool::BlockingError` error type can be imported + from `actix_web::error` module. [#1878] [#1812]: https://github.com/actix/actix-web/pull/1812 [#1813]: https://github.com/actix/actix-web/pull/1813 [#1852]: https://github.com/actix/actix-web/pull/1852 [#1865]: https://github.com/actix/actix-web/pull/1865 [#1875]: https://github.com/actix/actix-web/pull/1875 - +[#1878]: https://github.com/actix/actix-web/pull/1878 ## 3.3.2 - 2020-12-01 ### Fixed diff --git a/Cargo.toml b/Cargo.toml index fb0a876b6..bae6cb6cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,11 +76,10 @@ required-features = ["rustls"] actix-codec = "0.4.0-beta.1" actix-macros = "0.1.0" actix-router = "0.2.4" -actix-rt = "2.0.0-beta.1" +actix-rt = "2.0.0-beta.2" actix-server = "2.0.0-beta.2" -actix-service = "2.0.0-beta.2" +actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" -actix-threadpool = "0.3.1" actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true } actix-web-codegen = "0.4.0" diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index f93450ff8..bde2cb717 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] actix-web = { version = "4.0.0-beta.1", default-features = false } -actix-service = "2.0.0-beta.2" +actix-service = "2.0.0-beta.3" bitflags = "1" bytes = "1" futures-core = { version = "0.3.7", default-features = false } @@ -31,5 +31,5 @@ percent-encoding = "2.1" v_htmlescape = "0.12" [dev-dependencies] -actix-rt = "2.0.0-beta.1" +actix-rt = "2.0.0-beta.2" actix-web = "4.0.0-beta.1" diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index 580b06787..5b7b17dc4 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -8,17 +8,11 @@ use std::{ }; use actix_web::{ - error::{BlockingError, Error}, - web, + error::{Error, ErrorInternalServerError}, + rt::task::{spawn_blocking, JoinHandle}, }; use bytes::Bytes; use futures_core::{ready, Stream}; -use futures_util::future::{FutureExt, LocalBoxFuture}; - -use crate::handle_error; - -type ChunkedBoxFuture = - LocalBoxFuture<'static, Result<(File, Bytes), BlockingError>>; #[doc(hidden)] /// A helper created from a `std::fs::File` which reads the file @@ -27,7 +21,7 @@ pub struct ChunkedReadFile { pub(crate) size: u64, pub(crate) offset: u64, pub(crate) file: Option, - pub(crate) fut: Option, + pub(crate) fut: Option>>, pub(crate) counter: u64, } @@ -45,18 +39,20 @@ impl Stream for ChunkedReadFile { cx: &mut Context<'_>, ) -> Poll> { if let Some(ref mut fut) = self.fut { - return match ready!(Pin::new(fut).poll(cx)) { - Ok((file, bytes)) => { + let res = match ready!(Pin::new(fut).poll(cx)) { + Ok(Ok((file, bytes))) => { self.fut.take(); self.file = Some(file); self.offset += bytes.len() as u64; self.counter += bytes.len() as u64; - Poll::Ready(Some(Ok(bytes))) + Ok(bytes) } - Err(e) => Poll::Ready(Some(Err(handle_error(e)))), + Ok(Err(e)) => Err(e.into()), + Err(_) => Err(ErrorInternalServerError("Unexpected error")), }; + return Poll::Ready(Some(res)); } let size = self.size; @@ -68,25 +64,21 @@ impl Stream for ChunkedReadFile { } else { let mut file = self.file.take().expect("Use after completion"); - self.fut = Some( - web::block(move || { - let max_bytes = - cmp::min(size.saturating_sub(counter), 65_536) as usize; + self.fut = Some(spawn_blocking(move || { + let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - let mut buf = Vec::with_capacity(max_bytes); - file.seek(io::SeekFrom::Start(offset))?; + let mut buf = Vec::with_capacity(max_bytes); + file.seek(io::SeekFrom::Start(offset))?; - let n_bytes = - file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; + let n_bytes = + file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } - Ok((file, Bytes::from(buf))) - }) - .boxed_local(), - ); + Ok((file, Bytes::from(buf))) + })); self.poll_next(cx) } diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index 662fba0a3..b7225fbc0 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -14,12 +14,10 @@ #![deny(rust_2018_idioms)] #![warn(missing_docs, missing_debug_implementations)] -use std::io; - use actix_service::boxed::{BoxService, BoxServiceFactory}; use actix_web::{ dev::{ServiceRequest, ServiceResponse}, - error::{BlockingError, Error, ErrorInternalServerError}, + error::Error, http::header::DispositionType, }; use mime_guess::from_ext; @@ -56,13 +54,6 @@ pub fn file_extension_to_mime(ext: &str) -> mime::Mime { from_ext(ext).first_or_octet_stream() } -pub(crate) fn handle_error(err: BlockingError) -> Error { - match err { - BlockingError::Error(err) => err.into(), - BlockingError::Canceled => ErrorInternalServerError("Unexpected error"), - } -} - type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType; #[cfg(test)] diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index a056b833e..772b60f76 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -29,11 +29,11 @@ default = [] openssl = ["open-ssl", "awc/openssl"] [dependencies] -actix-service = "2.0.0-beta.2" +actix-service = "2.0.0-beta.3" actix-codec = "0.4.0-beta.1" actix-tls = "3.0.0-beta.2" actix-utils = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.1" +actix-rt = "2.0.0-beta.2" actix-server = "2.0.0-beta.2" awc = "3.0.0-beta.1" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 622ed55ea..e9a94300b 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -25,11 +25,14 @@ * Remove `ConnectError::SslHandshakeError` and re-export of `HandshakeError`. due to the removal of this type from `tokio-openssl` crate. openssl handshake error would return as `ConnectError::SslError`. [#1813] +* Remove `actix-threadpool` dependency. Use `actix_rt::task::spawn_blocking`. + Due to this change `actix_threadpool::BlockingError` type is moved into + `actix_http::error` module. [#1878] [#1813]: https://github.com/actix/actix-web/pull/1813 [#1857]: https://github.com/actix/actix-web/pull/1857 [#1864]: https://github.com/actix/actix-web/pull/1864 - +[#1878]: https://github.com/actix/actix-web/pull/1878 ## 2.2.0 - 2020-11-25 ### Added diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index b64c71a8a..0cc8e5cf9 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -40,11 +40,10 @@ secure-cookies = ["cookie/secure"] actors = ["actix"] [dependencies] -actix-service = "2.0.0-beta.2" +actix-service = "2.0.0-beta.3" actix-codec = "0.4.0-beta.1" actix-utils = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.1" -actix-threadpool = "0.3.1" +actix-rt = "2.0.0-beta.2" actix-tls = "3.0.0-beta.2" actix = { version = "0.11.0-beta.1", optional = true } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index b60435859..b26609911 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -3,14 +3,14 @@ use std::io::{self, Write}; use std::pin::Pin; use std::task::{Context, Poll}; -use actix_threadpool::{run, CpuFuture}; +use actix_rt::task::{spawn_blocking, JoinHandle}; use brotli2::write::BrotliDecoder; use bytes::Bytes; use flate2::write::{GzDecoder, ZlibDecoder}; use futures_core::{ready, Stream}; use super::Writer; -use crate::error::PayloadError; +use crate::error::{BlockingError, PayloadError}; use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING}; const INPLACE: usize = 2049; @@ -19,7 +19,7 @@ pub struct Decoder { decoder: Option, stream: S, eof: bool, - fut: Option, ContentDecoder), io::Error>>, + fut: Option, ContentDecoder), io::Error>>>, } impl Decoder @@ -80,8 +80,13 @@ where loop { if let Some(ref mut fut) = self.fut { let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { - Ok(item) => item, - Err(e) => return Poll::Ready(Some(Err(e.into()))), + Ok(Ok(item)) => item, + Ok(Err(e)) => { + return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) + } + Err(_) => { + return Poll::Ready(Some(Err(BlockingError::Canceled.into()))) + } }; self.decoder = Some(decoder); self.fut.take(); @@ -105,7 +110,7 @@ where return Poll::Ready(Some(Ok(chunk))); } } else { - self.fut = Some(run(move || { + self.fut = Some(spawn_blocking(move || { let chunk = decoder.feed_data(chunk)?; Ok((chunk, decoder)) })); diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index eb1821285..28c757076 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -4,7 +4,7 @@ use std::io::{self, Write}; use std::pin::Pin; use std::task::{Context, Poll}; -use actix_threadpool::{run, CpuFuture}; +use actix_rt::task::{spawn_blocking, JoinHandle}; use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::write::{GzEncoder, ZlibEncoder}; @@ -17,6 +17,7 @@ use crate::http::{HeaderValue, StatusCode}; use crate::{Error, ResponseHead}; use super::Writer; +use crate::error::BlockingError; const INPLACE: usize = 1024; @@ -26,7 +27,7 @@ pub struct Encoder { #[pin] body: EncoderBody, encoder: Option, - fut: Option>, + fut: Option>>, } impl Encoder { @@ -136,8 +137,15 @@ impl MessageBody for Encoder { if let Some(ref mut fut) = this.fut { let mut encoder = match ready!(Pin::new(fut).poll(cx)) { - Ok(item) => item, - Err(e) => return Poll::Ready(Some(Err(e.into()))), + Ok(Ok(item)) => item, + Ok(Err(e)) => { + return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) + } + Err(_) => { + return Poll::Ready(Some(Err( + BlockingError::::Canceled.into(), + ))) + } }; let chunk = encoder.take(); *this.encoder = Some(encoder); @@ -160,7 +168,7 @@ impl MessageBody for Encoder { return Poll::Ready(Some(Ok(chunk))); } } else { - *this.fut = Some(run(move || { + *this.fut = Some(spawn_blocking(move || { encoder.write(&chunk)?; Ok(encoder) })); diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 852cf8e5c..a585962be 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -7,7 +7,6 @@ use std::string::FromUtf8Error; use std::{fmt, io, result}; use actix_codec::{Decoder, Encoder}; -pub use actix_threadpool::BlockingError; use actix_utils::dispatcher::DispatcherError as FramedDispatcherError; use actix_utils::timeout::TimeoutError; use bytes::BytesMut; @@ -186,9 +185,6 @@ impl ResponseError for DeError { /// `InternalServerError` for `Canceled` impl ResponseError for Canceled {} -/// `InternalServerError` for `BlockingError` -impl ResponseError for BlockingError {} - /// Return `BAD_REQUEST` for `Utf8Error` impl ResponseError for Utf8Error { fn status_code(&self) -> StatusCode { @@ -300,6 +296,20 @@ impl From for ParseError { } } +/// A set of errors that can occur running blocking tasks in thread pool. +#[derive(Debug, Display)] +pub enum BlockingError { + #[display(fmt = "{:?}", _0)] + Error(E), + #[display(fmt = "Thread pool is gone")] + Canceled, +} + +impl std::error::Error for BlockingError {} + +/// `InternalServerError` for `BlockingError` +impl ResponseError for BlockingError {} + #[derive(Display, Debug)] /// A set of errors that can occur during payload parsing pub enum PayloadError { diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index d22cf7ef0..44a7e8d16 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -28,5 +28,5 @@ mime = "0.3" twoway = "0.2" [dev-dependencies] -actix-rt = "2.0.0-beta.1" +actix-rt = "2.0.0-beta.2" actix-http = "3.0.0-beta.1" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index 331363543..0f90edb07 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -28,6 +28,6 @@ pin-project = "1.0.0" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-rt = "2.0.0-beta.1" +actix-rt = "2.0.0-beta.2" env_logger = "0.7" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index 25e88d9e1..00875cf1b 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -19,7 +19,7 @@ syn = { version = "1", features = ["full", "parsing"] } proc-macro2 = "1" [dev-dependencies] -actix-rt = "2.0.0-beta.1" +actix-rt = "2.0.0-beta.2" actix-web = "4.0.0-beta.1" futures-util = { version = "0.3.7", default-features = false } trybuild = "1" diff --git a/awc/Cargo.toml b/awc/Cargo.toml index b92df8247..0dbf80d33 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -38,9 +38,9 @@ compress = ["actix-http/compress"] [dependencies] actix-codec = "0.4.0-beta.1" -actix-service = "2.0.0-beta.2" +actix-service = "2.0.0-beta.3" actix-http = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.1" +actix-rt = "2.0.0-beta.2" base64 = "0.13" bytes = "1" diff --git a/src/web.rs b/src/web.rs index 39dfc450a..88071f551 100644 --- a/src/web.rs +++ b/src/web.rs @@ -280,5 +280,8 @@ where I: Send + 'static, E: Send + std::fmt::Debug + 'static, { - actix_threadpool::run(f).await + match actix_rt::task::spawn_blocking(f).await { + Ok(res) => res.map_err(BlockingError::Error), + Err(_) => Err(BlockingError::Canceled), + } }