From bddd8e9c2e643696e56fa61af7ff56cccfc74ca4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 6 Nov 2017 09:24:19 -0800 Subject: [PATCH] better deflate decoding --- src/payload.rs | 109 ++++++++++++++++++++----------------------------- 1 file changed, 44 insertions(+), 65 deletions(-) diff --git a/src/payload.rs b/src/payload.rs index 0f539c8c6..98adc1692 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -8,8 +8,8 @@ use bytes::{Bytes, BytesMut, BufMut, Writer}; use http2::Error as Http2Error; use futures::{Async, Poll, Stream}; use futures::task::{Task, current as current_task}; -use flate2::{FlateReadExt, Flush, Decompress, Status as DecompressStatus}; -use flate2::read::GzDecoder; +use flate2::read::{GzDecoder}; +use flate2::write::{DeflateDecoder}; use brotli2::write::BrotliDecoder; use actix::ResponseType; @@ -206,7 +206,7 @@ impl PayloadWriter for PayloadSender { } enum Decoder { - Zlib(Decompress), + Zlib(DeflateDecoder), Gzip(Option>), Br(Rc>, BrotliDecoder), Identity, @@ -226,6 +226,27 @@ impl io::Read for Wrapper { } } +struct BytesWriter { + buf: BytesMut, +} + +impl Default for BytesWriter { + fn default() -> BytesWriter { + BytesWriter{buf: BytesMut::with_capacity(8192)} + } +} + +impl io::Write for BytesWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buf.extend(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + + #[derive(Debug)] struct WrapperRc { buf: Rc>, @@ -245,14 +266,14 @@ pub(crate) struct EncodedPayload { inner: PayloadSender, decoder: Decoder, dst: Writer, - buffer: BytesMut, error: bool, } impl EncodedPayload { pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { let dec = match enc { - ContentEncoding::Deflate => Decoder::Zlib(Decompress::new(false)), + ContentEncoding::Deflate => Decoder::Zlib( + DeflateDecoder::new(BytesWriter::default())), ContentEncoding::Gzip => Decoder::Gzip(None), ContentEncoding::Br => { let buf = Rc::new(RefCell::new(BytesMut::new())); @@ -266,7 +287,6 @@ impl EncodedPayload { decoder: dec, error: false, dst: BytesMut::new().writer(), - buffer: BytesMut::new(), } } } @@ -323,32 +343,16 @@ impl PayloadWriter for EncodedPayload { } } Decoder::Zlib(ref mut decoder) => { - let len = self.dst.get_ref().len(); - if len < self.buffer.len() * 2 { - self.dst.get_mut().reserve(self.buffer.len() * 2 - len); - unsafe{self.dst.get_mut().set_len(self.buffer.len() * 2)}; - } - - let len = self.dst.get_ref().len(); - let before_in = decoder.total_in(); - let before_out = decoder.total_out(); - let ret = decoder.decompress( - self.buffer.as_ref(), &mut self.dst.get_mut()[..len], Flush::Finish); - let read = (decoder.total_out() - before_out) as usize; - let consumed = (decoder.total_in() - before_in) as usize; - - let ch = self.dst.get_mut().split_to(read).freeze(); - if !ch.is_empty() { - self.inner.feed_data(ch); - } - self.buffer.split_to(consumed); - - match ret { - Ok(DecompressStatus::Ok) | Ok(DecompressStatus::StreamEnd) => { + match decoder.flush() { + Ok(_) => { + let b = decoder.get_mut().buf.take().freeze(); + if !b.is_empty() { + self.inner.feed_data(b); + } self.inner.feed_eof(); return }, - _ => None, + Err(err) => Some(err), } }, Decoder::Identity => { @@ -390,7 +394,7 @@ impl PayloadWriter for EncodedPayload { if decoder.is_none() { let mut buf = BytesMut::new(); buf.extend(data); - *decoder = Some(Wrapper{buf: buf}.gz_decode().unwrap()); + *decoder = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap()); } else { decoder.as_mut().unwrap().get_mut().buf.extend(data); } @@ -420,42 +424,17 @@ impl PayloadWriter for EncodedPayload { } Decoder::Zlib(ref mut decoder) => { - self.buffer.extend(data); - - loop { - if self.buffer.is_empty() { + match decoder.write(&data) { + Ok(_) => { + let b = decoder.get_mut().buf.take().freeze(); + if !b.is_empty() { + self.inner.feed_data(b); + } return - } - - let ret = { - let len = self.dst.get_ref().len(); - if len < self.buffer.len() * 2 { - self.dst.get_mut().reserve(self.buffer.len() * 2 - len); - unsafe{self.dst.get_mut().set_len(self.buffer.len() * 2)}; - } - let before_out = decoder.total_out(); - let before_in = decoder.total_in(); - - let len = self.dst.get_ref().len(); - let ret = decoder.decompress( - self.buffer.as_ref(), &mut self.dst.get_mut()[..len], Flush::None); - let read = (decoder.total_out() - before_out) as usize; - let consumed = (decoder.total_in() - before_in) as usize; - - let ch = self.dst.get_mut().split_to(read).freeze(); - if !ch.is_empty() { - self.inner.feed_data(ch); - } - if self.buffer.len() > consumed { - self.buffer.split_to(consumed); - } - ret - }; - - match ret { - Ok(DecompressStatus::Ok) => continue, - _ => break, - } + }, + Err(err) => { + trace!("Error decoding deflate encoding: {}", err); + }, } } Decoder::Identity => {