diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs index 755b42b9..63fb78f0 100644 --- a/actix-http/src/h1/big_bytes.rs +++ b/actix-http/src/h1/big_bytes.rs @@ -2,8 +2,6 @@ use std::collections::VecDeque; use bytes::{Buf, BufMut, Bytes, BytesMut}; -const SIXTYFOUR_KB: usize = 1024 * 64; - pub(crate) struct BigBytes { buffer: BytesMut, frozen: VecDeque, @@ -32,13 +30,6 @@ impl BigBytes { &mut self.buffer } - // Reserve the requested size, if fewer than 64KB - pub(super) fn reserve(&mut self, count: usize) { - if count < SIXTYFOUR_KB { - self.buffer.reserve(count); - } - } - pub(super) fn total_len(&mut self) -> usize { self.frozen_len + self.buffer.len() } @@ -50,23 +41,14 @@ impl BigBytes { // Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a // queue, otherwise, it is added to a buffer. pub(super) fn put_bytes(&mut self, bytes: Bytes) { - if bytes.len() < SIXTYFOUR_KB { - self.buffer.extend_from_slice(&bytes); - } else { - if !self.buffer.is_empty() { - let current = self.buffer.split().freeze(); - self.frozen_len += current.len(); - self.frozen.push_back(current); - } - - self.frozen_len += bytes.len(); - self.frozen.push_back(bytes); + if !self.buffer.is_empty() { + let current = self.buffer.split().freeze(); + self.frozen_len += current.len(); + self.frozen.push_back(current); } - } - // Put a slice into the internal structure. This is always added to the internal buffer - pub(super) fn extend_from_slice(&mut self, slice: &[u8]) { - self.buffer.extend_from_slice(slice); + self.frozen_len += bytes.len(); + self.frozen.push_back(bytes); } // Returns a slice of the frontmost buffer diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index a6ac3f89..a648bd49 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -170,8 +170,8 @@ impl Codec { }; // encode message - self.encoder.encode_bigbytes( - dst, + self.encoder.encode( + dst.buffer_mut(), &mut res, self.flags.contains(Flags::HEAD), self.flags.contains(Flags::STREAM), @@ -187,7 +187,7 @@ impl Codec { } Message::Chunk(None) => { - self.encoder.encode_eof_bigbytes(dst)?; + self.encoder.encode_eof(dst.buffer_mut())?; } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 30d8c61d..64a5d460 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -414,6 +414,7 @@ where fn send_continue(self: Pin<&mut Self>) { self.project() .write_buf + .buffer_mut() .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); } @@ -575,6 +576,7 @@ where // to service call. Poll::Ready(Ok(req)) => { this.write_buf + .buffer_mut() .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 6fce4700..97ba0adf 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -50,183 +50,8 @@ pub(crate) trait MessageType: Sized { fn chunked(&self) -> bool; - fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()>; fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()>; - fn encode_headers_bigbytes( - &mut self, - dst: &mut BigBytes, - version: Version, - mut length: BodySize, - conn_type: ConnectionType, - config: &ServiceConfig, - ) -> io::Result<()> { - let chunked = self.chunked(); - let mut skip_len = length != BodySize::Stream; - let camel_case = self.camel_case(); - - // Content length - if let Some(status) = self.status() { - match status { - StatusCode::CONTINUE - | StatusCode::SWITCHING_PROTOCOLS - | StatusCode::PROCESSING - | StatusCode::NO_CONTENT => { - // skip content-length and transfer-encoding headers - // see https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.1 - // and https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2 - skip_len = true; - length = BodySize::None - } - - StatusCode::NOT_MODIFIED => { - // 304 responses should never have a body but should retain a manually set - // content-length header - // see https://datatracker.ietf.org/doc/html/rfc7232#section-4.1 - skip_len = false; - length = BodySize::None; - } - - _ => {} - } - } - - match length { - BodySize::Stream => { - if chunked { - skip_len = true; - if camel_case { - dst.extend_from_slice(b"\r\nTransfer-Encoding: chunked\r\n") - } else { - dst.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") - } - } else { - skip_len = false; - dst.extend_from_slice(b"\r\n"); - } - } - BodySize::Sized(0) if camel_case => dst.extend_from_slice(b"\r\nContent-Length: 0\r\n"), - BodySize::Sized(0) => dst.extend_from_slice(b"\r\ncontent-length: 0\r\n"), - BodySize::Sized(len) => { - helpers::write_content_length(len, dst.buffer_mut(), camel_case) - } - BodySize::None => dst.extend_from_slice(b"\r\n"), - } - - // Connection - match conn_type { - ConnectionType::Upgrade => dst.extend_from_slice(b"connection: upgrade\r\n"), - ConnectionType::KeepAlive if version < Version::HTTP_11 => { - if camel_case { - dst.extend_from_slice(b"Connection: keep-alive\r\n") - } else { - dst.extend_from_slice(b"connection: keep-alive\r\n") - } - } - ConnectionType::Close if version >= Version::HTTP_11 => { - if camel_case { - dst.extend_from_slice(b"Connection: close\r\n") - } else { - dst.extend_from_slice(b"connection: close\r\n") - } - } - _ => {} - } - - // write headers - - let mut has_date = false; - - let dst = dst.buffer_mut(); - - let mut buf = dst.chunk_mut().as_mut_ptr(); - let mut remaining = dst.capacity() - dst.len(); - - // tracks bytes written since last buffer resize - // since buf is a raw pointer to a bytes container storage but is written to without the - // container's knowledge, this is used to sync the containers cursor after data is written - let mut pos = 0; - - self.write_headers(|key, value| { - match *key { - CONNECTION => return, - TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => return, - DATE => has_date = true, - _ => {} - } - - let k = key.as_str().as_bytes(); - let k_len = k.len(); - - for val in value.iter() { - let v = val.as_ref(); - let v_len = v.len(); - - // key length + value length + colon + space + \r\n - let len = k_len + v_len + 4; - - if len > remaining { - // SAFETY: all the bytes written up to position "pos" are initialized - // the written byte count and pointer advancement are kept in sync - unsafe { - dst.advance_mut(pos); - } - - pos = 0; - dst.reserve(len * 2); - remaining = dst.capacity() - dst.len(); - - // re-assign buf raw pointer since it's possible that the buffer was - // reallocated and/or resized - buf = dst.chunk_mut().as_mut_ptr(); - } - - // SAFETY: on each write, it is enough to ensure that the advancement of - // the cursor matches the number of bytes written - unsafe { - if camel_case { - // use Camel-Case headers - write_camel_case(k, buf, k_len); - } else { - write_data(k, buf, k_len); - } - - buf = buf.add(k_len); - - write_data(b": ", buf, 2); - buf = buf.add(2); - - write_data(v, buf, v_len); - buf = buf.add(v_len); - - write_data(b"\r\n", buf, 2); - buf = buf.add(2); - }; - - pos += len; - remaining -= len; - } - }); - - // final cursor synchronization with the bytes container - // - // SAFETY: all the bytes written up to position "pos" are initialized - // the written byte count and pointer advancement are kept in sync - unsafe { - dst.advance_mut(pos); - } - - if !has_date { - // optimized date header, write_date_header writes its own \r\n - config.write_date_header(dst, camel_case); - } - - // end-of-headers marker - dst.extend_from_slice(b"\r\n"); - - Ok(()) - } - fn encode_headers( &mut self, dst: &mut BytesMut, @@ -439,17 +264,6 @@ impl MessageType for Response<()> { .contains(crate::message::Flags::CAMEL_CASE) } - fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> { - let head = self.head(); - let reason = head.reason().as_bytes(); - dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE + reason.len()); - - // status line - helpers::write_status_line(head.version, head.status.as_u16(), dst.buffer_mut()); - dst.extend_from_slice(reason); - Ok(()) - } - fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { let head = self.head(); let reason = head.reason().as_bytes(); @@ -483,26 +297,6 @@ impl MessageType for RequestHeadType { self.extra_headers() } - fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> { - let head = self.as_ref(); - dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); - write!( - helpers::MutWriter(dst.buffer_mut()), - "{} {} {}", - head.method, - head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"), - match head.version { - Version::HTTP_09 => "HTTP/0.9", - Version::HTTP_10 => "HTTP/1.0", - Version::HTTP_11 => "HTTP/1.1", - Version::HTTP_2 => "HTTP/2.0", - Version::HTTP_3 => "HTTP/3.0", - _ => return Err(io::Error::new(io::ErrorKind::Other, "unsupported version")), - } - ) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - } - fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { let head = self.as_ref(); dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); @@ -543,44 +337,6 @@ impl MessageEncoder { self.te.encode_eof(buf) } - pub(super) fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> { - self.te.encode_eof_bigbytes(buf) - } - - /// Encode message. - pub(super) fn encode_bigbytes( - &mut self, - dst: &mut BigBytes, - message: &mut T, - head: bool, - stream: bool, - version: Version, - length: BodySize, - conn_type: ConnectionType, - config: &ServiceConfig, - ) -> io::Result<()> { - // transfer encoding - if !head { - self.te = match length { - BodySize::Sized(0) => TransferEncoding::empty(), - BodySize::Sized(len) => TransferEncoding::length(len), - BodySize::Stream => { - if message.chunked() && !stream { - TransferEncoding::chunked() - } else { - TransferEncoding::eof() - } - } - BodySize::None => TransferEncoding::empty(), - }; - } else { - self.te = TransferEncoding::empty(); - } - - message.encode_status_bigbytes(dst)?; - message.encode_headers_bigbytes(dst, version, length, conn_type, config) - } - /// Encode message. pub fn encode( &mut self, @@ -683,14 +439,13 @@ impl TransferEncoding { if msg.is_empty() { *eof = true; - buf.extend_from_slice(b"0\r\n\r\n"); + buf.buffer_mut().extend_from_slice(b"0\r\n\r\n"); } else { writeln!(helpers::MutWriter(buf.buffer_mut()), "{:X}\r", msg.len()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - buf.reserve(msg.len() + 2); buf.put_bytes(msg); - buf.extend_from_slice(b"\r\n"); + buf.buffer_mut().extend_from_slice(b"\r\n"); } Ok(*eof) } @@ -757,28 +512,6 @@ impl TransferEncoding { } } - /// Encode eof. Return `EOF` state of encoder - #[inline] - pub fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> { - match self.kind { - TransferEncodingKind::Eof => Ok(()), - TransferEncodingKind::Length(rem) => { - if rem != 0 { - Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")) - } else { - Ok(()) - } - } - TransferEncodingKind::Chunked(ref mut eof) => { - if !*eof { - *eof = true; - buf.extend_from_slice(b"0\r\n\r\n"); - } - Ok(()) - } - } - } - /// Encode eof. Return `EOF` state of encoder #[inline] pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> {