From 5795850bbb6dce3fd6ac0b8a7b10819a1e2500b9 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 28 Mar 2019 11:08:24 -0700 Subject: [PATCH] decompress payload in cpu threadpool --- actix-http/Cargo.toml | 3 +- actix-http/src/encoding/decoder.rs | 74 ++++++++++++++++++++---------- actix-http/src/error.rs | 40 ++++++++++++++-- awc/src/request.rs | 2 +- src/error.rs | 22 --------- src/lib.rs | 6 +-- src/middleware/decompress.rs | 2 +- 7 files changed, 91 insertions(+), 58 deletions(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index fefe05c4c..cdaeb1fc5 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -53,6 +53,7 @@ actix-codec = "0.1.2" actix-connect = "0.1.0" actix-utils = "0.3.4" actix-server-config = "0.1.0" +actix-threadpool = "0.1.0" base64 = "0.10" bitflags = "1.0" @@ -94,7 +95,7 @@ failure = { version = "0.1.5", optional = true } openssl = { version="0.10", optional = true } [dev-dependencies] -actix-rt = "0.2.1" +actix-rt = "0.2.2" actix-server = { version = "0.4.0", features=["ssl"] } actix-connect = { version = "0.1.0", features=["ssl"] } actix-http-test = { path="../test-server", features=["ssl"] } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index 8be6702fc..ae2b4ae6b 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -1,27 +1,31 @@ use std::io::{self, Write}; -use bytes::Bytes; -use futures::{Async, Poll, Stream}; - +use actix_threadpool::{run, CpuFuture}; #[cfg(feature = "brotli")] use brotli2::write::BrotliDecoder; +use bytes::Bytes; #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] use flate2::write::{GzDecoder, ZlibDecoder}; +use futures::{try_ready, Async, Future, Poll, Stream}; use super::Writer; use crate::error::PayloadError; use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING}; -pub struct Decoder { - stream: T, +pub struct Decoder { decoder: Option, + stream: S, + eof: bool, + fut: Option, ContentDecoder), io::Error>>, } -impl Decoder +impl Decoder where - T: Stream, + S: Stream, { - pub fn new(stream: T, encoding: ContentEncoding) -> Self { + /// Construct a decoder. + #[inline] + pub fn new(stream: S, encoding: ContentEncoding) -> Decoder { let decoder = match encoding { #[cfg(feature = "brotli")] ContentEncoding::Br => Some(ContentDecoder::Br(Box::new( @@ -37,10 +41,17 @@ where ))), _ => None, }; - Decoder { stream, decoder } + Decoder { + decoder, + stream, + fut: None, + eof: false, + } } - pub fn from_headers(headers: &HeaderMap, stream: T) -> Self { + /// Construct decoder based on headers. + #[inline] + pub fn from_headers(stream: S, headers: &HeaderMap) -> Decoder { // check content-encoding let encoding = if let Some(enc) = headers.get(CONTENT_ENCODING) { if let Ok(enc) = enc.to_str() { @@ -56,35 +67,50 @@ where } } -impl Stream for Decoder +impl Stream for Decoder where - T: Stream, + S: Stream, { type Item = Bytes; type Error = PayloadError; fn poll(&mut self) -> Poll, Self::Error> { loop { + if let Some(ref mut fut) = self.fut { + let (chunk, decoder) = try_ready!(fut.poll()); + self.decoder = Some(decoder); + self.fut.take(); + if let Some(chunk) = chunk { + return Ok(Async::Ready(Some(chunk))); + } + } + + if self.eof { + return Ok(Async::Ready(None)); + } + match self.stream.poll()? { Async::Ready(Some(chunk)) => { - if let Some(ref mut decoder) = self.decoder { - match decoder.feed_data(chunk) { - Ok(Some(chunk)) => return Ok(Async::Ready(Some(chunk))), - Ok(None) => continue, - Err(e) => return Err(e.into()), - } + if let Some(mut decoder) = self.decoder.take() { + self.fut = Some(run(move || { + let chunk = decoder.feed_data(chunk)?; + Ok((chunk, decoder)) + })); + continue; } else { return Ok(Async::Ready(Some(chunk))); } } Async::Ready(None) => { - return if let Some(mut decoder) = self.decoder.take() { - match decoder.feed_eof() { - Ok(chunk) => Ok(Async::Ready(chunk)), - Err(e) => Err(e.into()), - } + self.eof = true; + if let Some(mut decoder) = self.decoder.take() { + self.fut = Some(run(move || { + let chunk = decoder.feed_eof()?; + Ok((chunk, decoder)) + })); + continue; } else { - Ok(Async::Ready(None)) + return Ok(Async::Ready(None)); }; } Async::NotReady => break, diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 4329970d4..e6cc0e07f 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -1,11 +1,11 @@ //! Error and Result module use std::cell::RefCell; -use std::io::Error as IoError; use std::str::Utf8Error; use std::string::FromUtf8Error; use std::{fmt, io, result}; // use actix::MailboxError; +pub use actix_threadpool::BlockingError; use actix_utils::timeout::TimeoutError; #[cfg(feature = "cookies")] use cookie; @@ -126,6 +126,9 @@ impl ResponseError for DeError { } } +/// `InternalServerError` for `BlockingError` +impl ResponseError for BlockingError {} + /// Return `BAD_REQUEST` for `Utf8Error` impl ResponseError for Utf8Error { fn error_response(&self) -> Response { @@ -199,7 +202,7 @@ pub enum ParseError { /// An `io::Error` that occurred while trying to read or write to a network /// stream. #[display(fmt = "IO error: {}", _0)] - Io(IoError), + Io(io::Error), /// Parsing a field as string failed #[display(fmt = "UTF8 error: {}", _0)] Utf8(Utf8Error), @@ -212,8 +215,8 @@ impl ResponseError for ParseError { } } -impl From for ParseError { - fn from(err: IoError) -> ParseError { +impl From for ParseError { + fn from(err: io::Error) -> ParseError { ParseError::Io(err) } } @@ -250,7 +253,7 @@ impl From for ParseError { } } -#[derive(Display, Debug, From)] +#[derive(Display, Debug)] /// A set of errors that can occur during payload parsing pub enum PayloadError { /// A payload reached EOF, but is not complete. @@ -271,6 +274,21 @@ pub enum PayloadError { /// Http2 payload error #[display(fmt = "{}", _0)] Http2Payload(h2::Error), + /// Io error + #[display(fmt = "{}", _0)] + Io(io::Error), +} + +impl From for PayloadError { + fn from(err: h2::Error) -> Self { + PayloadError::Http2Payload(err) + } +} + +impl From> for PayloadError { + fn from(err: Option) -> Self { + PayloadError::Incomplete(err) + } } impl From for PayloadError { @@ -279,6 +297,18 @@ impl From for PayloadError { } } +impl From> for PayloadError { + fn from(err: BlockingError) -> Self { + match err { + BlockingError::Error(e) => PayloadError::Io(e), + BlockingError::Canceled => PayloadError::Io(io::Error::new( + io::ErrorKind::Other, + "Thread pool is gone", + )), + } + } +} + /// `PayloadError` returns two possible results: /// /// - `Overflow` returns `PayloadTooLarge` diff --git a/awc/src/request.rs b/awc/src/request.rs index c0962ebf1..dde51a8f5 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -457,7 +457,7 @@ impl ClientRequest { .map(move |res| { res.map_body(|head, payload| { if response_decompress { - Payload::Stream(Decoder::from_headers(&head.headers, payload)) + Payload::Stream(Decoder::from_headers(payload, &head.headers)) } else { Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) } diff --git a/src/error.rs b/src/error.rs index 984b46e08..02e17241f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,4 @@ //! Error and Result module -use std::fmt; - pub use actix_http::error::*; use derive_more::{Display, From}; use serde_json::error::Error as JsonError; @@ -26,26 +24,6 @@ pub enum UrlGenerationError { /// `InternalServerError` for `UrlGeneratorError` impl ResponseError for UrlGenerationError {} -/// Blocking operation execution error -#[derive(Debug, Display)] -pub enum BlockingError { - #[display(fmt = "{:?}", _0)] - Error(E), - #[display(fmt = "Thread pool is gone")] - Canceled, -} - -impl ResponseError for BlockingError {} - -impl From> for BlockingError { - fn from(err: actix_threadpool::BlockingError) -> Self { - match err { - actix_threadpool::BlockingError::Error(e) => BlockingError::Error(e), - actix_threadpool::BlockingError::Canceled => BlockingError::Canceled, - } - } -} - /// A set of errors that can occur during parsing urlencoded payloads #[derive(Debug, Display, From)] pub enum UrlencodedError { diff --git a/src/lib.rs b/src/lib.rs index ec5a9e6a9..7a4f4bfbb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -162,6 +162,7 @@ pub mod dev { pub mod web { //! Various types use actix_http::{http::Method, Response}; + use actix_service::{fn_transform, Service, Transform}; use futures::{Future, IntoFuture}; pub use actix_http::Response as HttpResponse; @@ -174,6 +175,7 @@ pub mod web { use crate::responder::Responder; use crate::route::Route; use crate::scope::Scope; + use crate::service::{ServiceRequest, ServiceResponse}; pub use crate::data::{Data, RouteData}; pub use crate::request::HttpRequest; @@ -341,10 +343,6 @@ pub mod web { actix_threadpool::run(f).from_err() } - use actix_service::{fn_transform, Service, Transform}; - - use crate::service::{ServiceRequest, ServiceResponse}; - /// Create middleare pub fn md( f: F, diff --git a/src/middleware/decompress.rs b/src/middleware/decompress.rs index eaffbbdb4..84d357375 100644 --- a/src/middleware/decompress.rs +++ b/src/middleware/decompress.rs @@ -70,7 +70,7 @@ where fn call(&mut self, req: ServiceRequest

) -> Self::Future { let (req, payload) = req.into_parts(); - let payload = Decoder::from_headers(req.headers(), payload); + let payload = Decoder::from_headers(payload, req.headers()); ok(ServiceRequest::from_parts(req, Payload::Stream(payload))) } }