1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-28 01:52:57 +01:00

Expand BigBytes usage to ws encoding

This commit is contained in:
asonix 2024-11-04 13:06:47 -06:00
parent 94c5d4d641
commit 627d113323
9 changed files with 227 additions and 153 deletions

124
actix-http/src/big_bytes.rs Normal file
View File

@ -0,0 +1,124 @@
use std::collections::VecDeque;
use bytes::{Buf, BufMut, Bytes, BytesMut};
// 64KB max capacity (arbitrarily chosen)
const MAX_CAPACITY: usize = 1024 * 64;
pub struct BigBytes {
buffer: BytesMut,
frozen: VecDeque<Bytes>,
frozen_len: usize,
}
impl BigBytes {
/// Initialize a new BigBytes with the internal buffer set to `capacity` capacity
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: BytesMut::with_capacity(capacity),
frozen: VecDeque::default(),
frozen_len: 0,
}
}
/// Clear the internal queue and buffer, resetting length to zero
///
/// if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will
/// be freed and a new buffer of capacity `new_capacity` will be allocated
pub fn clear(&mut self, new_capacity: usize) {
std::mem::take(&mut self.frozen);
self.frozen_len = 0;
self.buffer.clear();
if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) {
self.buffer = BytesMut::with_capacity(new_capacity);
}
}
/// Return a mutable reference to the underlying buffer. This should only be used when dealing
/// with small allocations (e.g. writing headers)
pub fn buffer_mut(&mut self) -> &mut BytesMut {
&mut self.buffer
}
/// Return the total length of the bytes stored in BigBytes
pub fn total_len(&mut self) -> usize {
self.frozen_len + self.buffer.len()
}
/// Return whether there are no bytes present in the BigBytes
pub fn is_empty(&self) -> bool {
self.frozen_len == 0 && self.buffer.is_empty()
}
/// Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a
/// queue, otherwise, it is added to a buffer.
pub fn put_bytes(&mut self, bytes: Bytes) {
if !self.buffer.is_empty() {
let current = self.buffer.split().freeze();
self.frozen_len += current.len();
self.frozen.push_back(current);
}
if !bytes.is_empty() {
self.frozen_len += bytes.len();
self.frozen.push_back(bytes);
}
}
/// Returns a slice of the frontmost buffer
///
/// While there are bytes present in BigBytes, front_slice is guaranteed not to return an empty
/// slice.
pub fn front_slice(&self) -> &[u8] {
if let Some(front) = self.frozen.front() {
front
} else {
&self.buffer
}
}
/// Advances the first buffer by `count` bytes. If the first buffer is advanced to completion,
/// it is popped from the queue
pub fn advance(&mut self, count: usize) {
if let Some(front) = self.frozen.front_mut() {
front.advance(count);
if front.is_empty() {
self.frozen.pop_front();
}
self.frozen_len -= count;
} else {
self.buffer.advance(count);
}
}
/// Pops the front Bytes from the BigBytes, or splits and freezes the internal buffer if no
/// Bytes are present.
pub fn pop_front(&mut self) -> Option<Bytes> {
if let Some(front) = self.frozen.pop_front() {
self.frozen_len -= front.len();
Some(front)
} else if !self.buffer.is_empty() {
Some(self.buffer.split().freeze())
} else {
None
}
}
/// Drain the BigBytes, writing everything into the provided BytesMut
pub fn write_to(&mut self, dst: &mut BytesMut) {
dst.reserve(self.total_len());
for buf in &self.frozen {
dst.put_slice(buf);
}
dst.put_slice(&self.buffer.split());
self.frozen_len = 0;
std::mem::take(&mut self.frozen);
}
}

View File

