From 33260c7b3583d341baecebff968f846d7aafbb88 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 24 Jun 2018 10:42:20 +0600 Subject: [PATCH] split encoding module --- src/client/pipeline.rs | 2 +- src/client/writer.rs | 2 +- src/server/h1.rs | 2 +- src/server/h1writer.rs | 2 +- src/server/h2.rs | 4 +- src/server/h2writer.rs | 2 +- src/server/input.rs | 357 +++++++++++++++++++++++++ src/server/mod.rs | 3 +- src/server/{encoding.rs => output.rs} | 359 +------------------------- 9 files changed, 371 insertions(+), 362 deletions(-) create mode 100644 src/server/input.rs rename src/server/{encoding.rs => output.rs} (70%) diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 4173c7d2c..2886b42f2 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -18,7 +18,7 @@ use error::Error; use error::PayloadError; use header::ContentEncoding; use httpmessage::HttpMessage; -use server::encoding::PayloadStream; +use server::input::PayloadStream; use server::WriterState; /// A set of errors that can occur during request sending and response reading diff --git a/src/client/writer.rs b/src/client/writer.rs index 653289794..d42a07d5a 100644 --- a/src/client/writer.rs +++ b/src/client/writer.rs @@ -21,7 +21,7 @@ use tokio_io::AsyncWrite; use body::{Binary, Body}; use header::ContentEncoding; -use server::encoding::{ContentEncoder, Output, TransferEncoding}; +use server::output::{ContentEncoder, Output, TransferEncoding}; use server::WriterState; use client::ClientRequest; diff --git a/src/server/h1.rs b/src/server/h1.rs index 8bca504c9..e358f84b3 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -13,9 +13,9 @@ use httpresponse::HttpResponse; use payload::{Payload, PayloadStatus, PayloadWriter}; use pipeline::Pipeline; -use super::encoding::PayloadType; use super::h1decoder::{DecoderError, H1Decoder, Message}; use super::h1writer::H1Writer; +use super::input::PayloadType; use super::settings::WorkerSettings; use super::Writer; use super::{HttpHandler, HttpHandlerTask, IoStream}; diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 502793f7a..5f5d6ec5c 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -6,8 +6,8 @@ use std::io; use std::rc::Rc; use tokio_io::AsyncWrite; -use super::encoding::{ContentEncoder, Output}; use super::helpers; +use super::output::{ContentEncoder, Output}; use super::settings::WorkerSettings; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use body::{Binary, Body}; diff --git a/src/server/h2.rs b/src/server/h2.rs index 1904734c6..c2a385725 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -1,5 +1,3 @@ -#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] - use std::collections::VecDeque; use std::io::{Read, Write}; use std::net::SocketAddr; @@ -23,8 +21,8 @@ use payload::{Payload, PayloadStatus, PayloadWriter}; use pipeline::Pipeline; use uri::Url; -use super::encoding::PayloadType; use super::h2writer::H2Writer; +use super::input::PayloadType; use super::settings::WorkerSettings; use super::{HttpHandler, HttpHandlerTask, Writer}; diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index c44af51a7..db7755ba9 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -11,8 +11,8 @@ use std::{cmp, io}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::{HttpTryFrom, Version}; -use super::encoding::{ContentEncoder, Output}; use super::helpers; +use super::output::{ContentEncoder, Output}; use super::settings::WorkerSettings; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use body::{Binary, Body}; diff --git a/src/server/input.rs b/src/server/input.rs new file mode 100644 index 000000000..8c11c2463 --- /dev/null +++ b/src/server/input.rs @@ -0,0 +1,357 @@ +use std::io::{Read, Write}; +use std::{cmp, io}; + +#[cfg(feature = "brotli")] +use brotli2::write::BrotliDecoder; +use bytes::{BufMut, Bytes, BytesMut}; +use error::PayloadError; +#[cfg(feature = "flate2")] +use flate2::read::GzDecoder; +#[cfg(feature = "flate2")] +use flate2::write::DeflateDecoder; +use header::ContentEncoding; +use http::header::{HeaderMap, CONTENT_ENCODING}; +use payload::{PayloadSender, PayloadStatus, PayloadWriter}; + +pub(crate) enum PayloadType { + Sender(PayloadSender), + Encoding(Box), +} + +impl PayloadType { + #[cfg(any(feature = "brotli", feature = "flate2"))] + pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType { + // check content-encoding + let enc = if let Some(enc) = headers.get(CONTENT_ENCODING) { + if let Ok(enc) = enc.to_str() { + ContentEncoding::from(enc) + } else { + ContentEncoding::Auto + } + } else { + ContentEncoding::Auto + }; + + match enc { + ContentEncoding::Auto | ContentEncoding::Identity => { + PayloadType::Sender(sender) + } + _ => PayloadType::Encoding(Box::new(EncodedPayload::new(sender, enc))), + } + } + + #[cfg(not(any(feature = "brotli", feature = "flate2")))] + pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType { + PayloadType::Sender(sender) + } +} + +impl PayloadWriter for PayloadType { + #[inline] + fn set_error(&mut self, err: PayloadError) { + match *self { + PayloadType::Sender(ref mut sender) => sender.set_error(err), + PayloadType::Encoding(ref mut enc) => enc.set_error(err), + } + } + + #[inline] + fn feed_eof(&mut self) { + match *self { + PayloadType::Sender(ref mut sender) => sender.feed_eof(), + PayloadType::Encoding(ref mut enc) => enc.feed_eof(), + } + } + + #[inline] + fn feed_data(&mut self, data: Bytes) { + match *self { + PayloadType::Sender(ref mut sender) => sender.feed_data(data), + PayloadType::Encoding(ref mut enc) => enc.feed_data(data), + } + } + + #[inline] + fn need_read(&self) -> PayloadStatus { + match *self { + PayloadType::Sender(ref sender) => sender.need_read(), + PayloadType::Encoding(ref enc) => enc.need_read(), + } + } +} + +/// Payload wrapper with content decompression support +pub(crate) struct EncodedPayload { + inner: PayloadSender, + error: bool, + payload: PayloadStream, +} + +impl EncodedPayload { + pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { + EncodedPayload { + inner, + error: false, + payload: PayloadStream::new(enc), + } + } +} + +impl PayloadWriter for EncodedPayload { + fn set_error(&mut self, err: PayloadError) { + self.inner.set_error(err) + } + + fn feed_eof(&mut self) { + if !self.error { + match self.payload.feed_eof() { + Err(err) => { + self.error = true; + self.set_error(PayloadError::Io(err)); + } + Ok(value) => { + if let Some(b) = value { + self.inner.feed_data(b); + } + self.inner.feed_eof(); + } + } + } + } + + fn feed_data(&mut self, data: Bytes) { + if self.error { + return; + } + + match self.payload.feed_data(data) { + Ok(Some(b)) => self.inner.feed_data(b), + Ok(None) => (), + Err(e) => { + self.error = true; + self.set_error(e.into()); + } + } + } + + #[inline] + fn need_read(&self) -> PayloadStatus { + self.inner.need_read() + } +} + +pub(crate) enum Decoder { + #[cfg(feature = "flate2")] + Deflate(Box>), + #[cfg(feature = "flate2")] + Gzip(Option>>), + #[cfg(feature = "brotli")] + Br(Box>), + Identity, +} + +// should go after write::GzDecoder get implemented +#[derive(Debug)] +pub(crate) struct Wrapper { + pub buf: BytesMut, + pub eof: bool, +} + +impl io::Read for Wrapper { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let len = cmp::min(buf.len(), self.buf.len()); + buf[..len].copy_from_slice(&self.buf[..len]); + self.buf.split_to(len); + if len == 0 { + if self.eof { + Ok(0) + } else { + Err(io::Error::new(io::ErrorKind::WouldBlock, "")) + } + } else { + Ok(len) + } + } +} + +impl io::Write for Wrapper { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buf.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub(crate) struct Writer { + buf: BytesMut, +} + +impl Writer { + fn new() -> Writer { + Writer { + buf: BytesMut::with_capacity(8192), + } + } + fn take(&mut self) -> Bytes { + self.buf.take().freeze() + } +} + +impl io::Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buf.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +/// Payload stream with decompression support +pub(crate) struct PayloadStream { + decoder: Decoder, + dst: BytesMut, +} + +impl PayloadStream { + pub fn new(enc: ContentEncoding) -> PayloadStream { + let dec = match enc { + #[cfg(feature = "brotli")] + ContentEncoding::Br => { + Decoder::Br(Box::new(BrotliDecoder::new(Writer::new()))) + } + #[cfg(feature = "flate2")] + ContentEncoding::Deflate => { + Decoder::Deflate(Box::new(DeflateDecoder::new(Writer::new()))) + } + #[cfg(feature = "flate2")] + ContentEncoding::Gzip => Decoder::Gzip(None), + _ => Decoder::Identity, + }; + PayloadStream { + decoder: dec, + dst: BytesMut::new(), + } + } +} + +impl PayloadStream { + pub fn feed_eof(&mut self) -> io::Result> { + match self.decoder { + #[cfg(feature = "brotli")] + Decoder::Br(ref mut decoder) => match decoder.finish() { + Ok(mut writer) => { + let b = writer.take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + #[cfg(feature = "flate2")] + Decoder::Gzip(ref mut decoder) => { + if let Some(ref mut decoder) = *decoder { + decoder.as_mut().get_mut().eof = true; + + self.dst.reserve(8192); + match decoder.read(unsafe { self.dst.bytes_mut() }) { + Ok(n) => { + unsafe { self.dst.advance_mut(n) }; + return Ok(Some(self.dst.take().freeze())); + } + Err(e) => return Err(e), + } + } else { + Ok(None) + } + } + #[cfg(feature = "flate2")] + Decoder::Deflate(ref mut decoder) => match decoder.try_finish() { + Ok(_) => { + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + Decoder::Identity => Ok(None), + } + } + + pub fn feed_data(&mut self, data: Bytes) -> io::Result> { + match self.decoder { + #[cfg(feature = "brotli")] + Decoder::Br(ref mut decoder) => match decoder.write_all(&data) { + Ok(_) => { + decoder.flush()?; + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + #[cfg(feature = "flate2")] + Decoder::Gzip(ref mut decoder) => { + if decoder.is_none() { + *decoder = Some(Box::new(GzDecoder::new(Wrapper { + buf: BytesMut::from(data), + eof: false, + }))); + } else { + let _ = decoder.as_mut().unwrap().write(&data); + } + + loop { + self.dst.reserve(8192); + match decoder + .as_mut() + .as_mut() + .unwrap() + .read(unsafe { self.dst.bytes_mut() }) + { + Ok(n) => { + if n != 0 { + unsafe { self.dst.advance_mut(n) }; + } + if n == 0 { + return Ok(Some(self.dst.take().freeze())); + } + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock + && !self.dst.is_empty() + { + return Ok(Some(self.dst.take().freeze())); + } + return Err(e); + } + } + } + } + #[cfg(feature = "flate2")] + Decoder::Deflate(ref mut decoder) => match decoder.write_all(&data) { + Ok(_) => { + decoder.flush()?; + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, + Decoder::Identity => Ok(Some(data)), + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 1bbf460b0..f10dacc2e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,13 +8,14 @@ use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::TcpStream; mod channel; -pub(crate) mod encoding; pub(crate) mod h1; pub(crate) mod h1decoder; mod h1writer; mod h2; mod h2writer; pub(crate) mod helpers; +pub(crate) mod input; +pub(crate) mod output; pub(crate) mod settings; mod srv; mod worker; diff --git a/src/server/encoding.rs b/src/server/output.rs similarity index 70% rename from src/server/encoding.rs rename to src/server/output.rs index 5acce762b..7908dd38e 100644 --- a/src/server/encoding.rs +++ b/src/server/output.rs @@ -1,372 +1,24 @@ use std::fmt::Write as FmtWrite; -use std::io::{Read, Write}; +use std::io::Write; use std::str::FromStr; use std::{cmp, fmt, io, mem}; #[cfg(feature = "brotli")] -use brotli2::write::{BrotliDecoder, BrotliEncoder}; -use bytes::{BufMut, Bytes, BytesMut}; +use brotli2::write::BrotliEncoder; +use bytes::BytesMut; #[cfg(feature = "flate2")] -use flate2::read::GzDecoder; -#[cfg(feature = "flate2")] -use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder}; +use flate2::write::{DeflateEncoder, GzEncoder}; #[cfg(feature = "flate2")] use flate2::Compression; use http::header::{ - HeaderMap, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, - TRANSFER_ENCODING, + HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING, }; use http::{HttpTryFrom, Method, Version}; use body::{Binary, Body}; -use error::PayloadError; use header::ContentEncoding; use httprequest::HttpInnerMessage; use httpresponse::HttpResponse; -use payload::{PayloadSender, PayloadStatus, PayloadWriter}; - -pub(crate) enum PayloadType { - Sender(PayloadSender), - Encoding(Box), -} - -impl PayloadType { - #[cfg(any(feature = "brotli", feature = "flate2"))] - pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType { - // check content-encoding - let enc = if let Some(enc) = headers.get(CONTENT_ENCODING) { - if let Ok(enc) = enc.to_str() { - ContentEncoding::from(enc) - } else { - ContentEncoding::Auto - } - } else { - ContentEncoding::Auto - }; - - match enc { - ContentEncoding::Auto | ContentEncoding::Identity => { - PayloadType::Sender(sender) - } - _ => PayloadType::Encoding(Box::new(EncodedPayload::new(sender, enc))), - } - } - - #[cfg(not(any(feature = "brotli", feature = "flate2")))] - pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType { - PayloadType::Sender(sender) - } -} - -impl PayloadWriter for PayloadType { - #[inline] - fn set_error(&mut self, err: PayloadError) { - match *self { - PayloadType::Sender(ref mut sender) => sender.set_error(err), - PayloadType::Encoding(ref mut enc) => enc.set_error(err), - } - } - - #[inline] - fn feed_eof(&mut self) { - match *self { - PayloadType::Sender(ref mut sender) => sender.feed_eof(), - PayloadType::Encoding(ref mut enc) => enc.feed_eof(), - } - } - - #[inline] - fn feed_data(&mut self, data: Bytes) { - match *self { - PayloadType::Sender(ref mut sender) => sender.feed_data(data), - PayloadType::Encoding(ref mut enc) => enc.feed_data(data), - } - } - - #[inline] - fn need_read(&self) -> PayloadStatus { - match *self { - PayloadType::Sender(ref sender) => sender.need_read(), - PayloadType::Encoding(ref enc) => enc.need_read(), - } - } -} - -/// Payload wrapper with content decompression support -pub(crate) struct EncodedPayload { - inner: PayloadSender, - error: bool, - payload: PayloadStream, -} - -impl EncodedPayload { - pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload { - EncodedPayload { - inner, - error: false, - payload: PayloadStream::new(enc), - } - } -} - -impl PayloadWriter for EncodedPayload { - fn set_error(&mut self, err: PayloadError) { - self.inner.set_error(err) - } - - fn feed_eof(&mut self) { - if !self.error { - match self.payload.feed_eof() { - Err(err) => { - self.error = true; - self.set_error(PayloadError::Io(err)); - } - Ok(value) => { - if let Some(b) = value { - self.inner.feed_data(b); - } - self.inner.feed_eof(); - } - } - } - } - - fn feed_data(&mut self, data: Bytes) { - if self.error { - return; - } - - match self.payload.feed_data(data) { - Ok(Some(b)) => self.inner.feed_data(b), - Ok(None) => (), - Err(e) => { - self.error = true; - self.set_error(e.into()); - } - } - } - - #[inline] - fn need_read(&self) -> PayloadStatus { - self.inner.need_read() - } -} - -pub(crate) enum Decoder { - #[cfg(feature = "flate2")] - Deflate(Box>), - #[cfg(feature = "flate2")] - Gzip(Option>>), - #[cfg(feature = "brotli")] - Br(Box>), - Identity, -} - -// should go after write::GzDecoder get implemented -#[derive(Debug)] -pub(crate) struct Wrapper { - pub buf: BytesMut, - pub eof: bool, -} - -impl io::Read for Wrapper { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let len = cmp::min(buf.len(), self.buf.len()); - buf[..len].copy_from_slice(&self.buf[..len]); - self.buf.split_to(len); - if len == 0 { - if self.eof { - Ok(0) - } else { - Err(io::Error::new(io::ErrorKind::WouldBlock, "")) - } - } else { - Ok(len) - } - } -} - -impl io::Write for Wrapper { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.buf.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -pub(crate) struct Writer { - buf: BytesMut, -} - -impl Writer { - fn new() -> Writer { - Writer { - buf: BytesMut::with_capacity(8192), - } - } - fn take(&mut self) -> Bytes { - self.buf.take().freeze() - } -} - -impl io::Write for Writer { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.buf.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -/// Payload stream with decompression support -pub(crate) struct PayloadStream { - decoder: Decoder, - dst: BytesMut, -} - -impl PayloadStream { - pub fn new(enc: ContentEncoding) -> PayloadStream { - let dec = match enc { - #[cfg(feature = "brotli")] - ContentEncoding::Br => { - Decoder::Br(Box::new(BrotliDecoder::new(Writer::new()))) - } - #[cfg(feature = "flate2")] - ContentEncoding::Deflate => { - Decoder::Deflate(Box::new(DeflateDecoder::new(Writer::new()))) - } - #[cfg(feature = "flate2")] - ContentEncoding::Gzip => Decoder::Gzip(None), - _ => Decoder::Identity, - }; - PayloadStream { - decoder: dec, - dst: BytesMut::new(), - } - } -} - -impl PayloadStream { - pub fn feed_eof(&mut self) -> io::Result> { - match self.decoder { - #[cfg(feature = "brotli")] - Decoder::Br(ref mut decoder) => match decoder.finish() { - Ok(mut writer) => { - let b = writer.take(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - } - Err(e) => Err(e), - }, - #[cfg(feature = "flate2")] - Decoder::Gzip(ref mut decoder) => { - if let Some(ref mut decoder) = *decoder { - decoder.as_mut().get_mut().eof = true; - - self.dst.reserve(8192); - match decoder.read(unsafe { self.dst.bytes_mut() }) { - Ok(n) => { - unsafe { self.dst.advance_mut(n) }; - return Ok(Some(self.dst.take().freeze())); - } - Err(e) => return Err(e), - } - } else { - Ok(None) - } - } - #[cfg(feature = "flate2")] - Decoder::Deflate(ref mut decoder) => match decoder.try_finish() { - Ok(_) => { - let b = decoder.get_mut().take(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - } - Err(e) => Err(e), - }, - Decoder::Identity => Ok(None), - } - } - - pub fn feed_data(&mut self, data: Bytes) -> io::Result> { - match self.decoder { - #[cfg(feature = "brotli")] - Decoder::Br(ref mut decoder) => match decoder.write_all(&data) { - Ok(_) => { - decoder.flush()?; - let b = decoder.get_mut().take(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - } - Err(e) => Err(e), - }, - #[cfg(feature = "flate2")] - Decoder::Gzip(ref mut decoder) => { - if decoder.is_none() { - *decoder = Some(Box::new(GzDecoder::new(Wrapper { - buf: BytesMut::from(data), - eof: false, - }))); - } else { - let _ = decoder.as_mut().unwrap().write(&data); - } - - loop { - self.dst.reserve(8192); - match decoder - .as_mut() - .as_mut() - .unwrap() - .read(unsafe { self.dst.bytes_mut() }) - { - Ok(n) => { - if n != 0 { - unsafe { self.dst.advance_mut(n) }; - } - if n == 0 { - return Ok(Some(self.dst.take().freeze())); - } - } - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock - && !self.dst.is_empty() - { - return Ok(Some(self.dst.take().freeze())); - } - return Err(e); - } - } - } - } - #[cfg(feature = "flate2")] - Decoder::Deflate(ref mut decoder) => match decoder.write_all(&data) { - Ok(_) => { - decoder.flush()?; - let b = decoder.get_mut().take(); - if !b.is_empty() { - Ok(Some(b)) - } else { - Ok(None) - } - } - Err(e) => Err(e), - }, - Decoder::Identity => Ok(Some(data)), - } - } -} #[derive(Debug)] pub(crate) enum Output { @@ -1071,6 +723,7 @@ impl AcceptEncoding { #[cfg(test)] mod tests { use super::*; + use bytes::Bytes; #[test] fn test_chunked_te() {