diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 049f5328..106c57f3 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -8,10 +8,13 @@ * Render error and return as response body +* Use thread pool for response body comression + ### Deleted * Removed PayloadBuffer + ## [0.1.0-alpha.3] - 2019-04-02 ### Added diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index fcac3a42..0e02bc89 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -1,13 +1,13 @@ //! Stream encoder use std::io::{self, Write}; -use bytes::Bytes; -use futures::{Async, Poll}; - +use actix_threadpool::{run, CpuFuture}; #[cfg(feature = "brotli")] use brotli2::write::BrotliEncoder; +use bytes::Bytes; #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] use flate2::write::{GzEncoder, ZlibEncoder}; +use futures::{Async, Future, Poll}; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; @@ -16,9 +16,12 @@ use crate::{Error, ResponseHead}; use super::Writer; +const INPLACE: usize = 2049; + pub struct Encoder { body: EncoderBody, encoder: Option, + fut: Option>, } impl Encoder { @@ -27,73 +30,58 @@ impl Encoder { head: &mut ResponseHead, body: ResponseBody, ) -> ResponseBody> { - let has_ce = head.headers().contains_key(CONTENT_ENCODING); - match body { - ResponseBody::Other(b) => match b { - Body::None => ResponseBody::Other(Body::None), - Body::Empty => ResponseBody::Other(Body::Empty), - Body::Bytes(buf) => { - if !(has_ce - || encoding == ContentEncoding::Identity - || encoding == ContentEncoding::Auto) - { - let mut enc = ContentEncoder::encoder(encoding).unwrap(); + let can_encode = !(head.headers().contains_key(CONTENT_ENCODING) + || head.status == StatusCode::SWITCHING_PROTOCOLS + || encoding == ContentEncoding::Identity + || encoding == ContentEncoding::Auto); - // TODO return error! - let _ = enc.write(buf.as_ref()); - let body = enc.finish().unwrap(); - update_head(encoding, head); - ResponseBody::Other(Body::Bytes(body)) + let body = match body { + ResponseBody::Other(b) => match b { + Body::None => return ResponseBody::Other(Body::None), + Body::Empty => return ResponseBody::Other(Body::Empty), + Body::Bytes(buf) => { + if can_encode { + EncoderBody::Bytes(buf) } else { - ResponseBody::Other(Body::Bytes(buf)) - } - } - Body::Message(stream) => { - if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS { - ResponseBody::Body(Encoder { - body: EncoderBody::Other(stream), - encoder: None, - }) - } else { - update_head(encoding, head); - head.no_chunking(false); - ResponseBody::Body(Encoder { - body: EncoderBody::Other(stream), - encoder: ContentEncoder::encoder(encoding), - }) + return ResponseBody::Other(Body::Bytes(buf)); } } + Body::Message(stream) => EncoderBody::BoxedStream(stream), }, - ResponseBody::Body(stream) => { - if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS { - ResponseBody::Body(Encoder { - body: EncoderBody::Body(stream), - encoder: None, - }) - } else { - update_head(encoding, head); - head.no_chunking(false); - ResponseBody::Body(Encoder { - body: EncoderBody::Body(stream), - encoder: ContentEncoder::encoder(encoding), - }) - } - } + ResponseBody::Body(stream) => EncoderBody::Stream(stream), + }; + + if can_encode { + update_head(encoding, head); + head.no_chunking(false); + ResponseBody::Body(Encoder { + body, + fut: None, + encoder: ContentEncoder::encoder(encoding), + }) + } else { + ResponseBody::Body(Encoder { + body, + fut: None, + encoder: None, + }) } } } enum EncoderBody { - Body(B), - Other(Box), + Bytes(Bytes), + Stream(B), + BoxedStream(Box), } impl MessageBody for Encoder { fn length(&self) -> BodySize { if self.encoder.is_none() { match self.body { - EncoderBody::Body(ref b) => b.length(), - EncoderBody::Other(ref b) => b.length(), + EncoderBody::Bytes(ref b) => b.length(), + EncoderBody::Stream(ref b) => b.length(), + EncoderBody::BoxedStream(ref b) => b.length(), } } else { BodySize::Stream @@ -102,20 +90,47 @@ impl MessageBody for Encoder { fn poll_next(&mut self) -> Poll, Error> { loop { + if let Some(ref mut fut) = self.fut { + let mut encoder = futures::try_ready!(fut.poll()); + let chunk = encoder.take(); + self.encoder = Some(encoder); + self.fut.take(); + if !chunk.is_empty() { + return Ok(Async::Ready(Some(chunk))); + } + } + let result = match self.body { - EncoderBody::Body(ref mut b) => b.poll_next()?, - EncoderBody::Other(ref mut b) => b.poll_next()?, + EncoderBody::Bytes(ref mut b) => { + if b.is_empty() { + Async::Ready(None) + } else { + Async::Ready(Some(std::mem::replace(b, Bytes::new()))) + } + } + EncoderBody::Stream(ref mut b) => b.poll_next()?, + EncoderBody::BoxedStream(ref mut b) => b.poll_next()?, }; match result { Async::NotReady => return Ok(Async::NotReady), Async::Ready(Some(chunk)) => { - if let Some(ref mut encoder) = self.encoder { - if encoder.write(&chunk)? { - return Ok(Async::Ready(Some(encoder.take()))); + if let Some(mut encoder) = self.encoder.take() { + if chunk.len() < INPLACE { + encoder.write(&chunk)?; + let chunk = encoder.take(); + self.encoder = Some(encoder); + if !chunk.is_empty() { + return Ok(Async::Ready(Some(chunk))); + } + } else { + self.fut = Some(run(move || { + encoder.write(&chunk)?; + Ok(encoder) + })); + continue; } - } else { - return Ok(Async::Ready(Some(chunk))); } + return Ok(Async::Ready(Some(chunk))); } Async::Ready(None) => { if let Some(encoder) = self.encoder.take() { @@ -203,11 +218,11 @@ impl ContentEncoder { } } - fn write(&mut self, data: &[u8]) -> Result { + fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { match *self { #[cfg(feature = "brotli")] ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) { - Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), + Ok(_) => Ok(()), Err(err) => { trace!("Error decoding br encoding: {}", err); Err(err) @@ -215,7 +230,7 @@ impl ContentEncoder { }, #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { - Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), + Ok(_) => Ok(()), Err(err) => { trace!("Error decoding gzip encoding: {}", err); Err(err) @@ -223,7 +238,7 @@ impl ContentEncoder { }, #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { - Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), + Ok(_) => Ok(()), Err(err) => { trace!("Error decoding deflate encoding: {}", err); Err(err) diff --git a/src/handler.rs b/src/handler.rs index 4ff3193c..a11a5d0b 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -22,8 +22,8 @@ where impl Factory<(), R> for F where - F: Fn() -> R + Clone + 'static, - R: Responder + 'static, + F: Fn() -> R + Clone, + R: Responder, { fn call(&self, _: ()) -> R { (self)() @@ -55,7 +55,7 @@ where impl NewService for Handler where F: Factory, - R: Responder + 'static, + R: Responder, { type Request = (T, HttpRequest); type Response = ServiceResponse; @@ -76,7 +76,7 @@ where pub struct HandlerService where F: Factory, - R: Responder + 'static, + R: Responder, { hnd: F, _t: PhantomData<(T, R)>, @@ -85,7 +85,7 @@ where impl Service for HandlerService where F: Factory, - R: Responder + 'static, + R: Responder, { type Request = (T, HttpRequest); type Response = ServiceResponse; @@ -355,8 +355,8 @@ impl> Future for ExtractResponse { /// FromRequest trait impl for tuples macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => { impl Factory<($($T,)+), Res> for Func - where Func: Fn($($T,)+) -> Res + Clone + 'static, - Res: Responder + 'static, + where Func: Fn($($T,)+) -> Res + Clone, + Res: Responder, { fn call(&self, param: ($($T,)+)) -> Res { (self)($(param.$n,)+) @@ -365,7 +365,7 @@ macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => { impl AsyncFactory<($($T,)+), Res> for Func where Func: Fn($($T,)+) -> Res + Clone + 'static, - Res: IntoFuture + 'static, + Res: IntoFuture, Res::Item: Into, Res::Error: Into, {