1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

refactor and simplify content encoding

This commit is contained in:
Nikolay Kim 2018-02-25 11:43:00 +03:00
parent 141b992450
commit ab5ed27bf1
6 changed files with 129 additions and 280 deletions

View File

@ -1,142 +0,0 @@
use std::io;
use std::io::{Read, Write};
use bytes::{Bytes, BytesMut, BufMut};
use flate2::read::GzDecoder;
use flate2::write::DeflateDecoder;
use brotli2::write::BrotliDecoder;
use headers::ContentEncoding;
use server::encoding::{Decoder, Wrapper};
/// Payload wrapper with content decompression support
pub(crate) struct PayloadStream {
decoder: Decoder,
dst: BytesMut,
}
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))),
ContentEncoding::Deflate => Decoder::Deflate(
Box::new(DeflateDecoder::new(BytesMut::with_capacity(8192).writer()))),
ContentEncoding::Gzip => Decoder::Gzip(None),
_ => Decoder::Identity,
};
PayloadStream{ decoder: dec, dst: BytesMut::new() }
}
}
impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.finish() {
Ok(mut writer) => {
let b = writer.get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err),
}
},
Decoder::Gzip(ref mut decoder) => {
if let Some(ref mut decoder) = *decoder {
decoder.as_mut().get_mut().eof = true;
loop {
self.dst.reserve(8192);
match decoder.read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.take().freeze()))
} else {
unsafe{self.dst.set_len(n)};
}
}
Err(err) => return Err(err),
}
}
} else {
Ok(None)
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err),
}
},
Decoder::Identity => Ok(None),
}
}
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err)
}
},
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
*decoder = Some(
Box::new(GzDecoder::new(
Wrapper{buf: BytesMut::from(data), eof: false})));
} else {
let _ = decoder.as_mut().unwrap().write(&data);
}
loop {
self.dst.reserve(8192);
match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.split_to(n).freeze()));
} else {
unsafe{self.dst.set_len(n)};
}
}
Err(e) => return Err(e),
}
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e),
}
},
Decoder::Identity => Ok(Some(data)),
}
}
}

View File

@ -1,6 +1,5 @@
//! Http client //! Http client
mod connector; mod connector;
mod encoding;
mod parser; mod parser;
mod request; mod request;
mod response; mod response;

View File

@ -13,11 +13,11 @@ use headers::ContentEncoding;
use error::PayloadError; use error::PayloadError;
use server::WriterState; use server::WriterState;
use server::shared::SharedBytes; use server::shared::SharedBytes;
use server::encoding::PayloadStream;
use super::{ClientRequest, ClientResponse}; use super::{ClientRequest, ClientResponse};
use super::{Connect, Connection, ClientConnector, ClientConnectorError}; use super::{Connect, Connection, ClientConnector, ClientConnectorError};
use super::HttpClientWriter; use super::HttpClientWriter;
use super::{HttpResponseParser, HttpResponseParserError}; use super::{HttpResponseParser, HttpResponseParserError};
use super::encoding::PayloadStream;
/// A set of errors that can occur during sending request and reading response /// A set of errors that can occur during sending request and reading response
#[derive(Fail, Debug)] #[derive(Fail, Debug)]

View File

