diff --git a/CHANGES.md b/CHANGES.md index 95eecbf84..00d8de588 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,8 @@ * Do not enable chunked encoding for HTTP/1.0 +* Allow to explicitly disable chunked encoding + ## 0.3.0 (2018-01-12) diff --git a/src/context.rs b/src/context.rs index 13eb884cd..f518e24ad 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,9 +1,9 @@ use std; use std::marker::PhantomData; -use std::collections::VecDeque; use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; use futures::unsync::oneshot; +use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; @@ -18,7 +18,7 @@ use httprequest::HttpRequest; pub trait ActorHttpContext: 'static { fn disconnected(&mut self); - fn poll(&mut self) -> Poll, Error>; + fn poll(&mut self) -> Poll>, Error>; } #[derive(Debug)] @@ -31,7 +31,7 @@ pub enum Frame { pub struct HttpContext where A: Actor>, { inner: ContextImpl, - stream: VecDeque, + stream: Option>, request: HttpRequest, disconnected: bool, } @@ -91,7 +91,7 @@ impl HttpContext where A: Actor { pub fn from_request(req: HttpRequest) -> HttpContext { HttpContext { inner: ContextImpl::new(None), - stream: VecDeque::new(), + stream: None, request: req, disconnected: false, } @@ -121,7 +121,7 @@ impl HttpContext where A: Actor { #[inline] pub fn write>(&mut self, data: B) { if !self.disconnected { - self.stream.push_back(Frame::Chunk(Some(data.into()))); + self.add_frame(Frame::Chunk(Some(data.into()))); } else { warn!("Trying to write to disconnected response"); } @@ -130,14 +130,14 @@ impl HttpContext where A: Actor { /// Indicate end of streamimng payload. Also this method calls `Self::close`. #[inline] pub fn write_eof(&mut self) { - self.stream.push_back(Frame::Chunk(None)); + self.add_frame(Frame::Chunk(None)); } /// Returns drain future pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); self.inner.modify(); - self.stream.push_back(Frame::Drain(tx)); + self.add_frame(Frame::Drain(tx)); Drain::new(rx) } @@ -146,6 +146,14 @@ impl HttpContext where A: Actor { pub fn connected(&self) -> bool { !self.disconnected } + + #[inline] + fn add_frame(&mut self, frame: Frame) { + if self.stream.is_none() { + self.stream = Some(SmallVec::new()); + } + self.stream.as_mut().map(|s| s.push(frame)); + } } impl HttpContext where A: Actor { @@ -176,7 +184,7 @@ impl ActorHttpContext for HttpContext where A: Actor, self.stop(); } - fn poll(&mut self) -> Poll, Error> { + fn poll(&mut self) -> Poll>, Error> { let ctx: &mut HttpContext = unsafe { std::mem::transmute(self as &mut HttpContext) }; @@ -189,8 +197,8 @@ impl ActorHttpContext for HttpContext where A: Actor, } // frames - if let Some(frame) = self.stream.pop_front() { - Ok(Async::Ready(Some(frame))) + if let Some(data) = self.stream.take() { + Ok(Async::Ready(Some(data))) } else if self.inner.alive() { Ok(Async::NotReady) } else { diff --git a/src/httpresponse.rs b/src/httpresponse.rs index e015275f2..e356aad35 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -158,7 +158,7 @@ impl HttpResponse { /// is chunked encoding enabled #[inline] - pub fn chunked(&self) -> bool { + pub fn chunked(&self) -> Option { self.get_ref().chunked } @@ -329,7 +329,16 @@ impl HttpResponseBuilder { #[inline] pub fn chunked(&mut self) -> &mut Self { if let Some(parts) = parts(&mut self.response, &self.err) { - parts.chunked = true; + parts.chunked = Some(true); + } + self + } + + /// Force disable chunked encoding + #[inline] + pub fn no_chunking(&mut self) -> &mut Self { + if let Some(parts) = parts(&mut self.response, &self.err) { + parts.chunked = Some(false); } self } @@ -641,7 +650,7 @@ struct InnerHttpResponse { status: StatusCode, reason: Option<&'static str>, body: Body, - chunked: bool, + chunked: Option, encoding: ContentEncoding, connection_type: Option, response_size: u64, @@ -658,7 +667,7 @@ impl InnerHttpResponse { status: status, reason: None, body: body, - chunked: false, + chunked: None, encoding: ContentEncoding::Auto, connection_type: None, response_size: 0, @@ -709,7 +718,7 @@ impl Pool { if v.len() < 128 { inner.headers.clear(); inner.version = None; - inner.chunked = false; + inner.chunked = None; inner.reason = None; inner.encoding = ContentEncoding::Auto; inner.connection_type = None; diff --git a/src/pipeline.rs b/src/pipeline.rs index 0cd6f7531..85aa744ce 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -439,8 +439,7 @@ impl ProcessResponse { ProcessResponse{ resp: resp, iostate: IOState::Response, running: RunningState::Running, - drain: None, - _s: PhantomData, _h: PhantomData}) + drain: None, _s: PhantomData, _h: PhantomData}) } fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo) @@ -448,7 +447,7 @@ impl ProcessResponse { { if self.drain.is_none() && self.running != RunningState::Paused { // if task is paused, write buffer is probably full - loop { + 'outter: loop { let result = match mem::replace(&mut self.iostate, IOState::Done) { IOState::Response => { let result = match io.start(info.req_mut().get_inner(), &mut self.resp) { @@ -504,35 +503,44 @@ impl ProcessResponse { ctx.disconnected(); } match ctx.poll() { - Ok(Async::Ready(Some(frame))) => { - match frame { - Frame::Chunk(None) => { - info.context = Some(ctx); - self.iostate = IOState::Done; - if let Err(err) = io.write_eof() { - info.error = Some(err.into()); - return Ok( - FinishingMiddlewares::init(info, self.resp)) - } - break - }, - Frame::Chunk(Some(chunk)) => { - self.iostate = IOState::Actor(ctx); - match io.write(chunk.as_ref()) { - Err(err) => { + Ok(Async::Ready(Some(vec))) => { + if vec.is_empty() { + self.iostate = IOState::Actor(ctx); + break + } + let mut res = None; + for frame in vec { + match frame { + Frame::Chunk(None) => { + info.context = Some(ctx); + self.iostate = IOState::Done; + if let Err(err) = io.write_eof() { info.error = Some(err.into()); return Ok( FinishingMiddlewares::init(info, self.resp)) - }, - Ok(result) => result - } - }, - Frame::Drain(fut) => { - self.drain = Some(fut); - self.iostate = IOState::Actor(ctx); - break + } + break 'outter + }, + Frame::Chunk(Some(chunk)) => { + match io.write(chunk.as_ref()) { + Err(err) => { + info.error = Some(err.into()); + return Ok( + FinishingMiddlewares::init(info, self.resp)) + }, + Ok(result) => res = Some(result), + } + }, + Frame::Drain(fut) => + self.drain = Some(fut), } } + self.iostate = IOState::Actor(ctx); + if self.drain.is_some() { + self.running.resume(); + break 'outter + } + res.unwrap() }, Ok(Async::Ready(None)) => { self.iostate = IOState::Done; @@ -677,6 +685,7 @@ impl FinishingMiddlewares { } } +#[derive(Debug)] struct Completed(PhantomData, PhantomData); impl Completed { diff --git a/src/server/encoding.rs b/src/server/encoding.rs index 1fda25a33..c0cd6fd32 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -378,9 +378,6 @@ impl PayloadEncoder { let transfer = match body { Body::Empty => { - if resp.chunked() { - error!("Chunked transfer is enabled but body is set to Empty"); - } resp.headers_mut().remove(CONTENT_LENGTH); TransferEncoding::eof(buf) }, @@ -444,54 +441,59 @@ impl PayloadEncoder { fn streaming_encoding(buf: SharedBytes, version: Version, resp: &mut HttpResponse) -> TransferEncoding { - if resp.chunked() { - // Enable transfer encoding - resp.headers_mut().remove(CONTENT_LENGTH); - if version == Version::HTTP_2 { - resp.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::eof(buf) - } else { - resp.headers_mut().insert( - TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - TransferEncoding::chunked(buf) - } - } else { - // if Content-Length is specified, then use it as length hint - let (len, chunked) = - if let Some(len) = resp.headers().get(CONTENT_LENGTH) { - // Content-Length - if let Ok(s) = len.to_str() { - if let Ok(len) = s.parse::() { - (Some(len), false) + match resp.chunked() { + Some(true) => { + // Enable transfer encoding + resp.headers_mut().remove(CONTENT_LENGTH); + if version == Version::HTTP_2 { + resp.headers_mut().remove(TRANSFER_ENCODING); + TransferEncoding::eof(buf) + } else { + resp.headers_mut().insert( + TRANSFER_ENCODING, HeaderValue::from_static("chunked")); + TransferEncoding::chunked(buf) + } + }, + Some(false) => + TransferEncoding::eof(buf), + None => { + // if Content-Length is specified, then use it as length hint + let (len, chunked) = + if let Some(len) = resp.headers().get(CONTENT_LENGTH) { + // Content-Length + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + (Some(len), false) + } else { + error!("illegal Content-Length: {:?}", len); + (None, false) + } } else { error!("illegal Content-Length: {:?}", len); (None, false) } } else { - error!("illegal Content-Length: {:?}", len); - (None, false) + (None, true) + }; + + if !chunked { + if let Some(len) = len { + TransferEncoding::length(len, buf) + } else { + TransferEncoding::eof(buf) } } else { - (None, true) - }; - - if !chunked { - if let Some(len) = len { - TransferEncoding::length(len, buf) - } else { - TransferEncoding::eof(buf) - } - } else { - // Enable transfer encoding - match version { - Version::HTTP_11 => { - resp.headers_mut().insert( - TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - TransferEncoding::chunked(buf) - }, - _ => { - resp.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::eof(buf) + // Enable transfer encoding + match version { + Version::HTTP_11 => { + resp.headers_mut().insert( + TRANSFER_ENCODING, HeaderValue::from_static("chunked")); + TransferEncoding::chunked(buf) + }, + _ => { + resp.headers_mut().remove(TRANSFER_ENCODING); + TransferEncoding::eof(buf) + } } } } diff --git a/src/ws/context.rs b/src/ws/context.rs index d557255dc..f77a3f2bd 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -1,8 +1,8 @@ use std::mem; -use std::collections::VecDeque; use futures::{Async, Poll}; use futures::sync::oneshot::Sender; use futures::unsync::oneshot; +use smallvec::SmallVec; use actix::{Actor, ActorState, ActorContext, AsyncContext, Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; @@ -23,7 +23,7 @@ use ws::proto::{OpCode, CloseCode}; pub struct WebsocketContext where A: Actor>, { inner: ContextImpl, - stream: VecDeque, + stream: Option>, request: HttpRequest, disconnected: bool, } @@ -88,7 +88,7 @@ impl WebsocketContext where A: Actor { pub fn from_request(req: HttpRequest) -> WebsocketContext { WebsocketContext { inner: ContextImpl::new(None), - stream: VecDeque::new(), + stream: None, request: req, disconnected: false, } @@ -107,7 +107,7 @@ impl WebsocketContext where A: Actor { #[inline] fn write>(&mut self, data: B) { if !self.disconnected { - self.stream.push_back(ContextFrame::Chunk(Some(data.into()))); + self.add_frame(ContextFrame::Chunk(Some(data.into()))); } else { warn!("Trying to write to disconnected response"); } @@ -173,7 +173,7 @@ impl WebsocketContext where A: Actor { pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); self.inner.modify(); - self.stream.push_back(ContextFrame::Drain(tx)); + self.add_frame(ContextFrame::Drain(tx)); Drain::new(rx) } @@ -182,6 +182,13 @@ impl WebsocketContext where A: Actor { pub fn connected(&self) -> bool { !self.disconnected } + + fn add_frame(&mut self, frame: ContextFrame) { + if self.stream.is_none() { + self.stream = Some(SmallVec::new()); + } + self.stream.as_mut().map(|s| s.push(frame)); + } } impl WebsocketContext where A: Actor { @@ -212,7 +219,7 @@ impl ActorHttpContext for WebsocketContext where A: Actor Poll, Error> { + fn poll(&mut self) -> Poll>, Error> { let ctx: &mut WebsocketContext = unsafe { mem::transmute(self as &mut WebsocketContext) }; @@ -225,8 +232,8 @@ impl ActorHttpContext for WebsocketContext where A: Actor