diff --git a/Cargo.toml b/Cargo.toml index 40f8c7c4a..22c2efe99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ members = [ ] [package.metadata.docs.rs] -features = ["ssl", "tls", "rust-tls", "brotli", "flate2-c", "cookies"] +features = ["ssl", "tls", "rust-tls", "brotli", "flate2-c", "cookies", "client"] [features] default = ["brotli", "flate2-c", "cookies", "client"] @@ -45,13 +45,13 @@ default = ["brotli", "flate2-c", "cookies", "client"] client = ["awc"] # brotli encoding, requires c compiler -brotli = ["brotli2"] +brotli = ["actix-http/brotli2"] # miniz-sys backend for flate2 crate -flate2-c = ["flate2/miniz-sys"] +flate2-c = ["actix-http/flate2-c"] # rust backend for flate2 crate -flate2-rust = ["flate2/rust_backend"] +flate2-rust = ["actix-http/flate2-rust"] # sessions feature, session require "ring" crate and c compiler cookies = ["cookie", "actix-http/cookies"] @@ -96,22 +96,20 @@ url = { version="1.7", features=["query_encoding"] } # cookies support cookie = { version="0.11", features=["secure", "percent-encode"], optional = true } -# compression -brotli2 = { version="^0.3.2", optional = true } -flate2 = { version="^1.0.2", optional = true, default-features = false } - # ssl support native-tls = { version="0.2", optional = true } openssl = { version="0.10", optional = true } # rustls = { version = "^0.15", optional = true } [dev-dependencies] -actix-http = { path = "actix-http", features=["ssl"] } +actix-http = { path = "actix-http", features=["ssl", "brotli", "flate2-c"] } actix-http-test = { path = "test-server", features=["ssl"] } rand = "0.6" env_logger = "0.6" serde_derive = "1.0" tokio-timer = "0.2.8" +brotli2 = { version="^0.3.2" } +flate2 = { version="^1.0.2" } [profile.release] lto = true diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 3b403ac2a..7b73e7e26 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -36,6 +36,15 @@ ssl = ["openssl", "actix-connect/ssl"] # cookies integration cookies = ["cookie"] +# brotli encoding, requires c compiler +brotli = ["brotli2"] + +# miniz-sys backend for flate2 crate +flate2-c = ["flate2/miniz-sys"] + +# rust backend for flate2 crate +flate2-rust = ["flate2/rust_backend"] + # failure integration. actix does not use failure anymore fail = ["failure"] @@ -77,6 +86,10 @@ tokio-timer = "0.2" tokio-current-thread = "0.1" trust-dns-resolver = { version="0.11.0-alpha.2", default-features = false } +# compression +brotli2 = { version="^0.3.2", optional = true } +flate2 = { version="^1.0.2", optional = true, default-features = false } + # optional deps cookie = { version="0.11", features=["percent-encode"], optional = true } failure = { version = "0.1.5", optional = true } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs new file mode 100644 index 000000000..a922d1738 --- /dev/null +++ b/actix-http/src/encoding/decoder.rs @@ -0,0 +1,191 @@ +use std::io::{self, Write}; + +use bytes::Bytes; +use futures::{Async, Poll, Stream}; + +#[cfg(feature = "brotli")] +use brotli2::write::BrotliDecoder; +#[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] +use flate2::write::{GzDecoder, ZlibDecoder}; + +use super::Writer; +use crate::error::PayloadError; +use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING}; + +pub struct Decoder { + stream: T, + decoder: Option, +} + +impl Decoder +where + T: Stream, +{ + pub fn new(stream: T, encoding: ContentEncoding) -> Self { + let decoder = match encoding { + #[cfg(feature = "brotli")] + ContentEncoding::Br => Some(ContentDecoder::Br(Box::new( + BrotliDecoder::new(Writer::new()), + ))), + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new( + ZlibDecoder::new(Writer::new()), + ))), + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new( + GzDecoder::new(Writer::new()), + ))), + _ => None, + }; + Decoder { stream, decoder } + } + + pub fn from_headers(headers: &HeaderMap, stream: T) -> Self { + // check content-encoding + let encoding = if let Some(enc) = headers.get(CONTENT_ENCODING) { + if let Ok(enc) = enc.to_str() { + ContentEncoding::from(enc) + } else { + ContentEncoding::Identity + } + } else { + ContentEncoding::Identity + }; + + Self::new(stream, encoding) + } +} + +impl Stream for Decoder +where + T: Stream, +{ + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + match self.stream.poll()? { + Async::Ready(Some(chunk)) => { + if let Some(ref mut decoder) = self.decoder { + match decoder.feed_data(chunk) { + Ok(Some(chunk)) => return Ok(Async::Ready(Some(chunk))), + Ok(None) => continue, + Err(e) => return Err(e.into()), + } + } else { + break; + } + } + Async::Ready(None) => { + return if let Some(mut decoder) = self.decoder.take() { + match decoder.feed_eof() { + Ok(chunk) => Ok(Async::Ready(chunk)), + Err(e) => Err(e.into()), + } + } else { + Ok(Async::Ready(None)) + }; + } + Async::NotReady => break, + } + } + Ok(Async::NotReady) + } +} + +enum ContentDecoder { + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + Deflate(Box>), + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + Gzip(Box>), + #[cfg(feature = "brotli")] + Br(Box>), +} + +impl ContentDecoder { + fn feed_eof(&mut self) -> io::Result> { + match self { + #[cfg(feature = "brotli")] + ContentDecoder::Br(ref mut decoder) => match decoder.finish() { + Ok(mut writer) => { + let b = writer.take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentDecoder::Gzip(ref mut decoder) => match decoder.try_finish() { + Ok(_) => { + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentDecoder::Deflate(ref mut decoder) => match decoder.try_finish() { + Ok(_) => { + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + } + } + + fn feed_data(&mut self, data: Bytes) -> io::Result> { + match self { + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentDecoder::Br(ref mut decoder) => match decoder.write_all(&data) { + Ok(_) => { + decoder.flush()?; + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentDecoder::Gzip(ref mut decoder) => match decoder.write_all(&data) { + Ok(_) => { + decoder.flush()?; + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentDecoder::Deflate(ref mut decoder) => match decoder.write_all(&data) { + Ok(_) => { + decoder.flush()?; + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + } + } +} diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs new file mode 100644 index 000000000..1985dcdf2 --- /dev/null +++ b/actix-http/src/encoding/encoder.rs @@ -0,0 +1,234 @@ +//! Stream encoder +use std::io::{self, Write}; + +use bytes::Bytes; +use futures::{Async, Poll}; + +#[cfg(feature = "brotli")] +use brotli2::write::BrotliEncoder; +#[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] +use flate2::write::{GzEncoder, ZlibEncoder}; + +use crate::body::{Body, BodyLength, MessageBody, ResponseBody}; +use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; +use crate::http::{HeaderValue, HttpTryFrom, StatusCode}; +use crate::{Error, Head, ResponseHead}; + +use super::Writer; + +pub struct Encoder { + body: EncoderBody, + encoder: Option, +} + +impl Encoder { + pub fn response( + encoding: ContentEncoding, + head: &mut ResponseHead, + body: ResponseBody, + ) -> ResponseBody> { + let has_ce = head.headers().contains_key(CONTENT_ENCODING); + match body { + ResponseBody::Other(b) => match b { + Body::None => ResponseBody::Other(Body::None), + Body::Empty => ResponseBody::Other(Body::Empty), + Body::Bytes(buf) => { + if !(has_ce + || encoding == ContentEncoding::Identity + || encoding == ContentEncoding::Auto) + { + let mut enc = ContentEncoder::encoder(encoding).unwrap(); + + // TODO return error! + let _ = enc.write(buf.as_ref()); + let body = enc.finish().unwrap(); + update_head(encoding, head); + ResponseBody::Other(Body::Bytes(body)) + } else { + ResponseBody::Other(Body::Bytes(buf)) + } + } + Body::Message(stream) => { + if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS { + ResponseBody::Body(Encoder { + body: EncoderBody::Other(stream), + encoder: None, + }) + } else { + update_head(encoding, head); + head.no_chunking = false; + ResponseBody::Body(Encoder { + body: EncoderBody::Other(stream), + encoder: ContentEncoder::encoder(encoding), + }) + } + } + }, + ResponseBody::Body(stream) => { + if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS { + ResponseBody::Body(Encoder { + body: EncoderBody::Body(stream), + encoder: None, + }) + } else { + update_head(encoding, head); + head.no_chunking = false; + ResponseBody::Body(Encoder { + body: EncoderBody::Body(stream), + encoder: ContentEncoder::encoder(encoding), + }) + } + } + } + } +} + +enum EncoderBody { + Body(B), + Other(Box), +} + +impl MessageBody for Encoder { + fn length(&self) -> BodyLength { + if self.encoder.is_none() { + match self.body { + EncoderBody::Body(ref b) => b.length(), + EncoderBody::Other(ref b) => b.length(), + } + } else { + BodyLength::Stream + } + } + + fn poll_next(&mut self) -> Poll, Error> { + loop { + let result = match self.body { + EncoderBody::Body(ref mut b) => b.poll_next()?, + EncoderBody::Other(ref mut b) => b.poll_next()?, + }; + match result { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(Some(chunk)) => { + if let Some(ref mut encoder) = self.encoder { + if encoder.write(&chunk)? { + return Ok(Async::Ready(Some(encoder.take()))); + } + } else { + return Ok(Async::Ready(Some(chunk))); + } + } + Async::Ready(None) => { + if let Some(encoder) = self.encoder.take() { + let chunk = encoder.finish()?; + if chunk.is_empty() { + return Ok(Async::Ready(None)); + } else { + return Ok(Async::Ready(Some(chunk))); + } + } else { + return Ok(Async::Ready(None)); + } + } + } + } + } +} + +fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) { + head.headers_mut().insert( + CONTENT_ENCODING, + HeaderValue::try_from(Bytes::from_static(encoding.as_str().as_bytes())).unwrap(), + ); +} + +enum ContentEncoder { + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + Deflate(ZlibEncoder), + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + Gzip(GzEncoder), + #[cfg(feature = "brotli")] + Br(BrotliEncoder), +} + +impl ContentEncoder { + fn encoder(encoding: ContentEncoding) -> Option { + match encoding { + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new( + Writer::new(), + flate2::Compression::fast(), + ))), + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new( + Writer::new(), + flate2::Compression::fast(), + ))), + #[cfg(feature = "brotli")] + ContentEncoding::Br => { + Some(ContentEncoder::Br(BrotliEncoder::new(Writer::new(), 3))) + } + _ => None, + } + } + + #[inline] + pub(crate) fn take(&mut self) -> Bytes { + match *self { + #[cfg(feature = "brotli")] + ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(), + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(), + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(), + } + } + + fn finish(self) -> Result { + match self { + #[cfg(feature = "brotli")] + ContentEncoder::Br(encoder) => match encoder.finish() { + Ok(writer) => Ok(writer.buf.freeze()), + Err(err) => Err(err), + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoder::Gzip(encoder) => match encoder.finish() { + Ok(writer) => Ok(writer.buf.freeze()), + Err(err) => Err(err), + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoder::Deflate(encoder) => match encoder.finish() { + Ok(writer) => Ok(writer.buf.freeze()), + Err(err) => Err(err), + }, + } + } + + fn write(&mut self, data: &[u8]) -> Result { + match *self { + #[cfg(feature = "brotli")] + ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), + Err(err) => { + trace!("Error decoding br encoding: {}", err); + Err(err) + } + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), + Err(err) => { + trace!("Error decoding gzip encoding: {}", err); + Err(err) + } + }, + #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] + ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), + Err(err) => { + trace!("Error decoding deflate encoding: {}", err); + Err(err) + } + }, + } + } +} diff --git a/actix-http/src/encoding/mod.rs b/actix-http/src/encoding/mod.rs new file mode 100644 index 000000000..b55a43a7c --- /dev/null +++ b/actix-http/src/encoding/mod.rs @@ -0,0 +1,35 @@ +//! Content-Encoding support +use std::io; + +use bytes::{Bytes, BytesMut}; + +mod decoder; +mod encoder; + +pub use self::decoder::Decoder; +pub use self::encoder::Encoder; + +pub(self) struct Writer { + buf: BytesMut, +} + +impl Writer { + fn new() -> Writer { + Writer { + buf: BytesMut::with_capacity(8192), + } + } + fn take(&mut self) -> Bytes { + self.buf.take().freeze() + } +} + +impl io::Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buf.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index b41ce7ae8..edc06c2a6 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -12,6 +12,7 @@ pub mod body; mod builder; pub mod client; mod config; +pub mod encoding; mod extensions; mod header; mod helpers; diff --git a/src/app.rs b/src/app.rs index f46f5252f..b8efdd38b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -193,10 +193,10 @@ where } /// Register a request modifier. It can modify any request parameters - /// including payload stream type. + /// including request payload type. pub fn chain( self, - chain: C, + chain: F, ) -> App< P1, impl NewService< diff --git a/src/app_service.rs b/src/app_service.rs index 0bf3d3095..236eed9f9 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -380,7 +380,7 @@ impl

