mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-23 23:51:06 +01:00
refactor content encoding
This commit is contained in:
parent
994d0afd80
commit
a65fd695e1
354
src/encoding.rs
Normal file
354
src/encoding.rs
Normal file
@ -0,0 +1,354 @@
|
|||||||
|
use std::{io, cmp};
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
|
||||||
|
use http::header::{HeaderMap, CONTENT_ENCODING};
|
||||||
|
use flate2::read::{GzDecoder};
|
||||||
|
use flate2::write::{DeflateDecoder};
|
||||||
|
use brotli2::write::BrotliDecoder;
|
||||||
|
use bytes::{Bytes, BytesMut, BufMut, Writer};
|
||||||
|
|
||||||
|
use payload::{PayloadSender, PayloadWriter, PayloadError};
|
||||||
|
|
||||||
|
/// Represents various types of connection
|
||||||
|
#[derive(Copy, Clone, PartialEq, Debug)]
|
||||||
|
pub enum ContentEncoding {
|
||||||
|
/// Automatically select encoding based on encoding negotiation
|
||||||
|
Auto,
|
||||||
|
/// A format using the Brotli algorithm
|
||||||
|
Br,
|
||||||
|
/// A format using the zlib structure with deflate algorithm
|
||||||
|
Deflate,
|
||||||
|
/// Gzip algorithm
|
||||||
|
Gzip,
|
||||||
|
/// Indicates the identity function (i.e. no compression, nor modification)
|
||||||
|
Identity,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> From<&'a str> for ContentEncoding {
|
||||||
|
fn from(s: &'a str) -> ContentEncoding {
|
||||||
|
match s.trim().to_lowercase().as_ref() {
|
||||||
|
"br" => ContentEncoding::Br,
|
||||||
|
"gzip" => ContentEncoding::Gzip,
|
||||||
|
"deflate" => ContentEncoding::Deflate,
|
||||||
|
"identity" => ContentEncoding::Identity,
|
||||||
|
_ => ContentEncoding::Auto,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub(crate) enum PayloadType {
|
||||||
|
Sender(PayloadSender),
|
||||||
|
Encoding(EncodedPayload),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PayloadType {
|
||||||
|
|
||||||
|
pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType {
|
||||||
|
// check content-encoding
|
||||||
|
let enc = if let Some(enc) = headers.get(CONTENT_ENCODING) {
|
||||||
|
if let Ok(enc) = enc.to_str() {
|
||||||
|
ContentEncoding::from(enc)
|
||||||
|
} else {
|
||||||
|
ContentEncoding::Auto
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ContentEncoding::Auto
|
||||||
|
};
|
||||||
|
|
||||||
|
match enc {
|
||||||
|
ContentEncoding::Auto | ContentEncoding::Identity =>
|
||||||
|
PayloadType::Sender(sender),
|
||||||
|
_ => PayloadType::Encoding(EncodedPayload::new(sender, enc)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PayloadWriter for PayloadType {
|
||||||
|
fn set_error(&mut self, err: PayloadError) {
|
||||||
|
match *self {
|
||||||
|
PayloadType::Sender(ref mut sender) => sender.set_error(err),
|
||||||
|
PayloadType::Encoding(ref mut enc) => enc.set_error(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn feed_eof(&mut self) {
|
||||||
|
match *self {
|
||||||
|
PayloadType::Sender(ref mut sender) => sender.feed_eof(),
|
||||||
|
PayloadType::Encoding(ref mut enc) => enc.feed_eof(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn feed_data(&mut self, data: Bytes) {
|
||||||
|
match *self {
|
||||||
|
PayloadType::Sender(ref mut sender) => sender.feed_data(data),
|
||||||
|
PayloadType::Encoding(ref mut enc) => enc.feed_data(data),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn capacity(&self) -> usize {
|
||||||
|
match *self {
|
||||||
|
PayloadType::Sender(ref sender) => sender.capacity(),
|
||||||
|
PayloadType::Encoding(ref enc) => enc.capacity(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Decoder {
|
||||||
|
Zlib(DeflateDecoder<BytesWriter>),
|
||||||
|
Gzip(Option<GzDecoder<Wrapper>>),
|
||||||
|
Br(Rc<RefCell<BytesMut>>, BrotliDecoder<WrapperRc>),
|
||||||
|
Identity,
|
||||||
|
}
|
||||||
|
|
||||||
|
// should go after write::GzDecoder get implemented
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Wrapper {
|
||||||
|
buf: BytesMut
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Read for Wrapper {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
let len = cmp::min(buf.len(), self.buf.len());
|
||||||
|
buf[..len].copy_from_slice(&self.buf[..len]);
|
||||||
|
self.buf.split_to(len);
|
||||||
|
Ok(len)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BytesWriter {
|
||||||
|
buf: BytesMut,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for BytesWriter {
|
||||||
|
fn default() -> BytesWriter {
|
||||||
|
BytesWriter{buf: BytesMut::with_capacity(8192)}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Write for BytesWriter {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
self.buf.extend(buf);
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// should go after brotli2::write::BrotliDecoder::get_mut get implemented
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct WrapperRc {
|
||||||
|
buf: Rc<RefCell<BytesMut>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Write for WrapperRc {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
self.buf.borrow_mut().extend(buf);
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct EncodedPayload {
|
||||||
|
inner: PayloadSender,
|
||||||
|
decoder: Decoder,
|
||||||
|
dst: Writer<BytesMut>,
|
||||||
|
error: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EncodedPayload {
|
||||||
|
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
|
||||||
|
let dec = match enc {
|
||||||
|
ContentEncoding::Deflate => Decoder::Zlib(
|
||||||
|
DeflateDecoder::new(BytesWriter::default())),
|
||||||
|
ContentEncoding::Gzip => Decoder::Gzip(None),
|
||||||
|
ContentEncoding::Br => {
|
||||||
|
let buf = Rc::new(RefCell::new(BytesMut::new()));
|
||||||
|
let buf2 = Rc::clone(&buf);
|
||||||
|
Decoder::Br(buf, BrotliDecoder::new(WrapperRc{buf: buf2}))
|
||||||
|
}
|
||||||
|
_ => Decoder::Identity,
|
||||||
|
};
|
||||||
|
EncodedPayload {
|
||||||
|
inner: inner,
|
||||||
|
decoder: dec,
|
||||||
|
error: false,
|
||||||
|
dst: BytesMut::new().writer(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PayloadWriter for EncodedPayload {
|
||||||
|
|
||||||
|
fn set_error(&mut self, err: PayloadError) {
|
||||||
|
self.inner.set_error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn feed_eof(&mut self) {
|
||||||
|
if self.error {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
let err = match self.decoder {
|
||||||
|
Decoder::Br(ref mut buf, ref mut decoder) => {
|
||||||
|
match decoder.flush() {
|
||||||
|
Ok(_) => {
|
||||||
|
let b = buf.borrow_mut().take().freeze();
|
||||||
|
if !b.is_empty() {
|
||||||
|
self.inner.feed_data(b);
|
||||||
|
}
|
||||||
|
self.inner.feed_eof();
|
||||||
|
return
|
||||||
|
},
|
||||||
|
Err(err) => Some(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Decoder::Gzip(ref mut decoder) => {
|
||||||
|
if decoder.is_none() {
|
||||||
|
self.inner.feed_eof();
|
||||||
|
return
|
||||||
|
}
|
||||||
|
loop {
|
||||||
|
let len = self.dst.get_ref().len();
|
||||||
|
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
|
||||||
|
|
||||||
|
if len < len_buf * 2 {
|
||||||
|
self.dst.get_mut().reserve(len_buf * 2 - len);
|
||||||
|
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
||||||
|
}
|
||||||
|
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
self.inner.feed_eof();
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
self.inner.feed_data(self.dst.get_mut().split_to(n).freeze());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => break Some(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Decoder::Zlib(ref mut decoder) => {
|
||||||
|
match decoder.flush() {
|
||||||
|
Ok(_) => {
|
||||||
|
let b = decoder.get_mut().buf.take().freeze();
|
||||||
|
if !b.is_empty() {
|
||||||
|
self.inner.feed_data(b);
|
||||||
|
}
|
||||||
|
self.inner.feed_eof();
|
||||||
|
return
|
||||||
|
},
|
||||||
|
Err(err) => Some(err),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Decoder::Identity => {
|
||||||
|
self.inner.feed_eof();
|
||||||
|
return
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.error = true;
|
||||||
|
self.decoder = Decoder::Identity;
|
||||||
|
if let Some(err) = err {
|
||||||
|
self.set_error(PayloadError::ParseError(err));
|
||||||
|
} else {
|
||||||
|
self.set_error(PayloadError::Incomplete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn feed_data(&mut self, data: Bytes) {
|
||||||
|
if self.error {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
match self.decoder {
|
||||||
|
Decoder::Br(ref mut buf, ref mut decoder) => {
|
||||||
|
match decoder.write(&data) {
|
||||||
|
Ok(_) => {
|
||||||
|
let b = buf.borrow_mut().take().freeze();
|
||||||
|
if !b.is_empty() {
|
||||||
|
self.inner.feed_data(b);
|
||||||
|
}
|
||||||
|
return
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
trace!("Error decoding br encoding: {}", err);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Decoder::Gzip(ref mut decoder) => {
|
||||||
|
if decoder.is_none() {
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
buf.extend(data);
|
||||||
|
*decoder = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap());
|
||||||
|
} else {
|
||||||
|
decoder.as_mut().unwrap().get_mut().buf.extend(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
|
||||||
|
if len_buf == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
let len = self.dst.get_ref().len();
|
||||||
|
if len < len_buf * 2 {
|
||||||
|
self.dst.get_mut().reserve(len_buf * 2 - len);
|
||||||
|
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
||||||
|
}
|
||||||
|
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
self.inner.feed_data(self.dst.get_mut().split_to(n).freeze());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Decoder::Zlib(ref mut decoder) => {
|
||||||
|
match decoder.write(&data) {
|
||||||
|
Ok(_) => {
|
||||||
|
let b = decoder.get_mut().buf.take().freeze();
|
||||||
|
if !b.is_empty() {
|
||||||
|
self.inner.feed_data(b);
|
||||||
|
}
|
||||||
|
return
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
trace!("Error decoding deflate encoding: {}", err);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Decoder::Identity => {
|
||||||
|
self.inner.feed_data(data);
|
||||||
|
return
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.error = true;
|
||||||
|
self.decoder = Decoder::Identity;
|
||||||
|
self.set_error(PayloadError::EncodingCorrupted);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn capacity(&self) -> usize {
|
||||||
|
match self.decoder {
|
||||||
|
Decoder::Br(ref buf, _) => {
|
||||||
|
buf.borrow().len() + self.inner.capacity()
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
self.inner.capacity()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
59
src/h1.rs
59
src/h1.rs
@ -7,7 +7,7 @@ use std::collections::VecDeque;
|
|||||||
use actix::Arbiter;
|
use actix::Arbiter;
|
||||||
use httparse;
|
use httparse;
|
||||||
use http::{Method, Version, HttpTryFrom, HeaderMap};
|
use http::{Method, Version, HttpTryFrom, HeaderMap};
|
||||||
use http::header::{self, HeaderName, HeaderValue, CONTENT_ENCODING};
|
use http::header::{self, HeaderName, HeaderValue};
|
||||||
use bytes::{Bytes, BytesMut, BufMut};
|
use bytes::{Bytes, BytesMut, BufMut};
|
||||||
use futures::{Future, Poll, Async};
|
use futures::{Future, Poll, Async};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
@ -20,9 +20,8 @@ use error::ParseError;
|
|||||||
use h1writer::H1Writer;
|
use h1writer::H1Writer;
|
||||||
use httpcodes::HTTPNotFound;
|
use httpcodes::HTTPNotFound;
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::ContentEncoding;
|
use encoding::PayloadType;
|
||||||
use payload::{Payload, PayloadError, PayloadSender,
|
use payload::{Payload, PayloadError, PayloadWriter, DEFAULT_BUFFER_SIZE};
|
||||||
PayloadWriter, EncodedPayload, DEFAULT_BUFFER_SIZE};
|
|
||||||
|
|
||||||
const KEEPALIVE_PERIOD: u64 = 15; // seconds
|
const KEEPALIVE_PERIOD: u64 = 15; // seconds
|
||||||
const INIT_BUFFER_SIZE: usize = 8192;
|
const INIT_BUFFER_SIZE: usize = 8192;
|
||||||
@ -286,25 +285,10 @@ enum Decoding {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct PayloadInfo {
|
struct PayloadInfo {
|
||||||
tx: PayloadInfoItem,
|
tx: PayloadType,
|
||||||
decoder: Decoder,
|
decoder: Decoder,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum PayloadInfoItem {
|
|
||||||
Sender(PayloadSender),
|
|
||||||
Encoding(EncodedPayload),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PayloadInfo {
|
|
||||||
|
|
||||||
fn as_mut(&mut self) -> &mut PayloadWriter {
|
|
||||||
match self.tx {
|
|
||||||
PayloadInfoItem::Sender(ref mut sender) => sender,
|
|
||||||
PayloadInfoItem::Encoding(ref mut enc) => enc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum ReaderError {
|
enum ReaderError {
|
||||||
Disconnect,
|
Disconnect,
|
||||||
@ -330,21 +314,21 @@ impl Reader {
|
|||||||
fn decode(&mut self, buf: &mut BytesMut) -> std::result::Result<Decoding, ReaderError>
|
fn decode(&mut self, buf: &mut BytesMut) -> std::result::Result<Decoding, ReaderError>
|
||||||
{
|
{
|
||||||
if let Some(ref mut payload) = self.payload {
|
if let Some(ref mut payload) = self.payload {
|
||||||
if payload.as_mut().capacity() > DEFAULT_BUFFER_SIZE {
|
if payload.tx.capacity() > DEFAULT_BUFFER_SIZE {
|
||||||
return Ok(Decoding::Paused)
|
return Ok(Decoding::Paused)
|
||||||
}
|
}
|
||||||
loop {
|
loop {
|
||||||
match payload.decoder.decode(buf) {
|
match payload.decoder.decode(buf) {
|
||||||
Ok(Async::Ready(Some(bytes))) => {
|
Ok(Async::Ready(Some(bytes))) => {
|
||||||
payload.as_mut().feed_data(bytes)
|
payload.tx.feed_data(bytes)
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
payload.as_mut().feed_eof();
|
payload.tx.feed_eof();
|
||||||
return Ok(Decoding::Ready)
|
return Ok(Decoding::Ready)
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
|
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
payload.as_mut().set_error(err.into());
|
payload.tx.set_error(err.into());
|
||||||
return Err(ReaderError::Payload)
|
return Err(ReaderError::Payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -368,7 +352,7 @@ impl Reader {
|
|||||||
match self.read_from_io(io, buf) {
|
match self.read_from_io(io, buf) {
|
||||||
Ok(Async::Ready(0)) => {
|
Ok(Async::Ready(0)) => {
|
||||||
if let Some(ref mut payload) = self.payload {
|
if let Some(ref mut payload) = self.payload {
|
||||||
payload.as_mut().set_error(PayloadError::Incomplete);
|
payload.tx.set_error(PayloadError::Incomplete);
|
||||||
}
|
}
|
||||||
// http channel should not deal with payload errors
|
// http channel should not deal with payload errors
|
||||||
return Err(ReaderError::Payload)
|
return Err(ReaderError::Payload)
|
||||||
@ -379,7 +363,7 @@ impl Reader {
|
|||||||
Ok(Async::NotReady) => break,
|
Ok(Async::NotReady) => break,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if let Some(ref mut payload) = self.payload {
|
if let Some(ref mut payload) = self.payload {
|
||||||
payload.as_mut().set_error(err.into());
|
payload.tx.set_error(err.into());
|
||||||
}
|
}
|
||||||
// http channel should not deal with payload errors
|
// http channel should not deal with payload errors
|
||||||
return Err(ReaderError::Payload)
|
return Err(ReaderError::Payload)
|
||||||
@ -394,25 +378,8 @@ impl Reader {
|
|||||||
Message::Http1(msg, decoder) => {
|
Message::Http1(msg, decoder) => {
|
||||||
let payload = if let Some(decoder) = decoder {
|
let payload = if let Some(decoder) = decoder {
|
||||||
let (tx, rx) = Payload::new(false);
|
let (tx, rx) = Payload::new(false);
|
||||||
|
|
||||||
// Content-Encoding
|
|
||||||
let enc = if let Some(enc) = msg.headers().get(CONTENT_ENCODING) {
|
|
||||||
if let Ok(enc) = enc.to_str() {
|
|
||||||
ContentEncoding::from(enc)
|
|
||||||
} else {
|
|
||||||
ContentEncoding::Auto
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ContentEncoding::Auto
|
|
||||||
};
|
|
||||||
|
|
||||||
let tx = match enc {
|
|
||||||
ContentEncoding::Auto => PayloadInfoItem::Sender(tx),
|
|
||||||
_ => PayloadInfoItem::Encoding(EncodedPayload::new(tx, enc)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let payload = PayloadInfo {
|
let payload = PayloadInfo {
|
||||||
tx: tx,
|
tx: PayloadType::new(msg.headers(), tx),
|
||||||
decoder: decoder,
|
decoder: decoder,
|
||||||
};
|
};
|
||||||
self.payload = Some(payload);
|
self.payload = Some(payload);
|
||||||
@ -430,7 +397,7 @@ impl Reader {
|
|||||||
Ok(Async::Ready(0)) => {
|
Ok(Async::Ready(0)) => {
|
||||||
trace!("parse eof");
|
trace!("parse eof");
|
||||||
if let Some(ref mut payload) = self.payload {
|
if let Some(ref mut payload) = self.payload {
|
||||||
payload.as_mut().set_error(
|
payload.tx.set_error(
|
||||||
PayloadError::Incomplete);
|
PayloadError::Incomplete);
|
||||||
}
|
}
|
||||||
// http channel should deal with payload errors
|
// http channel should deal with payload errors
|
||||||
@ -442,7 +409,7 @@ impl Reader {
|
|||||||
Ok(Async::NotReady) => break,
|
Ok(Async::NotReady) => break,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
if let Some(ref mut payload) = self.payload {
|
if let Some(ref mut payload) = self.payload {
|
||||||
payload.as_mut().set_error(err.into());
|
payload.tx.set_error(err.into());
|
||||||
}
|
}
|
||||||
// http channel should deal with payload errors
|
// http channel should deal with payload errors
|
||||||
return Err(ReaderError::Payload)
|
return Err(ReaderError::Payload)
|
||||||
|
52
src/h2.rs
52
src/h2.rs
@ -7,7 +7,6 @@ use std::collections::VecDeque;
|
|||||||
|
|
||||||
use actix::Arbiter;
|
use actix::Arbiter;
|
||||||
use http::request::Parts;
|
use http::request::Parts;
|
||||||
use http::header::CONTENT_ENCODING;
|
|
||||||
use http2::{Reason, RecvStream};
|
use http2::{Reason, RecvStream};
|
||||||
use http2::server::{Server, Handshake, Respond};
|
use http2::server::{Server, Handshake, Respond};
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
@ -20,8 +19,8 @@ use h2writer::H2Writer;
|
|||||||
use channel::HttpHandler;
|
use channel::HttpHandler;
|
||||||
use httpcodes::HTTPNotFound;
|
use httpcodes::HTTPNotFound;
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::ContentEncoding;
|
use encoding::PayloadType;
|
||||||
use payload::{Payload, PayloadError, PayloadSender, PayloadWriter, EncodedPayload};
|
use payload::{Payload, PayloadError, PayloadWriter};
|
||||||
|
|
||||||
const KEEPALIVE_PERIOD: u64 = 15; // seconds
|
const KEEPALIVE_PERIOD: u64 = 15; // seconds
|
||||||
|
|
||||||
@ -141,8 +140,7 @@ impl<T, A, H> Http2<T, A, H>
|
|||||||
}
|
}
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
// start keep-alive timer
|
// start keep-alive timer
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() && self.keepalive_timer.is_none() {
|
||||||
if self.keepalive_timer.is_none() {
|
|
||||||
trace!("Start keep-alive timer");
|
trace!("Start keep-alive timer");
|
||||||
let mut timeout = Timeout::new(
|
let mut timeout = Timeout::new(
|
||||||
Duration::new(KEEPALIVE_PERIOD, 0),
|
Duration::new(KEEPALIVE_PERIOD, 0),
|
||||||
@ -152,7 +150,6 @@ impl<T, A, H> Http2<T, A, H>
|
|||||||
self.keepalive_timer = Some(timeout);
|
self.keepalive_timer = Some(timeout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
trace!("Connection error: {}", err);
|
trace!("Connection error: {}", err);
|
||||||
self.disconnected = true;
|
self.disconnected = true;
|
||||||
@ -195,26 +192,10 @@ impl<T, A, H> Http2<T, A, H>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PayloadInfo(PayloadInfoItem);
|
|
||||||
enum PayloadInfoItem {
|
|
||||||
Sender(PayloadSender),
|
|
||||||
Encoding(EncodedPayload),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PayloadInfo {
|
|
||||||
|
|
||||||
fn as_mut(&mut self) -> &mut PayloadWriter {
|
|
||||||
match self.0 {
|
|
||||||
PayloadInfoItem::Sender(ref mut sender) => sender,
|
|
||||||
PayloadInfoItem::Encoding(ref mut enc) => enc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Entry {
|
struct Entry {
|
||||||
task: Task,
|
task: Task,
|
||||||
req: UnsafeCell<HttpRequest>,
|
req: UnsafeCell<HttpRequest>,
|
||||||
payload: PayloadInfo,
|
payload: PayloadType,
|
||||||
recv: RecvStream,
|
recv: RecvStream,
|
||||||
stream: H2Writer,
|
stream: H2Writer,
|
||||||
eof: bool,
|
eof: bool,
|
||||||
@ -239,20 +220,6 @@ impl Entry {
|
|||||||
|
|
||||||
// Payload and Content-Encoding
|
// Payload and Content-Encoding
|
||||||
let (psender, payload) = Payload::new(false);
|
let (psender, payload) = Payload::new(false);
|
||||||
let enc = if let Some(enc) = req.headers().get(CONTENT_ENCODING) {
|
|
||||||
if let Ok(enc) = enc.to_str() {
|
|
||||||
ContentEncoding::from(enc)
|
|
||||||
} else {
|
|
||||||
ContentEncoding::Auto
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ContentEncoding::Auto
|
|
||||||
};
|
|
||||||
let psender = match enc {
|
|
||||||
ContentEncoding::Auto | ContentEncoding::Identity =>
|
|
||||||
PayloadInfoItem::Sender(psender),
|
|
||||||
_ => PayloadInfoItem::Encoding(EncodedPayload::new(psender, enc)),
|
|
||||||
};
|
|
||||||
|
|
||||||
// start request processing
|
// start request processing
|
||||||
let mut task = None;
|
let mut task = None;
|
||||||
@ -262,10 +229,11 @@ impl Entry {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let psender = PayloadType::new(req.headers(), psender);
|
||||||
|
|
||||||
Entry {task: task.unwrap_or_else(|| Task::reply(HTTPNotFound)),
|
Entry {task: task.unwrap_or_else(|| Task::reply(HTTPNotFound)),
|
||||||
req: UnsafeCell::new(req),
|
req: UnsafeCell::new(req),
|
||||||
payload: PayloadInfo(psender),
|
payload: psender,
|
||||||
recv: recv,
|
recv: recv,
|
||||||
stream: H2Writer::new(resp),
|
stream: H2Writer::new(resp),
|
||||||
eof: false,
|
eof: false,
|
||||||
@ -280,22 +248,22 @@ impl Entry {
|
|||||||
if !self.reof {
|
if !self.reof {
|
||||||
match self.recv.poll() {
|
match self.recv.poll() {
|
||||||
Ok(Async::Ready(Some(chunk))) => {
|
Ok(Async::Ready(Some(chunk))) => {
|
||||||
self.payload.as_mut().feed_data(chunk);
|
self.payload.feed_data(chunk);
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
self.reof = true;
|
self.reof = true;
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => (),
|
Ok(Async::NotReady) => (),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.payload.as_mut().set_error(PayloadError::Http2(err))
|
self.payload.set_error(PayloadError::Http2(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let capacity = self.payload.as_mut().capacity();
|
let capacity = self.payload.capacity();
|
||||||
if self.capacity != capacity {
|
if self.capacity != capacity {
|
||||||
self.capacity = capacity;
|
self.capacity = capacity;
|
||||||
if let Err(err) = self.recv.release_capacity().release_capacity(capacity) {
|
if let Err(err) = self.recv.release_capacity().release_capacity(capacity) {
|
||||||
self.payload.as_mut().set_error(PayloadError::Http2(err))
|
self.payload.set_error(PayloadError::Http2(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ use http::header::{self, HeaderName, HeaderValue};
|
|||||||
use Cookie;
|
use Cookie;
|
||||||
use body::Body;
|
use body::Body;
|
||||||
use route::Frame;
|
use route::Frame;
|
||||||
|
use encoding::ContentEncoding;
|
||||||
|
|
||||||
/// Represents various types of connection
|
/// Represents various types of connection
|
||||||
#[derive(Copy, Clone, PartialEq, Debug)]
|
#[derive(Copy, Clone, PartialEq, Debug)]
|
||||||
@ -23,33 +23,6 @@ pub enum ConnectionType {
|
|||||||
Upgrade,
|
Upgrade,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents various types of connection
|
|
||||||
#[derive(Copy, Clone, PartialEq, Debug)]
|
|
||||||
pub enum ContentEncoding {
|
|
||||||
/// Automatically select encoding based on encoding negotiation
|
|
||||||
Auto,
|
|
||||||
/// A format using the Brotli algorithm
|
|
||||||
Br,
|
|
||||||
/// A format using the zlib structure with deflate algorithm
|
|
||||||
Deflate,
|
|
||||||
/// Gzip algorithm
|
|
||||||
Gzip,
|
|
||||||
/// Indicates the identity function (i.e. no compression, nor modification)
|
|
||||||
Identity,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> From<&'a str> for ContentEncoding {
|
|
||||||
fn from(s: &'a str) -> ContentEncoding {
|
|
||||||
match s.trim().to_lowercase().as_ref() {
|
|
||||||
"br" => ContentEncoding::Br,
|
|
||||||
"gzip" => ContentEncoding::Gzip,
|
|
||||||
"deflate" => ContentEncoding::Deflate,
|
|
||||||
"identity" => ContentEncoding::Identity,
|
|
||||||
_ => ContentEncoding::Auto,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// An HTTP Response
|
/// An HTTP Response
|
||||||
pub struct HttpResponse {
|
pub struct HttpResponse {
|
||||||
|
@ -39,6 +39,7 @@ mod body;
|
|||||||
mod context;
|
mod context;
|
||||||
mod error;
|
mod error;
|
||||||
mod date;
|
mod date;
|
||||||
|
mod encoding;
|
||||||
mod httprequest;
|
mod httprequest;
|
||||||
mod httpresponse;
|
mod httpresponse;
|
||||||
mod logger;
|
mod logger;
|
||||||
@ -61,6 +62,7 @@ pub mod ws;
|
|||||||
pub mod dev;
|
pub mod dev;
|
||||||
pub mod httpcodes;
|
pub mod httpcodes;
|
||||||
pub mod multipart;
|
pub mod multipart;
|
||||||
|
pub use encoding::ContentEncoding;
|
||||||
pub use error::ParseError;
|
pub use error::ParseError;
|
||||||
pub use body::{Body, BinaryBody};
|
pub use body::{Body, BinaryBody};
|
||||||
pub use application::{Application, ApplicationBuilder, Middleware};
|
pub use application::{Application, ApplicationBuilder, Middleware};
|
||||||
|
266
src/payload.rs
266
src/payload.rs
@ -1,19 +1,15 @@
|
|||||||
use std::{io, fmt, cmp};
|
use std::{fmt, cmp};
|
||||||
use std::rc::{Rc, Weak};
|
use std::rc::{Rc, Weak};
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::io::{Read, Write, Error as IoError};
|
use std::io::{Error as IoError};
|
||||||
use bytes::{Bytes, BytesMut, BufMut, Writer};
|
use bytes::{Bytes, BytesMut};
|
||||||
use http2::Error as Http2Error;
|
use http2::Error as Http2Error;
|
||||||
use futures::{Async, Poll, Stream};
|
use futures::{Async, Poll, Stream};
|
||||||
use futures::task::{Task, current as current_task};
|
use futures::task::{Task, current as current_task};
|
||||||
use flate2::read::{GzDecoder};
|
|
||||||
use flate2::write::{DeflateDecoder};
|
|
||||||
use brotli2::write::BrotliDecoder;
|
|
||||||
|
|
||||||
use actix::ResponseType;
|
use actix::ResponseType;
|
||||||
use httpresponse::ContentEncoding;
|
|
||||||
|
|
||||||
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
|
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
|
||||||
|
|
||||||
@ -205,262 +201,6 @@ impl PayloadWriter for PayloadSender {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum Decoder {
|
|
||||||
Zlib(DeflateDecoder<BytesWriter>),
|
|
||||||
Gzip(Option<GzDecoder<Wrapper>>),
|
|
||||||
Br(Rc<RefCell<BytesMut>>, BrotliDecoder<WrapperRc>),
|
|
||||||
Identity,
|
|
||||||
}
|
|
||||||
|
|
||||||
// should go after write::GzDecoder get implemented
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Wrapper {
|
|
||||||
buf: BytesMut
|
|
||||||
}
|
|
||||||
|
|
||||||
impl io::Read for Wrapper {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
||||||
let len = cmp::min(buf.len(), self.buf.len());
|
|
||||||
buf[..len].copy_from_slice(&self.buf[..len]);
|
|
||||||
self.buf.split_to(len);
|
|
||||||
Ok(len)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct BytesWriter {
|
|
||||||
buf: BytesMut,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for BytesWriter {
|
|
||||||
fn default() -> BytesWriter {
|
|
||||||
BytesWriter{buf: BytesMut::with_capacity(8192)}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl io::Write for BytesWriter {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
||||||
self.buf.extend(buf);
|
|
||||||
Ok(buf.len())
|
|
||||||
}
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// should go after brotli2::write::BrotliDecoder::get_mut get implemented
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct WrapperRc {
|
|
||||||
buf: Rc<RefCell<BytesMut>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl io::Write for WrapperRc {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
||||||
self.buf.borrow_mut().extend(buf);
|
|
||||||
Ok(buf.len())
|
|
||||||
}
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct EncodedPayload {
|
|
||||||
inner: PayloadSender,
|
|
||||||
decoder: Decoder,
|
|
||||||
dst: Writer<BytesMut>,
|
|
||||||
error: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EncodedPayload {
|
|
||||||
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
|
|
||||||
let dec = match enc {
|
|
||||||
ContentEncoding::Deflate => Decoder::Zlib(
|
|
||||||
DeflateDecoder::new(BytesWriter::default())),
|
|
||||||
ContentEncoding::Gzip => Decoder::Gzip(None),
|
|
||||||
ContentEncoding::Br => {
|
|
||||||
let buf = Rc::new(RefCell::new(BytesMut::new()));
|
|
||||||
let buf2 = Rc::clone(&buf);
|
|
||||||
Decoder::Br(buf, BrotliDecoder::new(WrapperRc{buf: buf2}))
|
|
||||||
}
|
|
||||||
_ => Decoder::Identity,
|
|
||||||
};
|
|
||||||
EncodedPayload {
|
|
||||||
inner: inner,
|
|
||||||
decoder: dec,
|
|
||||||
error: false,
|
|
||||||
dst: BytesMut::new().writer(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PayloadWriter for EncodedPayload {
|
|
||||||
|
|
||||||
fn set_error(&mut self, err: PayloadError) {
|
|
||||||
self.inner.set_error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn feed_eof(&mut self) {
|
|
||||||
if self.error {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
let err = match self.decoder {
|
|
||||||
Decoder::Br(ref mut buf, ref mut decoder) => {
|
|
||||||
match decoder.flush() {
|
|
||||||
Ok(_) => {
|
|
||||||
let b = buf.borrow_mut().take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
self.inner.feed_data(b);
|
|
||||||
}
|
|
||||||
self.inner.feed_eof();
|
|
||||||
return
|
|
||||||
},
|
|
||||||
Err(err) => Some(err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Decoder::Gzip(ref mut decoder) => {
|
|
||||||
if decoder.is_none() {
|
|
||||||
self.inner.feed_eof();
|
|
||||||
return
|
|
||||||
}
|
|
||||||
loop {
|
|
||||||
let len = self.dst.get_ref().len();
|
|
||||||
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
|
|
||||||
|
|
||||||
if len < len_buf * 2 {
|
|
||||||
self.dst.get_mut().reserve(len_buf * 2 - len);
|
|
||||||
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
|
||||||
}
|
|
||||||
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
|
||||||
Ok(n) => {
|
|
||||||
if n == 0 {
|
|
||||||
self.inner.feed_eof();
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
self.inner.feed_data(self.dst.get_mut().split_to(n).freeze());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => break Some(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Decoder::Zlib(ref mut decoder) => {
|
|
||||||
match decoder.flush() {
|
|
||||||
Ok(_) => {
|
|
||||||
let b = decoder.get_mut().buf.take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
self.inner.feed_data(b);
|
|
||||||
}
|
|
||||||
self.inner.feed_eof();
|
|
||||||
return
|
|
||||||
},
|
|
||||||
Err(err) => Some(err),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Decoder::Identity => {
|
|
||||||
self.inner.feed_eof();
|
|
||||||
return
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.error = true;
|
|
||||||
self.decoder = Decoder::Identity;
|
|
||||||
if let Some(err) = err {
|
|
||||||
self.set_error(PayloadError::ParseError(err));
|
|
||||||
} else {
|
|
||||||
self.set_error(PayloadError::Incomplete);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn feed_data(&mut self, data: Bytes) {
|
|
||||||
if self.error {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
match self.decoder {
|
|
||||||
Decoder::Br(ref mut buf, ref mut decoder) => {
|
|
||||||
match decoder.write(&data) {
|
|
||||||
Ok(_) => {
|
|
||||||
let b = buf.borrow_mut().take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
self.inner.feed_data(b);
|
|
||||||
}
|
|
||||||
return
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
trace!("Error decoding br encoding: {}", err);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Decoder::Gzip(ref mut decoder) => {
|
|
||||||
if decoder.is_none() {
|
|
||||||
let mut buf = BytesMut::new();
|
|
||||||
buf.extend(data);
|
|
||||||
*decoder = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap());
|
|
||||||
} else {
|
|
||||||
decoder.as_mut().unwrap().get_mut().buf.extend(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
|
|
||||||
if len_buf == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
let len = self.dst.get_ref().len();
|
|
||||||
if len < len_buf * 2 {
|
|
||||||
self.dst.get_mut().reserve(len_buf * 2 - len);
|
|
||||||
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
|
|
||||||
}
|
|
||||||
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
|
|
||||||
Ok(n) => {
|
|
||||||
if n == 0 {
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
self.inner.feed_data(self.dst.get_mut().split_to(n).freeze());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Decoder::Zlib(ref mut decoder) => {
|
|
||||||
match decoder.write(&data) {
|
|
||||||
Ok(_) => {
|
|
||||||
let b = decoder.get_mut().buf.take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
self.inner.feed_data(b);
|
|
||||||
}
|
|
||||||
return
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
trace!("Error decoding deflate encoding: {}", err);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Decoder::Identity => {
|
|
||||||
self.inner.feed_data(data);
|
|
||||||
return
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.error = true;
|
|
||||||
self.decoder = Decoder::Identity;
|
|
||||||
self.set_error(PayloadError::EncodingCorrupted);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn capacity(&self) -> usize {
|
|
||||||
match self.decoder {
|
|
||||||
Decoder::Br(ref buf, _) => {
|
|
||||||
buf.borrow().len() + self.inner.capacity()
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
self.inner.capacity()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Inner {
|
struct Inner {
|
||||||
|
Loading…
Reference in New Issue
Block a user