From 6b044507038b02df8680d865a1fb1ba05a0c84d5 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 13 May 2024 11:49:34 -0500 Subject: [PATCH] Enable sending Continuations from actix-ws (#431) * Enable sending continuations from an actix-ws Session * actix-ws: Allow sending continuations from Session * Convert ignored doctests to no_run doctests --------- Co-authored-by: Rob Ede --- actix-ws/CHANGELOG.md | 1 + actix-ws/src/fut.rs | 5 +++- actix-ws/src/lib.rs | 2 +- actix-ws/src/session.rs | 61 ++++++++++++++++++++++++++++++++++++----- 4 files changed, 60 insertions(+), 9 deletions(-) diff --git a/actix-ws/CHANGELOG.md b/actix-ws/CHANGELOG.md index 985050ec9..97e4ccf46 100644 --- a/actix-ws/CHANGELOG.md +++ b/actix-ws/CHANGELOG.md @@ -4,6 +4,7 @@ - Remove type parameters from `Session::{text, binary}()` methods, replacing with equivalent `impl Trait` parameters. - `Session::text()` now receives an `impl Into`, making broadcasting text messages more efficient. +- Allow sending continuations via `Session::continuation()` ## 0.2.5 diff --git a/actix-ws/src/fut.rs b/actix-ws/src/fut.rs index 92b762c36..a025a429b 100644 --- a/actix-ws/src/fut.rs +++ b/actix-ws/src/fut.rs @@ -65,10 +65,13 @@ impl MessageStream { /// Wait for the next item from the message stream /// - /// ```rust,ignore + /// ```rust,no_run + /// # use actix_ws::MessageStream; + /// # async fn test(mut stream: MessageStream) { /// while let Some(Ok(msg)) = stream.recv().await { /// // handle message /// } + /// # } /// ``` pub async fn recv(&mut self) -> Option> { poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await diff --git a/actix-ws/src/lib.rs b/actix-ws/src/lib.rs index 00dd20f39..67ce756f2 100644 --- a/actix-ws/src/lib.rs +++ b/actix-ws/src/lib.rs @@ -8,7 +8,7 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![cfg_attr(docsrs, feature(doc_auto_cfg))] -pub use actix_http::ws::{CloseCode, CloseReason, Message, ProtocolError}; +pub use actix_http::ws::{CloseCode, CloseReason, Item, Message, ProtocolError}; use actix_http::{ body::{BodyStream, MessageBody}, ws::handshake, diff --git a/actix-ws/src/session.rs b/actix-ws/src/session.rs index c5bf64f8b..3bbfbabbc 100644 --- a/actix-ws/src/session.rs +++ b/actix-ws/src/session.rs @@ -3,7 +3,7 @@ use std::sync::{ Arc, }; -use actix_http::ws::{CloseReason, Message}; +use actix_http::ws::{CloseReason, Item, Message}; use actix_web::web::Bytes; use bytestring::ByteString; use tokio::sync::mpsc::Sender; @@ -45,10 +45,13 @@ impl Session { /// Send text into the websocket /// - /// ```rust,ignore + /// ```rust,no_run + /// # use actix_ws::Session; + /// # async fn test(mut session: Session) { /// if session.text("Some text").await.is_err() { /// // session closed /// } + /// # } /// ``` pub async fn text(&mut self, msg: impl Into) -> Result<(), Closed> { self.pre_check(); @@ -64,10 +67,13 @@ impl Session { /// Send raw bytes into the websocket /// - /// ```rust,ignore - /// if session.binary(b"some bytes").await.is_err() { + /// ```rust,no_run + /// # use actix_ws::Session; + /// # async fn test(mut session: Session) { + /// if session.binary(&b"some bytes"[..]).await.is_err() { /// // session closed /// } + /// # } /// ``` pub async fn binary(&mut self, msg: impl Into) -> Result<(), Closed> { self.pre_check(); @@ -86,10 +92,13 @@ impl Session { /// For many applications, it will be important to send regular pings to keep track of if the /// client has disconnected /// - /// ```rust,ignore + /// ```rust,no_run + /// # use actix_ws::Session; + /// # async fn test(mut session: Session) { /// if session.ping(b"").await.is_err() { /// // session is closed /// } + /// # } /// ``` pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> { self.pre_check(); @@ -105,13 +114,16 @@ impl Session { /// Pong the client /// - /// ```rust,ignore + /// ```rust,no_run + /// # use actix_ws::{Message, Session}; + /// # async fn test(mut session: Session, msg: Message) { /// match msg { /// Message::Ping(bytes) => { /// let _ = session.pong(&bytes).await; /// } /// _ => (), /// } + /// # } pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> { self.pre_check(); if let Some(inner) = self.inner.as_mut() { @@ -124,12 +136,47 @@ impl Session { } } + /// Manually control sending continuations + /// + /// Be wary of this method. Continuations represent multiple frames that, when combined, are + /// presented as a single message. They are useful when the entire contents of a message are + /// not avilable all at once. However, continuations MUST NOT be interrupted by other Text or + /// Binary messages. Control messages such as Ping, Pong, or Close are allowed to interrupt a + /// continuation. + /// + /// Continuations must be initialized with a First variant, and must be terminated by a Last + /// variant, with only Continue variants sent in between. + /// + /// ```rust,no_run + /// # use actix_ws::{Item, Session}; + /// # async fn test(mut session: Session) -> Result<(), Box> { + /// session.continuation(Item::FirstText("Hello".into())).await?; + /// session.continuation(Item::Continue(b", World"[..].into())).await?; + /// session.continuation(Item::Last(b"!"[..].into())).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn continuation(&mut self, msg: Item) -> Result<(), Closed> { + self.pre_check(); + if let Some(inner) = self.inner.as_mut() { + inner + .send(Message::Continuation(msg)) + .await + .map_err(|_| Closed) + } else { + Err(Closed) + } + } + /// Send a close message, and consume the session /// /// All clones will return `Err(Closed)` if used after this call /// - /// ```rust,ignore + /// ```rust,no_run + /// # use actix_ws::{Closed, Session}; + /// # async fn test(mut session: Session) -> Result<(), Closed> { /// session.close(None).await + /// # } /// ``` pub async fn close(mut self, reason: Option) -> Result<(), Closed> { self.pre_check();