1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

Allow to explicitly disable chunked encoding

This commit is contained in:
Nikolay Kim 2018-01-13 16:17:33 -08:00
parent 305666067e
commit 93fdb596d4
6 changed files with 131 additions and 94 deletions

View File

@ -6,6 +6,8 @@
* Do not enable chunked encoding for HTTP/1.0 * Do not enable chunked encoding for HTTP/1.0
* Allow to explicitly disable chunked encoding
## 0.3.0 (2018-01-12) ## 0.3.0 (2018-01-12)

View File

@ -1,9 +1,9 @@
use std; use std;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::collections::VecDeque;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use futures::sync::oneshot::Sender; use futures::sync::oneshot::Sender;
use futures::unsync::oneshot; use futures::unsync::oneshot;
use smallvec::SmallVec;
use actix::{Actor, ActorState, ActorContext, AsyncContext, use actix::{Actor, ActorState, ActorContext, AsyncContext,
Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle};
@ -18,7 +18,7 @@ use httprequest::HttpRequest;
pub trait ActorHttpContext: 'static { pub trait ActorHttpContext: 'static {
fn disconnected(&mut self); fn disconnected(&mut self);
fn poll(&mut self) -> Poll<Option<Frame>, Error>; fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error>;
} }
#[derive(Debug)] #[derive(Debug)]
@ -31,7 +31,7 @@ pub enum Frame {
pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>, pub struct HttpContext<A, S=()> where A: Actor<Context=HttpContext<A, S>>,
{ {
inner: ContextImpl<A>, inner: ContextImpl<A>,
stream: VecDeque<Frame>, stream: Option<SmallVec<[Frame; 2]>>,
request: HttpRequest<S>, request: HttpRequest<S>,
disconnected: bool, disconnected: bool,
} }
@ -91,7 +91,7 @@ impl<A, S: 'static> HttpContext<A, S> where A: Actor<Context=Self> {
pub fn from_request(req: HttpRequest<S>) -> HttpContext<A, S> { pub fn from_request(req: HttpRequest<S>) -> HttpContext<A, S> {
HttpContext { HttpContext {
inner: ContextImpl::new(None), inner: ContextImpl::new(None),
stream: VecDeque::new(), stream: None,
request: req, request: req,
disconnected: false, disconnected: false,
} }
@ -121,7 +121,7 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
#[inline] #[inline]
pub fn write<B: Into<Binary>>(&mut self, data: B) { pub fn write<B: Into<Binary>>(&mut self, data: B) {
if !self.disconnected { if !self.disconnected {
self.stream.push_back(Frame::Chunk(Some(data.into()))); self.add_frame(Frame::Chunk(Some(data.into())));
} else { } else {
warn!("Trying to write to disconnected response"); warn!("Trying to write to disconnected response");
} }
@ -130,14 +130,14 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
/// Indicate end of streamimng payload. Also this method calls `Self::close`. /// Indicate end of streamimng payload. Also this method calls `Self::close`.
#[inline] #[inline]
pub fn write_eof(&mut self) { pub fn write_eof(&mut self) {
self.stream.push_back(Frame::Chunk(None)); self.add_frame(Frame::Chunk(None));
} }
/// Returns drain future /// Returns drain future
pub fn drain(&mut self) -> Drain<A> { pub fn drain(&mut self) -> Drain<A> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.inner.modify(); self.inner.modify();
self.stream.push_back(Frame::Drain(tx)); self.add_frame(Frame::Drain(tx));
Drain::new(rx) Drain::new(rx)
} }
@ -146,6 +146,14 @@ impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
pub fn connected(&self) -> bool { pub fn connected(&self) -> bool {
!self.disconnected !self.disconnected
} }
#[inline]
fn add_frame(&mut self, frame: Frame) {
if self.stream.is_none() {
self.stream = Some(SmallVec::new());
}
self.stream.as_mut().map(|s| s.push(frame));
}
} }
impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> { impl<A, S> HttpContext<A, S> where A: Actor<Context=Self> {
@ -176,7 +184,7 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
self.stop(); self.stop();
} }
fn poll(&mut self) -> Poll<Option<Frame>, Error> { fn poll(&mut self) -> Poll<Option<SmallVec<[Frame; 2]>>, Error> {
let ctx: &mut HttpContext<A, S> = unsafe { let ctx: &mut HttpContext<A, S> = unsafe {
std::mem::transmute(self as &mut HttpContext<A, S>) std::mem::transmute(self as &mut HttpContext<A, S>)
}; };
@ -189,8 +197,8 @@ impl<A, S> ActorHttpContext for HttpContext<A, S> where A: Actor<Context=Self>,
} }
// frames // frames
if let Some(frame) = self.stream.pop_front() { if let Some(data) = self.stream.take() {
Ok(Async::Ready(Some(frame))) Ok(Async::Ready(Some(data)))
} else if self.inner.alive() { } else if self.inner.alive() {
Ok(Async::NotReady) Ok(Async::NotReady)
} else { } else {

View File

@ -158,7 +158,7 @@ impl HttpResponse {
/// is chunked encoding enabled /// is chunked encoding enabled
#[inline] #[inline]
pub fn chunked(&self) -> bool { pub fn chunked(&self) -> Option<bool> {
self.get_ref().chunked self.get_ref().chunked
} }
@ -329,7 +329,16 @@ impl HttpResponseBuilder {
#[inline] #[inline]
pub fn chunked(&mut self) -> &mut Self { pub fn chunked(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) { if let Some(parts) = parts(&mut self.response, &self.err) {
parts.chunked = true; parts.chunked = Some(true);
}
self
}
/// Force disable chunked encoding
#[inline]
pub fn no_chunking(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.chunked = Some(false);
} }
self self
} }
@ -641,7 +650,7 @@ struct InnerHttpResponse {
status: StatusCode, status: StatusCode,
reason: Option<&'static str>, reason: Option<&'static str>,
body: Body, body: Body,
chunked: bool, chunked: Option<bool>,
encoding: ContentEncoding, encoding: ContentEncoding,
connection_type: Option<ConnectionType>, connection_type: Option<ConnectionType>,
response_size: u64, response_size: u64,
@ -658,7 +667,7 @@ impl InnerHttpResponse {
status: status, status: status,
reason: None, reason: None,
body: body, body: body,
chunked: false, chunked: None,
encoding: ContentEncoding::Auto, encoding: ContentEncoding::Auto,
connection_type: None, connection_type: None,
response_size: 0, response_size: 0,
@ -709,7 +718,7 @@ impl Pool {
if v.len() < 128 { if v.len() < 128 {
inner.headers.clear(); inner.headers.clear();
inner.version = None; inner.version = None;
inner.chunked = false; inner.chunked = None;
inner.reason = None; inner.reason = None;
inner.encoding = ContentEncoding::Auto; inner.encoding = ContentEncoding::Auto;
inner.connection_type = None; inner.connection_type = None;

View File

@ -439,8 +439,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
ProcessResponse{ resp: resp, ProcessResponse{ resp: resp,
iostate: IOState::Response, iostate: IOState::Response,
running: RunningState::Running, running: RunningState::Running,
drain: None, drain: None, _s: PhantomData, _h: PhantomData})
_s: PhantomData, _h: PhantomData})
} }
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>) fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
@ -448,7 +447,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
{ {
if self.drain.is_none() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full // if task is paused, write buffer is probably full
loop { 'outter: loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) { let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => { IOState::Response => {
let result = match io.start(info.req_mut().get_inner(), &mut self.resp) { let result = match io.start(info.req_mut().get_inner(), &mut self.resp) {
@ -504,35 +503,44 @@ impl<S: 'static, H> ProcessResponse<S, H> {
ctx.disconnected(); ctx.disconnected();
} }
match ctx.poll() { match ctx.poll() {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(vec))) => {
match frame { if vec.is_empty() {
Frame::Chunk(None) => { self.iostate = IOState::Actor(ctx);
info.context = Some(ctx); break
self.iostate = IOState::Done; }
if let Err(err) = io.write_eof() { let mut res = None;
info.error = Some(err.into()); for frame in vec {
return Ok( match frame {
FinishingMiddlewares::init(info, self.resp)) Frame::Chunk(None) => {
} info.context = Some(ctx);
break self.iostate = IOState::Done;
}, if let Err(err) = io.write_eof() {
Frame::Chunk(Some(chunk)) => {
self.iostate = IOState::Actor(ctx);
match io.write(chunk.as_ref()) {
Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init(info, self.resp)) FinishingMiddlewares::init(info, self.resp))
}, }
Ok(result) => result break 'outter
} },
}, Frame::Chunk(Some(chunk)) => {
Frame::Drain(fut) => { match io.write(chunk.as_ref()) {
self.drain = Some(fut); Err(err) => {
self.iostate = IOState::Actor(ctx); info.error = Some(err.into());
break return Ok(
FinishingMiddlewares::init(info, self.resp))
},
Ok(result) => res = Some(result),
}
},
Frame::Drain(fut) =>
self.drain = Some(fut),
} }
} }
self.iostate = IOState::Actor(ctx);
if self.drain.is_some() {
self.running.resume();
break 'outter
}
res.unwrap()
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.iostate = IOState::Done; self.iostate = IOState::Done;
@ -677,6 +685,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
} }
} }
#[derive(Debug)]
struct Completed<S, H>(PhantomData<S>, PhantomData<H>); struct Completed<S, H>(PhantomData<S>, PhantomData<H>);
impl<S, H> Completed<S, H> { impl<S, H> Completed<S, H> {

View File

@ -378,9 +378,6 @@ impl PayloadEncoder {
let transfer = match body { let transfer = match body {
Body::Empty => { Body::Empty => {
if resp.chunked() {
error!("Chunked transfer is enabled but body is set to Empty");
}
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::eof(buf) TransferEncoding::eof(buf)
}, },
@ -444,54 +441,59 @@ impl PayloadEncoder {
fn streaming_encoding(buf: SharedBytes, version: Version, fn streaming_encoding(buf: SharedBytes, version: Version,
resp: &mut HttpResponse) -> TransferEncoding { resp: &mut HttpResponse) -> TransferEncoding {
if resp.chunked() { match resp.chunked() {
// Enable transfer encoding Some(true) => {
resp.headers_mut().remove(CONTENT_LENGTH); // Enable transfer encoding
if version == Version::HTTP_2 { resp.headers_mut().remove(CONTENT_LENGTH);
resp.headers_mut().remove(TRANSFER_ENCODING); if version == Version::HTTP_2 {
TransferEncoding::eof(buf) resp.headers_mut().remove(TRANSFER_ENCODING);
} else { TransferEncoding::eof(buf)
resp.headers_mut().insert( } else {
TRANSFER_ENCODING, HeaderValue::from_static("chunked")); resp.headers_mut().insert(
TransferEncoding::chunked(buf) TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
} TransferEncoding::chunked(buf)
} else { }
// if Content-Length is specified, then use it as length hint },
let (len, chunked) = Some(false) =>
if let Some(len) = resp.headers().get(CONTENT_LENGTH) { TransferEncoding::eof(buf),
// Content-Length None => {
if let Ok(s) = len.to_str() { // if Content-Length is specified, then use it as length hint
if let Ok(len) = s.parse::<u64>() { let (len, chunked) =
(Some(len), false) if let Some(len) = resp.headers().get(CONTENT_LENGTH) {
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
(Some(len), false)
} else {
error!("illegal Content-Length: {:?}", len);
(None, false)
}
} else { } else {
error!("illegal Content-Length: {:?}", len); error!("illegal Content-Length: {:?}", len);
(None, false) (None, false)
} }
} else { } else {
error!("illegal Content-Length: {:?}", len); (None, true)
(None, false) };
if !chunked {
if let Some(len) = len {
TransferEncoding::length(len, buf)
} else {
TransferEncoding::eof(buf)
} }
} else { } else {
(None, true) // Enable transfer encoding
}; match version {
Version::HTTP_11 => {
if !chunked { resp.headers_mut().insert(
if let Some(len) = len { TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::length(len, buf) TransferEncoding::chunked(buf)
} else { },
TransferEncoding::eof(buf) _ => {
} resp.headers_mut().remove(TRANSFER_ENCODING);
} else { TransferEncoding::eof(buf)
// Enable transfer encoding }
match version {
Version::HTTP_11 => {
resp.headers_mut().insert(
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked(buf)
},
_ => {
resp.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof(buf)
} }
} }
} }

View File

@ -1,8 +1,8 @@
use std::mem; use std::mem;
use std::collections::VecDeque;
use futures::{Async, Poll}; use futures::{Async, Poll};
use futures::sync::oneshot::Sender; use futures::sync::oneshot::Sender;
use futures::unsync::oneshot; use futures::unsync::oneshot;
use smallvec::SmallVec;
use actix::{Actor, ActorState, ActorContext, AsyncContext, use actix::{Actor, ActorState, ActorContext, AsyncContext,
Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle};
@ -23,7 +23,7 @@ use ws::proto::{OpCode, CloseCode};
pub struct WebsocketContext<A, S=()> where A: Actor<Context=WebsocketContext<A, S>>, pub struct WebsocketContext<A, S=()> where A: Actor<Context=WebsocketContext<A, S>>,
{ {
inner: ContextImpl<A>, inner: ContextImpl<A>,
stream: VecDeque<ContextFrame>, stream: Option<SmallVec<[ContextFrame; 2]>>,
request: HttpRequest<S>, request: HttpRequest<S>,
disconnected: bool, disconnected: bool,
} }
@ -88,7 +88,7 @@ impl<A, S: 'static> WebsocketContext<A, S> where A: Actor<Context=Self> {
pub fn from_request(req: HttpRequest<S>) -> WebsocketContext<A, S> { pub fn from_request(req: HttpRequest<S>) -> WebsocketContext<A, S> {
WebsocketContext { WebsocketContext {
inner: ContextImpl::new(None), inner: ContextImpl::new(None),
stream: VecDeque::new(), stream: None,
request: req, request: req,
disconnected: false, disconnected: false,
} }
@ -107,7 +107,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
#[inline] #[inline]
fn write<B: Into<Binary>>(&mut self, data: B) { fn write<B: Into<Binary>>(&mut self, data: B) {
if !self.disconnected { if !self.disconnected {
self.stream.push_back(ContextFrame::Chunk(Some(data.into()))); self.add_frame(ContextFrame::Chunk(Some(data.into())));
} else { } else {
warn!("Trying to write to disconnected response"); warn!("Trying to write to disconnected response");
} }
@ -173,7 +173,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
pub fn drain(&mut self) -> Drain<A> { pub fn drain(&mut self) -> Drain<A> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.inner.modify(); self.inner.modify();
self.stream.push_back(ContextFrame::Drain(tx)); self.add_frame(ContextFrame::Drain(tx));
Drain::new(rx) Drain::new(rx)
} }
@ -182,6 +182,13 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
pub fn connected(&self) -> bool { pub fn connected(&self) -> bool {
!self.disconnected !self.disconnected
} }
fn add_frame(&mut self, frame: ContextFrame) {
if self.stream.is_none() {
self.stream = Some(SmallVec::new());
}
self.stream.as_mut().map(|s| s.push(frame));
}
} }
impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> { impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
@ -212,7 +219,7 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
self.stop(); self.stop();
} }
fn poll(&mut self) -> Poll<Option<ContextFrame>, Error> { fn poll(&mut self) -> Poll<Option<SmallVec<[ContextFrame;2]>>, Error> {
let ctx: &mut WebsocketContext<A, S> = unsafe { let ctx: &mut WebsocketContext<A, S> = unsafe {
mem::transmute(self as &mut WebsocketContext<A, S>) mem::transmute(self as &mut WebsocketContext<A, S>)
}; };
@ -225,8 +232,8 @@ impl<A, S> ActorHttpContext for WebsocketContext<A, S> where A: Actor<Context=Se
} }
// frames // frames
if let Some(frame) = self.stream.pop_front() { if let Some(data) = self.stream.take() {
Ok(Async::Ready(Some(frame))) Ok(Async::Ready(Some(data)))
} else if self.inner.alive() { } else if self.inner.alive() {
Ok(Async::NotReady) Ok(Async::NotReady)
} else { } else {