diff --git a/src/encoding.rs b/src/encoding.rs new file mode 100644 index 000000000..55b3981df --- /dev/null +++ b/src/encoding.rs @@ -0,0 +1,354 @@ +use std::{io, cmp}; +use std::rc::Rc; +use std::cell::RefCell; +use std::io::{Read, Write}; + +use http::header::{HeaderMap, CONTENT_ENCODING}; +use flate2::read::{GzDecoder}; +use flate2::write::{DeflateDecoder}; +use brotli2::write::BrotliDecoder; +use bytes::{Bytes, BytesMut, BufMut, Writer}; + +use payload::{PayloadSender, PayloadWriter, PayloadError}; + +/// Represents various types of connection +#[derive(Copy, Clone, PartialEq, Debug)] +pub enum ContentEncoding { + /// Automatically select encoding based on encoding negotiation + Auto, + /// A format using the Brotli algorithm + Br, + /// A format using the zlib structure with deflate algorithm + Deflate, + /// Gzip algorithm + Gzip, + /// Indicates the identity function (i.e. no compression, nor modification) + Identity, +} + +impl<'a> From<&'a str> for ContentEncoding { + fn from(s: &'a str) -> ContentEncoding { + match s.trim().to_lowercase().as_ref() { + "br" => ContentEncoding::Br, + "gzip" => ContentEncoding::Gzip, + "deflate" => ContentEncoding::Deflate, + "identity" => ContentEncoding::Identity, + _ => ContentEncoding::Auto, + } + } +} + + +pub(crate) enum PayloadType { + Sender(PayloadSender), + Encoding(EncodedPayload), +} + +impl PayloadType { + + pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType { + // check content-encoding + let enc = if let Some(enc) = headers.get(CONTENT_ENCODING) { + if let Ok(enc) = enc.to_str() { + ContentEncoding::from(enc) + } else { + ContentEncoding::Auto + } + } else { + ContentEncoding::Auto + }; + + match enc { + ContentEncoding::Auto | ContentEncoding::Identity => + PayloadType::Sender(sender), + _ => PayloadType::Encoding(EncodedPayload::new(sender, enc)), + } + } +} + +impl PayloadWriter for PayloadType { + fn set_error(&mut self, err: PayloadError) { + match *self { + PayloadType::Sender(ref mut sender) => sender.set_error(err), + PayloadType::Encoding(ref mut enc) => enc.set_error(err), + } + } + + fn feed_eof(&mut self) { + match *self { + PayloadType::Sender(ref mut sender) => sender.feed_eof(), + PayloadType::Encoding(ref mut enc) => enc.feed_eof(), + } + } + + fn feed_data(&mut self, data: Bytes) { + match *self { + PayloadType::Sender(ref mut sender) => sender.feed_data(data), + PayloadType::Encoding(ref mut enc) => enc.feed_data(data), + } + } + + fn capacity(&self) -> usize { + match *self { + PayloadType::Sender(ref sender) => sender.capacity(), + PayloadType::Encoding(ref enc) => enc.capacity(), + } + } +} + +enum Decoder { + Zlib(DeflateDecoder), + Gzip(Option>), + Br(Rc>, BrotliDecoder), + Identity, +} + +// should go after write::GzDecoder get implemented +#[derive(Debug)] +struct Wrapper { + buf: BytesMut +} + +impl io::Read for Wrapper { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let len = cmp::min(buf.len(), self.buf.len()); + buf[..len].copy_from_slice(&self.buf[..len]); + self.buf.split_to(len); + Ok(len) + } +} + +struct BytesWriter { + buf: BytesMut, +} + +impl Default for BytesWriter { + fn default() -> BytesWriter { + BytesWriter{buf: BytesMut::with_capacity(8192)} + } +} + +impl io::Write for BytesWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buf.extend(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + + +// should go after brotli2::write::BrotliDecoder::get_mut get implemented +#[derive(Debug)] +struct WrapperRc { + buf: Rc>, +} + +impl io::Write for WrapperRc { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buf.borrow_mut().extend(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub(crate) struct EncodedPayload { + inner: PayloadSender, + decoder: Decoder, + dst: Writer, + error: bool, +} + +impl EncodedPayload { + pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { + let dec = match enc { + ContentEncoding::Deflate => Decoder::Zlib( + DeflateDecoder::new(BytesWriter::default())), + ContentEncoding::Gzip => Decoder::Gzip(None), + ContentEncoding::Br => { + let buf = Rc::new(RefCell::new(BytesMut::new())); + let buf2 = Rc::clone(&buf); + Decoder::Br(buf, BrotliDecoder::new(WrapperRc{buf: buf2})) + } + _ => Decoder::Identity, + }; + EncodedPayload { + inner: inner, + decoder: dec, + error: false, + dst: BytesMut::new().writer(), + } + } +} + +impl PayloadWriter for EncodedPayload { + + fn set_error(&mut self, err: PayloadError) { + self.inner.set_error(err) + } + + fn feed_eof(&mut self) { + if self.error { + return + } + let err = match self.decoder { + Decoder::Br(ref mut buf, ref mut decoder) => { + match decoder.flush() { + Ok(_) => { + let b = buf.borrow_mut().take().freeze(); + if !b.is_empty() { + self.inner.feed_data(b); + } + self.inner.feed_eof(); + return + }, + Err(err) => Some(err), + } + } + + Decoder::Gzip(ref mut decoder) => { + if decoder.is_none() { + self.inner.feed_eof(); + return + } + loop { + let len = self.dst.get_ref().len(); + let len_buf = decoder.as_mut().unwrap().get_mut().buf.len(); + + if len < len_buf * 2 { + self.dst.get_mut().reserve(len_buf * 2 - len); + unsafe{self.dst.get_mut().set_len(len_buf * 2)}; + } + match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) { + Ok(n) => { + if n == 0 { + self.inner.feed_eof(); + return + } else { + self.inner.feed_data(self.dst.get_mut().split_to(n).freeze()); + } + } + Err(err) => break Some(err) + } + } + } + Decoder::Zlib(ref mut decoder) => { + match decoder.flush() { + Ok(_) => { + let b = decoder.get_mut().buf.take().freeze(); + if !b.is_empty() { + self.inner.feed_data(b); + } + self.inner.feed_eof(); + return + }, + Err(err) => Some(err), + } + }, + Decoder::Identity => { + self.inner.feed_eof(); + return + } + }; + + self.error = true; + self.decoder = Decoder::Identity; + if let Some(err) = err { + self.set_error(PayloadError::ParseError(err)); + } else { + self.set_error(PayloadError::Incomplete); + } + } + + fn feed_data(&mut self, data: Bytes) { + if self.error { + return + } + match self.decoder { + Decoder::Br(ref mut buf, ref mut decoder) => { + match decoder.write(&data) { + Ok(_) => { + let b = buf.borrow_mut().take().freeze(); + if !b.is_empty() { + self.inner.feed_data(b); + } + return + }, + Err(err) => { + trace!("Error decoding br encoding: {}", err); + }, + } + } + + Decoder::Gzip(ref mut decoder) => { + if decoder.is_none() { + let mut buf = BytesMut::new(); + buf.extend(data); + *decoder = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap()); + } else { + decoder.as_mut().unwrap().get_mut().buf.extend(data); + } + + loop { + let len_buf = decoder.as_mut().unwrap().get_mut().buf.len(); + if len_buf == 0 { + return + } + + let len = self.dst.get_ref().len(); + if len < len_buf * 2 { + self.dst.get_mut().reserve(len_buf * 2 - len); + unsafe{self.dst.get_mut().set_len(len_buf * 2)}; + } + match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) { + Ok(n) => { + if n == 0 { + return + } else { + self.inner.feed_data(self.dst.get_mut().split_to(n).freeze()); + } + } + Err(_) => break + } + } + } + + Decoder::Zlib(ref mut decoder) => { + match decoder.write(&data) { + Ok(_) => { + let b = decoder.get_mut().buf.take().freeze(); + if !b.is_empty() { + self.inner.feed_data(b); + } + return + }, + Err(err) => { + trace!("Error decoding deflate encoding: {}", err); + }, + } + } + Decoder::Identity => { + self.inner.feed_data(data); + return + } + }; + + self.error = true; + self.decoder = Decoder::Identity; + self.set_error(PayloadError::EncodingCorrupted); + } + + fn capacity(&self) -> usize { + match self.decoder { + Decoder::Br(ref buf, _) => { + buf.borrow().len() + self.inner.capacity() + } + _ => { + self.inner.capacity() + } + } + } +} diff --git a/src/h1.rs b/src/h1.rs index 63d878ffd..6ade25b76 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -7,7 +7,7 @@ use std::collections::VecDeque; use actix::Arbiter; use httparse; use http::{Method, Version, HttpTryFrom, HeaderMap}; -use http::header::{self, HeaderName, HeaderValue, CONTENT_ENCODING}; +use http::header::{self, HeaderName, HeaderValue}; use bytes::{Bytes, BytesMut, BufMut}; use futures::{Future, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -20,9 +20,8 @@ use error::ParseError; use h1writer::H1Writer; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; -use httpresponse::ContentEncoding; -use payload::{Payload, PayloadError, PayloadSender, - PayloadWriter, EncodedPayload, DEFAULT_BUFFER_SIZE}; +use encoding::PayloadType; +use payload::{Payload, PayloadError, PayloadWriter, DEFAULT_BUFFER_SIZE}; const KEEPALIVE_PERIOD: u64 = 15; // seconds const INIT_BUFFER_SIZE: usize = 8192; @@ -286,25 +285,10 @@ enum Decoding { } struct PayloadInfo { - tx: PayloadInfoItem, + tx: PayloadType, decoder: Decoder, } -enum PayloadInfoItem { - Sender(PayloadSender), - Encoding(EncodedPayload), -} - -impl PayloadInfo { - - fn as_mut(&mut self) -> &mut PayloadWriter { - match self.tx { - PayloadInfoItem::Sender(ref mut sender) => sender, - PayloadInfoItem::Encoding(ref mut enc) => enc, - } - } -} - #[derive(Debug)] enum ReaderError { Disconnect, @@ -330,21 +314,21 @@ impl Reader { fn decode(&mut self, buf: &mut BytesMut) -> std::result::Result { if let Some(ref mut payload) = self.payload { - if payload.as_mut().capacity() > DEFAULT_BUFFER_SIZE { + if payload.tx.capacity() > DEFAULT_BUFFER_SIZE { return Ok(Decoding::Paused) } loop { match payload.decoder.decode(buf) { Ok(Async::Ready(Some(bytes))) => { - payload.as_mut().feed_data(bytes) + payload.tx.feed_data(bytes) }, Ok(Async::Ready(None)) => { - payload.as_mut().feed_eof(); + payload.tx.feed_eof(); return Ok(Decoding::Ready) }, Ok(Async::NotReady) => return Ok(Decoding::NotReady), Err(err) => { - payload.as_mut().set_error(err.into()); + payload.tx.set_error(err.into()); return Err(ReaderError::Payload) } } @@ -368,7 +352,7 @@ impl Reader { match self.read_from_io(io, buf) { Ok(Async::Ready(0)) => { if let Some(ref mut payload) = self.payload { - payload.as_mut().set_error(PayloadError::Incomplete); + payload.tx.set_error(PayloadError::Incomplete); } // http channel should not deal with payload errors return Err(ReaderError::Payload) @@ -379,7 +363,7 @@ impl Reader { Ok(Async::NotReady) => break, Err(err) => { if let Some(ref mut payload) = self.payload { - payload.as_mut().set_error(err.into()); + payload.tx.set_error(err.into()); } // http channel should not deal with payload errors return Err(ReaderError::Payload) @@ -394,25 +378,8 @@ impl Reader { Message::Http1(msg, decoder) => { let payload = if let Some(decoder) = decoder { let (tx, rx) = Payload::new(false); - - // Content-Encoding - let enc = if let Some(enc) = msg.headers().get(CONTENT_ENCODING) { - if let Ok(enc) = enc.to_str() { - ContentEncoding::from(enc) - } else { - ContentEncoding::Auto - } - } else { - ContentEncoding::Auto - }; - - let tx = match enc { - ContentEncoding::Auto => PayloadInfoItem::Sender(tx), - _ => PayloadInfoItem::Encoding(EncodedPayload::new(tx, enc)), - }; - let payload = PayloadInfo { - tx: tx, + tx: PayloadType::new(msg.headers(), tx), decoder: decoder, }; self.payload = Some(payload); @@ -430,7 +397,7 @@ impl Reader { Ok(Async::Ready(0)) => { trace!("parse eof"); if let Some(ref mut payload) = self.payload { - payload.as_mut().set_error( + payload.tx.set_error( PayloadError::Incomplete); } // http channel should deal with payload errors @@ -442,7 +409,7 @@ impl Reader { Ok(Async::NotReady) => break, Err(err) => { if let Some(ref mut payload) = self.payload { - payload.as_mut().set_error(err.into()); + payload.tx.set_error(err.into()); } // http channel should deal with payload errors return Err(ReaderError::Payload) diff --git a/src/h2.rs b/src/h2.rs index 2de7e102d..4d16f70fc 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -7,7 +7,6 @@ use std::collections::VecDeque; use actix::Arbiter; use http::request::Parts; -use http::header::CONTENT_ENCODING; use http2::{Reason, RecvStream}; use http2::server::{Server, Handshake, Respond}; use bytes::{Buf, Bytes}; @@ -20,8 +19,8 @@ use h2writer::H2Writer; use channel::HttpHandler; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; -use httpresponse::ContentEncoding; -use payload::{Payload, PayloadError, PayloadSender, PayloadWriter, EncodedPayload}; +use encoding::PayloadType; +use payload::{Payload, PayloadError, PayloadWriter}; const KEEPALIVE_PERIOD: u64 = 15; // seconds @@ -141,16 +140,14 @@ impl Http2 } Ok(Async::NotReady) => { // start keep-alive timer - if self.tasks.is_empty() { - if self.keepalive_timer.is_none() { - trace!("Start keep-alive timer"); - let mut timeout = Timeout::new( - Duration::new(KEEPALIVE_PERIOD, 0), - Arbiter::handle()).unwrap(); - // register timeout - let _ = timeout.poll(); - self.keepalive_timer = Some(timeout); - } + if self.tasks.is_empty() && self.keepalive_timer.is_none() { + trace!("Start keep-alive timer"); + let mut timeout = Timeout::new( + Duration::new(KEEPALIVE_PERIOD, 0), + Arbiter::handle()).unwrap(); + // register timeout + let _ = timeout.poll(); + self.keepalive_timer = Some(timeout); } } Err(err) => { @@ -195,26 +192,10 @@ impl Http2 } } -struct PayloadInfo(PayloadInfoItem); -enum PayloadInfoItem { - Sender(PayloadSender), - Encoding(EncodedPayload), -} - -impl PayloadInfo { - - fn as_mut(&mut self) -> &mut PayloadWriter { - match self.0 { - PayloadInfoItem::Sender(ref mut sender) => sender, - PayloadInfoItem::Encoding(ref mut enc) => enc, - } - } -} - struct Entry { task: Task, req: UnsafeCell, - payload: PayloadInfo, + payload: PayloadType, recv: RecvStream, stream: H2Writer, eof: bool, @@ -239,20 +220,6 @@ impl Entry { // Payload and Content-Encoding let (psender, payload) = Payload::new(false); - let enc = if let Some(enc) = req.headers().get(CONTENT_ENCODING) { - if let Ok(enc) = enc.to_str() { - ContentEncoding::from(enc) - } else { - ContentEncoding::Auto - } - } else { - ContentEncoding::Auto - }; - let psender = match enc { - ContentEncoding::Auto | ContentEncoding::Identity => - PayloadInfoItem::Sender(psender), - _ => PayloadInfoItem::Encoding(EncodedPayload::new(psender, enc)), - }; // start request processing let mut task = None; @@ -262,10 +229,11 @@ impl Entry { break } } + let psender = PayloadType::new(req.headers(), psender); Entry {task: task.unwrap_or_else(|| Task::reply(HTTPNotFound)), req: UnsafeCell::new(req), - payload: PayloadInfo(psender), + payload: psender, recv: recv, stream: H2Writer::new(resp), eof: false, @@ -280,22 +248,22 @@ impl Entry { if !self.reof { match self.recv.poll() { Ok(Async::Ready(Some(chunk))) => { - self.payload.as_mut().feed_data(chunk); + self.payload.feed_data(chunk); }, Ok(Async::Ready(None)) => { self.reof = true; }, Ok(Async::NotReady) => (), Err(err) => { - self.payload.as_mut().set_error(PayloadError::Http2(err)) + self.payload.set_error(PayloadError::Http2(err)) } } - let capacity = self.payload.as_mut().capacity(); + let capacity = self.payload.capacity(); if self.capacity != capacity { self.capacity = capacity; if let Err(err) = self.recv.release_capacity().release_capacity(capacity) { - self.payload.as_mut().set_error(PayloadError::Http2(err)) + self.payload.set_error(PayloadError::Http2(err)) } } } diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 0bf4a649f..3bc7a089e 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -10,7 +10,7 @@ use http::header::{self, HeaderName, HeaderValue}; use Cookie; use body::Body; use route::Frame; - +use encoding::ContentEncoding; /// Represents various types of connection #[derive(Copy, Clone, PartialEq, Debug)] @@ -23,33 +23,6 @@ pub enum ConnectionType { Upgrade, } -/// Represents various types of connection -#[derive(Copy, Clone, PartialEq, Debug)] -pub enum ContentEncoding { - /// Automatically select encoding based on encoding negotiation - Auto, - /// A format using the Brotli algorithm - Br, - /// A format using the zlib structure with deflate algorithm - Deflate, - /// Gzip algorithm - Gzip, - /// Indicates the identity function (i.e. no compression, nor modification) - Identity, -} - -impl<'a> From<&'a str> for ContentEncoding { - fn from(s: &'a str) -> ContentEncoding { - match s.trim().to_lowercase().as_ref() { - "br" => ContentEncoding::Br, - "gzip" => ContentEncoding::Gzip, - "deflate" => ContentEncoding::Deflate, - "identity" => ContentEncoding::Identity, - _ => ContentEncoding::Auto, - } - } -} - #[derive(Debug)] /// An HTTP Response pub struct HttpResponse { @@ -496,5 +469,5 @@ mod tests { let resp = HttpResponse::builder(StatusCode::OK) .content_encoding(ContentEncoding::Br).finish().unwrap(); assert_eq!(*resp.content_encoding(), ContentEncoding::Br); -} + } } diff --git a/src/lib.rs b/src/lib.rs index d1e6450fd..85a20aa2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ mod body; mod context; mod error; mod date; +mod encoding; mod httprequest; mod httpresponse; mod logger; @@ -61,6 +62,7 @@ pub mod ws; pub mod dev; pub mod httpcodes; pub mod multipart; +pub use encoding::ContentEncoding; pub use error::ParseError; pub use body::{Body, BinaryBody}; pub use application::{Application, ApplicationBuilder, Middleware}; diff --git a/src/payload.rs b/src/payload.rs index 9299dc692..2a00458b7 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -1,19 +1,15 @@ -use std::{io, fmt, cmp}; +use std::{fmt, cmp}; use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::collections::VecDeque; use std::error::Error; -use std::io::{Read, Write, Error as IoError}; -use bytes::{Bytes, BytesMut, BufMut, Writer}; +use std::io::{Error as IoError}; +use bytes::{Bytes, BytesMut}; use http2::Error as Http2Error; use futures::{Async, Poll, Stream}; use futures::task::{Task, current as current_task}; -use flate2::read::{GzDecoder}; -use flate2::write::{DeflateDecoder}; -use brotli2::write::BrotliDecoder; use actix::ResponseType; -use httpresponse::ContentEncoding; pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k @@ -205,262 +201,6 @@ impl PayloadWriter for PayloadSender { } } -enum Decoder { - Zlib(DeflateDecoder), - Gzip(Option>), - Br(Rc>, BrotliDecoder), - Identity, -} - -// should go after write::GzDecoder get implemented -#[derive(Debug)] -struct Wrapper { - buf: BytesMut -} - -impl io::Read for Wrapper { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let len = cmp::min(buf.len(), self.buf.len()); - buf[..len].copy_from_slice(&self.buf[..len]); - self.buf.split_to(len); - Ok(len) - } -} - -struct BytesWriter { - buf: BytesMut, -} - -impl Default for BytesWriter { - fn default() -> BytesWriter { - BytesWriter{buf: BytesMut::with_capacity(8192)} - } -} - -impl io::Write for BytesWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.buf.extend(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - - -// should go after brotli2::write::BrotliDecoder::get_mut get implemented -#[derive(Debug)] -struct WrapperRc { - buf: Rc>, -} - -impl io::Write for WrapperRc { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.buf.borrow_mut().extend(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -pub(crate) struct EncodedPayload { - inner: PayloadSender, - decoder: Decoder, - dst: Writer, - error: bool, -} - -impl EncodedPayload { - pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { - let dec = match enc { - ContentEncoding::Deflate => Decoder::Zlib( - DeflateDecoder::new(BytesWriter::default())), - ContentEncoding::Gzip => Decoder::Gzip(None), - ContentEncoding::Br => { - let buf = Rc::new(RefCell::new(BytesMut::new())); - let buf2 = Rc::clone(&buf); - Decoder::Br(buf, BrotliDecoder::new(WrapperRc{buf: buf2})) - } - _ => Decoder::Identity, - }; - EncodedPayload { - inner: inner, - decoder: dec, - error: false, - dst: BytesMut::new().writer(), - } - } -} - -impl PayloadWriter for EncodedPayload { - - fn set_error(&mut self, err: PayloadError) { - self.inner.set_error(err) - } - - fn feed_eof(&mut self) { - if self.error { - return - } - let err = match self.decoder { - Decoder::Br(ref mut buf, ref mut decoder) => { - match decoder.flush() { - Ok(_) => { - let b = buf.borrow_mut().take().freeze(); - if !b.is_empty() { - self.inner.feed_data(b); - } - self.inner.feed_eof(); - return - }, - Err(err) => Some(err), - } - } - - Decoder::Gzip(ref mut decoder) => { - if decoder.is_none() { - self.inner.feed_eof(); - return - } - loop { - let len = self.dst.get_ref().len(); - let len_buf = decoder.as_mut().unwrap().get_mut().buf.len(); - - if len < len_buf * 2 { - self.dst.get_mut().reserve(len_buf * 2 - len); - unsafe{self.dst.get_mut().set_len(len_buf * 2)}; - } - match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) { - Ok(n) => { - if n == 0 { - self.inner.feed_eof(); - return - } else { - self.inner.feed_data(self.dst.get_mut().split_to(n).freeze()); - } - } - Err(err) => break Some(err) - } - } - } - Decoder::Zlib(ref mut decoder) => { - match decoder.flush() { - Ok(_) => { - let b = decoder.get_mut().buf.take().freeze(); - if !b.is_empty() { - self.inner.feed_data(b); - } - self.inner.feed_eof(); - return - }, - Err(err) => Some(err), - } - }, - Decoder::Identity => { - self.inner.feed_eof(); - return - } - }; - - self.error = true; - self.decoder = Decoder::Identity; - if let Some(err) = err { - self.set_error(PayloadError::ParseError(err)); - } else { - self.set_error(PayloadError::Incomplete); - } - } - - fn feed_data(&mut self, data: Bytes) { - if self.error { - return - } - match self.decoder { - Decoder::Br(ref mut buf, ref mut decoder) => { - match decoder.write(&data) { - Ok(_) => { - let b = buf.borrow_mut().take().freeze(); - if !b.is_empty() { - self.inner.feed_data(b); - } - return - }, - Err(err) => { - trace!("Error decoding br encoding: {}", err); - }, - } - } - - Decoder::Gzip(ref mut decoder) => { - if decoder.is_none() { - let mut buf = BytesMut::new(); - buf.extend(data); - *decoder = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap()); - } else { - decoder.as_mut().unwrap().get_mut().buf.extend(data); - } - - loop { - let len_buf = decoder.as_mut().unwrap().get_mut().buf.len(); - if len_buf == 0 { - return - } - - let len = self.dst.get_ref().len(); - if len < len_buf * 2 { - self.dst.get_mut().reserve(len_buf * 2 - len); - unsafe{self.dst.get_mut().set_len(len_buf * 2)}; - } - match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) { - Ok(n) => { - if n == 0 { - return - } else { - self.inner.feed_data(self.dst.get_mut().split_to(n).freeze()); - } - } - Err(_) => break - } - } - } - - Decoder::Zlib(ref mut decoder) => { - match decoder.write(&data) { - Ok(_) => { - let b = decoder.get_mut().buf.take().freeze(); - if !b.is_empty() { - self.inner.feed_data(b); - } - return - }, - Err(err) => { - trace!("Error decoding deflate encoding: {}", err); - }, - } - } - Decoder::Identity => { - self.inner.feed_data(data); - return - } - }; - - self.error = true; - self.decoder = Decoder::Identity; - self.set_error(PayloadError::EncodingCorrupted); - } - - fn capacity(&self) -> usize { - match self.decoder { - Decoder::Br(ref buf, _) => { - buf.borrow().len() + self.inner.capacity() - } - _ => { - self.inner.capacity() - } - } - } -} #[derive(Debug)] struct Inner {