1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

add empty output stream

This commit is contained in:
Nikolay Kim 2018-06-24 22:05:44 +06:00
parent 989cd61236
commit 8e8a68f90b
4 changed files with 107 additions and 92 deletions

View File

@ -221,9 +221,11 @@ fn content_encoder(buf: BytesMut, req: &mut ClientRequest) -> Output {
let transfer = match body { let transfer = match body {
Body::Empty => { Body::Empty => {
req.headers_mut().remove(CONTENT_LENGTH); req.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::length(0, buf) return Output::Empty(buf);
} }
Body::Binary(ref mut bytes) => { Body::Binary(ref mut bytes) => {
#[cfg(any(feature = "flate2", feature = "brotli"))]
{
if encoding.is_compression() { if encoding.is_compression() {
let mut tmp = BytesMut::new(); let mut tmp = BytesMut::new();
let mut transfer = TransferEncoding::eof(tmp); let mut transfer = TransferEncoding::eof(tmp);
@ -241,8 +243,9 @@ fn content_encoder(buf: BytesMut, req: &mut ClientRequest) -> Output {
ContentEncoding::Br => { ContentEncoding::Br => {
ContentEncoder::Br(BrotliEncoder::new(transfer, 5)) ContentEncoder::Br(BrotliEncoder::new(transfer, 5))
} }
ContentEncoding::Identity => ContentEncoder::Identity(transfer), ContentEncoding::Auto | ContentEncoding::Identity => {
ContentEncoding::Auto => unreachable!(), unreachable!()
}
}; };
// TODO return error! // TODO return error!
let _ = enc.write(bytes.as_ref()); let _ = enc.write(bytes.as_ref());
@ -261,6 +264,11 @@ fn content_encoder(buf: BytesMut, req: &mut ClientRequest) -> Output {
.insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap()); .insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
TransferEncoding::eof(buf) TransferEncoding::eof(buf)
} }
#[cfg(not(any(feature = "flate2", feature = "brotli")))]
{
TransferEncoding::eof(buf)
}
}
Body::Streaming(_) | Body::Actor(_) => { Body::Streaming(_) | Body::Actor(_) => {
if req.upgrade() { if req.upgrade() {
if version == Version::HTTP_2 { if version == Version::HTTP_2 {

View File

@ -7,7 +7,7 @@ use std::rc::Rc;
use tokio_io::AsyncWrite; use tokio_io::AsyncWrite;
use super::helpers; use super::helpers;
use super::output::{ContentEncoder, Output}; use super::output::Output;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body}; use body::{Binary, Body};
@ -60,9 +60,7 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
self.flags = Flags::KEEPALIVE; self.flags = Flags::KEEPALIVE;
} }
pub fn disconnected(&mut self) { pub fn disconnected(&mut self) {}
self.buffer = Output::Empty;
}
pub fn keepalive(&self) -> bool { pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
@ -117,7 +115,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
encoding: ContentEncoding, encoding: ContentEncoding,
) -> io::Result<WriterState> { ) -> io::Result<WriterState> {
// prepare task // prepare task
self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding); self.buffer.for_server(req, msg, encoding);
if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
self.flags = Flags::STARTED | Flags::KEEPALIVE; self.flags = Flags::STARTED | Flags::KEEPALIVE;
} else { } else {

View File

@ -12,7 +12,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCOD
use http::{HttpTryFrom, Version}; use http::{HttpTryFrom, Version};
use super::helpers; use super::helpers;
use super::output::{ContentEncoder, Output}; use super::output::Output;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body}; use body::{Binary, Body};
@ -90,7 +90,7 @@ impl<H: 'static> Writer for H2Writer<H> {
) -> io::Result<WriterState> { ) -> io::Result<WriterState> {
// prepare response // prepare response
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding); self.buffer.for_server(req, msg, encoding);
// http2 specific // http2 specific
msg.headers_mut().remove(CONNECTION); msg.headers_mut().remove(CONNECTION);

View File

@ -22,71 +22,79 @@ use httpresponse::HttpResponse;
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Output { pub(crate) enum Output {
Empty(BytesMut),
Buffer(BytesMut), Buffer(BytesMut),
Encoder(ContentEncoder), Encoder(ContentEncoder),
TE(TransferEncoding), TE(TransferEncoding),
Empty, Done,
} }
impl Output { impl Output {
pub fn take(&mut self) -> BytesMut { pub fn take(&mut self) -> BytesMut {
match mem::replace(self, Output::Empty) { match mem::replace(self, Output::Done) {
Output::Empty(bytes) => bytes,
Output::Buffer(bytes) => bytes, Output::Buffer(bytes) => bytes,
Output::Encoder(mut enc) => enc.take_buf(), Output::Encoder(mut enc) => enc.take_buf(),
Output::TE(mut te) => te.take(), Output::TE(mut te) => te.take(),
_ => panic!(), Output::Done => panic!(),
} }
} }
pub fn take_option(&mut self) -> Option<BytesMut> { pub fn take_option(&mut self) -> Option<BytesMut> {
match mem::replace(self, Output::Empty) { match mem::replace(self, Output::Done) {
Output::Empty(bytes) => Some(bytes),
Output::Buffer(bytes) => Some(bytes), Output::Buffer(bytes) => Some(bytes),
Output::Encoder(mut enc) => Some(enc.take_buf()), Output::Encoder(mut enc) => Some(enc.take_buf()),
Output::TE(mut te) => Some(te.take()), Output::TE(mut te) => Some(te.take()),
_ => None, Output::Done => None,
} }
} }
pub fn as_ref(&mut self) -> &BytesMut { pub fn as_ref(&mut self) -> &BytesMut {
match self { match self {
Output::Empty(ref mut bytes) => bytes,
Output::Buffer(ref mut bytes) => bytes, Output::Buffer(ref mut bytes) => bytes,
Output::Encoder(ref mut enc) => enc.buf_ref(), Output::Encoder(ref mut enc) => enc.buf_ref(),
Output::TE(ref mut te) => te.buf_ref(), Output::TE(ref mut te) => te.buf_ref(),
Output::Empty => panic!(), Output::Done => panic!(),
} }
} }
pub fn as_mut(&mut self) -> &mut BytesMut { pub fn as_mut(&mut self) -> &mut BytesMut {
match self { match self {
Output::Empty(ref mut bytes) => bytes,
Output::Buffer(ref mut bytes) => bytes, Output::Buffer(ref mut bytes) => bytes,
Output::Encoder(ref mut enc) => enc.buf_mut(), Output::Encoder(ref mut enc) => enc.buf_mut(),
Output::TE(ref mut te) => te.buf_mut(), Output::TE(ref mut te) => te.buf_mut(),
_ => panic!(), Output::Done => panic!(),
} }
} }
pub fn split_to(&mut self, cap: usize) -> BytesMut { pub fn split_to(&mut self, cap: usize) -> BytesMut {
match self { match self {
Output::Empty(ref mut bytes) => bytes.split_to(cap),
Output::Buffer(ref mut bytes) => bytes.split_to(cap), Output::Buffer(ref mut bytes) => bytes.split_to(cap),
Output::Encoder(ref mut enc) => enc.buf_mut().split_to(cap), Output::Encoder(ref mut enc) => enc.buf_mut().split_to(cap),
Output::TE(ref mut te) => te.buf_mut().split_to(cap), Output::TE(ref mut te) => te.buf_mut().split_to(cap),
Output::Empty => BytesMut::new(), Output::Done => BytesMut::new(),
} }
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
match self { match self {
Output::Empty(ref bytes) => bytes.len(),
Output::Buffer(ref bytes) => bytes.len(), Output::Buffer(ref bytes) => bytes.len(),
Output::Encoder(ref enc) => enc.len(), Output::Encoder(ref enc) => enc.len(),
Output::TE(ref te) => te.len(), Output::TE(ref te) => te.len(),
Output::Empty => 0, Output::Done => 0,
} }
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
match self { match self {
Output::Empty(ref bytes) => bytes.is_empty(),
Output::Buffer(ref bytes) => bytes.is_empty(), Output::Buffer(ref bytes) => bytes.is_empty(),
Output::Encoder(ref enc) => enc.is_empty(), Output::Encoder(ref enc) => enc.is_empty(),
Output::TE(ref te) => te.is_empty(), Output::TE(ref te) => te.is_empty(),
Output::Empty => true, Output::Done => true,
} }
} }
@ -98,7 +106,7 @@ impl Output {
} }
Output::Encoder(ref mut enc) => enc.write(data), Output::Encoder(ref mut enc) => enc.write(data),
Output::TE(ref mut te) => te.encode(data).map(|_| ()), Output::TE(ref mut te) => te.encode(data).map(|_| ()),
Output::Empty => Ok(()), Output::Empty(_) | Output::Done => Ok(()),
} }
} }
@ -107,40 +115,15 @@ impl Output {
Output::Buffer(_) => Ok(true), Output::Buffer(_) => Ok(true),
Output::Encoder(ref mut enc) => enc.write_eof(), Output::Encoder(ref mut enc) => enc.write_eof(),
Output::TE(ref mut te) => Ok(te.encode_eof()), Output::TE(ref mut te) => Ok(te.encode_eof()),
Output::Empty => Ok(true), Output::Empty(_) | Output::Done => Ok(true),
}
} }
} }
pub(crate) enum ContentEncoder {
#[cfg(feature = "flate2")]
Deflate(DeflateEncoder<TransferEncoding>),
#[cfg(feature = "flate2")]
Gzip(GzEncoder<TransferEncoding>),
#[cfg(feature = "brotli")]
Br(BrotliEncoder<TransferEncoding>),
Identity(TransferEncoding),
}
impl fmt::Debug for ContentEncoder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
#[cfg(feature = "brotli")]
ContentEncoder::Br(_) => writeln!(f, "ContentEncoder(Brotli)"),
#[cfg(feature = "flate2")]
ContentEncoder::Deflate(_) => writeln!(f, "ContentEncoder(Deflate)"),
#[cfg(feature = "flate2")]
ContentEncoder::Gzip(_) => writeln!(f, "ContentEncoder(Gzip)"),
ContentEncoder::Identity(_) => writeln!(f, "ContentEncoder(Identity)"),
}
}
}
impl ContentEncoder {
pub fn for_server( pub fn for_server(
buf: BytesMut, req: &HttpInnerMessage, resp: &mut HttpResponse, &mut self, req: &HttpInnerMessage, resp: &mut HttpResponse,
response_encoding: ContentEncoding, response_encoding: ContentEncoding,
) -> Output { ) {
let buf = self.take();
let version = resp.version().unwrap_or_else(|| req.version); let version = resp.version().unwrap_or_else(|| req.version);
let is_head = req.method == Method::HEAD; let is_head = req.method == Method::HEAD;
let mut len = 0; let mut len = 0;
@ -188,12 +171,13 @@ impl ContentEncoder {
let mut encoding = ContentEncoding::Identity; let mut encoding = ContentEncoding::Identity;
#[cfg_attr(feature = "cargo-clippy", allow(match_ref_pats))] #[cfg_attr(feature = "cargo-clippy", allow(match_ref_pats))]
let mut transfer = match resp.body() { let transfer = match resp.body() {
&Body::Empty => { &Body::Empty => {
if req.method != Method::HEAD { if req.method != Method::HEAD {
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
} }
TransferEncoding::length(0, buf) *self = Output::Empty(buf);
return;
} }
&Body::Binary(_) => { &Body::Binary(_) => {
#[cfg(any(feature = "brotli", feature = "flate2"))] #[cfg(any(feature = "brotli", feature = "flate2"))]
@ -228,8 +212,6 @@ impl ContentEncoder {
let _ = enc.write_eof(); let _ = enc.write_eof();
let body = enc.buf_mut().take(); let body = enc.buf_mut().take();
len = body.len(); len = body.len();
encoding = ContentEncoding::Identity;
resp.replace_body(Binary::from(body)); resp.replace_body(Binary::from(body));
} }
} }
@ -241,10 +223,11 @@ impl ContentEncoder {
CONTENT_LENGTH, CONTENT_LENGTH,
HeaderValue::try_from(b.freeze()).unwrap(), HeaderValue::try_from(b.freeze()).unwrap(),
); );
*self = Output::Empty(buf);
} else { } else {
// resp.headers_mut().remove(CONTENT_LENGTH); *self = Output::Buffer(buf);
} }
TransferEncoding::eof(buf) return;
} }
&Body::Streaming(_) | &Body::Actor(_) => { &Body::Streaming(_) | &Body::Actor(_) => {
if resp.upgrade() { if resp.upgrade() {
@ -262,14 +245,15 @@ impl ContentEncoder {
{ {
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
} }
ContentEncoder::streaming_encoding(buf, version, resp) Output::streaming_encoding(buf, version, resp)
} }
} }
}; };
// check for head response // check for head response
if is_head { if is_head {
resp.set_body(Body::Empty); resp.set_body(Body::Empty);
transfer.kind = TransferEncodingKind::Length(0); *self = Output::Empty(transfer.buf.unwrap());
return;
} }
let enc = match encoding { let enc = match encoding {
@ -285,10 +269,11 @@ impl ContentEncoder {
#[cfg(feature = "brotli")] #[cfg(feature = "brotli")]
ContentEncoding::Br => ContentEncoder::Br(BrotliEncoder::new(transfer, 3)), ContentEncoding::Br => ContentEncoder::Br(BrotliEncoder::new(transfer, 3)),
ContentEncoding::Identity | ContentEncoding::Auto => { ContentEncoding::Identity | ContentEncoding::Auto => {
return Output::TE(transfer) *self = Output::TE(transfer);
return;
} }
}; };
Output::Encoder(enc) *self = Output::Encoder(enc);
} }
fn streaming_encoding( fn streaming_encoding(
@ -355,6 +340,30 @@ impl ContentEncoder {
} }
} }
pub(crate) enum ContentEncoder {
#[cfg(feature = "flate2")]
Deflate(DeflateEncoder<TransferEncoding>),
#[cfg(feature = "flate2")]
Gzip(GzEncoder<TransferEncoding>),
#[cfg(feature = "brotli")]
Br(BrotliEncoder<TransferEncoding>),
Identity(TransferEncoding),
}
impl fmt::Debug for ContentEncoder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
#[cfg(feature = "brotli")]
ContentEncoder::Br(_) => writeln!(f, "ContentEncoder(Brotli)"),
#[cfg(feature = "flate2")]
ContentEncoder::Deflate(_) => writeln!(f, "ContentEncoder(Deflate)"),
#[cfg(feature = "flate2")]
ContentEncoder::Gzip(_) => writeln!(f, "ContentEncoder(Gzip)"),
ContentEncoder::Identity(_) => writeln!(f, "ContentEncoder(Identity)"),
}
}
}
impl ContentEncoder { impl ContentEncoder {
#[inline] #[inline]
pub fn len(&self) -> usize { pub fn len(&self) -> usize {