From 8e8a68f90b49d0d01aa522efd8546c15ef453669 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 24 Jun 2018 22:05:44 +0600 Subject: [PATCH] add empty output stream --- src/client/writer.rs | 78 ++++++++++++++++------------- src/server/h1writer.rs | 8 ++- src/server/h2writer.rs | 4 +- src/server/output.rs | 109 ++++++++++++++++++++++------------------- 4 files changed, 107 insertions(+), 92 deletions(-) diff --git a/src/client/writer.rs b/src/client/writer.rs index d42a07d5..173b47e1 100644 --- a/src/client/writer.rs +++ b/src/client/writer.rs @@ -221,45 +221,53 @@ fn content_encoder(buf: BytesMut, req: &mut ClientRequest) -> Output { let transfer = match body { Body::Empty => { req.headers_mut().remove(CONTENT_LENGTH); - TransferEncoding::length(0, buf) + return Output::Empty(buf); } Body::Binary(ref mut bytes) => { - if encoding.is_compression() { - let mut tmp = BytesMut::new(); - let mut transfer = TransferEncoding::eof(tmp); - let mut enc = match encoding { - #[cfg(feature = "flate2")] - ContentEncoding::Deflate => ContentEncoder::Deflate( - DeflateEncoder::new(transfer, Compression::default()), - ), - #[cfg(feature = "flate2")] - ContentEncoding::Gzip => ContentEncoder::Gzip(GzEncoder::new( - transfer, - Compression::default(), - )), - #[cfg(feature = "brotli")] - ContentEncoding::Br => { - ContentEncoder::Br(BrotliEncoder::new(transfer, 5)) - } - ContentEncoding::Identity => ContentEncoder::Identity(transfer), - ContentEncoding::Auto => unreachable!(), - }; - // TODO return error! - let _ = enc.write(bytes.as_ref()); - let _ = enc.write_eof(); - *bytes = Binary::from(enc.buf_mut().take()); + #[cfg(any(feature = "flate2", feature = "brotli"))] + { + if encoding.is_compression() { + let mut tmp = BytesMut::new(); + let mut transfer = TransferEncoding::eof(tmp); + let mut enc = match encoding { + #[cfg(feature = "flate2")] + ContentEncoding::Deflate => ContentEncoder::Deflate( + DeflateEncoder::new(transfer, Compression::default()), + ), + #[cfg(feature = "flate2")] + ContentEncoding::Gzip => ContentEncoder::Gzip(GzEncoder::new( + transfer, + Compression::default(), + )), + #[cfg(feature = "brotli")] + ContentEncoding::Br => { + ContentEncoder::Br(BrotliEncoder::new(transfer, 5)) + } + ContentEncoding::Auto | ContentEncoding::Identity => { + unreachable!() + } + }; + // TODO return error! + let _ = enc.write(bytes.as_ref()); + let _ = enc.write_eof(); + *bytes = Binary::from(enc.buf_mut().take()); - req.headers_mut().insert( - CONTENT_ENCODING, - HeaderValue::from_static(encoding.as_str()), - ); - encoding = ContentEncoding::Identity; + req.headers_mut().insert( + CONTENT_ENCODING, + HeaderValue::from_static(encoding.as_str()), + ); + encoding = ContentEncoding::Identity; + } + let mut b = BytesMut::new(); + let _ = write!(b, "{}", bytes.len()); + req.headers_mut() + .insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap()); + TransferEncoding::eof(buf) + } + #[cfg(not(any(feature = "flate2", feature = "brotli")))] + { + TransferEncoding::eof(buf) } - let mut b = BytesMut::new(); - let _ = write!(b, "{}", bytes.len()); - req.headers_mut() - .insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap()); - TransferEncoding::eof(buf) } Body::Streaming(_) | Body::Actor(_) => { if req.upgrade() { diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 5f5d6ec5..b87891c2 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -7,7 +7,7 @@ use std::rc::Rc; use tokio_io::AsyncWrite; use super::helpers; -use super::output::{ContentEncoder, Output}; +use super::output::Output; use super::settings::WorkerSettings; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use body::{Binary, Body}; @@ -60,9 +60,7 @@ impl H1Writer { self.flags = Flags::KEEPALIVE; } - pub fn disconnected(&mut self) { - self.buffer = Output::Empty; - } + pub fn disconnected(&mut self) {} pub fn keepalive(&self) -> bool { self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) @@ -117,7 +115,7 @@ impl Writer for H1Writer { encoding: ContentEncoding, ) -> io::Result { // prepare task - self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding); + self.buffer.for_server(req, msg, encoding); if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { self.flags = Flags::STARTED | Flags::KEEPALIVE; } else { diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index db7755ba..9a02bbf4 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -12,7 +12,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCOD use http::{HttpTryFrom, Version}; use super::helpers; -use super::output::{ContentEncoder, Output}; +use super::output::Output; use super::settings::WorkerSettings; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use body::{Binary, Body}; @@ -90,7 +90,7 @@ impl Writer for H2Writer { ) -> io::Result { // prepare response self.flags.insert(Flags::STARTED); - self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding); + self.buffer.for_server(req, msg, encoding); // http2 specific msg.headers_mut().remove(CONNECTION); diff --git a/src/server/output.rs b/src/server/output.rs index e84b55b8..ad0b80b6 100644 --- a/src/server/output.rs +++ b/src/server/output.rs @@ -22,71 +22,79 @@ use httpresponse::HttpResponse; #[derive(Debug)] pub(crate) enum Output { + Empty(BytesMut), Buffer(BytesMut), Encoder(ContentEncoder), TE(TransferEncoding), - Empty, + Done, } impl Output { pub fn take(&mut self) -> BytesMut { - match mem::replace(self, Output::Empty) { + match mem::replace(self, Output::Done) { + Output::Empty(bytes) => bytes, Output::Buffer(bytes) => bytes, Output::Encoder(mut enc) => enc.take_buf(), Output::TE(mut te) => te.take(), - _ => panic!(), + Output::Done => panic!(), } } pub fn take_option(&mut self) -> Option { - match mem::replace(self, Output::Empty) { + match mem::replace(self, Output::Done) { + Output::Empty(bytes) => Some(bytes), Output::Buffer(bytes) => Some(bytes), Output::Encoder(mut enc) => Some(enc.take_buf()), Output::TE(mut te) => Some(te.take()), - _ => None, + Output::Done => None, } } pub fn as_ref(&mut self) -> &BytesMut { match self { + Output::Empty(ref mut bytes) => bytes, Output::Buffer(ref mut bytes) => bytes, Output::Encoder(ref mut enc) => enc.buf_ref(), Output::TE(ref mut te) => te.buf_ref(), - Output::Empty => panic!(), + Output::Done => panic!(), } } pub fn as_mut(&mut self) -> &mut BytesMut { match self { + Output::Empty(ref mut bytes) => bytes, Output::Buffer(ref mut bytes) => bytes, Output::Encoder(ref mut enc) => enc.buf_mut(), Output::TE(ref mut te) => te.buf_mut(), - _ => panic!(), + Output::Done => panic!(), } } pub fn split_to(&mut self, cap: usize) -> BytesMut { match self { + Output::Empty(ref mut bytes) => bytes.split_to(cap), Output::Buffer(ref mut bytes) => bytes.split_to(cap), Output::Encoder(ref mut enc) => enc.buf_mut().split_to(cap), Output::TE(ref mut te) => te.buf_mut().split_to(cap), - Output::Empty => BytesMut::new(), + Output::Done => BytesMut::new(), } } pub fn len(&self) -> usize { match self { + Output::Empty(ref bytes) => bytes.len(), Output::Buffer(ref bytes) => bytes.len(), Output::Encoder(ref enc) => enc.len(), Output::TE(ref te) => te.len(), - Output::Empty => 0, + Output::Done => 0, } } pub fn is_empty(&self) -> bool { match self { + Output::Empty(ref bytes) => bytes.is_empty(), Output::Buffer(ref bytes) => bytes.is_empty(), Output::Encoder(ref enc) => enc.is_empty(), Output::TE(ref te) => te.is_empty(), - Output::Empty => true, + Output::Done => true, } } @@ -98,7 +106,7 @@ impl Output { } Output::Encoder(ref mut enc) => enc.write(data), Output::TE(ref mut te) => te.encode(data).map(|_| ()), - Output::Empty => Ok(()), + Output::Empty(_) | Output::Done => Ok(()), } } @@ -107,40 +115,15 @@ impl Output { Output::Buffer(_) => Ok(true), Output::Encoder(ref mut enc) => enc.write_eof(), Output::TE(ref mut te) => Ok(te.encode_eof()), - Output::Empty => Ok(true), + Output::Empty(_) | Output::Done => Ok(true), } } -} -pub(crate) enum ContentEncoder { - #[cfg(feature = "flate2")] - Deflate(DeflateEncoder), - #[cfg(feature = "flate2")] - Gzip(GzEncoder), - #[cfg(feature = "brotli")] - Br(BrotliEncoder), - Identity(TransferEncoding), -} - -impl fmt::Debug for ContentEncoder { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - #[cfg(feature = "brotli")] - ContentEncoder::Br(_) => writeln!(f, "ContentEncoder(Brotli)"), - #[cfg(feature = "flate2")] - ContentEncoder::Deflate(_) => writeln!(f, "ContentEncoder(Deflate)"), - #[cfg(feature = "flate2")] - ContentEncoder::Gzip(_) => writeln!(f, "ContentEncoder(Gzip)"), - ContentEncoder::Identity(_) => writeln!(f, "ContentEncoder(Identity)"), - } - } -} - -impl ContentEncoder { pub fn for_server( - buf: BytesMut, req: &HttpInnerMessage, resp: &mut HttpResponse, + &mut self, req: &HttpInnerMessage, resp: &mut HttpResponse, response_encoding: ContentEncoding, - ) -> Output { + ) { + let buf = self.take(); let version = resp.version().unwrap_or_else(|| req.version); let is_head = req.method == Method::HEAD; let mut len = 0; @@ -188,12 +171,13 @@ impl ContentEncoder { let mut encoding = ContentEncoding::Identity; #[cfg_attr(feature = "cargo-clippy", allow(match_ref_pats))] - let mut transfer = match resp.body() { + let transfer = match resp.body() { &Body::Empty => { if req.method != Method::HEAD { resp.headers_mut().remove(CONTENT_LENGTH); } - TransferEncoding::length(0, buf) + *self = Output::Empty(buf); + return; } &Body::Binary(_) => { #[cfg(any(feature = "brotli", feature = "flate2"))] @@ -228,8 +212,6 @@ impl ContentEncoder { let _ = enc.write_eof(); let body = enc.buf_mut().take(); len = body.len(); - - encoding = ContentEncoding::Identity; resp.replace_body(Binary::from(body)); } } @@ -241,10 +223,11 @@ impl ContentEncoder { CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap(), ); + *self = Output::Empty(buf); } else { - // resp.headers_mut().remove(CONTENT_LENGTH); + *self = Output::Buffer(buf); } - TransferEncoding::eof(buf) + return; } &Body::Streaming(_) | &Body::Actor(_) => { if resp.upgrade() { @@ -262,14 +245,15 @@ impl ContentEncoder { { resp.headers_mut().remove(CONTENT_LENGTH); } - ContentEncoder::streaming_encoding(buf, version, resp) + Output::streaming_encoding(buf, version, resp) } } }; // check for head response if is_head { resp.set_body(Body::Empty); - transfer.kind = TransferEncodingKind::Length(0); + *self = Output::Empty(transfer.buf.unwrap()); + return; } let enc = match encoding { @@ -285,10 +269,11 @@ impl ContentEncoder { #[cfg(feature = "brotli")] ContentEncoding::Br => ContentEncoder::Br(BrotliEncoder::new(transfer, 3)), ContentEncoding::Identity | ContentEncoding::Auto => { - return Output::TE(transfer) + *self = Output::TE(transfer); + return; } }; - Output::Encoder(enc) + *self = Output::Encoder(enc); } fn streaming_encoding( @@ -355,6 +340,30 @@ impl ContentEncoder { } } +pub(crate) enum ContentEncoder { + #[cfg(feature = "flate2")] + Deflate(DeflateEncoder), + #[cfg(feature = "flate2")] + Gzip(GzEncoder), + #[cfg(feature = "brotli")] + Br(BrotliEncoder), + Identity(TransferEncoding), +} + +impl fmt::Debug for ContentEncoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + #[cfg(feature = "brotli")] + ContentEncoder::Br(_) => writeln!(f, "ContentEncoder(Brotli)"), + #[cfg(feature = "flate2")] + ContentEncoder::Deflate(_) => writeln!(f, "ContentEncoder(Deflate)"), + #[cfg(feature = "flate2")] + ContentEncoder::Gzip(_) => writeln!(f, "ContentEncoder(Gzip)"), + ContentEncoder::Identity(_) => writeln!(f, "ContentEncoder(Identity)"), + } + } +} + impl ContentEncoder { #[inline] pub fn len(&self) -> usize {