1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-30 18:44:35 +01:00

fix memory usage for h1 and read bug on buffer size. (#1929)

This commit is contained in:
fakeshadow 2021-02-06 19:20:35 -08:00 committed by GitHub
parent 41bc04b1c4
commit 7cfed73be8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 18 deletions

View File

@ -14,7 +14,7 @@ use crate::header::HeaderMap;
use crate::message::{ConnectionType, ResponseHead}; use crate::message::{ConnectionType, ResponseHead};
use crate::request::Request; use crate::request::Request;
const MAX_BUFFER_SIZE: usize = 131_072; pub(crate) const MAX_BUFFER_SIZE: usize = 131_072;
const MAX_HEADERS: usize = 96; const MAX_HEADERS: usize = 96;
/// Incoming message decoder /// Incoming message decoder
@ -203,7 +203,15 @@ impl MessageType for Request {
(len, method, uri, version, req.headers.len()) (len, method, uri, version, req.headers.len())
} }
httparse::Status::Partial => return Ok(None), httparse::Status::Partial => {
return if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
Err(ParseError::TooLarge)
} else {
// Return None to notify more read are needed for parsing request
Ok(None)
};
}
} }
}; };
@ -222,9 +230,6 @@ impl MessageType for Request {
PayloadLength::None => { PayloadLength::None => {
if method == Method::CONNECT { if method == Method::CONNECT {
PayloadType::Stream(PayloadDecoder::eof()) PayloadType::Stream(PayloadDecoder::eof())
} else if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
} else { } else {
PayloadType::None PayloadType::None
} }
@ -273,7 +278,14 @@ impl MessageType for ResponseHead {
(len, version, status, res.headers.len()) (len, version, status, res.headers.len())
} }
httparse::Status::Partial => return Ok(None), httparse::Status::Partial => {
return if src.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
Err(ParseError::TooLarge)
} else {
Ok(None)
}
}
} }
}; };
@ -289,9 +301,6 @@ impl MessageType for ResponseHead {
} else if status == StatusCode::SWITCHING_PROTOCOLS { } else if status == StatusCode::SWITCHING_PROTOCOLS {
// switching protocol or connect // switching protocol or connect
PayloadType::Stream(PayloadDecoder::eof()) PayloadType::Stream(PayloadDecoder::eof())
} else if src.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ParseError::TooLarge);
} else { } else {
// for HTTP/1.0 read to eof and close connection // for HTTP/1.0 read to eof and close connection
if msg.version == Version::HTTP_10 { if msg.version == Version::HTTP_10 {

View File

@ -29,8 +29,8 @@ use super::codec::Codec;
use super::payload::{Payload, PayloadSender, PayloadStatus}; use super::payload::{Payload, PayloadSender, PayloadStatus};
use super::{Message, MessageType}; use super::{Message, MessageType};
const LW_BUFFER_SIZE: usize = 4096; const LW_BUFFER_SIZE: usize = 1024;
const HW_BUFFER_SIZE: usize = 32_768; const HW_BUFFER_SIZE: usize = 1024 * 8;
const MAX_PIPELINED_MESSAGES: usize = 16; const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! { bitflags! {
@ -404,7 +404,7 @@ where
}, },
StateProj::SendPayload(mut stream) => { StateProj::SendPayload(mut stream) => {
loop { loop {
if this.write_buf.len() < HW_BUFFER_SIZE { if this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
match stream.as_mut().poll_next(cx) { match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => { Poll::Ready(Some(Ok(item))) => {
this.codec.encode( this.codec.encode(
@ -605,6 +605,8 @@ where
} }
} }
} }
// decode is partial and buffer is not full yet.
// break and wait for more read.
Ok(None) => break, Ok(None) => break,
Err(ParseError::Io(e)) => { Err(ParseError::Io(e)) => {
self.as_mut().client_disconnected(); self.as_mut().client_disconnected();
@ -612,6 +614,18 @@ where
*this.error = Some(DispatchError::Io(e)); *this.error = Some(DispatchError::Io(e));
break; break;
} }
Err(ParseError::TooLarge) => {
if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Overflow);
}
// Requests overflow buffer size should be responded with 413
this.messages.push_back(DispatcherMessage::Error(
Response::PayloadTooLarge().finish().drop_body(),
));
this.flags.insert(Flags::READ_DISCONNECT);
*this.error = Some(ParseError::TooLarge.into());
break;
}
Err(e) => { Err(e) => {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted); payload.set_error(PayloadError::EncodingCorrupted);
@ -890,12 +904,7 @@ where
let mut read_some = false; let mut read_some = false;
loop { loop {
// If buf is full return but do not disconnect since // reserve capacity for buffer
// there is more reading to be done
if buf.len() >= HW_BUFFER_SIZE {
return Ok(Some(false));
}
let remaining = buf.capacity() - buf.len(); let remaining = buf.capacity() - buf.len();
if remaining < LW_BUFFER_SIZE { if remaining < LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE - remaining); buf.reserve(HW_BUFFER_SIZE - remaining);
@ -909,6 +918,16 @@ where
if n == 0 { if n == 0 {
return Ok(Some(true)); return Ok(Some(true));
} else { } else {
// If buf is full return but do not disconnect since
// there is more reading to be done
if buf.len() >= super::decoder::MAX_BUFFER_SIZE {
// at this point it's not known io is still scheduled to
// be waked up. so force wake up dispatcher just in case.
// TODO: figure out the overhead.
cx.waker().wake_by_ref();
return Ok(Some(false));
}
read_some = true; read_some = true;
} }
} }