Service for AppRouting

{ } else if let Some(ref mut default) = self.default { Either::A(default.call(req)) } else { - let req = req.into_request(); + let req = req.into_parts().0; Either::B(ok(ServiceResponse::new(req, Response::NotFound().finish()))) } } diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index 3c4718fed..5ffe9afb1 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -1,25 +1,14 @@ -/// `Middleware` for compressing response body. -use std::io::Write; +//! `Middleware` for compressing response body. +use std::cmp; use std::marker::PhantomData; use std::str::FromStr; -use std::{cmp, fmt, io}; -use actix_http::body::{Body, BodyLength, MessageBody, ResponseBody}; -use actix_http::http::header::{ - ContentEncoding, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, -}; -use actix_http::http::{HttpTryFrom, StatusCode}; -use actix_http::{Error, Head, ResponseHead}; +use actix_http::body::MessageBody; +use actix_http::encoding::Encoder; +use actix_http::http::header::{ContentEncoding, ACCEPT_ENCODING}; use actix_service::{Service, Transform}; -use bytes::{Bytes, BytesMut}; use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; -use log::trace; - -#[cfg(feature = "brotli")] -use brotli2::write::BrotliEncoder; -#[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] -use flate2::write::{GzEncoder, ZlibEncoder}; use crate::service::{ServiceRequest, ServiceResponse}; @@ -130,266 +119,11 @@ where let resp = futures::try_ready!(self.fut.poll()); Ok(Async::Ready(resp.map_body(move |head, body| { - Encoder::body(self.encoding, head, body) + Encoder::response(self.encoding, head, body) }))) } } -enum EncoderBody { - Body(B), - Other(Box), -} - -pub struct Encoder { - body: EncoderBody, - encoder: Option, -} - -impl MessageBody for Encoder { - fn length(&self) -> BodyLength { - if self.encoder.is_none() { - match self.body { - EncoderBody::Body(ref b) => b.length(), - EncoderBody::Other(ref b) => b.length(), - } - } else { - BodyLength::Stream - } - } - - fn poll_next(&mut self) -> Poll, Error> { - loop { - let result = match self.body { - EncoderBody::Body(ref mut b) => b.poll_next()?, - EncoderBody::Other(ref mut b) => b.poll_next()?, - }; - match result { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(Some(chunk)) => { - if let Some(ref mut encoder) = self.encoder { - if encoder.write(&chunk)? { - return Ok(Async::Ready(Some(encoder.take()))); - } - } else { - return Ok(Async::Ready(Some(chunk))); - } - } - Async::Ready(None) => { - if let Some(encoder) = self.encoder.take() { - let chunk = encoder.finish()?; - if chunk.is_empty() { - return Ok(Async::Ready(None)); - } else { - return Ok(Async::Ready(Some(chunk))); - } - } else { - return Ok(Async::Ready(None)); - } - } - } - } - } -} - -fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) { - head.headers_mut().insert( - CONTENT_ENCODING, - HeaderValue::try_from(Bytes::from_static(encoding.as_str().as_bytes())).unwrap(), - ); -} - -impl Encoder { - fn body( - encoding: ContentEncoding, - head: &mut ResponseHead, - body: ResponseBody, - ) -> ResponseBody> { - let has_ce = head.headers().contains_key(CONTENT_ENCODING); - match body { - ResponseBody::Other(b) => match b { - Body::None => ResponseBody::Other(Body::None), - Body::Empty => ResponseBody::Other(Body::Empty), - Body::Bytes(buf) => { - if !(has_ce - || encoding == ContentEncoding::Identity - || encoding == ContentEncoding::Auto) - { - let mut enc = ContentEncoder::encoder(encoding).unwrap(); - - // TODO return error! - let _ = enc.write(buf.as_ref()); - let body = enc.finish().unwrap(); - update_head(encoding, head); - ResponseBody::Other(Body::Bytes(body)) - } else { - ResponseBody::Other(Body::Bytes(buf)) - } - } - Body::Message(stream) => { - if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS { - ResponseBody::Body(Encoder { - body: EncoderBody::Other(stream), - encoder: None, - }) - } else { - update_head(encoding, head); - head.no_chunking = false; - ResponseBody::Body(Encoder { - body: EncoderBody::Other(stream), - encoder: ContentEncoder::encoder(encoding), - }) - } - } - }, - ResponseBody::Body(stream) => { - if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS { - ResponseBody::Body(Encoder { - body: EncoderBody::Body(stream), - encoder: None, - }) - } else { - update_head(encoding, head); - head.no_chunking = false; - ResponseBody::Body(Encoder { - body: EncoderBody::Body(stream), - encoder: ContentEncoder::encoder(encoding), - }) - } - } - } - } -} - -pub(crate) struct Writer { - buf: BytesMut, -} - -impl Writer { - fn new() -> Writer { - Writer { - buf: BytesMut::with_capacity(8192), - } - } - fn take(&mut self) -> Bytes { - self.buf.take().freeze() - } -} - -impl io::Write for Writer { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.buf.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -pub(crate) enum ContentEncoder { - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - Deflate(ZlibEncoder), - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - Gzip(GzEncoder), - #[cfg(feature = "brotli")] - Br(BrotliEncoder), -} - -impl fmt::Debug for ContentEncoder { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - #[cfg(feature = "brotli")] - ContentEncoder::Br(_) => writeln!(f, "ContentEncoder(Brotli)"), - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Deflate(_) => writeln!(f, "ContentEncoder(Deflate)"), - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Gzip(_) => writeln!(f, "ContentEncoder(Gzip)"), - } - } -} - -impl ContentEncoder { - fn encoder(encoding: ContentEncoding) -> Option { - match encoding { - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new( - Writer::new(), - flate2::Compression::fast(), - ))), - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new( - Writer::new(), - flate2::Compression::fast(), - ))), - #[cfg(feature = "brotli")] - ContentEncoding::Br => { - Some(ContentEncoder::Br(BrotliEncoder::new(Writer::new(), 3))) - } - _ => None, - } - } - - #[inline] - pub(crate) fn take(&mut self) -> Bytes { - match *self { - #[cfg(feature = "brotli")] - ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(), - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(), - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(), - } - } - - fn finish(self) -> Result { - match self { - #[cfg(feature = "brotli")] - ContentEncoder::Br(encoder) => match encoder.finish() { - Ok(writer) => Ok(writer.buf.freeze()), - Err(err) => Err(err), - }, - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Gzip(encoder) => match encoder.finish() { - Ok(writer) => Ok(writer.buf.freeze()), - Err(err) => Err(err), - }, - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Deflate(encoder) => match encoder.finish() { - Ok(writer) => Ok(writer.buf.freeze()), - Err(err) => Err(err), - }, - } - } - - fn write(&mut self, data: &[u8]) -> Result { - match *self { - #[cfg(feature = "brotli")] - ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) { - Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), - Err(err) => { - trace!("Error decoding br encoding: {}", err); - Err(err) - } - }, - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { - Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), - Err(err) => { - trace!("Error decoding gzip encoding: {}", err); - Err(err) - } - }, - #[cfg(any(feature = "flate2-c", feature = "flate2-rust"))] - ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { - Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), - Err(err) => { - trace!("Error decoding deflate encoding: {}", err); - Err(err) - } - }, - } - } -} - struct AcceptEncoding { encoding: ContentEncoding, quality: f64, diff --git a/src/middleware/decompress.rs b/src/middleware/decompress.rs new file mode 100644 index 000000000..d0a9bfd24 --- /dev/null +++ b/src/middleware/decompress.rs @@ -0,0 +1,60 @@ +//! Chain service for decompressing request payload. +use std::marker::PhantomData; + +use actix_http::encoding::Decoder; +use actix_service::{NewService, Service}; +use bytes::Bytes; +use futures::future::{ok, FutureResult}; +use futures::{Async, Poll, Stream}; + +use crate::dev::Payload; +use crate::error::{Error, PayloadError}; +use crate::service::ServiceRequest; +use crate::HttpMessage; + +pub struct Decompress

(PhantomData

); + +impl

Decompress

+where + P: Stream, +{ + pub fn new() -> Self { + Decompress(PhantomData) + } +} + +impl

NewService for Decompress

+where + P: Stream, +{ + type Request = ServiceRequest

; + type Response = ServiceRequest>>; + type Error = Error; + type InitError = (); + type Service = Decompress

; + type Future = FutureResult; + + fn new_service(&self, _: &()) -> Self::Future { + ok(Decompress(PhantomData)) + } +} + +impl

Service for Decompress

+where + P: Stream, +{ + type Request = ServiceRequest

; + type Response = ServiceRequest>>; + type Error = Error; + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: ServiceRequest

) -> Self::Future { + let (req, payload) = req.into_parts(); + let payload = Decoder::from_headers(req.headers(), payload); + ok(ServiceRequest::from_parts(req, Payload::Stream(payload))) + } +} diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 998b59052..764cd9a3d 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -4,6 +4,11 @@ mod compress; #[cfg(any(feature = "brotli", feature = "flate2"))] pub use self::compress::Compress; +#[cfg(any(feature = "brotli", feature = "flate2"))] +mod decompress; +#[cfg(any(feature = "brotli", feature = "flate2"))] +pub use self::decompress::Decompress; + pub mod cors; mod defaultheaders; pub mod errhandlers; diff --git a/src/resource.rs b/src/resource.rs index 55237157f..b24e8dd51 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -507,7 +507,7 @@ impl