@ -169,16 +169,14 @@ impl io::Write for Wrapper {
} }
} }
/// Payload wrapper with content decompression support /// Payload stream with decompression support
pub(crate) struct EncodedPayload { pub(crate) struct PayloadStream {
inner: PayloadSender,
decoder: Decoder, decoder: Decoder,
dst: BytesMut, dst: BytesMut,
error: bool,
} }
impl EncodedPayload { impl PayloadStream {
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc { let dec = match enc {
ContentEncoding::Br => Decoder::Br( ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))), Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))),
@ -187,32 +185,25 @@ impl EncodedPayload {
ContentEncoding::Gzip => Decoder::Gzip(None), ContentEncoding::Gzip => Decoder::Gzip(None),
_ => Decoder::Identity, _ => Decoder::Identity,
}; };
EncodedPayload{ inner: inner, decoder: dec, error: false, dst: BytesMut::new() } PayloadStream{ decoder: dec, dst: BytesMut::new() }
} }
} }
impl PayloadWriter for EncodedPayload { impl PayloadStream {
fn set_error(&mut self, err: PayloadError) { pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
self.inner.set_error(err) match self.decoder {
}
fn feed_eof(&mut self) {
if self.error {
return
}
let err = match self.decoder {
Decoder::Br(ref mut decoder) => { Decoder::Br(ref mut decoder) => {
match decoder.finish() { match decoder.finish() {
Ok(mut writer) => { Ok(mut writer) => {
let b = writer.get_mut().take().freeze(); let b = writer.get_mut().take().freeze();
if !b.is_empty() { if !b.is_empty() {
self.inner.feed_data(b); Ok(Some(b))
} else {
Ok(None)
} }
self.inner.feed_eof();
return
}, },
Err(err) => Some(err), Err(err) => Err(err),
} }
}, },
Decoder::Gzip(ref mut decoder) => { Decoder::Gzip(ref mut decoder) => {
@ -224,20 +215,16 @@ impl PayloadWriter for EncodedPayload {
match decoder.read(unsafe{self.dst.bytes_mut()}) { match decoder.read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => { Ok(n) => {
if n == 0 { if n == 0 {
self.inner.feed_eof(); return Ok(Some(self.dst.take().freeze()))
return
} else { } else {
unsafe{self.dst.set_len(n)}; unsafe{self.dst.set_len(n)};
self.inner.feed_data(self.dst.split_to(n).freeze());
} }
} }
Err(err) => { Err(err) => return Err(err),
break Some(err);
}
} }
} }
} else { } else {
return Ok(None)
} }
}, },
Decoder::Deflate(ref mut decoder) => { Decoder::Deflate(ref mut decoder) => {
@ -245,45 +232,33 @@ impl PayloadWriter for EncodedPayload {
Ok(_) => { Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze(); let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() { if !b.is_empty() {
self.inner.feed_data(b); Ok(Some(b))
} else {
Ok(None)
} }
self.inner.feed_eof();
return
}, },
Err(err) => Some(err), Err(err) => Err(err),
} }
}, },
Decoder::Identity => { Decoder::Identity => Ok(None),
self.inner.feed_eof();
return
}
};
self.error = true;
self.decoder = Decoder::Identity;
if let Some(err) = err {
self.set_error(PayloadError::Io(err));
} else {
self.set_error(PayloadError::Incomplete);
} }
} }
fn feed_data(&mut self, data: Bytes) { pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
if self.error {
return
}
match self.decoder { match self.decoder {
Decoder::Br(ref mut decoder) => { Decoder::Br(ref mut decoder) => {
if decoder.write(&data).is_ok() && decoder.flush().is_ok() { match decoder.write(&data).and_then(|_| decoder.flush()) {
let b = decoder.get_mut().get_mut().take().freeze(); Ok(_) => {
if !b.is_empty() { let b = decoder.get_mut().get_mut().take().freeze();
self.inner.feed_data(b); if !b.is_empty() {
} Ok(Some(b))
return } else {
Ok(None)
}
},
Err(err) => Err(err)
} }
trace!("Error decoding br encoding"); },
}
Decoder::Gzip(ref mut decoder) => { Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() { if decoder.is_none() {
*decoder = Some( *decoder = Some(
@ -298,41 +273,82 @@ impl PayloadWriter for EncodedPayload {
match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) { match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => { Ok(n) => {
if n == 0 { if n == 0 {
return return Ok(Some(self.dst.split_to(n).freeze()));
} else { } else {
unsafe{self.dst.set_len(n)}; unsafe{self.dst.set_len(n)};
self.inner.feed_data(self.dst.split_to(n).freeze());
} }
} }
Err(e) => { Err(e) => return Err(e),
if e.kind() == io::ErrorKind::WouldBlock {
return
}
break
}
} }
} }
} },
Decoder::Deflate(ref mut decoder) => { Decoder::Deflate(ref mut decoder) => {
if decoder.write(&data).is_ok() && decoder.flush().is_ok() { match decoder.write(&data).and_then(|_| decoder.flush()) {
let b = decoder.get_mut().get_mut().take().freeze(); Ok(_) => {
if !b.is_empty() { let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e),
}
},
Decoder::Identity => Ok(Some(data)),
}
}
}
/// Payload wrapper with content decompression support
pub(crate) struct EncodedPayload {
inner: PayloadSender,
error: bool,
payload: PayloadStream,
}
impl EncodedPayload {
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
EncodedPayload{ inner: inner, error: false, payload: PayloadStream::new(enc) }
}
}
impl PayloadWriter for EncodedPayload {
fn set_error(&mut self, err: PayloadError) {
self.inner.set_error(err)
}
fn feed_eof(&mut self) {
if !self.error {
match self.payload.feed_eof() {
Err(err) => {
self.error = true;
self.set_error(PayloadError::Io(err));
},
Ok(value) => {
if let Some(b) = value {
self.inner.feed_data(b); self.inner.feed_data(b);
} }
return self.inner.feed_eof();
} }
trace!("Error decoding deflate encoding");
} }
Decoder::Identity => { }
self.inner.feed_data(data); }
return
}
};
self.error = true; fn feed_data(&mut self, data: Bytes) {
self.decoder = Decoder::Identity; if self.error {
self.set_error(PayloadError::EncodingCorrupted); return
}
match self.payload.feed_data(data) {
Ok(Some(b)) => self.inner.feed_data(b),
Ok(None) => (),
Err(e) => {
self.error = true;
self.set_error(e.into());
}
}
} }
fn capacity(&self) -> usize { fn capacity(&self) -> usize {
@ -340,18 +356,23 @@ impl PayloadWriter for EncodedPayload {
} }
} }
pub(crate) struct PayloadEncoder(ContentEncoder); pub(crate) enum ContentEncoder {
Deflate(DeflateEncoder<TransferEncoding>),
Gzip(GzEncoder<TransferEncoding>),
Br(BrotliEncoder<TransferEncoding>),
Identity(TransferEncoding),
}
impl PayloadEncoder { impl ContentEncoder {
pub fn empty(bytes: SharedBytes) -> PayloadEncoder { pub fn empty(bytes: SharedBytes) -> ContentEncoder {
PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof(bytes))) ContentEncoder::Identity(TransferEncoding::eof(bytes))
} }
pub fn new(buf: SharedBytes, pub fn for_server(buf: SharedBytes,
req: &HttpMessage, req: &HttpMessage,
resp: &mut HttpResponse, resp: &mut HttpResponse,
response_encoding: ContentEncoding) -> PayloadEncoder response_encoding: ContentEncoding) -> ContentEncoder
{ {
let version = resp.version().unwrap_or_else(|| req.version); let version = resp.version().unwrap_or_else(|| req.version);
let mut body = resp.replace_body(Body::Empty); let mut body = resp.replace_body(Body::Empty);
@ -440,7 +461,7 @@ impl PayloadEncoder {
} }
TransferEncoding::eof(buf) TransferEncoding::eof(buf)
} else { } else {
PayloadEncoder::streaming_encoding(buf, version, resp) ContentEncoder::streaming_encoding(buf, version, resp)
} }
} }
}; };
@ -451,18 +472,16 @@ impl PayloadEncoder {
resp.replace_body(body); resp.replace_body(body);
} }
PayloadEncoder( match encoding {
match encoding { ContentEncoding::Deflate => ContentEncoder::Deflate(
ContentEncoding::Deflate => ContentEncoder::Deflate( DeflateEncoder::new(transfer, Compression::default())),
DeflateEncoder::new(transfer, Compression::default())), ContentEncoding::Gzip => ContentEncoder::Gzip(
ContentEncoding::Gzip => ContentEncoder::Gzip( GzEncoder::new(transfer, Compression::default())),
GzEncoder::new(transfer, Compression::default())), ContentEncoding::Br => ContentEncoder::Br(
ContentEncoding::Br => ContentEncoder::Br( BrotliEncoder::new(transfer, 5)),
BrotliEncoder::new(transfer, 5)), ContentEncoding::Identity => ContentEncoder::Identity(transfer),
ContentEncoding::Identity => ContentEncoder::Identity(transfer), ContentEncoding::Auto => unreachable!()
ContentEncoding::Auto => unreachable!() }
}
)
} }
fn streaming_encoding(buf: SharedBytes, version: Version, fn streaming_encoding(buf: SharedBytes, version: Version,
@ -527,33 +546,6 @@ impl PayloadEncoder {
} }
} }
impl PayloadEncoder {
#[inline]
pub fn is_eof(&self) -> bool {
self.0.is_eof()
}
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write(&mut self, payload: Binary) -> Result<(), io::Error> {
self.0.write(payload)
}
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> {
self.0.write_eof()
}
}
pub(crate) enum ContentEncoder {
Deflate(DeflateEncoder<TransferEncoding>),
Gzip(GzEncoder<TransferEncoding>),
Br(BrotliEncoder<TransferEncoding>),
Identity(TransferEncoding),
}
impl ContentEncoder { impl ContentEncoder {
#[inline] #[inline]

View File

@ -12,7 +12,7 @@ use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes; use super::shared::SharedBytes;
use super::encoding::PayloadEncoder; use super::encoding::ContentEncoder;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -28,7 +28,7 @@ bitflags! {
pub(crate) struct H1Writer<T: AsyncWrite> { pub(crate) struct H1Writer<T: AsyncWrite> {
flags: Flags, flags: Flags,
stream: T, stream: T,
encoder: PayloadEncoder, encoder: ContentEncoder,
written: u64, written: u64,
headers_size: u32, headers_size: u32,
buffer: SharedBytes, buffer: SharedBytes,
@ -40,7 +40,7 @@ impl<T: AsyncWrite> H1Writer<T> {
H1Writer { H1Writer {
flags: Flags::empty(), flags: Flags::empty(),
stream: stream, stream: stream,
encoder: PayloadEncoder::empty(buf.clone()), encoder: ContentEncoder::empty(buf.clone()),
written: 0, written: 0,
headers_size: 0, headers_size: 0,
buffer: buf, buffer: buf,
@ -101,7 +101,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
encoding: ContentEncoding) -> io::Result<WriterState> encoding: ContentEncoding) -> io::Result<WriterState>
{ {
// prepare task // prepare task
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg, encoding); self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding);
if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
self.flags.insert(Flags::STARTED | Flags::KEEPALIVE); self.flags.insert(Flags::STARTED | Flags::KEEPALIVE);
} else { } else {

View File

@ -11,7 +11,7 @@ use body::{Body, Binary};
use headers::ContentEncoding; use headers::ContentEncoding;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use super::encoding::PayloadEncoder; use super::encoding::ContentEncoder;
use super::shared::SharedBytes; use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
@ -28,7 +28,7 @@ bitflags! {
pub(crate) struct H2Writer { pub(crate) struct H2Writer {
respond: SendResponse<Bytes>, respond: SendResponse<Bytes>,
stream: Option<SendStream<Bytes>>, stream: Option<SendStream<Bytes>>,
encoder: PayloadEncoder, encoder: ContentEncoder,
flags: Flags, flags: Flags,
written: u64, written: u64,
buffer: SharedBytes, buffer: SharedBytes,
@ -40,7 +40,7 @@ impl H2Writer {
H2Writer { H2Writer {
respond: respond, respond: respond,
stream: None, stream: None,
encoder: PayloadEncoder::empty(buf.clone()), encoder: ContentEncoder::empty(buf.clone()),
flags: Flags::empty(), flags: Flags::empty(),
written: 0, written: 0,
buffer: buf, buffer: buf,
@ -113,7 +113,7 @@ impl Writer for H2Writer {
-> io::Result<WriterState> { -> io::Result<WriterState> {
// prepare response // prepare response
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg, encoding); self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding);
if let Body::Empty = *msg.body() { if let Body::Empty = *msg.body() {
self.flags.insert(Flags::EOF); self.flags.insert(Flags::EOF);
} }