@ -1,105 +0,0 @@
use std::collections::VecDeque;
use bytes::{Buf, BufMut, Bytes, BytesMut};
// 64KB max capacity (arbitrarily chosen)
const MAX_CAPACITY: usize = 1024 * 64;
pub(crate) struct BigBytes {
buffer: BytesMut,
frozen: VecDeque<Bytes>,
frozen_len: usize,
}
impl BigBytes {
pub(super) fn with_capacity(capacity: usize) -> Self {
Self {
buffer: BytesMut::with_capacity(capacity),
frozen: VecDeque::default(),
frozen_len: 0,
}
}
// Clear the internal queue and buffer, resetting length to zero
//
// if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will
// be freed and a new buffer of capacity `new_capacity` will be allocated
pub(super) fn clear(&mut self, new_capacity: usize) {
std::mem::take(&mut self.frozen);
self.frozen_len = 0;
self.buffer.clear();
if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) {
self.buffer = BytesMut::with_capacity(new_capacity);
}
}
// Return a mutable reference to the underlying buffer. This should only be used when dealing
// with small allocations (e.g. writing headers)
pub(super) fn buffer_mut(&mut self) -> &mut BytesMut {
&mut self.buffer
}
pub(super) fn total_len(&mut self) -> usize {
self.frozen_len + self.buffer.len()
}
pub(super) fn is_empty(&self) -> bool {
self.frozen_len == 0 && self.buffer.is_empty()
}
// Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a
// queue, otherwise, it is added to a buffer.
pub(super) fn put_bytes(&mut self, bytes: Bytes) {
if !self.buffer.is_empty() {
let current = self.buffer.split().freeze();
self.frozen_len += current.len();
self.frozen.push_back(current);
}
if !bytes.is_empty() {
self.frozen_len += bytes.len();
self.frozen.push_back(bytes);
}
}
// Returns a slice of the frontmost buffer
pub(super) fn front_slice(&self) -> &[u8] {
if let Some(front) = self.frozen.front() {
front
} else {
&self.buffer
}
}
// Advances the first buffer by `count` bytes. If the first buffer is advanced to completion,
// it is popped from the queue
pub(super) fn advance(&mut self, count: usize) {
if let Some(front) = self.frozen.front_mut() {
front.advance(count);
if front.is_empty() {
self.frozen.pop_front();
}
self.frozen_len -= count;
} else {
self.buffer.advance(count);
}
}
// Drain the BibBytes, writing everything into the provided BytesMut
pub(super) fn write_to(&mut self, dst: &mut BytesMut) {
dst.reserve(self.total_len());
for buf in &self.frozen {
dst.put_slice(buf);
}
dst.put_slice(&self.buffer.split());
self.frozen_len = 0;
std::mem::take(&mut self.frozen);
}
}

View File

