mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-28 01:32:57 +01:00
decompress payload in cpu threadpool
This commit is contained in:
parent
e84c95968f
commit
5795850bbb
@ -53,6 +53,7 @@ actix-codec = "0.1.2"
|
||||
actix-connect = "0.1.0"
|
||||
actix-utils = "0.3.4"
|
||||
actix-server-config = "0.1.0"
|
||||
actix-threadpool = "0.1.0"
|
||||
|
||||
base64 = "0.10"
|
||||
bitflags = "1.0"
|
||||
@ -94,7 +95,7 @@ failure = { version = "0.1.5", optional = true }
|
||||
openssl = { version="0.10", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "0.2.1"
|
||||
actix-rt = "0.2.2"
|
||||
actix-server = { version = "0.4.0", features=["ssl"] }
|
||||
actix-connect = { version = "0.1.0", features=["ssl"] }
|
||||
actix-http-test = { path="../test-server", features=["ssl"] }
|
||||
|
@ -1,27 +1,31 @@
|
||||
use std::io::{self, Write};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{Async, Poll, Stream};
|
||||
|
||||
use actix_threadpool::{run, CpuFuture};
|
||||
#[cfg(feature = "brotli")]
|
||||
use brotli2::write::BrotliDecoder;
|
||||
use bytes::Bytes;
|
||||
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
|
||||
use flate2::write::{GzDecoder, ZlibDecoder};
|
||||
use futures::{try_ready, Async, Future, Poll, Stream};
|
||||
|
||||
use super::Writer;
|
||||
use crate::error::PayloadError;
|
||||
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
|
||||
|
||||
pub struct Decoder<T> {
|
||||
stream: T,
|
||||
pub struct Decoder<S> {
|
||||
decoder: Option<ContentDecoder>,
|
||||
stream: S,
|
||||
eof: bool,
|
||||
fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
|
||||
}
|
||||
|
||||
impl<T> Decoder<T>
|
||||
impl<S> Decoder<S>
|
||||
where
|
||||
T: Stream<Item = Bytes, Error = PayloadError>,
|
||||
S: Stream<Item = Bytes, Error = PayloadError>,
|
||||
{
|
||||
pub fn new(stream: T, encoding: ContentEncoding) -> Self {
|
||||
/// Construct a decoder.
|
||||
#[inline]
|
||||
pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
|
||||
let decoder = match encoding {
|
||||
#[cfg(feature = "brotli")]
|
||||
ContentEncoding::Br => Some(ContentDecoder::Br(Box::new(
|
||||
@ -37,10 +41,17 @@ where
|
||||
))),
|
||||
_ => None,
|
||||
};
|
||||
Decoder { stream, decoder }
|
||||
Decoder {
|
||||
decoder,
|
||||
stream,
|
||||
fut: None,
|
||||
eof: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_headers(headers: &HeaderMap, stream: T) -> Self {
|
||||
/// Construct decoder based on headers.
|
||||
#[inline]
|
||||
pub fn from_headers(stream: S, headers: &HeaderMap) -> Decoder<S> {
|
||||
// check content-encoding
|
||||
let encoding = if let Some(enc) = headers.get(CONTENT_ENCODING) {
|
||||
if let Ok(enc) = enc.to_str() {
|
||||
@ -56,35 +67,50 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for Decoder<T>
|
||||
impl<S> Stream for Decoder<S>
|
||||
where
|
||||
T: Stream<Item = Bytes, Error = PayloadError>,
|
||||
S: Stream<Item = Bytes, Error = PayloadError>,
|
||||
{
|
||||
type Item = Bytes;
|
||||
type Error = PayloadError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
let (chunk, decoder) = try_ready!(fut.poll());
|
||||
self.decoder = Some(decoder);
|
||||
self.fut.take();
|
||||
if let Some(chunk) = chunk {
|
||||
return Ok(Async::Ready(Some(chunk)));
|
||||
}
|
||||
}
|
||||
|
||||
if self.eof {
|
||||
return Ok(Async::Ready(None));
|
||||
}
|
||||
|
||||
match self.stream.poll()? {
|
||||
Async::Ready(Some(chunk)) => {
|
||||
if let Some(ref mut decoder) = self.decoder {
|
||||
match decoder.feed_data(chunk) {
|
||||
Ok(Some(chunk)) => return Ok(Async::Ready(Some(chunk))),
|
||||
Ok(None) => continue,
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
if let Some(mut decoder) = self.decoder.take() {
|
||||
self.fut = Some(run(move || {
|
||||
let chunk = decoder.feed_data(chunk)?;
|
||||
Ok((chunk, decoder))
|
||||
}));
|
||||
continue;
|
||||
} else {
|
||||
return Ok(Async::Ready(Some(chunk)));
|
||||
}
|
||||
}
|
||||
Async::Ready(None) => {
|
||||
return if let Some(mut decoder) = self.decoder.take() {
|
||||
match decoder.feed_eof() {
|
||||
Ok(chunk) => Ok(Async::Ready(chunk)),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
self.eof = true;
|
||||
if let Some(mut decoder) = self.decoder.take() {
|
||||
self.fut = Some(run(move || {
|
||||
let chunk = decoder.feed_eof()?;
|
||||
Ok((chunk, decoder))
|
||||
}));
|
||||
continue;
|
||||
} else {
|
||||
Ok(Async::Ready(None))
|
||||
return Ok(Async::Ready(None));
|
||||
};
|
||||
}
|
||||
Async::NotReady => break,
|
||||
|
@ -1,11 +1,11 @@
|
||||
//! Error and Result module
|
||||
use std::cell::RefCell;
|
||||
use std::io::Error as IoError;
|
||||
use std::str::Utf8Error;
|
||||
use std::string::FromUtf8Error;
|
||||
use std::{fmt, io, result};
|
||||
|
||||
// use actix::MailboxError;
|
||||
pub use actix_threadpool::BlockingError;
|
||||
use actix_utils::timeout::TimeoutError;
|
||||
#[cfg(feature = "cookies")]
|
||||
use cookie;
|
||||
@ -126,6 +126,9 @@ impl ResponseError for DeError {
|
||||
}
|
||||
}
|
||||
|
||||
/// `InternalServerError` for `BlockingError`
|
||||
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
|
||||
|
||||
/// Return `BAD_REQUEST` for `Utf8Error`
|
||||
impl ResponseError for Utf8Error {
|
||||
fn error_response(&self) -> Response {
|
||||
@ -199,7 +202,7 @@ pub enum ParseError {
|
||||
/// An `io::Error` that occurred while trying to read or write to a network
|
||||
/// stream.
|
||||
#[display(fmt = "IO error: {}", _0)]
|
||||
Io(IoError),
|
||||
Io(io::Error),
|
||||
/// Parsing a field as string failed
|
||||
#[display(fmt = "UTF8 error: {}", _0)]
|
||||
Utf8(Utf8Error),
|
||||
@ -212,8 +215,8 @@ impl ResponseError for ParseError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IoError> for ParseError {
|
||||
fn from(err: IoError) -> ParseError {
|
||||
impl From<io::Error> for ParseError {
|
||||
fn from(err: io::Error) -> ParseError {
|
||||
ParseError::Io(err)
|
||||
}
|
||||
}
|
||||
@ -250,7 +253,7 @@ impl From<httparse::Error> for ParseError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Display, Debug, From)]
|
||||
#[derive(Display, Debug)]
|
||||
/// A set of errors that can occur during payload parsing
|
||||
pub enum PayloadError {
|
||||
/// A payload reached EOF, but is not complete.
|
||||
@ -271,6 +274,21 @@ pub enum PayloadError {
|
||||
/// Http2 payload error
|
||||
#[display(fmt = "{}", _0)]
|
||||
Http2Payload(h2::Error),
|
||||
/// Io error
|
||||
#[display(fmt = "{}", _0)]
|
||||
Io(io::Error),
|
||||
}
|
||||
|
||||
impl From<h2::Error> for PayloadError {
|
||||
fn from(err: h2::Error) -> Self {
|
||||
PayloadError::Http2Payload(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Option<io::Error>> for PayloadError {
|
||||
fn from(err: Option<io::Error>) -> Self {
|
||||
PayloadError::Incomplete(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for PayloadError {
|
||||
@ -279,6 +297,18 @@ impl From<io::Error> for PayloadError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BlockingError<io::Error>> for PayloadError {
|
||||
fn from(err: BlockingError<io::Error>) -> Self {
|
||||
match err {
|
||||
BlockingError::Error(e) => PayloadError::Io(e),
|
||||
BlockingError::Canceled => PayloadError::Io(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Thread pool is gone",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `PayloadError` returns two possible results:
|
||||
///
|
||||
/// - `Overflow` returns `PayloadTooLarge`
|
||||
|
@ -457,7 +457,7 @@ impl ClientRequest {
|
||||
.map(move |res| {
|
||||
res.map_body(|head, payload| {
|
||||
if response_decompress {
|
||||
Payload::Stream(Decoder::from_headers(&head.headers, payload))
|
||||
Payload::Stream(Decoder::from_headers(payload, &head.headers))
|
||||
} else {
|
||||
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity))
|
||||
}
|
||||
|
22
src/error.rs
22
src/error.rs
@ -1,6 +1,4 @@
|
||||
//! Error and Result module
|
||||
use std::fmt;
|
||||
|
||||
pub use actix_http::error::*;
|
||||
use derive_more::{Display, From};
|
||||
use serde_json::error::Error as JsonError;
|
||||
@ -26,26 +24,6 @@ pub enum UrlGenerationError {
|
||||
/// `InternalServerError` for `UrlGeneratorError`
|
||||
impl ResponseError for UrlGenerationError {}
|
||||
|
||||
/// Blocking operation execution error
|
||||
#[derive(Debug, Display)]
|
||||
pub enum BlockingError<E: fmt::Debug> {
|
||||
#[display(fmt = "{:?}", _0)]
|
||||
Error(E),
|
||||
#[display(fmt = "Thread pool is gone")]
|
||||
Canceled,
|
||||
}
|
||||
|
||||
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
|
||||
|
||||
impl<E: fmt::Debug> From<actix_threadpool::BlockingError<E>> for BlockingError<E> {
|
||||
fn from(err: actix_threadpool::BlockingError<E>) -> Self {
|
||||
match err {
|
||||
actix_threadpool::BlockingError::Error(e) => BlockingError::Error(e),
|
||||
actix_threadpool::BlockingError::Canceled => BlockingError::Canceled,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A set of errors that can occur during parsing urlencoded payloads
|
||||
#[derive(Debug, Display, From)]
|
||||
pub enum UrlencodedError {
|
||||
|
@ -162,6 +162,7 @@ pub mod dev {
|
||||
pub mod web {
|
||||
//! Various types
|
||||
use actix_http::{http::Method, Response};
|
||||
use actix_service::{fn_transform, Service, Transform};
|
||||
use futures::{Future, IntoFuture};
|
||||
|
||||
pub use actix_http::Response as HttpResponse;
|
||||
@ -174,6 +175,7 @@ pub mod web {
|
||||
use crate::responder::Responder;
|
||||
use crate::route::Route;
|
||||
use crate::scope::Scope;
|
||||
use crate::service::{ServiceRequest, ServiceResponse};
|
||||
|
||||
pub use crate::data::{Data, RouteData};
|
||||
pub use crate::request::HttpRequest;
|
||||
@ -341,10 +343,6 @@ pub mod web {
|
||||
actix_threadpool::run(f).from_err()
|
||||
}
|
||||
|
||||
use actix_service::{fn_transform, Service, Transform};
|
||||
|
||||
use crate::service::{ServiceRequest, ServiceResponse};
|
||||
|
||||
/// Create middleare
|
||||
pub fn md<F, R, S, P, B>(
|
||||
f: F,
|
||||
|
@ -70,7 +70,7 @@ where
|
||||
|
||||
fn call(&mut self, req: ServiceRequest<P>) -> Self::Future {
|
||||
let (req, payload) = req.into_parts();
|
||||
let payload = Decoder::from_headers(req.headers(), payload);
|
||||
let payload = Decoder::from_headers(payload, req.headers());
|
||||
ok(ServiceRequest::from_parts(req, Payload::Stream(payload)))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user