diff --git a/src/types/payload.rs b/src/types/payload.rs index fd4d3e945..9228b37aa 100644 --- a/src/types/payload.rs +++ b/src/types/payload.rs @@ -10,11 +10,8 @@ use bytes::{Bytes, BytesMut}; use encoding_rs::{Encoding, UTF_8}; use futures_core::stream::Stream; use futures_util::{ - future::{ - err, ok, Either, ErrInto, FutureExt as _, LocalBoxFuture, Ready, - TryFutureExt as _, - }, - stream::StreamExt as _, + future::{err, ok, Either, ErrInto, Ready, TryFutureExt as _}, + ready, }; use mime::Mime; @@ -305,10 +302,12 @@ impl PayloadConfig { // Allow shared refs to default. const DEFAULT_CONFIG: PayloadConfig = PayloadConfig { - limit: 262_144, // 2^18 bytes (~256kB) + limit: DEFAULT_CONFIG_LIMIT, mimetype: None, }; +const DEFAULT_CONFIG_LIMIT: usize = 262_144; // 2^18 bytes (~256kB) + impl Default for PayloadConfig { fn default() -> Self { DEFAULT_CONFIG.clone() @@ -326,99 +325,83 @@ pub struct HttpMessageBody { limit: usize, length: Option, #[cfg(feature = "compress")] - stream: Option>, + stream: dev::Decompress, #[cfg(not(feature = "compress"))] - stream: Option, + stream: dev::Payload, + buf: BytesMut, err: Option, - fut: Option>>, } impl HttpMessageBody { /// Create `MessageBody` for request. #[allow(clippy::borrow_interior_mutable_const)] pub fn new(req: &HttpRequest, payload: &mut dev::Payload) -> HttpMessageBody { - let mut len = None; + let mut length = None; + let mut err = None; + if let Some(l) = req.headers().get(&header::CONTENT_LENGTH) { - if let Ok(s) = l.to_str() { - if let Ok(l) = s.parse::() { - len = Some(l) - } else { - return Self::err(PayloadError::UnknownLength); - } - } else { - return Self::err(PayloadError::UnknownLength); + match l.to_str() { + Ok(s) => match s.parse::() { + Ok(l) if l > DEFAULT_CONFIG_LIMIT => { + err = Some(PayloadError::Overflow) + } + Ok(l) => length = Some(l), + Err(_) => err = Some(PayloadError::UnknownLength), + }, + Err(_) => err = Some(PayloadError::UnknownLength), } } #[cfg(feature = "compress")] - let stream = Some(dev::Decompress::from_headers(payload.take(), req.headers())); + let stream = dev::Decompress::from_headers(payload.take(), req.headers()); #[cfg(not(feature = "compress"))] - let stream = Some(payload.take()); + let stream = payload.take(); HttpMessageBody { stream, - limit: 262_144, - length: len, - fut: None, - err: None, + limit: DEFAULT_CONFIG_LIMIT, + length, + buf: BytesMut::with_capacity(8192), + err, } } /// Change max size of payload. By default max size is 256Kb pub fn limit(mut self, limit: usize) -> Self { + if let Some(l) = self.length { + if l > limit { + self.err = Some(PayloadError::Overflow); + } + } self.limit = limit; self } - - fn err(e: PayloadError) -> Self { - HttpMessageBody { - stream: None, - limit: 262_144, - fut: None, - err: Some(e), - length: None, - } - } } impl Future for HttpMessageBody { type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Some(ref mut fut) = self.fut { - return Pin::new(fut).poll(cx); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + if let Some(e) = this.err.take() { + return Poll::Ready(Err(e)); } - if let Some(err) = self.err.take() { - return Poll::Ready(Err(err)); - } - - if let Some(len) = self.length.take() { - if len > self.limit { - return Poll::Ready(Err(PayloadError::Overflow)); - } - } - - // future - let limit = self.limit; - let mut stream = self.stream.take().unwrap(); - self.fut = Some( - async move { - let mut body = BytesMut::with_capacity(8192); - - while let Some(item) = stream.next().await { - let chunk = item?; - if body.len() + chunk.len() > limit { - return Err(PayloadError::Overflow); + loop { + let res = ready!(Pin::new(&mut this.stream).poll_next(cx)); + match res { + Some(chunk) => { + let chunk = chunk?; + if this.buf.len() + chunk.len() > this.limit { + return Poll::Ready(Err(PayloadError::Overflow)); } else { - body.extend_from_slice(&chunk); + this.buf.extend_from_slice(&chunk); } } - Ok(body.freeze()) + None => return Poll::Ready(Ok(this.buf.split().freeze())), } - .boxed_local(), - ); - self.poll(cx) + } } }