From 627d11332379f4e718ceee5d7ddde76171f2a6f3 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 4 Nov 2024 13:06:47 -0600 Subject: [PATCH] Expand BigBytes usage to ws encoding --- actix-http/src/big_bytes.rs | 124 ++++++++++++++++++++++++++++++++ actix-http/src/h1/big_bytes.rs | 105 --------------------------- actix-http/src/h1/codec.rs | 6 +- actix-http/src/h1/dispatcher.rs | 2 +- actix-http/src/h1/encoder.rs | 2 +- actix-http/src/h1/mod.rs | 1 - actix-http/src/lib.rs | 1 + actix-http/src/ws/codec.rs | 60 ++++++++++------ actix-http/src/ws/frame.rs | 79 ++++++++++++++------ 9 files changed, 227 insertions(+), 153 deletions(-) create mode 100644 actix-http/src/big_bytes.rs delete mode 100644 actix-http/src/h1/big_bytes.rs diff --git a/actix-http/src/big_bytes.rs b/actix-http/src/big_bytes.rs new file mode 100644 index 00000000..49839ab4 --- /dev/null +++ b/actix-http/src/big_bytes.rs @@ -0,0 +1,124 @@ +use std::collections::VecDeque; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +// 64KB max capacity (arbitrarily chosen) +const MAX_CAPACITY: usize = 1024 * 64; + +pub struct BigBytes { + buffer: BytesMut, + frozen: VecDeque, + frozen_len: usize, +} + +impl BigBytes { + /// Initialize a new BigBytes with the internal buffer set to `capacity` capacity + pub fn with_capacity(capacity: usize) -> Self { + Self { + buffer: BytesMut::with_capacity(capacity), + frozen: VecDeque::default(), + frozen_len: 0, + } + } + + /// Clear the internal queue and buffer, resetting length to zero + /// + /// if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will + /// be freed and a new buffer of capacity `new_capacity` will be allocated + pub fn clear(&mut self, new_capacity: usize) { + std::mem::take(&mut self.frozen); + self.frozen_len = 0; + self.buffer.clear(); + + if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) { + self.buffer = BytesMut::with_capacity(new_capacity); + } + } + + /// Return a mutable reference to the underlying buffer. This should only be used when dealing + /// with small allocations (e.g. writing headers) + pub fn buffer_mut(&mut self) -> &mut BytesMut { + &mut self.buffer + } + + /// Return the total length of the bytes stored in BigBytes + pub fn total_len(&mut self) -> usize { + self.frozen_len + self.buffer.len() + } + + /// Return whether there are no bytes present in the BigBytes + pub fn is_empty(&self) -> bool { + self.frozen_len == 0 && self.buffer.is_empty() + } + + /// Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a + /// queue, otherwise, it is added to a buffer. + pub fn put_bytes(&mut self, bytes: Bytes) { + if !self.buffer.is_empty() { + let current = self.buffer.split().freeze(); + self.frozen_len += current.len(); + self.frozen.push_back(current); + } + + if !bytes.is_empty() { + self.frozen_len += bytes.len(); + self.frozen.push_back(bytes); + } + } + + /// Returns a slice of the frontmost buffer + /// + /// While there are bytes present in BigBytes, front_slice is guaranteed not to return an empty + /// slice. + pub fn front_slice(&self) -> &[u8] { + if let Some(front) = self.frozen.front() { + front + } else { + &self.buffer + } + } + + /// Advances the first buffer by `count` bytes. If the first buffer is advanced to completion, + /// it is popped from the queue + pub fn advance(&mut self, count: usize) { + if let Some(front) = self.frozen.front_mut() { + front.advance(count); + + if front.is_empty() { + self.frozen.pop_front(); + } + + self.frozen_len -= count; + } else { + self.buffer.advance(count); + } + } + + /// Pops the front Bytes from the BigBytes, or splits and freezes the internal buffer if no + /// Bytes are present. + pub fn pop_front(&mut self) -> Option { + if let Some(front) = self.frozen.pop_front() { + self.frozen_len -= front.len(); + Some(front) + } else if !self.buffer.is_empty() { + Some(self.buffer.split().freeze()) + } else { + None + } + } + + /// Drain the BigBytes, writing everything into the provided BytesMut + pub fn write_to(&mut self, dst: &mut BytesMut) { + dst.reserve(self.total_len()); + + for buf in &self.frozen { + dst.put_slice(buf); + } + + dst.put_slice(&self.buffer.split()); + + self.frozen_len = 0; + + std::mem::take(&mut self.frozen); + } +} diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs deleted file mode 100644 index 04ae8a6c..00000000 --- a/actix-http/src/h1/big_bytes.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::collections::VecDeque; - -use bytes::{Buf, BufMut, Bytes, BytesMut}; - -// 64KB max capacity (arbitrarily chosen) -const MAX_CAPACITY: usize = 1024 * 64; - -pub(crate) struct BigBytes { - buffer: BytesMut, - frozen: VecDeque, - frozen_len: usize, -} - -impl BigBytes { - pub(super) fn with_capacity(capacity: usize) -> Self { - Self { - buffer: BytesMut::with_capacity(capacity), - frozen: VecDeque::default(), - frozen_len: 0, - } - } - - // Clear the internal queue and buffer, resetting length to zero - // - // if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will - // be freed and a new buffer of capacity `new_capacity` will be allocated - pub(super) fn clear(&mut self, new_capacity: usize) { - std::mem::take(&mut self.frozen); - self.frozen_len = 0; - self.buffer.clear(); - - if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) { - self.buffer = BytesMut::with_capacity(new_capacity); - } - } - - // Return a mutable reference to the underlying buffer. This should only be used when dealing - // with small allocations (e.g. writing headers) - pub(super) fn buffer_mut(&mut self) -> &mut BytesMut { - &mut self.buffer - } - - pub(super) fn total_len(&mut self) -> usize { - self.frozen_len + self.buffer.len() - } - - pub(super) fn is_empty(&self) -> bool { - self.frozen_len == 0 && self.buffer.is_empty() - } - - // Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a - // queue, otherwise, it is added to a buffer. - pub(super) fn put_bytes(&mut self, bytes: Bytes) { - if !self.buffer.is_empty() { - let current = self.buffer.split().freeze(); - self.frozen_len += current.len(); - self.frozen.push_back(current); - } - - if !bytes.is_empty() { - self.frozen_len += bytes.len(); - self.frozen.push_back(bytes); - } - } - - // Returns a slice of the frontmost buffer - pub(super) fn front_slice(&self) -> &[u8] { - if let Some(front) = self.frozen.front() { - front - } else { - &self.buffer - } - } - - // Advances the first buffer by `count` bytes. If the first buffer is advanced to completion, - // it is popped from the queue - pub(super) fn advance(&mut self, count: usize) { - if let Some(front) = self.frozen.front_mut() { - front.advance(count); - - if front.is_empty() { - self.frozen.pop_front(); - } - - self.frozen_len -= count; - } else { - self.buffer.advance(count); - } - } - - // Drain the BibBytes, writing everything into the provided BytesMut - pub(super) fn write_to(&mut self, dst: &mut BytesMut) { - dst.reserve(self.total_len()); - - for buf in &self.frozen { - dst.put_slice(buf); - } - - dst.put_slice(&self.buffer.split()); - - self.frozen_len = 0; - - std::mem::take(&mut self.frozen); - } -} diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index a648bd49..b097ddf9 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -6,11 +6,13 @@ use http::{Method, Version}; use tokio_util::codec::{Decoder, Encoder}; use super::{ - big_bytes::BigBytes, decoder::{self, PayloadDecoder, PayloadItem, PayloadType}, encoder, Message, MessageType, }; -use crate::{body::BodySize, error::ParseError, ConnectionType, Request, Response, ServiceConfig}; +use crate::{ + big_bytes::BigBytes, body::BodySize, error::ParseError, ConnectionType, Request, Response, + ServiceConfig, +}; bitflags! { #[derive(Debug, Clone, Copy)] diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index d1bff8db..40b47293 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -20,7 +20,6 @@ use tokio_util::codec::Decoder as _; use tracing::{error, trace}; use super::{ - big_bytes::BigBytes, codec::Codec, decoder::MAX_BUFFER_SIZE, payload::{Payload, PayloadSender, PayloadStatus}, @@ -28,6 +27,7 @@ use super::{ Message, MessageType, }; use crate::{ + big_bytes::BigBytes, body::{BodySize, BoxBody, MessageBody}, config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 75aa8a82..42d7d677 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -8,8 +8,8 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; -use super::big_bytes::BigBytes; use crate::{ + big_bytes::BigBytes, body::BodySize, header::{ map::Value, HeaderMap, HeaderName, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 267b2012..9e44608d 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -2,7 +2,6 @@ use bytes::{Bytes, BytesMut}; -mod big_bytes; mod chunked; mod client; mod codec; diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index f9697c4d..5d287a17 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -33,6 +33,7 @@ pub use http::{uri, uri::Uri, Method, StatusCode, Version}; +pub mod big_bytes; pub mod body; mod builder; mod config; diff --git a/actix-http/src/ws/codec.rs b/actix-http/src/ws/codec.rs index ad487e40..0a2c8996 100644 --- a/actix-http/src/ws/codec.rs +++ b/actix-http/src/ws/codec.rs @@ -4,6 +4,8 @@ use bytestring::ByteString; use tokio_util::codec::{Decoder, Encoder}; use tracing::error; +use crate::big_bytes::BigBytes; + use super::{ frame::Parser, proto::{CloseReason, OpCode}, @@ -116,51 +118,55 @@ impl Default for Codec { } } -impl Encoder for Codec { - type Error = ProtocolError; - - fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { +impl Codec { + pub fn encode_bigbytes( + &mut self, + item: Message, + dst: &mut BigBytes, + ) -> Result<(), ProtocolError> { match item { - Message::Text(txt) => Parser::write_message( + Message::Text(txt) => Parser::write_message_bigbytes( dst, - txt, + txt.into_bytes(), OpCode::Text, true, !self.flags.contains(Flags::SERVER), ), - Message::Binary(bin) => Parser::write_message( + Message::Binary(bin) => Parser::write_message_bigbytes( dst, bin, OpCode::Binary, true, !self.flags.contains(Flags::SERVER), ), - Message::Ping(txt) => Parser::write_message( + Message::Ping(txt) => Parser::write_message_bigbytes( dst, txt, OpCode::Ping, true, !self.flags.contains(Flags::SERVER), ), - Message::Pong(txt) => Parser::write_message( + Message::Pong(txt) => Parser::write_message_bigbytes( dst, txt, OpCode::Pong, true, !self.flags.contains(Flags::SERVER), ), - Message::Close(reason) => { - Parser::write_close(dst, reason, !self.flags.contains(Flags::SERVER)) - } + Message::Close(reason) => Parser::write_close( + dst.buffer_mut(), + reason, + !self.flags.contains(Flags::SERVER), + ), Message::Continuation(cont) => match cont { Item::FirstText(data) => { if self.flags.contains(Flags::W_CONTINUATION) { return Err(ProtocolError::ContinuationStarted); } else { self.flags.insert(Flags::W_CONTINUATION); - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Text, false, !self.flags.contains(Flags::SERVER), @@ -172,9 +178,9 @@ impl Encoder for Codec { return Err(ProtocolError::ContinuationStarted); } else { self.flags.insert(Flags::W_CONTINUATION); - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Binary, false, !self.flags.contains(Flags::SERVER), @@ -183,9 +189,9 @@ impl Encoder for Codec { } Item::Continue(data) => { if self.flags.contains(Flags::W_CONTINUATION) { - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Continue, false, !self.flags.contains(Flags::SERVER), @@ -197,9 +203,9 @@ impl Encoder for Codec { Item::Last(data) => { if self.flags.contains(Flags::W_CONTINUATION) { self.flags.remove(Flags::W_CONTINUATION); - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Continue, true, !self.flags.contains(Flags::SERVER), @@ -215,6 +221,20 @@ impl Encoder for Codec { } } +impl Encoder for Codec { + type Error = ProtocolError; + + fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut big_bytes = BigBytes::with_capacity(0); + + self.encode_bigbytes(item, &mut big_bytes)?; + + big_bytes.write_to(dst); + + Ok(()) + } +} + impl Decoder for Codec { type Item = Frame; type Error = ProtocolError; diff --git a/actix-http/src/ws/frame.rs b/actix-http/src/ws/frame.rs index c9fb0cde..a70063a4 100644 --- a/actix-http/src/ws/frame.rs +++ b/actix-http/src/ws/frame.rs @@ -1,8 +1,10 @@ use std::cmp::min; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use tracing::debug; +use crate::big_bytes::BigBytes; + use super::{ mask::apply_mask, proto::{CloseCode, CloseReason, OpCode}, @@ -156,21 +158,19 @@ impl Parser { } } - /// Generate binary representation - pub fn write_message>( - dst: &mut BytesMut, - pl: B, + pub fn write_message_bigbytes( + dst: &mut BigBytes, + pl: Bytes, op: OpCode, fin: bool, mask: bool, ) { - let payload = pl.as_ref(); let one: u8 = if fin { 0x80 | Into::::into(op) } else { op.into() }; - let payload_len = payload.len(); + let payload_len = pl.len(); let (two, p_len) = if mask { (0x80, payload_len + 4) } else { @@ -178,29 +178,50 @@ impl Parser { }; if payload_len < 126 { - dst.reserve(p_len + 2 + if mask { 4 } else { 0 }); - dst.put_slice(&[one, two | payload_len as u8]); + dst.buffer_mut() + .reserve(p_len + 2 + if mask { 4 } else { 0 }); + dst.buffer_mut().put_slice(&[one, two | payload_len as u8]); } else if payload_len <= 65_535 { - dst.reserve(p_len + 4 + if mask { 4 } else { 0 }); - dst.put_slice(&[one, two | 126]); - dst.put_u16(payload_len as u16); + dst.buffer_mut() + .reserve(p_len + 4 + if mask { 4 } else { 0 }); + dst.buffer_mut().put_slice(&[one, two | 126]); + dst.buffer_mut().put_u16(payload_len as u16); } else { - dst.reserve(p_len + 10 + if mask { 4 } else { 0 }); - dst.put_slice(&[one, two | 127]); - dst.put_u64(payload_len as u64); + dst.buffer_mut() + .reserve(p_len + 10 + if mask { 4 } else { 0 }); + dst.buffer_mut().put_slice(&[one, two | 127]); + dst.buffer_mut().put_u64(payload_len as u64); }; if mask { let mask = rand::random::<[u8; 4]>(); - dst.put_slice(mask.as_ref()); - dst.put_slice(payload.as_ref()); - let pos = dst.len() - payload_len; - apply_mask(&mut dst[pos..], mask); + dst.buffer_mut().put_slice(mask.as_ref()); + + match pl.try_into_mut() { + Ok(mut pl_mut) => { + apply_mask(&mut pl_mut, mask); + dst.put_bytes(pl_mut.freeze()); + } + Err(pl) => { + dst.buffer_mut().put_slice(pl.as_ref()); + let pos = dst.buffer_mut().len() - payload_len; + apply_mask(&mut dst.buffer_mut()[pos..], mask); + } + } } else { - dst.put_slice(payload.as_ref()); + dst.put_bytes(pl) } } + /// Generate binary representation + pub fn write_message(dst: &mut BytesMut, pl: Bytes, op: OpCode, fin: bool, mask: bool) { + let mut big_bytes = BigBytes::with_capacity(0); + + Self::write_message_bigbytes(&mut big_bytes, pl, op, fin, mask); + + big_bytes.write_to(dst); + } + /// Create a new Close control frame. #[inline] pub fn write_close(dst: &mut BytesMut, reason: Option, mask: bool) { @@ -215,7 +236,7 @@ impl Parser { } }; - Parser::write_message(dst, payload, OpCode::Close, true, mask) + Parser::write_message(dst, Bytes::from(payload), OpCode::Close, true, mask) } } @@ -368,7 +389,13 @@ mod tests { #[test] fn test_ping_frame() { let mut buf = BytesMut::new(); - Parser::write_message(&mut buf, Vec::from("data"), OpCode::Ping, true, false); + Parser::write_message( + &mut buf, + Bytes::from(Vec::from("data")), + OpCode::Ping, + true, + false, + ); let mut v = vec![137u8, 4u8]; v.extend(b"data"); @@ -378,7 +405,13 @@ mod tests { #[test] fn test_pong_frame() { let mut buf = BytesMut::new(); - Parser::write_message(&mut buf, Vec::from("data"), OpCode::Pong, true, false); + Parser::write_message( + &mut buf, + Bytes::from(Vec::from("data")), + OpCode::Pong, + true, + false, + ); let mut v = vec![138u8, 4u8]; v.extend(b"data");