From 4e41e13baf77ee13e7948b6739b328d77e551ed6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 24 Feb 2018 07:29:35 +0300 Subject: [PATCH] refactor client payload processing --- CHANGES.md | 4 +- Cargo.toml | 1 + src/client/encoding.rs | 142 +++++++++++++++++++++++++++++++++++++++ src/client/mod.rs | 1 + src/client/parser.rs | 26 +++++--- src/client/pipeline.rs | 110 +++++++++++++++++++++++++----- src/client/request.rs | 46 ++++++++++++- src/error.rs | 2 + src/lib.rs | 2 + src/server/encoding.rs | 8 +-- tests/test_client.rs | 148 +++++++++++++++++++++++++++++++++++++++++ tests/test_server.rs | 127 +++-------------------------------- 12 files changed, 468 insertions(+), 149 deletions(-) create mode 100644 src/client/encoding.rs diff --git a/CHANGES.md b/CHANGES.md index b623c163f..a406bdfa0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -16,11 +16,11 @@ * Added http client -* Added basic websocket client +* Added websocket client * Added TestServer::ws(), test websockets client -* Added TestServer test http client +* Added TestServer http client support * Allow to override content encoding on application level diff --git a/Cargo.toml b/Cargo.toml index b7999b743..d8a4b93a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ tokio-tls = { version="0.1", optional = true } openssl = { version="0.10", optional = true } tokio-openssl = { version="0.2", optional = true } +backtrace="*" [dependencies.actix] version = "0.5" diff --git a/src/client/encoding.rs b/src/client/encoding.rs new file mode 100644 index 000000000..4764d67b1 --- /dev/null +++ b/src/client/encoding.rs @@ -0,0 +1,142 @@ +use std::io; +use std::io::{Read, Write}; +use bytes::{Bytes, BytesMut, BufMut}; + +use flate2::read::GzDecoder; +use flate2::write::DeflateDecoder; +use brotli2::write::BrotliDecoder; + +use headers::ContentEncoding; +use server::encoding::{Decoder, Wrapper}; + + +/// Payload wrapper with content decompression support +pub(crate) struct PayloadStream { + decoder: Decoder, + dst: BytesMut, +} + +impl PayloadStream { + pub fn new(enc: ContentEncoding) -> PayloadStream { + let dec = match enc { + ContentEncoding::Br => Decoder::Br( + Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))), + ContentEncoding::Deflate => Decoder::Deflate( + Box::new(DeflateDecoder::new(BytesMut::with_capacity(8192).writer()))), + ContentEncoding::Gzip => Decoder::Gzip(None), + _ => Decoder::Identity, + }; + PayloadStream{ decoder: dec, dst: BytesMut::new() } + } +} + +impl PayloadStream { + + pub fn feed_eof(&mut self) -> io::Result> { + match self.decoder { + Decoder::Br(ref mut decoder) => { + match decoder.finish() { + Ok(mut writer) => { + let b = writer.get_mut().take().freeze(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + }, + Err(err) => Err(err), + } + }, + Decoder::Gzip(ref mut decoder) => { + if let Some(ref mut decoder) = *decoder { + decoder.as_mut().get_mut().eof = true; + + loop { + self.dst.reserve(8192); + match decoder.read(unsafe{self.dst.bytes_mut()}) { + Ok(n) => { + if n == 0 { + return Ok(Some(self.dst.take().freeze())) + } else { + unsafe{self.dst.set_len(n)}; + } + } + Err(err) => return Err(err), + } + } + } else { + Ok(None) + } + }, + Decoder::Deflate(ref mut decoder) => { + match decoder.try_finish() { + Ok(_) => { + let b = decoder.get_mut().get_mut().take().freeze(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + }, + Err(err) => Err(err), + } + }, + Decoder::Identity => Ok(None), + } + } + + pub fn feed_data(&mut self, data: Bytes) -> io::Result> { + match self.decoder { + Decoder::Br(ref mut decoder) => { + match decoder.write(&data).and_then(|_| decoder.flush()) { + Ok(_) => { + let b = decoder.get_mut().get_mut().take().freeze(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + }, + Err(err) => Err(err) + } + }, + Decoder::Gzip(ref mut decoder) => { + if decoder.is_none() { + *decoder = Some( + Box::new(GzDecoder::new( + Wrapper{buf: BytesMut::from(data), eof: false}))); + } else { + let _ = decoder.as_mut().unwrap().write(&data); + } + + loop { + self.dst.reserve(8192); + match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) { + Ok(n) => { + if n == 0 { + return Ok(Some(self.dst.split_to(n).freeze())); + } else { + unsafe{self.dst.set_len(n)}; + } + } + Err(e) => return Err(e), + } + } + }, + Decoder::Deflate(ref mut decoder) => { + match decoder.write(&data).and_then(|_| decoder.flush()) { + Ok(_) => { + let b = decoder.get_mut().get_mut().take().freeze(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + }, + Err(e) => Err(e), + } + }, + Decoder::Identity => Ok(Some(data)), + } + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index f7b735437..8a8e9f500 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,6 @@ //! Http client mod connector; +mod encoding; mod parser; mod request; mod response; diff --git a/src/client/parser.rs b/src/client/parser.rs index b4ce9b2b2..03ba23f99 100644 --- a/src/client/parser.rs +++ b/src/client/parser.rs @@ -83,20 +83,34 @@ impl HttpResponseParser { -> Poll, PayloadError> where T: IoStream { - if let Some(ref mut decoder) = self.decoder { + if self.decoder.is_some() { // read payload match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => return Err(PayloadError::Incomplete), + Ok(Async::Ready(0)) => { + if buf.is_empty() { + return Err(PayloadError::Incomplete) + } + } Err(err) => return Err(err.into()), _ => (), } - decoder.decode(buf).map_err(|e| e.into()) + + match self.decoder.as_mut().unwrap().decode(buf) { + Ok(Async::Ready(Some(b))) => Ok(Async::Ready(Some(b))), + Ok(Async::Ready(None)) => { + self.decoder.take(); + Ok(Async::Ready(None)) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err.into()), + } } else { Ok(Async::Ready(None)) } } - fn parse_message(buf: &mut BytesMut) -> Poll<(ClientResponse, Option), ParseError> { + fn parse_message(buf: &mut BytesMut) -> Poll<(ClientResponse, Option), ParseError> + { // Parse http message let bytes_ptr = buf.as_ref().as_ptr() as usize; let mut headers: [httparse::Header; MAX_HEADERS] = @@ -160,10 +174,6 @@ impl HttpResponseParser { }; if let Some(decoder) = decoder { - //let info = PayloadInfo { - //tx: PayloadType::new(&hdrs, psender), - // decoder: decoder, - //}; Ok(Async::Ready( (ClientResponse::new( ClientMessage{status: status, version: version, diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index d0f339d7f..c2b3f7bdc 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -1,5 +1,6 @@ use std::{io, mem}; use bytes::{Bytes, BytesMut}; +use http::header::CONTENT_ENCODING; use futures::{Async, Future, Poll}; use futures::unsync::oneshot; @@ -8,6 +9,7 @@ use actix::prelude::*; use error::Error; use body::{Body, BodyStream}; use context::{Frame, ActorHttpContext}; +use headers::ContentEncoding; use error::PayloadError; use server::WriterState; use server::shared::SharedBytes; @@ -15,6 +17,7 @@ use super::{ClientRequest, ClientResponse}; use super::{Connect, Connection, ClientConnector, ClientConnectorError}; use super::HttpClientWriter; use super::{HttpResponseParser, HttpResponseParserError}; +use super::encoding::PayloadStream; /// A set of errors that can occur during sending request and reading response #[derive(Fail, Debug)] @@ -114,11 +117,13 @@ impl Future for SendRequest { body: body, conn: stream, writer: writer, - parser: HttpResponseParser::default(), + parser: Some(HttpResponseParser::default()), parser_buf: BytesMut::new(), disconnected: false, - running: RunningState::Running, drain: None, + decompress: None, + should_decompress: self.req.response_decompress(), + write_state: RunningState::Running, }); self.state = State::Send(pl); }, @@ -150,11 +155,13 @@ pub(crate) struct Pipeline { body: IoBody, conn: Connection, writer: HttpClientWriter, - parser: HttpResponseParser, + parser: Option, parser_buf: BytesMut, disconnected: bool, - running: RunningState, drain: Option>, + decompress: Option, + should_decompress: bool, + write_state: RunningState, } enum IoBody { @@ -163,7 +170,7 @@ enum IoBody { Done, } -#[derive(PartialEq)] +#[derive(Debug, PartialEq)] enum RunningState { Running, Paused, @@ -189,25 +196,90 @@ impl Pipeline { #[inline] pub fn parse(&mut self) -> Poll { - self.parser.parse(&mut self.conn, &mut self.parser_buf) + match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) { + Ok(Async::Ready(resp)) => { + // check content-encoding + if self.should_decompress { + if let Some(enc) = resp.headers().get(CONTENT_ENCODING) { + if let Ok(enc) = enc.to_str() { + match ContentEncoding::from(enc) { + ContentEncoding::Auto | ContentEncoding::Identity => (), + enc => self.decompress = Some(PayloadStream::new(enc)), + } + } + } + } + + Ok(Async::Ready(resp)) + } + val => val, + } } #[inline] pub fn poll(&mut self) -> Poll, PayloadError> { - self.poll_write() - .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e).as_str()))?; - Ok(self.parser.parse_payload(&mut self.conn, &mut self.parser_buf)?) + let mut need_run = false; + + // need write? + match self.poll_write() + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))? + { + Async::NotReady => need_run = true, + _ => (), + } + + // need read? + if self.parser.is_some() { + loop { + match self.parser.as_mut().unwrap() + .parse_payload(&mut self.conn, &mut self.parser_buf)? + { + Async::Ready(Some(b)) => { + if let Some(ref mut decompress) = self.decompress { + match decompress.feed_data(b) { + Ok(Some(b)) => return Ok(Async::Ready(Some(b))), + Ok(None) => return Ok(Async::NotReady), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => + continue, + Err(err) => return Err(err.into()), + } + } else { + return Ok(Async::Ready(Some(b))) + } + }, + Async::Ready(None) => { + let _ = self.parser.take(); + break + } + Async::NotReady => return Ok(Async::NotReady), + } + } + } + + // eof + if let Some(mut decompress) = self.decompress.take() { + let res = decompress.feed_eof(); + if let Some(b) = res? { + return Ok(Async::Ready(Some(b))) + } + } + + if need_run { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } } #[inline] pub fn poll_write(&mut self) -> Poll<(), Error> { - if self.running == RunningState::Done { + if self.write_state == RunningState::Done { return Ok(Async::Ready(())) } let mut done = false; - if self.drain.is_none() && self.running != RunningState::Paused { + if self.drain.is_none() && self.write_state != RunningState::Paused { 'outter: loop { let result = match mem::replace(&mut self.body, IoBody::Done) { IoBody::Payload(mut body) => { @@ -243,6 +315,7 @@ impl Pipeline { match frame { Frame::Chunk(None) => { // info.context = Some(ctx); + self.disconnected = true; self.writer.write_eof()?; break 'outter }, @@ -253,7 +326,7 @@ impl Pipeline { } self.body = IoBody::Actor(ctx); if self.drain.is_some() { - self.running.resume(); + self.write_state.resume(); break } res.unwrap() @@ -270,6 +343,7 @@ impl Pipeline { } }, IoBody::Done => { + self.disconnected = true; done = true; break } @@ -277,11 +351,11 @@ impl Pipeline { match result { WriterState::Pause => { - self.running.pause(); + self.write_state.pause(); break } WriterState::Done => { - self.running.resume() + self.write_state.resume() }, } } @@ -290,14 +364,18 @@ impl Pipeline { // flush io but only if we need to match self.writer.poll_completed(&mut self.conn, false) { Ok(Async::Ready(_)) => { - self.running.resume(); + if self.disconnected { + self.write_state = RunningState::Done; + } else { + self.write_state.resume(); + } // resolve drain futures if let Some(tx) = self.drain.take() { let _ = tx.send(()); } // restart io processing - if !done { + if !done || self.write_state == RunningState::Done { self.poll_write() } else { Ok(Async::NotReady) diff --git a/src/client/request.rs b/src/client/request.rs index 37d95fa74..fd1d40c5f 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -4,7 +4,7 @@ use std::io::Write; use actix::{Addr, Unsync}; use cookie::{Cookie, CookieJar}; use bytes::{BytesMut, BufMut}; -use http::{HeaderMap, Method, Version, Uri, HttpTryFrom, Error as HttpError}; +use http::{uri, HeaderMap, Method, Version, Uri, HttpTryFrom, Error as HttpError}; use http::header::{self, HeaderName, HeaderValue}; use serde_json; use serde::Serialize; @@ -25,6 +25,7 @@ pub struct ClientRequest { chunked: bool, upgrade: bool, encoding: ContentEncoding, + response_decompress: bool, } impl Default for ClientRequest { @@ -39,6 +40,7 @@ impl Default for ClientRequest { chunked: false, upgrade: false, encoding: ContentEncoding::Auto, + response_decompress: true, } } } @@ -89,6 +91,7 @@ impl ClientRequest { request: Some(ClientRequest::default()), err: None, cookies: None, + default_headers: true, } } @@ -158,6 +161,12 @@ impl ClientRequest { self.encoding } + /// Decompress response payload + #[inline] + pub fn response_decompress(&self) -> bool { + self.response_decompress + } + /// Get body os this response #[inline] pub fn body(&self) -> &Body { @@ -216,6 +225,7 @@ pub struct ClientRequestBuilder { request: Option, err: Option, cookies: Option, + default_headers: bool, } impl ClientRequestBuilder { @@ -409,6 +419,22 @@ impl ClientRequestBuilder { self } + /// Do not add default request headers. + /// By default `Accept-Encoding` header is set. + pub fn no_default_headers(&mut self) -> &mut Self { + self.default_headers = false; + self + } + + /// Disable automatic decompress response body + pub fn disable_decompress(&mut self) -> &mut Self { + if let Some(parts) = parts(&mut self.request, &self.err) { + parts.response_decompress = false; + } + self + } + + /// This method calls provided closure with builder reference if value is true. pub fn if_true(&mut self, value: bool, f: F) -> &mut Self where F: FnOnce(&mut ClientRequestBuilder) @@ -437,6 +463,23 @@ impl ClientRequestBuilder { return Err(e) } + if self.default_headers { + // enable br only for https + let https = + if let Some(parts) = parts(&mut self.request, &self.err) { + parts.uri.scheme_part() + .map(|s| s == &uri::Scheme::HTTPS).unwrap_or(true) + } else { + true + }; + + if https { + self.header(header::ACCEPT_ENCODING, "br, gzip, deflate"); + } else { + self.header(header::ACCEPT_ENCODING, "gzip, deflate"); + } + } + let mut request = self.request.take().expect("cannot reuse request builder"); // set cookies @@ -482,6 +525,7 @@ impl ClientRequestBuilder { request: self.request.take(), err: self.err.take(), cookies: self.cookies.take(), + default_headers: self.default_headers, } } } diff --git a/src/error.rs b/src/error.rs index 513c0f4d0..a37727cbd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -237,6 +237,8 @@ pub enum PayloadError { impl From for PayloadError { fn from(err: IoError) -> PayloadError { + use backtrace; + println!("IO ERROR {:?}", backtrace::Backtrace::new()); PayloadError::Io(err) } } diff --git a/src/lib.rs b/src/lib.rs index c9819aef3..ab683b6d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,6 +97,8 @@ extern crate openssl; #[cfg(feature="openssl")] extern crate tokio_openssl; +extern crate backtrace; + mod application; mod body; mod context; diff --git a/src/server/encoding.rs b/src/server/encoding.rs index 964754ab0..d3c78f405 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -128,7 +128,7 @@ impl PayloadWriter for PayloadType { } } -enum Decoder { +pub(crate) enum Decoder { Deflate(Box>>), Gzip(Option>>), Br(Box>>), @@ -137,9 +137,9 @@ enum Decoder { // should go after write::GzDecoder get implemented #[derive(Debug)] -struct Wrapper { - buf: BytesMut, - eof: bool, +pub(crate) struct Wrapper { + pub buf: BytesMut, + pub eof: bool, } impl io::Read for Wrapper { diff --git a/tests/test_client.rs b/tests/test_client.rs index 02a18f40b..cac1ab78e 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -2,8 +2,14 @@ extern crate actix; extern crate actix_web; extern crate bytes; extern crate futures; +extern crate flate2; + +use std::io::Read; use bytes::Bytes; +use futures::Future; +use futures::stream::once; +use flate2::read::GzDecoder; use actix_web::*; @@ -57,3 +63,145 @@ fn test_simple() { let bytes = srv.execute(response.body()).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } + +#[test] +fn test_no_decompress() { + let mut srv = test::TestServer::new( + |app| app.handler(|_| httpcodes::HTTPOk.build().body(STR))); + + let request = srv.get().disable_decompress().finish().unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + + let mut e = GzDecoder::new(&bytes[..]); + let mut dec = Vec::new(); + e.read_to_end(&mut dec).unwrap(); + assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + // POST + let request = srv.post().disable_decompress().finish().unwrap(); + let response = srv.execute(request.send()).unwrap(); + + let bytes = srv.execute(response.body()).unwrap(); + let mut e = GzDecoder::new(&bytes[..]); + let mut dec = Vec::new(); + e.read_to_end(&mut dec).unwrap(); + assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); +} + +#[test] +fn test_client_gzip_encoding() { + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { + req.body() + .and_then(|bytes: Bytes| { + Ok(httpcodes::HTTPOk + .build() + .content_encoding(headers::ContentEncoding::Deflate) + .body(bytes)) + }).responder()} + )); + + // client request + let request = srv.post() + .content_encoding(headers::ContentEncoding::Gzip) + .body(STR).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} + +#[test] +fn test_client_brotli_encoding() { + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { + req.body() + .and_then(|bytes: Bytes| { + Ok(httpcodes::HTTPOk + .build() + .content_encoding(headers::ContentEncoding::Deflate) + .body(bytes)) + }).responder()} + )); + + // client request + let request = srv.client(Method::POST, "/") + .content_encoding(headers::ContentEncoding::Br) + .body(STR).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} + +#[test] +fn test_client_deflate_encoding() { + let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { + req.body() + .and_then(|bytes: Bytes| { + Ok(httpcodes::HTTPOk + .build() + .content_encoding(headers::ContentEncoding::Br) + .body(bytes)) + }).responder()} + )); + + // client request + let request = srv.post() + .content_encoding(headers::ContentEncoding::Deflate) + .body(STR).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} + +#[test] +fn test_client_streaming_explicit() { + let mut srv = test::TestServer::new( + |app| app.handler( + |req: HttpRequest| req.body() + .map_err(Error::from) + .and_then(|body| { + Ok(httpcodes::HTTPOk.build() + .chunked() + .content_encoding(headers::ContentEncoding::Identity) + .body(body)?)}) + .responder())); + + let body = once(Ok(Bytes::from_static(STR.as_ref()))); + + let request = srv.get().body(Body::Streaming(Box::new(body))).unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} + +#[test] +fn test_body_streaming_implicit() { + let mut srv = test::TestServer::new( + |app| app.handler(|_| { + let body = once(Ok(Bytes::from_static(STR.as_ref()))); + httpcodes::HTTPOk.build() + .content_encoding(headers::ContentEncoding::Gzip) + .body(Body::Streaming(Box::new(body)))})); + + let request = srv.get().finish().unwrap(); + let response = srv.execute(request.send()).unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = srv.execute(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} diff --git a/tests/test_server.rs b/tests/test_server.rs index 2cbeba8fc..ad2028a2b 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -123,7 +123,7 @@ fn test_body_gzip() { .content_encoding(headers::ContentEncoding::Gzip) .body(STR))); - let request = srv.get().finish().unwrap(); + let request = srv.get().disable_decompress().finish().unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); @@ -138,7 +138,7 @@ fn test_body_gzip() { } #[test] -fn test_body_streaming_implicit() { +fn test_body_chunked_implicit() { let mut srv = test::TestServer::new( |app| app.handler(|_| { let body = once(Ok(Bytes::from_static(STR.as_ref()))); @@ -146,7 +146,7 @@ fn test_body_streaming_implicit() { .content_encoding(headers::ContentEncoding::Gzip) .body(Body::Streaming(Box::new(body)))})); - let request = srv.get().finish().unwrap(); + let request = srv.get().disable_decompress().finish().unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); @@ -169,7 +169,7 @@ fn test_body_br_streaming() { .content_encoding(headers::ContentEncoding::Br) .body(Body::Streaming(Box::new(body)))})); - let request = srv.get().finish().unwrap(); + let request = srv.get().disable_decompress().finish().unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); @@ -252,6 +252,7 @@ fn test_body_length() { let body = once(Ok(Bytes::from_static(STR.as_ref()))); httpcodes::HTTPOk.build() .content_length(STR.len() as u64) + .content_encoding(headers::ContentEncoding::Identity) .body(Body::Streaming(Box::new(body)))})); let request = srv.get().finish().unwrap(); @@ -264,7 +265,7 @@ fn test_body_length() { } #[test] -fn test_body_streaming_explicit() { +fn test_body_chunked_explicit() { let mut srv = test::TestServer::new( |app| app.handler(|_| { let body = once(Ok(Bytes::from_static(STR.as_ref()))); @@ -273,7 +274,7 @@ fn test_body_streaming_explicit() { .content_encoding(headers::ContentEncoding::Gzip) .body(Body::Streaming(Box::new(body)))})); - let request = srv.get().finish().unwrap(); + let request = srv.get().disable_decompress().finish().unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); @@ -297,7 +298,7 @@ fn test_body_deflate() { .body(STR))); // client request - let request = srv.get().finish().unwrap(); + let request = srv.get().disable_decompress().finish().unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); @@ -321,7 +322,7 @@ fn test_body_brotli() { .body(STR))); // client request - let request = srv.get().finish().unwrap(); + let request = srv.get().disable_decompress().finish().unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); @@ -363,34 +364,6 @@ fn test_gzip_encoding() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } -#[test] -fn test_client_gzip_encoding() { - let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { - req.body() - .and_then(|bytes: Bytes| { - Ok(httpcodes::HTTPOk - .build() - .content_encoding(headers::ContentEncoding::Deflate) - .body(bytes)) - }).responder()} - )); - - // client request - let request = srv.post() - .content_encoding(headers::ContentEncoding::Gzip) - .body(STR).unwrap(); - let response = srv.execute(request.send()).unwrap(); - assert!(response.status().is_success()); - - // read response - let bytes = srv.execute(response.body()).unwrap(); - - let mut e = DeflateDecoder::new(Vec::new()); - e.write_all(bytes.as_ref()).unwrap(); - let dec = e.finish().unwrap(); - assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); -} - #[test] fn test_deflate_encoding() { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { @@ -419,35 +392,6 @@ fn test_deflate_encoding() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } -#[test] -fn test_client_deflate_encoding() { - let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { - req.body() - .and_then(|bytes: Bytes| { - Ok(httpcodes::HTTPOk - .build() - .content_encoding(headers::ContentEncoding::Br) - .body(bytes)) - }).responder()} - )); - - // client request - let request = srv.post() - .content_encoding(headers::ContentEncoding::Deflate) - .body(STR).unwrap(); - let response = srv.execute(request.send()).unwrap(); - assert!(response.status().is_success()); - - // read response - let bytes = srv.execute(response.body()).unwrap(); - - // decode brotli - let mut e = BrotliDecoder::new(Vec::with_capacity(2048)); - e.write_all(bytes.as_ref()).unwrap(); - let dec = e.finish().unwrap(); - assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); -} - #[test] fn test_brotli_encoding() { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { @@ -476,35 +420,6 @@ fn test_brotli_encoding() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } -#[test] -fn test_client_brotli_encoding() { - let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { - req.body() - .and_then(|bytes: Bytes| { - Ok(httpcodes::HTTPOk - .build() - .content_encoding(headers::ContentEncoding::Deflate) - .body(bytes)) - }).responder()} - )); - - // client request - let request = srv.client(Method::POST, "/") - .content_encoding(headers::ContentEncoding::Br) - .body(STR).unwrap(); - let response = srv.execute(request.send()).unwrap(); - assert!(response.status().is_success()); - - // read response - let bytes = srv.execute(response.body()).unwrap(); - - // decode brotli - let mut e = DeflateDecoder::new(Vec::with_capacity(2048)); - e.write_all(bytes.as_ref()).unwrap(); - let dec = e.finish().unwrap(); - assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); -} - #[test] fn test_h2() { let srv = test::TestServer::new(|app| app.handler(|_|{ @@ -545,30 +460,6 @@ fn test_h2() { // assert_eq!(_res.unwrap(), Bytes::from_static(STR.as_ref())); } -#[test] -fn test_client_streaming_explicit() { - let mut srv = test::TestServer::new( - |app| app.handler( - |req: HttpRequest| req.body() - .map_err(Error::from) - .and_then(|body| { - Ok(httpcodes::HTTPOk.build() - .chunked() - .content_encoding(headers::ContentEncoding::Identity) - .body(body)?)}) - .responder())); - - let body = once(Ok(Bytes::from_static(STR.as_ref()))); - - let request = srv.get().body(Body::Streaming(Box::new(body))).unwrap(); - let response = srv.execute(request.send()).unwrap(); - assert!(response.status().is_success()); - - // read response - let bytes = srv.execute(response.body()).unwrap(); - assert_eq!(bytes, Bytes::from_static(STR.as_ref())); -} - #[test] fn test_application() { let mut srv = test::TestServer::with_factory(