Service for ResourceService

{ if let Some(ref mut default) = self.default { Either::B(Either::A(default.call(req))) } else { - let req = req.into_request(); + let req = req.into_parts().0; Either::B(Either::B(ok(ServiceResponse::new( req, Response::MethodNotAllowed().finish(), diff --git a/src/scope.rs b/src/scope.rs index 8c72824f4..d45609c5e 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -489,7 +489,7 @@ impl

Service for ScopeService

{ } else if let Some(ref mut default) = self.default { Either::A(default.call(req)) } else { - let req = req.into_request(); + let req = req.into_parts().0; Either::B(ok(ServiceResponse::new(req, Response::NotFound().finish()))) } } diff --git a/src/service.rs b/src/service.rs index b8c3a1584..5a0422089 100644 --- a/src/service.rs +++ b/src/service.rs @@ -69,9 +69,14 @@ impl

ServiceRequest

{ } } - #[inline] - pub fn into_request(self) -> HttpRequest { - self.req + /// Construct service request from parts + pub fn from_parts(req: HttpRequest, payload: Payload

) -> Self { + ServiceRequest { req, payload } + } + + /// Deconstruct request into parts + pub fn into_parts(self) -> (HttpRequest, Payload

) { + (self.req, self.payload) } /// Create service response @@ -162,11 +167,6 @@ impl

ServiceRequest

{ pub fn app_config(&self) -> &AppConfig { self.req.config() } - - /// Deconstruct request into parts - pub fn into_parts(self) -> (HttpRequest, Payload

) { - (self.req, self.payload) - } } impl

Resource for ServiceRequest

{ diff --git a/src/test.rs b/src/test.rs index c5936ea35..9e1f01f90 100644 --- a/src/test.rs +++ b/src/test.rs @@ -350,7 +350,8 @@ impl TestRequest { Rc::new(self.rmap), AppConfig::new(self.config), ) - .into_request() + .into_parts() + .0 } /// Complete request creation and generate `ServiceFromRequest` instance diff --git a/tests/test_server.rs b/tests/test_server.rs index 9c0f1f655..acea029c6 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1,14 +1,16 @@ use std::io::{Read, Write}; use actix_http::http::header::{ - ContentEncoding, ACCEPT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING, + ContentEncoding, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, + TRANSFER_ENCODING, }; -use actix_http::{h1, Error, Response}; +use actix_http::{h1, Error, HttpService, Response}; use actix_http_test::TestServer; -use brotli2::write::BrotliDecoder; +use brotli2::write::{BrotliDecoder, BrotliEncoder}; use bytes::Bytes; use flate2::read::GzDecoder; -use flate2::write::ZlibDecoder; +use flate2::write::{GzEncoder, ZlibDecoder, ZlibEncoder}; +use flate2::Compression; use futures::stream::once; //Future, Stream use rand::{distributions::Alphanumeric, Rng}; @@ -297,278 +299,246 @@ fn test_body_brotli() { assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); } -// #[test] -// fn test_gzip_encoding() { -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); +#[test] +fn test_gzip_encoding() { + let mut srv = TestServer::new(move || { + HttpService::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// // client request -// let mut e = GzEncoder::new(Vec::new(), Compression::default()); -// e.write_all(STR.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + // client request + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + e.write_all(STR.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "gzip") -// .body(enc.clone()) -// .unwrap(); -// let response = srv.block_on(request.send()).unwrap(); -// assert!(response.status().is_success()); + let request = srv + .post() + .header(CONTENT_ENCODING, "gzip") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.block_on(response.body()).unwrap(); -// assert_eq!(bytes, Bytes::from_static(STR.as_ref())); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} -// #[test] -// fn test_gzip_encoding_large() { -// let data = STR.repeat(10); -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); +#[test] +fn test_gzip_encoding_large() { + let data = STR.repeat(10); + let mut srv = TestServer::new(move || { + h1::H1Service::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// // client request -// let mut e = GzEncoder::new(Vec::new(), Compression::default()); -// e.write_all(data.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + // client request + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "gzip") -// .body(enc.clone()) -// .unwrap(); -// let response = srv.block_on(request.send()).unwrap(); -// assert!(response.status().is_success()); + let request = srv + .post() + .header(CONTENT_ENCODING, "gzip") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.block_on(response.body()).unwrap(); -// assert_eq!(bytes, Bytes::from(data)); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes, Bytes::from(data)); +} -// #[test] -// fn test_reading_gzip_encoding_large_random() { -// let data = rand::thread_rng() -// .sample_iter(&Alphanumeric) -// .take(60_000) -// .collect::(); +#[test] +fn test_reading_gzip_encoding_large_random() { + let data = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(60_000) + .collect::(); -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); + let mut srv = TestServer::new(move || { + HttpService::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// // client request -// let mut e = GzEncoder::new(Vec::new(), Compression::default()); -// e.write_all(data.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + // client request + let mut e = GzEncoder::new(Vec::new(), Compression::default()); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "gzip") -// .body(enc.clone()) -// .unwrap(); -// let response = srv.block_on(request.send()).unwrap(); -// assert!(response.status().is_success()); + let request = srv + .post() + .header(CONTENT_ENCODING, "gzip") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.block_on(response.body()).unwrap(); -// assert_eq!(bytes.len(), data.len()); -// assert_eq!(bytes, Bytes::from(data)); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes.len(), data.len()); + assert_eq!(bytes, Bytes::from(data)); +} -// #[test] -// fn test_reading_deflate_encoding() { -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); +#[test] +fn test_reading_deflate_encoding() { + let mut srv = TestServer::new(move || { + h1::H1Service::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); -// e.write_all(STR.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); + e.write_all(STR.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// // client request -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "deflate") -// .body(enc) -// .unwrap(); -// let response = srv.block_on(request.send()).unwrap(); -// assert!(response.status().is_success()); + // client request + let request = srv + .post() + .header(CONTENT_ENCODING, "deflate") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.block_on(response.body()).unwrap(); -// assert_eq!(bytes, Bytes::from_static(STR.as_ref())); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} -// #[test] -// fn test_reading_deflate_encoding_large() { -// let data = STR.repeat(10); -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); +#[test] +fn test_reading_deflate_encoding_large() { + let data = STR.repeat(10); + let mut srv = TestServer::new(move || { + h1::H1Service::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); -// e.write_all(data.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// // client request -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "deflate") -// .body(enc) -// .unwrap(); -// let response = srv.block_on(request.send()).unwrap(); -// assert!(response.status().is_success()); + // client request + let request = srv + .post() + .header(CONTENT_ENCODING, "deflate") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.block_on(response.body()).unwrap(); -// assert_eq!(bytes, Bytes::from(data)); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes, Bytes::from(data)); +} -// #[test] -// fn test_reading_deflate_encoding_large_random() { -// let data = rand::thread_rng() -// .sample_iter(&Alphanumeric) -// .take(160_000) -// .collect::(); +#[test] +fn test_reading_deflate_encoding_large_random() { + let data = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(160_000) + .collect::(); -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); + let mut srv = TestServer::new(move || { + h1::H1Service::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); -// e.write_all(data.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// // client request -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "deflate") -// .body(enc) -// .unwrap(); -// let response = srv.block_on(request.send()).unwrap(); -// assert!(response.status().is_success()); + // client request + let request = srv + .post() + .header(CONTENT_ENCODING, "deflate") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.execute(response.body()).unwrap(); -// assert_eq!(bytes.len(), data.len()); -// assert_eq!(bytes, Bytes::from(data)); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes.len(), data.len()); + assert_eq!(bytes, Bytes::from(data)); +} -// #[cfg(feature = "brotli")] -// #[test] -// fn test_brotli_encoding() { -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); +#[cfg(feature = "brotli")] +#[test] +fn test_brotli_encoding() { + let mut srv = TestServer::new(move || { + h1::H1Service::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// let mut e = BrotliEncoder::new(Vec::new(), 5); -// e.write_all(STR.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + let mut e = BrotliEncoder::new(Vec::new(), 5); + e.write_all(STR.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// // client request -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "br") -// .body(enc) -// .unwrap(); -// let response = srv.execute(request.send()).unwrap(); -// assert!(response.status().is_success()); + // client request + let request = srv + .post() + .header(CONTENT_ENCODING, "br") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.execute(response.body()).unwrap(); -// assert_eq!(bytes, Bytes::from_static(STR.as_ref())); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} -// #[cfg(feature = "brotli")] -// #[test] -// fn test_brotli_encoding_large() { -// let data = STR.repeat(10); -// let mut srv = test::TestServer::new(|app| { -// app.handler(|req: &HttpRequest| { -// req.body() -// .and_then(|bytes: Bytes| { -// Ok(HttpResponse::Ok() -// .content_encoding(http::ContentEncoding::Identity) -// .body(bytes)) -// }) -// .responder() -// }) -// }); +#[cfg(feature = "brotli")] +#[test] +fn test_brotli_encoding_large() { + let data = STR.repeat(10); + let mut srv = TestServer::new(move || { + h1::H1Service::new( + App::new().chain(middleware::Decompress::new()).service( + web::resource("/") + .route(web::to(move |body: Bytes| Response::Ok().body(body))), + ), + ) + }); -// let mut e = BrotliEncoder::new(Vec::new(), 5); -// e.write_all(data.as_ref()).unwrap(); -// let enc = e.finish().unwrap(); + let mut e = BrotliEncoder::new(Vec::new(), 5); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); -// // client request -// let request = srv -// .post() -// .header(http::header::CONTENT_ENCODING, "br") -// .body(enc) -// .unwrap(); -// let response = srv.execute(request.send()).unwrap(); -// assert!(response.status().is_success()); + // client request + let request = srv + .post() + .header(CONTENT_ENCODING, "br") + .send_body(enc.clone()); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); -// // read response -// let bytes = srv.execute(response.body()).unwrap(); -// assert_eq!(bytes, Bytes::from(data)); -// } + // read response + let bytes = srv.block_on(HttpMessageBody::new(&mut response)).unwrap(); + assert_eq!(bytes, Bytes::from(data)); +} // #[cfg(all(feature = "brotli", feature = "ssl"))] // #[test]