1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-23 23:51:06 +01:00

actix-ws: use BigBytes to avoid copying ws messages between various buffers

This commit is contained in:
asonix 2024-11-04 17:42:58 -06:00
parent 77406cbb71
commit a47d61c8d6

View File

@ -1,13 +1,14 @@
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
future::poll_fn, future::poll_fn,
io, mem, io,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use actix_codec::{Decoder, Encoder}; use actix_codec::Decoder;
use actix_http::{ use actix_http::{
big_bytes::BigBytes,
ws::{Codec, Frame, Message, ProtocolError}, ws::{Codec, Frame, Message, ProtocolError},
Payload, Payload,
}; };
@ -24,8 +25,7 @@ use crate::AggregatedMessageStream;
/// Response body for a WebSocket. /// Response body for a WebSocket.
pub struct StreamingBody { pub struct StreamingBody {
session_rx: Receiver<Message>, session_rx: Receiver<Message>,
messages: VecDeque<Message>, buf: BigBytes,
buf: BytesMut,
codec: Codec, codec: Codec,
closing: bool, closing: bool,
} }
@ -34,8 +34,7 @@ impl StreamingBody {
pub(super) fn new(session_rx: Receiver<Message>) -> Self { pub(super) fn new(session_rx: Receiver<Message>) -> Self {
StreamingBody { StreamingBody {
session_rx, session_rx,
messages: VecDeque::new(), buf: BigBytes::with_capacity(0),
buf: BytesMut::new(),
codec: Codec::new(), codec: Codec::new(),
closing: false, closing: false,
} }
@ -118,14 +117,12 @@ impl Stream for StreamingBody {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut(); let this = self.get_mut();
if this.closing { while !this.closing {
return Poll::Ready(None);
}
loop {
match Pin::new(&mut this.session_rx).poll_recv(cx) { match Pin::new(&mut this.session_rx).poll_recv(cx) {
Poll::Ready(Some(msg)) => { Poll::Ready(Some(msg)) => {
this.messages.push_back(msg); if let Err(err) = this.codec.encode_bigbytes(msg, &mut this.buf) {
return Poll::Ready(Some(Err(err.into())));
}
} }
Poll::Ready(None) => { Poll::Ready(None) => {
this.closing = true; this.closing = true;
@ -135,14 +132,12 @@ impl Stream for StreamingBody {
} }
} }
while let Some(msg) = this.messages.pop_front() { if let Some(bytes) = this.buf.pop_front() {
if let Err(err) = this.codec.encode(msg, &mut this.buf) { return Poll::Ready(Some(Ok(bytes)));
return Poll::Ready(Some(Err(err.into())));
}
} }
if !this.buf.is_empty() { if this.closing {
return Poll::Ready(Some(Ok(mem::take(&mut this.buf).freeze()))); return Poll::Ready(None);
} }
Poll::Pending Poll::Pending