diff --git a/actix-ws/src/stream.rs b/actix-ws/src/stream.rs index e95c17f43..27a3cdff9 100644 --- a/actix-ws/src/stream.rs +++ b/actix-ws/src/stream.rs @@ -1,13 +1,14 @@ use std::{ collections::VecDeque, future::poll_fn, - io, mem, + io, pin::Pin, task::{Context, Poll}, }; -use actix_codec::{Decoder, Encoder}; +use actix_codec::Decoder; use actix_http::{ + big_bytes::BigBytes, ws::{Codec, Frame, Message, ProtocolError}, Payload, }; @@ -24,8 +25,7 @@ use crate::AggregatedMessageStream; /// Response body for a WebSocket. pub struct StreamingBody { session_rx: Receiver, - messages: VecDeque, - buf: BytesMut, + buf: BigBytes, codec: Codec, closing: bool, } @@ -34,8 +34,7 @@ impl StreamingBody { pub(super) fn new(session_rx: Receiver) -> Self { StreamingBody { session_rx, - messages: VecDeque::new(), - buf: BytesMut::new(), + buf: BigBytes::with_capacity(0), codec: Codec::new(), closing: false, } @@ -118,14 +117,12 @@ impl Stream for StreamingBody { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - if this.closing { - return Poll::Ready(None); - } - - loop { + while !this.closing { match Pin::new(&mut this.session_rx).poll_recv(cx) { Poll::Ready(Some(msg)) => { - this.messages.push_back(msg); + if let Err(err) = this.codec.encode_bigbytes(msg, &mut this.buf) { + return Poll::Ready(Some(Err(err.into()))); + } } Poll::Ready(None) => { this.closing = true; @@ -135,14 +132,12 @@ impl Stream for StreamingBody { } } - while let Some(msg) = this.messages.pop_front() { - if let Err(err) = this.codec.encode(msg, &mut this.buf) { - return Poll::Ready(Some(Err(err.into()))); - } + if let Some(bytes) = this.buf.pop_front() { + return Poll::Ready(Some(Ok(bytes))); } - if !this.buf.is_empty() { - return Poll::Ready(Some(Ok(mem::take(&mut this.buf).freeze()))); + if this.closing { + return Poll::Ready(None); } Poll::Pending