From a0e6313d56e7719741cfd7ee13b72f772ee1dfb4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 6 Mar 2018 11:02:03 -0800 Subject: [PATCH] Fix compression #103 and #104 --- CHANGES.md | 2 + src/client/parser.rs | 1 + src/lib.rs | 2 +- src/server/encoding.rs | 52 ++++++++++++------------ tests/test_client.rs | 92 +++++++++++++++++++++++++++++++++++++++++- tests/test_server.rs | 45 ++++++++++++++++++--- 6 files changed, 162 insertions(+), 32 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 76cda851c..ea9a06f33 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## 0.4.5 (2018-03-xx) +* Fix compression #103 and #104 + * Enable compression support for `NamedFile` * Better support for `NamedFile` type diff --git a/src/client/parser.rs b/src/client/parser.rs index 8fe399009..3952ed3b7 100644 --- a/src/client/parser.rs +++ b/src/client/parser.rs @@ -78,6 +78,7 @@ impl HttpResponseParser { -> Poll, PayloadError> where T: IoStream { + println!("PARSE payload, {:?}", self.decoder.is_some()); if self.decoder.is_some() { loop { // read payload diff --git a/src/lib.rs b/src/lib.rs index 378f45a5b..1e0224f4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -154,7 +154,7 @@ pub(crate) const HAS_OPENSSL: bool = false; // #[cfg(not(feature="tls"))] // pub(crate) const HAS_TLS: bool = false; - +#[doc(hidden)] #[deprecated(since="0.4.4", note="please use `actix::header` module")] pub mod headers { //! Headers implementation diff --git a/src/server/encoding.rs b/src/server/encoding.rs index 45dfff141..df901818d 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -246,18 +246,14 @@ impl PayloadStream { if let Some(ref mut decoder) = *decoder { decoder.as_mut().get_mut().eof = true; - loop { - self.dst.reserve(8192); - match decoder.read(unsafe{self.dst.bytes_mut()}) { - Ok(n) => { - if n == 0 { - return Ok(Some(self.dst.take().freeze())) - } else { - unsafe{self.dst.advance_mut(n)}; - } - } - Err(e) => return Err(e), + self.dst.reserve(8192); + match decoder.read(unsafe{self.dst.bytes_mut()}) { + Ok(n) => { + unsafe{self.dst.advance_mut(n)}; + return Ok(Some(self.dst.take().freeze())) } + Err(e) => + return Err(e), } } else { Ok(None) @@ -283,8 +279,9 @@ impl PayloadStream { pub fn feed_data(&mut self, data: Bytes) -> io::Result> { match self.decoder { Decoder::Br(ref mut decoder) => { - match decoder.write(&data).and_then(|_| decoder.flush()) { + match decoder.write_all(&data) { Ok(_) => { + decoder.flush()?; let b = decoder.get_mut().take(); if !b.is_empty() { Ok(Some(b)) @@ -306,23 +303,31 @@ impl PayloadStream { loop { self.dst.reserve(8192); - match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) { + match decoder.as_mut() + .as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) + { Ok(n) => { + if n != 0 { + unsafe{self.dst.advance_mut(n)}; + } if n == 0 { return Ok(Some(self.dst.take().freeze())); - } else { - unsafe{self.dst.advance_mut(n)}; } } Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock && !self.dst.is_empty() + { + return Ok(Some(self.dst.take().freeze())); + } return Err(e) } } } }, Decoder::Deflate(ref mut decoder) => { - match decoder.write(&data).and_then(|_| decoder.flush()) { + match decoder.write_all(&data) { Ok(_) => { + decoder.flush()?; let b = decoder.get_mut().take(); if !b.is_empty() { Ok(Some(b)) @@ -590,9 +595,8 @@ impl ContentEncoder { pub fn write(&mut self, data: Binary) -> Result<(), io::Error> { match *self { ContentEncoder::Br(ref mut encoder) => { - match encoder.write(data.as_ref()) { - Ok(_) => - encoder.flush(), + match encoder.write_all(data.as_ref()) { + Ok(_) => Ok(()), Err(err) => { trace!("Error decoding br encoding: {}", err); Err(err) @@ -600,9 +604,8 @@ impl ContentEncoder { } }, ContentEncoder::Gzip(ref mut encoder) => { - match encoder.write(data.as_ref()) { - Ok(_) => - encoder.flush(), + match encoder.write_all(data.as_ref()) { + Ok(_) => Ok(()), Err(err) => { trace!("Error decoding gzip encoding: {}", err); Err(err) @@ -610,9 +613,8 @@ impl ContentEncoder { } } ContentEncoder::Deflate(ref mut encoder) => { - match encoder.write(data.as_ref()) { - Ok(_) => - encoder.flush(), + match encoder.write_all(data.as_ref()) { + Ok(_) => Ok(()), Err(err) => { trace!("Error decoding deflate encoding: {}", err); Err(err) diff --git a/tests/test_client.rs b/tests/test_client.rs index aaa3fa786..6118bc339 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -3,6 +3,7 @@ extern crate actix_web; extern crate bytes; extern crate futures; extern crate flate2; +extern crate rand; use std::io::Read; @@ -10,6 +11,7 @@ use bytes::Bytes; use futures::Future; use futures::stream::once; use flate2::read::GzDecoder; +use rand::Rng; use actix_web::*; @@ -143,7 +145,12 @@ fn test_client_gzip_encoding_large() { } #[test] -fn test_client_brotli_encoding() { +fn test_client_gzip_encoding_large_random() { + let data = rand::thread_rng() + .gen_ascii_chars() + .take(100_000) + .collect::(); + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { req.body() .and_then(|bytes: Bytes| { @@ -154,6 +161,30 @@ fn test_client_brotli_encoding() { }).responder()} )); + // client request + let request = srv.post() + .content_encoding(headers::ContentEncoding::Gzip) + .body(data.clone()).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from(data)); +} + +#[test] +fn test_client_brotli_encoding() { + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { + req.body() + .and_then(|bytes: Bytes| { + Ok(httpcodes::HTTPOk + .build() + .content_encoding(headers::ContentEncoding::Gzip) + .body(bytes)) + }).responder()} + )); + // client request let request = srv.client(Method::POST, "/") .content_encoding(headers::ContentEncoding::Br) @@ -166,6 +197,36 @@ fn test_client_brotli_encoding() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } +#[test] +fn test_client_brotli_encoding_large_random() { + let data = rand::thread_rng() + .gen_ascii_chars() + .take(70_000) + .collect::(); + + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { + req.body() + .and_then(move |bytes: Bytes| { + Ok(httpcodes::HTTPOk + .build() + .content_encoding(headers::ContentEncoding::Gzip) + .body(bytes)) + }).responder()} + )); + + // client request + let request = srv.client(Method::POST, "/") + .content_encoding(headers::ContentEncoding::Br) + .body(data.clone()).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes.len(), data.len()); + assert_eq!(bytes, Bytes::from(data)); +} + #[test] fn test_client_deflate_encoding() { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { @@ -190,6 +251,35 @@ fn test_client_deflate_encoding() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } +#[test] +fn test_client_deflate_encoding_large_random() { + let data = rand::thread_rng() + .gen_ascii_chars() + .take(70_000) + .collect::(); + + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { + req.body() + .and_then(|bytes: Bytes| { + Ok(httpcodes::HTTPOk + .build() + .content_encoding(headers::ContentEncoding::Br) + .body(bytes)) + }).responder()} + )); + + // client request + let request = srv.post() + .content_encoding(headers::ContentEncoding::Deflate) + .body(data.clone()).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from(data)); +} + #[test] fn test_client_streaming_explicit() { let mut srv = test::TestServer::new( diff --git a/tests/test_server.rs b/tests/test_server.rs index cafbea740..cf682468a 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -241,7 +241,7 @@ fn test_body_gzip_large() { fn test_body_gzip_large_random() { let data = rand::thread_rng() .gen_ascii_chars() - .take(70000) + .take(70_000) .collect::(); let srv_data = Arc::new(data.clone()); @@ -525,10 +525,10 @@ fn test_gzip_encoding_large() { } #[test] -fn test_gzip_encoding_large_random() { +fn test_reading_gzip_encoding_large_random() { let data = rand::thread_rng() .gen_ascii_chars() - .take(6000) + .take(60_000) .collect::(); let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { @@ -554,11 +554,12 @@ fn test_gzip_encoding_large_random() { // read response let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes.len(), data.len()); assert_eq!(bytes, Bytes::from(data)); } #[test] -fn test_deflate_encoding() { +fn test_reading_deflate_encoding() { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { req.body() .and_then(|bytes: Bytes| { @@ -586,7 +587,7 @@ fn test_deflate_encoding() { } #[test] -fn test_deflate_encoding_large() { +fn test_reading_deflate_encoding_large() { let data = STR.repeat(10); let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { req.body() @@ -614,6 +615,40 @@ fn test_deflate_encoding_large() { assert_eq!(bytes, Bytes::from(data)); } +#[test] +fn test_reading_deflate_encoding_large_random() { + let data = rand::thread_rng() + .gen_ascii_chars() + .take(160_000) + .collect::(); + + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { + req.body() + .and_then(|bytes: Bytes| { + Ok(httpcodes::HTTPOk + .build() + .content_encoding(headers::ContentEncoding::Identity) + .body(bytes)) + }).responder()} + )); + + let mut e = DeflateEncoder::new(Vec::new(), Compression::default()); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); + + // client request + let request = srv.post() + .header(header::CONTENT_ENCODING, "deflate") + .body(enc).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes.len(), data.len()); + assert_eq!(bytes, Bytes::from(data)); +} + #[test] fn test_brotli_encoding() { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {