From ab5ed27bf1fe9d7b266b389672b34f8cd9914964 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 25 Feb 2018 11:43:00 +0300 Subject: [PATCH] refactor and simplify content encoding --- src/client/encoding.rs | 142 ----------------------- src/client/mod.rs | 1 - src/client/pipeline.rs | 2 +- src/server/encoding.rs | 248 ++++++++++++++++++++--------------------- src/server/h1writer.rs | 8 +- src/server/h2writer.rs | 8 +- 6 files changed, 129 insertions(+), 280 deletions(-) delete mode 100644 src/client/encoding.rs diff --git a/src/client/encoding.rs b/src/client/encoding.rs deleted file mode 100644 index 4764d67b1..000000000 --- a/src/client/encoding.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::io; -use std::io::{Read, Write}; -use bytes::{Bytes, BytesMut, BufMut}; - -use flate2::read::GzDecoder; -use flate2::write::DeflateDecoder; -use brotli2::write::BrotliDecoder; - -use headers::ContentEncoding; -use server::encoding::{Decoder, Wrapper}; - - -/// Payload wrapper with content decompression support -pub(crate) struct PayloadStream { - decoder: Decoder, - dst: BytesMut, -} - -impl PayloadStream { - pub fn new(enc: ContentEncoding) -> PayloadStream { - let dec = match enc { - ContentEncoding::Br => Decoder::Br( - Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))), - ContentEncoding::Deflate => Decoder::Deflate( - Box::new(DeflateDecoder::new(BytesMut::with_capacity(8192).writer()))), - ContentEncoding::Gzip => Decoder::Gzip(None), - _ => Decoder::Identity, - }; - PayloadStream{ decoder: dec, dst: BytesMut::new() } - } -} - -impl PayloadStream { - - pub fn feed_eof(&mut self) -> io::Result> { - match self.decoder { - Decoder::Br(ref mut decoder) => { - match decoder.finish() { - Ok(mut writer) => { - let b = writer.get_mut().take().freeze(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - }, - Err(err) => Err(err), - } - }, - Decoder::Gzip(ref mut decoder) => { - if let Some(ref mut decoder) = *decoder { - decoder.as_mut().get_mut().eof = true; - - loop { - self.dst.reserve(8192); - match decoder.read(unsafe{self.dst.bytes_mut()}) { - Ok(n) => { - if n == 0 { - return Ok(Some(self.dst.take().freeze())) - } else { - unsafe{self.dst.set_len(n)}; - } - } - Err(err) => return Err(err), - } - } - } else { - Ok(None) - } - }, - Decoder::Deflate(ref mut decoder) => { - match decoder.try_finish() { - Ok(_) => { - let b = decoder.get_mut().get_mut().take().freeze(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - }, - Err(err) => Err(err), - } - }, - Decoder::Identity => Ok(None), - } - } - - pub fn feed_data(&mut self, data: Bytes) -> io::Result> { - match self.decoder { - Decoder::Br(ref mut decoder) => { - match decoder.write(&data).and_then(|_| decoder.flush()) { - Ok(_) => { - let b = decoder.get_mut().get_mut().take().freeze(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - }, - Err(err) => Err(err) - } - }, - Decoder::Gzip(ref mut decoder) => { - if decoder.is_none() { - *decoder = Some( - Box::new(GzDecoder::new( - Wrapper{buf: BytesMut::from(data), eof: false}))); - } else { - let _ = decoder.as_mut().unwrap().write(&data); - } - - loop { - self.dst.reserve(8192); - match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) { - Ok(n) => { - if n == 0 { - return Ok(Some(self.dst.split_to(n).freeze())); - } else { - unsafe{self.dst.set_len(n)}; - } - } - Err(e) => return Err(e), - } - } - }, - Decoder::Deflate(ref mut decoder) => { - match decoder.write(&data).and_then(|_| decoder.flush()) { - Ok(_) => { - let b = decoder.get_mut().get_mut().take().freeze(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - }, - Err(e) => Err(e), - } - }, - Decoder::Identity => Ok(Some(data)), - } - } -} diff --git a/src/client/mod.rs b/src/client/mod.rs index 8a8e9f500..f7b735437 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,6 +1,5 @@ //! Http client mod connector; -mod encoding; mod parser; mod request; mod response; diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index c2b3f7bdc..46705134d 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -13,11 +13,11 @@ use headers::ContentEncoding; use error::PayloadError; use server::WriterState; use server::shared::SharedBytes; +use server::encoding::PayloadStream; use super::{ClientRequest, ClientResponse}; use super::{Connect, Connection, ClientConnector, ClientConnectorError}; use super::HttpClientWriter; use super::{HttpResponseParser, HttpResponseParserError}; -use super::encoding::PayloadStream; /// A set of errors that can occur during sending request and reading response #[derive(Fail, Debug)] diff --git a/src/server/encoding.rs b/src/server/encoding.rs index d3c78f405..9137bb420 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -169,16 +169,14 @@ impl io::Write for Wrapper { } } -/// Payload wrapper with content decompression support -pub(crate) struct EncodedPayload { - inner: PayloadSender, +/// Payload stream with decompression support +pub(crate) struct PayloadStream { decoder: Decoder, dst: BytesMut, - error: bool, } -impl EncodedPayload { - pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { +impl PayloadStream { + pub fn new(enc: ContentEncoding) -> PayloadStream { let dec = match enc { ContentEncoding::Br => Decoder::Br( Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))), @@ -187,32 +185,25 @@ impl EncodedPayload { ContentEncoding::Gzip => Decoder::Gzip(None), _ => Decoder::Identity, }; - EncodedPayload{ inner: inner, decoder: dec, error: false, dst: BytesMut::new() } + PayloadStream{ decoder: dec, dst: BytesMut::new() } } } -impl PayloadWriter for EncodedPayload { +impl PayloadStream { - fn set_error(&mut self, err: PayloadError) { - self.inner.set_error(err) - } - - fn feed_eof(&mut self) { - if self.error { - return - } - let err = match self.decoder { + pub fn feed_eof(&mut self) -> io::Result> { + match self.decoder { Decoder::Br(ref mut decoder) => { match decoder.finish() { Ok(mut writer) => { let b = writer.get_mut().take().freeze(); if !b.is_empty() { - self.inner.feed_data(b); + Ok(Some(b)) + } else { + Ok(None) } - self.inner.feed_eof(); - return }, - Err(err) => Some(err), + Err(err) => Err(err), } }, Decoder::Gzip(ref mut decoder) => { @@ -224,20 +215,16 @@ impl PayloadWriter for EncodedPayload { match decoder.read(unsafe{self.dst.bytes_mut()}) { Ok(n) => { if n == 0 { - self.inner.feed_eof(); - return + return Ok(Some(self.dst.take().freeze())) } else { unsafe{self.dst.set_len(n)}; - self.inner.feed_data(self.dst.split_to(n).freeze()); } } - Err(err) => { - break Some(err); - } + Err(err) => return Err(err), } } } else { - return + Ok(None) } }, Decoder::Deflate(ref mut decoder) => { @@ -245,45 +232,33 @@ impl PayloadWriter for EncodedPayload { Ok(_) => { let b = decoder.get_mut().get_mut().take().freeze(); if !b.is_empty() { - self.inner.feed_data(b); + Ok(Some(b)) + } else { + Ok(None) } - self.inner.feed_eof(); - return }, - Err(err) => Some(err), + Err(err) => Err(err), } }, - Decoder::Identity => { - self.inner.feed_eof(); - return - } - }; - - self.error = true; - self.decoder = Decoder::Identity; - if let Some(err) = err { - self.set_error(PayloadError::Io(err)); - } else { - self.set_error(PayloadError::Incomplete); + Decoder::Identity => Ok(None), } } - fn feed_data(&mut self, data: Bytes) { - if self.error { - return - } + pub fn feed_data(&mut self, data: Bytes) -> io::Result> { match self.decoder { Decoder::Br(ref mut decoder) => { - if decoder.write(&data).is_ok() && decoder.flush().is_ok() { - let b = decoder.get_mut().get_mut().take().freeze(); - if !b.is_empty() { - self.inner.feed_data(b); - } - return + match decoder.write(&data).and_then(|_| decoder.flush()) { + Ok(_) => { + let b = decoder.get_mut().get_mut().take().freeze(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + }, + Err(err) => Err(err) } - trace!("Error decoding br encoding"); - } - + }, Decoder::Gzip(ref mut decoder) => { if decoder.is_none() { *decoder = Some( @@ -298,41 +273,82 @@ impl PayloadWriter for EncodedPayload { match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) { Ok(n) => { if n == 0 { - return + return Ok(Some(self.dst.split_to(n).freeze())); } else { unsafe{self.dst.set_len(n)}; - self.inner.feed_data(self.dst.split_to(n).freeze()); } } - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - return - } - break - } + Err(e) => return Err(e), } } - } - + }, Decoder::Deflate(ref mut decoder) => { - if decoder.write(&data).is_ok() && decoder.flush().is_ok() { - let b = decoder.get_mut().get_mut().take().freeze(); - if !b.is_empty() { + match decoder.write(&data).and_then(|_| decoder.flush()) { + Ok(_) => { + let b = decoder.get_mut().get_mut().take().freeze(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + }, + Err(e) => Err(e), + } + }, + Decoder::Identity => Ok(Some(data)), + } + } +} + +/// Payload wrapper with content decompression support +pub(crate) struct EncodedPayload { + inner: PayloadSender, + error: bool, + payload: PayloadStream, +} + +impl EncodedPayload { + pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { + EncodedPayload{ inner: inner, error: false, payload: PayloadStream::new(enc) } + } +} + +impl PayloadWriter for EncodedPayload { + + fn set_error(&mut self, err: PayloadError) { + self.inner.set_error(err) + } + + fn feed_eof(&mut self) { + if !self.error { + match self.payload.feed_eof() { + Err(err) => { + self.error = true; + self.set_error(PayloadError::Io(err)); + }, + Ok(value) => { + if let Some(b) = value { self.inner.feed_data(b); } - return + self.inner.feed_eof(); } - trace!("Error decoding deflate encoding"); } - Decoder::Identity => { - self.inner.feed_data(data); - return - } - }; + } + } - self.error = true; - self.decoder = Decoder::Identity; - self.set_error(PayloadError::EncodingCorrupted); + fn feed_data(&mut self, data: Bytes) { + if self.error { + return + } + + match self.payload.feed_data(data) { + Ok(Some(b)) => self.inner.feed_data(b), + Ok(None) => (), + Err(e) => { + self.error = true; + self.set_error(e.into()); + } + } } fn capacity(&self) -> usize { @@ -340,18 +356,23 @@ impl PayloadWriter for EncodedPayload { } } -pub(crate) struct PayloadEncoder(ContentEncoder); +pub(crate) enum ContentEncoder { + Deflate(DeflateEncoder), + Gzip(GzEncoder), + Br(BrotliEncoder), + Identity(TransferEncoding), +} -impl PayloadEncoder { +impl ContentEncoder { - pub fn empty(bytes: SharedBytes) -> PayloadEncoder { - PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof(bytes))) + pub fn empty(bytes: SharedBytes) -> ContentEncoder { + ContentEncoder::Identity(TransferEncoding::eof(bytes)) } - pub fn new(buf: SharedBytes, - req: &HttpMessage, - resp: &mut HttpResponse, - response_encoding: ContentEncoding) -> PayloadEncoder + pub fn for_server(buf: SharedBytes, + req: &HttpMessage, + resp: &mut HttpResponse, + response_encoding: ContentEncoding) -> ContentEncoder { let version = resp.version().unwrap_or_else(|| req.version); let mut body = resp.replace_body(Body::Empty); @@ -440,7 +461,7 @@ impl PayloadEncoder { } TransferEncoding::eof(buf) } else { - PayloadEncoder::streaming_encoding(buf, version, resp) + ContentEncoder::streaming_encoding(buf, version, resp) } } }; @@ -451,18 +472,16 @@ impl PayloadEncoder { resp.replace_body(body); } - PayloadEncoder( - match encoding { - ContentEncoding::Deflate => ContentEncoder::Deflate( - DeflateEncoder::new(transfer, Compression::default())), - ContentEncoding::Gzip => ContentEncoder::Gzip( - GzEncoder::new(transfer, Compression::default())), - ContentEncoding::Br => ContentEncoder::Br( - BrotliEncoder::new(transfer, 5)), - ContentEncoding::Identity => ContentEncoder::Identity(transfer), - ContentEncoding::Auto => unreachable!() - } - ) + match encoding { + ContentEncoding::Deflate => ContentEncoder::Deflate( + DeflateEncoder::new(transfer, Compression::default())), + ContentEncoding::Gzip => ContentEncoder::Gzip( + GzEncoder::new(transfer, Compression::default())), + ContentEncoding::Br => ContentEncoder::Br( + BrotliEncoder::new(transfer, 5)), + ContentEncoding::Identity => ContentEncoder::Identity(transfer), + ContentEncoding::Auto => unreachable!() + } } fn streaming_encoding(buf: SharedBytes, version: Version, @@ -527,33 +546,6 @@ impl PayloadEncoder { } } -impl PayloadEncoder { - - #[inline] - pub fn is_eof(&self) -> bool { - self.0.is_eof() - } - - #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] - #[inline(always)] - pub fn write(&mut self, payload: Binary) -> Result<(), io::Error> { - self.0.write(payload) - } - - #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] - #[inline(always)] - pub fn write_eof(&mut self) -> Result<(), io::Error> { - self.0.write_eof() - } -} - -pub(crate) enum ContentEncoder { - Deflate(DeflateEncoder), - Gzip(GzEncoder), - Br(BrotliEncoder), - Identity(TransferEncoding), -} - impl ContentEncoder { #[inline] diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index aa9c819d7..b2b79c5f9 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -12,7 +12,7 @@ use httprequest::HttpMessage; use httpresponse::HttpResponse; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::shared::SharedBytes; -use super::encoding::PayloadEncoder; +use super::encoding::ContentEncoder; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific @@ -28,7 +28,7 @@ bitflags! { pub(crate) struct H1Writer { flags: Flags, stream: T, - encoder: PayloadEncoder, + encoder: ContentEncoder, written: u64, headers_size: u32, buffer: SharedBytes, @@ -40,7 +40,7 @@ impl H1Writer { H1Writer { flags: Flags::empty(), stream: stream, - encoder: PayloadEncoder::empty(buf.clone()), + encoder: ContentEncoder::empty(buf.clone()), written: 0, headers_size: 0, buffer: buf, @@ -101,7 +101,7 @@ impl Writer for H1Writer { encoding: ContentEncoding) -> io::Result { // prepare task - self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg, encoding); + self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding); if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { self.flags.insert(Flags::STARTED | Flags::KEEPALIVE); } else { diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index 00a981915..466f6520b 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -11,7 +11,7 @@ use body::{Body, Binary}; use headers::ContentEncoding; use httprequest::HttpMessage; use httpresponse::HttpResponse; -use super::encoding::PayloadEncoder; +use super::encoding::ContentEncoder; use super::shared::SharedBytes; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; @@ -28,7 +28,7 @@ bitflags! { pub(crate) struct H2Writer { respond: SendResponse, stream: Option>, - encoder: PayloadEncoder, + encoder: ContentEncoder, flags: Flags, written: u64, buffer: SharedBytes, @@ -40,7 +40,7 @@ impl H2Writer { H2Writer { respond: respond, stream: None, - encoder: PayloadEncoder::empty(buf.clone()), + encoder: ContentEncoder::empty(buf.clone()), flags: Flags::empty(), written: 0, buffer: buf, @@ -113,7 +113,7 @@ impl Writer for H2Writer { -> io::Result { // prepare response self.flags.insert(Flags::STARTED); - self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg, encoding); + self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding); if let Body::Empty = *msg.body() { self.flags.insert(Flags::EOF); }