From 7cfed73be8d4349d0e2795e4fde1b3beba5e77c0 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 6 Feb 2021 19:20:35 -0800 Subject: [PATCH] fix memory usage for h1 and read bug on buffer size. (#1929) --- actix-http/src/h1/decoder.rs | 27 ++++++++++++++++-------- actix-http/src/h1/dispatcher.rs | 37 +++++++++++++++++++++++++-------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index 85379b08..9da95856 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -14,7 +14,7 @@ use crate::header::HeaderMap; use crate::message::{ConnectionType, ResponseHead}; use crate::request::Request; -const MAX_BUFFER_SIZE: usize = 131_072; +pub(crate) const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 96; /// Incoming message decoder @@ -203,7 +203,15 @@ impl MessageType for Request { (len, method, uri, version, req.headers.len()) } - httparse::Status::Partial => return Ok(None), + httparse::Status::Partial => { + return if src.len() >= MAX_BUFFER_SIZE { + trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + Err(ParseError::TooLarge) + } else { + // Return None to notify more read are needed for parsing request + Ok(None) + }; + } } }; @@ -222,9 +230,6 @@ impl MessageType for Request { PayloadLength::None => { if method == Method::CONNECT { PayloadType::Stream(PayloadDecoder::eof()) - } else if src.len() >= MAX_BUFFER_SIZE { - trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); - return Err(ParseError::TooLarge); } else { PayloadType::None } @@ -273,7 +278,14 @@ impl MessageType for ResponseHead { (len, version, status, res.headers.len()) } - httparse::Status::Partial => return Ok(None), + httparse::Status::Partial => { + return if src.len() >= MAX_BUFFER_SIZE { + error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + Err(ParseError::TooLarge) + } else { + Ok(None) + } + } } }; @@ -289,9 +301,6 @@ impl MessageType for ResponseHead { } else if status == StatusCode::SWITCHING_PROTOCOLS { // switching protocol or connect PayloadType::Stream(PayloadDecoder::eof()) - } else if src.len() >= MAX_BUFFER_SIZE { - error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); - return Err(ParseError::TooLarge); } else { // for HTTP/1.0 read to eof and close connection if msg.version == Version::HTTP_10 { diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 90a5f0e0..e65a1bd5 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -29,8 +29,8 @@ use super::codec::Codec; use super::payload::{Payload, PayloadSender, PayloadStatus}; use super::{Message, MessageType}; -const LW_BUFFER_SIZE: usize = 4096; -const HW_BUFFER_SIZE: usize = 32_768; +const LW_BUFFER_SIZE: usize = 1024; +const HW_BUFFER_SIZE: usize = 1024 * 8; const MAX_PIPELINED_MESSAGES: usize = 16; bitflags! { @@ -404,7 +404,7 @@ where }, StateProj::SendPayload(mut stream) => { loop { - if this.write_buf.len() < HW_BUFFER_SIZE { + if this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { match stream.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec.encode( @@ -605,6 +605,8 @@ where } } } + // decode is partial and buffer is not full yet. + // break and wait for more read. Ok(None) => break, Err(ParseError::Io(e)) => { self.as_mut().client_disconnected(); @@ -612,6 +614,18 @@ where *this.error = Some(DispatchError::Io(e)); break; } + Err(ParseError::TooLarge) => { + if let Some(mut payload) = this.payload.take() { + payload.set_error(PayloadError::Overflow); + } + // Requests overflow buffer size should be responded with 413 + this.messages.push_back(DispatcherMessage::Error( + Response::PayloadTooLarge().finish().drop_body(), + )); + this.flags.insert(Flags::READ_DISCONNECT); + *this.error = Some(ParseError::TooLarge.into()); + break; + } Err(e) => { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); @@ -890,12 +904,7 @@ where let mut read_some = false; loop { - // If buf is full return but do not disconnect since - // there is more reading to be done - if buf.len() >= HW_BUFFER_SIZE { - return Ok(Some(false)); - } - + // reserve capacity for buffer let remaining = buf.capacity() - buf.len(); if remaining < LW_BUFFER_SIZE { buf.reserve(HW_BUFFER_SIZE - remaining); @@ -909,6 +918,16 @@ where if n == 0 { return Ok(Some(true)); } else { + // If buf is full return but do not disconnect since + // there is more reading to be done + if buf.len() >= super::decoder::MAX_BUFFER_SIZE { + // at this point it's not known io is still scheduled to + // be waked up. so force wake up dispatcher just in case. + // TODO: figure out the overhead. + cx.waker().wake_by_ref(); + return Ok(Some(false)); + } + read_some = true; } }