@ -6,11 +6,13 @@ use http::{Method, Version};
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use super::{ use super::{
big_bytes::BigBytes,
decoder::{self, PayloadDecoder, PayloadItem, PayloadType}, decoder::{self, PayloadDecoder, PayloadItem, PayloadType},
encoder, Message, MessageType, encoder, Message, MessageType,
}; };
use crate::{body::BodySize, error::ParseError, ConnectionType, Request, Response, ServiceConfig}; use crate::{
big_bytes::BigBytes, body::BodySize, error::ParseError, ConnectionType, Request, Response,
ServiceConfig,
};
bitflags! { bitflags! {
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]

View File

@ -20,7 +20,6 @@ use tokio_util::codec::Decoder as _;
use tracing::{error, trace}; use tracing::{error, trace};
use super::{ use super::{
big_bytes::BigBytes,
codec::Codec, codec::Codec,
decoder::MAX_BUFFER_SIZE, decoder::MAX_BUFFER_SIZE,
payload::{Payload, PayloadSender, PayloadStatus}, payload::{Payload, PayloadSender, PayloadStatus},
@ -28,6 +27,7 @@ use super::{
Message, MessageType, Message, MessageType,
}; };
use crate::{ use crate::{
big_bytes::BigBytes,
body::{BodySize, BoxBody, MessageBody}, body::{BodySize, BoxBody, MessageBody},
config::ServiceConfig, config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError}, error::{DispatchError, ParseError, PayloadError},

View File

@ -8,8 +8,8 @@ use std::{
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use super::big_bytes::BigBytes;
use crate::{ use crate::{
big_bytes::BigBytes,
body::BodySize, body::BodySize,
header::{ header::{
map::Value, HeaderMap, HeaderName, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, map::Value, HeaderMap, HeaderName, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING,

View File

@ -2,7 +2,6 @@
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
mod big_bytes;
mod chunked; mod chunked;
mod client; mod client;
mod codec; mod codec;

View File

@ -33,6 +33,7 @@
pub use http::{uri, uri::Uri, Method, StatusCode, Version}; pub use http::{uri, uri::Uri, Method, StatusCode, Version};
pub mod big_bytes;
pub mod body; pub mod body;
mod builder; mod builder;
mod config; mod config;

View File

@ -4,6 +4,8 @@ use bytestring::ByteString;
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use tracing::error; use tracing::error;
use crate::big_bytes::BigBytes;
use super::{ use super::{
frame::Parser, frame::Parser,
proto::{CloseReason, OpCode}, proto::{CloseReason, OpCode},
@ -116,51 +118,55 @@ impl Default for Codec {
} }
} }
impl Encoder<Message> for Codec { impl Codec {
type Error = ProtocolError; pub fn encode_bigbytes(
&mut self,
fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { item: Message,
dst: &mut BigBytes,
) -> Result<(), ProtocolError> {
match item { match item {
Message::Text(txt) => Parser::write_message( Message::Text(txt) => Parser::write_message_bigbytes(
dst, dst,
txt, txt.into_bytes(),
OpCode::Text, OpCode::Text,
true, true,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
), ),
Message::Binary(bin) => Parser::write_message( Message::Binary(bin) => Parser::write_message_bigbytes(
dst, dst,
bin, bin,
OpCode::Binary, OpCode::Binary,
true, true,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
), ),
Message::Ping(txt) => Parser::write_message( Message::Ping(txt) => Parser::write_message_bigbytes(
dst, dst,
txt, txt,
OpCode::Ping, OpCode::Ping,
true, true,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
), ),
Message::Pong(txt) => Parser::write_message( Message::Pong(txt) => Parser::write_message_bigbytes(
dst, dst,
txt, txt,
OpCode::Pong, OpCode::Pong,
true, true,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
), ),
Message::Close(reason) => { Message::Close(reason) => Parser::write_close(
Parser::write_close(dst, reason, !self.flags.contains(Flags::SERVER)) dst.buffer_mut(),
} reason,
!self.flags.contains(Flags::SERVER),
),
Message::Continuation(cont) => match cont { Message::Continuation(cont) => match cont {
Item::FirstText(data) => { Item::FirstText(data) => {
if self.flags.contains(Flags::W_CONTINUATION) { if self.flags.contains(Flags::W_CONTINUATION) {
return Err(ProtocolError::ContinuationStarted); return Err(ProtocolError::ContinuationStarted);
} else { } else {
self.flags.insert(Flags::W_CONTINUATION); self.flags.insert(Flags::W_CONTINUATION);
Parser::write_message( Parser::write_message_bigbytes(
dst, dst,
&data[..], data,
OpCode::Text, OpCode::Text,
false, false,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
@ -172,9 +178,9 @@ impl Encoder<Message> for Codec {
return Err(ProtocolError::ContinuationStarted); return Err(ProtocolError::ContinuationStarted);
} else { } else {
self.flags.insert(Flags::W_CONTINUATION); self.flags.insert(Flags::W_CONTINUATION);
Parser::write_message( Parser::write_message_bigbytes(
dst, dst,
&data[..], data,
OpCode::Binary, OpCode::Binary,
false, false,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
@ -183,9 +189,9 @@ impl Encoder<Message> for Codec {
} }
Item::Continue(data) => { Item::Continue(data) => {
if self.flags.contains(Flags::W_CONTINUATION) { if self.flags.contains(Flags::W_CONTINUATION) {
Parser::write_message( Parser::write_message_bigbytes(
dst, dst,
&data[..], data,
OpCode::Continue, OpCode::Continue,
false, false,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
@ -197,9 +203,9 @@ impl Encoder<Message> for Codec {
Item::Last(data) => { Item::Last(data) => {
if self.flags.contains(Flags::W_CONTINUATION) { if self.flags.contains(Flags::W_CONTINUATION) {
self.flags.remove(Flags::W_CONTINUATION); self.flags.remove(Flags::W_CONTINUATION);
Parser::write_message( Parser::write_message_bigbytes(
dst, dst,
&data[..], data,
OpCode::Continue, OpCode::Continue,
true, true,
!self.flags.contains(Flags::SERVER), !self.flags.contains(Flags::SERVER),
@ -215,6 +221,20 @@ impl Encoder<Message> for Codec {
} }
} }
impl Encoder<Message> for Codec {
type Error = ProtocolError;
fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut big_bytes = BigBytes::with_capacity(0);
self.encode_bigbytes(item, &mut big_bytes)?;
big_bytes.write_to(dst);
Ok(())
}
}
impl Decoder for Codec { impl Decoder for Codec {
type Item = Frame; type Item = Frame;
type Error = ProtocolError; type Error = ProtocolError;

View File

@ -1,8 +1,10 @@
use std::cmp::min; use std::cmp::min;
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use tracing::debug; use tracing::debug;
use crate::big_bytes::BigBytes;
use super::{ use super::{
mask::apply_mask, mask::apply_mask,
proto::{CloseCode, CloseReason, OpCode}, proto::{CloseCode, CloseReason, OpCode},
@ -156,21 +158,19 @@ impl Parser {
} }
} }
/// Generate binary representation pub fn write_message_bigbytes(
pub fn write_message<B: AsRef<[u8]>>( dst: &mut BigBytes,
dst: &mut BytesMut, pl: Bytes,
pl: B,
op: OpCode, op: OpCode,
fin: bool, fin: bool,
mask: bool, mask: bool,
) { ) {
let payload = pl.as_ref();
let one: u8 = if fin { let one: u8 = if fin {
0x80 | Into::<u8>::into(op) 0x80 | Into::<u8>::into(op)
} else { } else {
op.into() op.into()
}; };
let payload_len = payload.len(); let payload_len = pl.len();
let (two, p_len) = if mask { let (two, p_len) = if mask {
(0x80, payload_len + 4) (0x80, payload_len + 4)
} else { } else {
@ -178,27 +178,48 @@ impl Parser {
}; };
if payload_len < 126 { if payload_len < 126 {
dst.reserve(p_len + 2 + if mask { 4 } else { 0 }); dst.buffer_mut()
dst.put_slice(&[one, two | payload_len as u8]); .reserve(p_len + 2 + if mask { 4 } else { 0 });
dst.buffer_mut().put_slice(&[one, two | payload_len as u8]);
} else if payload_len <= 65_535 { } else if payload_len <= 65_535 {
dst.reserve(p_len + 4 + if mask { 4 } else { 0 }); dst.buffer_mut()
dst.put_slice(&[one, two | 126]); .reserve(p_len + 4 + if mask { 4 } else { 0 });
dst.put_u16(payload_len as u16); dst.buffer_mut().put_slice(&[one, two | 126]);
dst.buffer_mut().put_u16(payload_len as u16);
} else { } else {
dst.reserve(p_len + 10 + if mask { 4 } else { 0 }); dst.buffer_mut()
dst.put_slice(&[one, two | 127]); .reserve(p_len + 10 + if mask { 4 } else { 0 });
dst.put_u64(payload_len as u64); dst.buffer_mut().put_slice(&[one, two | 127]);
dst.buffer_mut().put_u64(payload_len as u64);
}; };
if mask { if mask {
let mask = rand::random::<[u8; 4]>(); let mask = rand::random::<[u8; 4]>();
dst.put_slice(mask.as_ref()); dst.buffer_mut().put_slice(mask.as_ref());
dst.put_slice(payload.as_ref());
let pos = dst.len() - payload_len; match pl.try_into_mut() {
apply_mask(&mut dst[pos..], mask); Ok(mut pl_mut) => {
} else { apply_mask(&mut pl_mut, mask);
dst.put_slice(payload.as_ref()); dst.put_bytes(pl_mut.freeze());
} }
Err(pl) => {
dst.buffer_mut().put_slice(pl.as_ref());
let pos = dst.buffer_mut().len() - payload_len;
apply_mask(&mut dst.buffer_mut()[pos..], mask);
}
}
} else {
dst.put_bytes(pl)
}
}
/// Generate binary representation
pub fn write_message(dst: &mut BytesMut, pl: Bytes, op: OpCode, fin: bool, mask: bool) {
let mut big_bytes = BigBytes::with_capacity(0);
Self::write_message_bigbytes(&mut big_bytes, pl, op, fin, mask);
big_bytes.write_to(dst);
} }
/// Create a new Close control frame. /// Create a new Close control frame.
@ -215,7 +236,7 @@ impl Parser {
} }
}; };
Parser::write_message(dst, payload, OpCode::Close, true, mask) Parser::write_message(dst, Bytes::from(payload), OpCode::Close, true, mask)
} }
} }
@ -368,7 +389,13 @@ mod tests {
#[test] #[test]
fn test_ping_frame() { fn test_ping_frame() {
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
Parser::write_message(&mut buf, Vec::from("data"), OpCode::Ping, true, false); Parser::write_message(
&mut buf,
Bytes::from(Vec::from("data")),
OpCode::Ping,
true,
false,
);
let mut v = vec![137u8, 4u8]; let mut v = vec![137u8, 4u8];
v.extend(b"data"); v.extend(b"data");
@ -378,7 +405,13 @@ mod tests {
#[test] #[test]
fn test_pong_frame() { fn test_pong_frame() {
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
Parser::write_message(&mut buf, Vec::from("data"), OpCode::Pong, true, false); Parser::write_message(
&mut buf,
Bytes::from(Vec::from("data")),
OpCode::Pong,
true,
false,
);
let mut v = vec![138u8, 4u8]; let mut v = vec![138u8, 4u8];
v.extend(b"data"); v.extend(b"data");