mirror of
https://github.com/fafhrd91/actix-web
synced 2024-11-28 01:52:57 +01:00
Simplify bigbytes - always put bytes into queue
This commit is contained in:
parent
a6b5c9893d
commit
96f5ebb549
@ -2,8 +2,6 @@ use std::collections::VecDeque;
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
const SIXTYFOUR_KB: usize = 1024 * 64;
|
||||
|
||||
pub(crate) struct BigBytes {
|
||||
buffer: BytesMut,
|
||||
frozen: VecDeque<Bytes>,
|
||||
@ -32,13 +30,6 @@ impl BigBytes {
|
||||
&mut self.buffer
|
||||
}
|
||||
|
||||
// Reserve the requested size, if fewer than 64KB
|
||||
pub(super) fn reserve(&mut self, count: usize) {
|
||||
if count < SIXTYFOUR_KB {
|
||||
self.buffer.reserve(count);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn total_len(&mut self) -> usize {
|
||||
self.frozen_len + self.buffer.len()
|
||||
}
|
||||
@ -50,23 +41,14 @@ impl BigBytes {
|
||||
// Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a
|
||||
// queue, otherwise, it is added to a buffer.
|
||||
pub(super) fn put_bytes(&mut self, bytes: Bytes) {
|
||||
if bytes.len() < SIXTYFOUR_KB {
|
||||
self.buffer.extend_from_slice(&bytes);
|
||||
} else {
|
||||
if !self.buffer.is_empty() {
|
||||
let current = self.buffer.split().freeze();
|
||||
self.frozen_len += current.len();
|
||||
self.frozen.push_back(current);
|
||||
}
|
||||
|
||||
self.frozen_len += bytes.len();
|
||||
self.frozen.push_back(bytes);
|
||||
if !self.buffer.is_empty() {
|
||||
let current = self.buffer.split().freeze();
|
||||
self.frozen_len += current.len();
|
||||
self.frozen.push_back(current);
|
||||
}
|
||||
}
|
||||
|
||||
// Put a slice into the internal structure. This is always added to the internal buffer
|
||||
pub(super) fn extend_from_slice(&mut self, slice: &[u8]) {
|
||||
self.buffer.extend_from_slice(slice);
|
||||
self.frozen_len += bytes.len();
|
||||
self.frozen.push_back(bytes);
|
||||
}
|
||||
|
||||
// Returns a slice of the frontmost buffer
|
||||
|
@ -170,8 +170,8 @@ impl Codec {
|
||||
};
|
||||
|
||||
// encode message
|
||||
self.encoder.encode_bigbytes(
|
||||
dst,
|
||||
self.encoder.encode(
|
||||
dst.buffer_mut(),
|
||||
&mut res,
|
||||
self.flags.contains(Flags::HEAD),
|
||||
self.flags.contains(Flags::STREAM),
|
||||
@ -187,7 +187,7 @@ impl Codec {
|
||||
}
|
||||
|
||||
Message::Chunk(None) => {
|
||||
self.encoder.encode_eof_bigbytes(dst)?;
|
||||
self.encoder.encode_eof(dst.buffer_mut())?;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -414,6 +414,7 @@ where
|
||||
fn send_continue(self: Pin<&mut Self>) {
|
||||
self.project()
|
||||
.write_buf
|
||||
.buffer_mut()
|
||||
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
|
||||
}
|
||||
|
||||
@ -575,6 +576,7 @@ where
|
||||
// to service call.
|
||||
Poll::Ready(Ok(req)) => {
|
||||
this.write_buf
|
||||
.buffer_mut()
|
||||
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
|
||||
let fut = this.flow.service.call(req);
|
||||
this.state.set(State::ServiceCall { fut });
|
||||
|
@ -50,183 +50,8 @@ pub(crate) trait MessageType: Sized {
|
||||
|
||||
fn chunked(&self) -> bool;
|
||||
|
||||
fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()>;
|
||||
fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()>;
|
||||
|
||||
fn encode_headers_bigbytes(
|
||||
&mut self,
|
||||
dst: &mut BigBytes,
|
||||
version: Version,
|
||||
mut length: BodySize,
|
||||
conn_type: ConnectionType,
|
||||
config: &ServiceConfig,
|
||||
) -> io::Result<()> {
|
||||
let chunked = self.chunked();
|
||||
let mut skip_len = length != BodySize::Stream;
|
||||
let camel_case = self.camel_case();
|
||||
|
||||
// Content length
|
||||
if let Some(status) = self.status() {
|
||||
match status {
|
||||
StatusCode::CONTINUE
|
||||
| StatusCode::SWITCHING_PROTOCOLS
|
||||
| StatusCode::PROCESSING
|
||||
| StatusCode::NO_CONTENT => {
|
||||
// skip content-length and transfer-encoding headers
|
||||
// see https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.1
|
||||
// and https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2
|
||||
skip_len = true;
|
||||
length = BodySize::None
|
||||
}
|
||||
|
||||
StatusCode::NOT_MODIFIED => {
|
||||
// 304 responses should never have a body but should retain a manually set
|
||||
// content-length header
|
||||
// see https://datatracker.ietf.org/doc/html/rfc7232#section-4.1
|
||||
skip_len = false;
|
||||
length = BodySize::None;
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
match length {
|
||||
BodySize::Stream => {
|
||||
if chunked {
|
||||
skip_len = true;
|
||||
if camel_case {
|
||||
dst.extend_from_slice(b"\r\nTransfer-Encoding: chunked\r\n")
|
||||
} else {
|
||||
dst.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n")
|
||||
}
|
||||
} else {
|
||||
skip_len = false;
|
||||
dst.extend_from_slice(b"\r\n");
|
||||
}
|
||||
}
|
||||
BodySize::Sized(0) if camel_case => dst.extend_from_slice(b"\r\nContent-Length: 0\r\n"),
|
||||
BodySize::Sized(0) => dst.extend_from_slice(b"\r\ncontent-length: 0\r\n"),
|
||||
BodySize::Sized(len) => {
|
||||
helpers::write_content_length(len, dst.buffer_mut(), camel_case)
|
||||
}
|
||||
BodySize::None => dst.extend_from_slice(b"\r\n"),
|
||||
}
|
||||
|
||||
// Connection
|
||||
match conn_type {
|
||||
ConnectionType::Upgrade => dst.extend_from_slice(b"connection: upgrade\r\n"),
|
||||
ConnectionType::KeepAlive if version < Version::HTTP_11 => {
|
||||
if camel_case {
|
||||
dst.extend_from_slice(b"Connection: keep-alive\r\n")
|
||||
} else {
|
||||
dst.extend_from_slice(b"connection: keep-alive\r\n")
|
||||
}
|
||||
}
|
||||
ConnectionType::Close if version >= Version::HTTP_11 => {
|
||||
if camel_case {
|
||||
dst.extend_from_slice(b"Connection: close\r\n")
|
||||
} else {
|
||||
dst.extend_from_slice(b"connection: close\r\n")
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// write headers
|
||||
|
||||
let mut has_date = false;
|
||||
|
||||
let dst = dst.buffer_mut();
|
||||
|
||||
let mut buf = dst.chunk_mut().as_mut_ptr();
|
||||
let mut remaining = dst.capacity() - dst.len();
|
||||
|
||||
// tracks bytes written since last buffer resize
|
||||
// since buf is a raw pointer to a bytes container storage but is written to without the
|
||||
// container's knowledge, this is used to sync the containers cursor after data is written
|
||||
let mut pos = 0;
|
||||
|
||||
self.write_headers(|key, value| {
|
||||
match *key {
|
||||
CONNECTION => return,
|
||||
TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => return,
|
||||
DATE => has_date = true,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let k = key.as_str().as_bytes();
|
||||
let k_len = k.len();
|
||||
|
||||
for val in value.iter() {
|
||||
let v = val.as_ref();
|
||||
let v_len = v.len();
|
||||
|
||||
// key length + value length + colon + space + \r\n
|
||||
let len = k_len + v_len + 4;
|
||||
|
||||
if len > remaining {
|
||||
// SAFETY: all the bytes written up to position "pos" are initialized
|
||||
// the written byte count and pointer advancement are kept in sync
|
||||
unsafe {
|
||||
dst.advance_mut(pos);
|
||||
}
|
||||
|
||||
pos = 0;
|
||||
dst.reserve(len * 2);
|
||||
remaining = dst.capacity() - dst.len();
|
||||
|
||||
// re-assign buf raw pointer since it's possible that the buffer was
|
||||
// reallocated and/or resized
|
||||
buf = dst.chunk_mut().as_mut_ptr();
|
||||
}
|
||||
|
||||
// SAFETY: on each write, it is enough to ensure that the advancement of
|
||||
// the cursor matches the number of bytes written
|
||||
unsafe {
|
||||
if camel_case {
|
||||
// use Camel-Case headers
|
||||
write_camel_case(k, buf, k_len);
|
||||
} else {
|
||||
write_data(k, buf, k_len);
|
||||
}
|
||||
|
||||
buf = buf.add(k_len);
|
||||
|
||||
write_data(b": ", buf, 2);
|
||||
buf = buf.add(2);
|
||||
|
||||
write_data(v, buf, v_len);
|
||||
buf = buf.add(v_len);
|
||||
|
||||
write_data(b"\r\n", buf, 2);
|
||||
buf = buf.add(2);
|
||||
};
|
||||
|
||||
pos += len;
|
||||
remaining -= len;
|
||||
}
|
||||
});
|
||||
|
||||
// final cursor synchronization with the bytes container
|
||||
//
|
||||
// SAFETY: all the bytes written up to position "pos" are initialized
|
||||
// the written byte count and pointer advancement are kept in sync
|
||||
unsafe {
|
||||
dst.advance_mut(pos);
|
||||
}
|
||||
|
||||
if !has_date {
|
||||
// optimized date header, write_date_header writes its own \r\n
|
||||
config.write_date_header(dst, camel_case);
|
||||
}
|
||||
|
||||
// end-of-headers marker
|
||||
dst.extend_from_slice(b"\r\n");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn encode_headers(
|
||||
&mut self,
|
||||
dst: &mut BytesMut,
|
||||
@ -439,17 +264,6 @@ impl MessageType for Response<()> {
|
||||
.contains(crate::message::Flags::CAMEL_CASE)
|
||||
}
|
||||
|
||||
fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> {
|
||||
let head = self.head();
|
||||
let reason = head.reason().as_bytes();
|
||||
dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE + reason.len());
|
||||
|
||||
// status line
|
||||
helpers::write_status_line(head.version, head.status.as_u16(), dst.buffer_mut());
|
||||
dst.extend_from_slice(reason);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> {
|
||||
let head = self.head();
|
||||
let reason = head.reason().as_bytes();
|
||||
@ -483,26 +297,6 @@ impl MessageType for RequestHeadType {
|
||||
self.extra_headers()
|
||||
}
|
||||
|
||||
fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> {
|
||||
let head = self.as_ref();
|
||||
dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE);
|
||||
write!(
|
||||
helpers::MutWriter(dst.buffer_mut()),
|
||||
"{} {} {}",
|
||||
head.method,
|
||||
head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"),
|
||||
match head.version {
|
||||
Version::HTTP_09 => "HTTP/0.9",
|
||||
Version::HTTP_10 => "HTTP/1.0",
|
||||
Version::HTTP_11 => "HTTP/1.1",
|
||||
Version::HTTP_2 => "HTTP/2.0",
|
||||
Version::HTTP_3 => "HTTP/3.0",
|
||||
_ => return Err(io::Error::new(io::ErrorKind::Other, "unsupported version")),
|
||||
}
|
||||
)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> {
|
||||
let head = self.as_ref();
|
||||
dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE);
|
||||
@ -543,44 +337,6 @@ impl<T: MessageType> MessageEncoder<T> {
|
||||
self.te.encode_eof(buf)
|
||||
}
|
||||
|
||||
pub(super) fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> {
|
||||
self.te.encode_eof_bigbytes(buf)
|
||||
}
|
||||
|
||||
/// Encode message.
|
||||
pub(super) fn encode_bigbytes(
|
||||
&mut self,
|
||||
dst: &mut BigBytes,
|
||||
message: &mut T,
|
||||
head: bool,
|
||||
stream: bool,
|
||||
version: Version,
|
||||
length: BodySize,
|
||||
conn_type: ConnectionType,
|
||||
config: &ServiceConfig,
|
||||
) -> io::Result<()> {
|
||||
// transfer encoding
|
||||
if !head {
|
||||
self.te = match length {
|
||||
BodySize::Sized(0) => TransferEncoding::empty(),
|
||||
BodySize::Sized(len) => TransferEncoding::length(len),
|
||||
BodySize::Stream => {
|
||||
if message.chunked() && !stream {
|
||||
TransferEncoding::chunked()
|
||||
} else {
|
||||
TransferEncoding::eof()
|
||||
}
|
||||
}
|
||||
BodySize::None => TransferEncoding::empty(),
|
||||
};
|
||||
} else {
|
||||
self.te = TransferEncoding::empty();
|
||||
}
|
||||
|
||||
message.encode_status_bigbytes(dst)?;
|
||||
message.encode_headers_bigbytes(dst, version, length, conn_type, config)
|
||||
}
|
||||
|
||||
/// Encode message.
|
||||
pub fn encode(
|
||||
&mut self,
|
||||
@ -683,14 +439,13 @@ impl TransferEncoding {
|
||||
|
||||
if msg.is_empty() {
|
||||
*eof = true;
|
||||
buf.extend_from_slice(b"0\r\n\r\n");
|
||||
buf.buffer_mut().extend_from_slice(b"0\r\n\r\n");
|
||||
} else {
|
||||
writeln!(helpers::MutWriter(buf.buffer_mut()), "{:X}\r", msg.len())
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
buf.reserve(msg.len() + 2);
|
||||
buf.put_bytes(msg);
|
||||
buf.extend_from_slice(b"\r\n");
|
||||
buf.buffer_mut().extend_from_slice(b"\r\n");
|
||||
}
|
||||
Ok(*eof)
|
||||
}
|
||||
@ -757,28 +512,6 @@ impl TransferEncoding {
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode eof. Return `EOF` state of encoder
|
||||
#[inline]
|
||||
pub fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> {
|
||||
match self.kind {
|
||||
TransferEncodingKind::Eof => Ok(()),
|
||||
TransferEncodingKind::Length(rem) => {
|
||||
if rem != 0 {
|
||||
Err(io::Error::new(io::ErrorKind::UnexpectedEof, ""))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
TransferEncodingKind::Chunked(ref mut eof) => {
|
||||
if !*eof {
|
||||
*eof = true;
|
||||
buf.extend_from_slice(b"0\r\n\r\n");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode eof. Return `EOF` state of encoder
|
||||
#[inline]
|
||||
pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> {
|
||||
|
Loading…
Reference in New Issue
Block a user