diff --git a/Cargo.toml b/Cargo.toml index 645d4f39d..4793d4ee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "actix-session", "actix-settings", "actix-web-httpauth", + "actix-ws", ] [workspace.package] diff --git a/LICENSE-APACHE b/LICENSE-APACHE index d0eb30d70..fb96878c0 100644 --- a/LICENSE-APACHE +++ b/LICENSE-APACHE @@ -186,8 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2017-NOW Nikolay Kim - Copyright 2017-NOW svartalf and Actix team + Copyright 2017-NOW Actix team Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/LICENSE-MIT b/LICENSE-MIT index e57de3745..255963b91 100644 --- a/LICENSE-MIT +++ b/LICENSE-MIT @@ -1,5 +1,4 @@ -Copyright (c) 2017 Nikolay Kim -Copyright (c) 2017 svartalf and Actix team +Copyright (c) 2023 Actix team Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated diff --git a/README.md b/README.md index a52e568fc..a6d55decd 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ | [actix-session] | [![crates.io](https://img.shields.io/crates/v/actix-session?label=latest)](https://crates.io/crates/actix-session) [![dependency status](https://deps.rs/crate/actix-session/latest/status.svg)](https://deps.rs/crate/actix-session) | Session management. | | [actix-settings] | [![crates.io](https://img.shields.io/crates/v/actix-settings?label=latest)](https://crates.io/crates/actix-settings) [![dependency status](https://deps.rs/crate/actix-settings/latest/status.svg)](https://deps.rs/crate/actix-settings) | Easily manage Actix Web's settings from a TOML file and environment variables. | | [actix-web-httpauth] | [![crates.io](https://img.shields.io/crates/v/actix-web-httpauth?label=latest)](https://crates.io/crates/actix-web-httpauth) [![dependency status](https://deps.rs/crate/actix-web-httpauth/latest/status.svg)](https://deps.rs/crate/actix-web-httpauth) | HTTP authentication schemes. | +| [actix-ws] | [![crates.io](https://img.shields.io/crates/v/actix-ws?label=latest)][actix-ws] [![dependency status](https://deps.rs/crate/actix-ws/latest/status.svg)](https://deps.rs/crate/actix-ws) | WebSockets for Actix Web, without actors. | --- @@ -30,7 +31,7 @@ These crates are provided by the community. | -------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------- | | [actix-web-lab] | [![crates.io](https://img.shields.io/crates/v/actix-web-lab?label=latest)][actix-web-lab] [![dependency status](https://deps.rs/crate/actix-web-lab/latest/status.svg)](https://deps.rs/crate/actix-web-lab) | Experimental extractors, middleware, and other extras for possible inclusion in Actix Web. | | [actix-multipart-extract] | [![crates.io](https://img.shields.io/crates/v/actix-multipart-extract?label=latest)][actix-multipart-extract] [![dependency status](https://deps.rs/crate/actix-multipart-extract/latest/status.svg)](https://deps.rs/crate/actix-multipart-extract) | Better multipart form support for Actix Web. | -| [actix-form-data] | [![crates.io](https://img.shields.io/crates/v/actix-form-data?label=latest)][actix-form-data] [![dependency status](https://deps.rs/crate/actix-form-data/latest/status.svg)](https://deps.rs/crate/actix-form-data) | Multipart form data from actix multipart streams | +| [actix-form-data] | [![crates.io](https://img.shields.io/crates/v/actix-form-data?label=latest)][actix-form-data] [![dependency status](https://deps.rs/crate/actix-form-data/latest/status.svg)](https://deps.rs/crate/actix-form-data) | Multipart form data from actix multipart streams | | [actix-governor] | [![crates.io](https://img.shields.io/crates/v/actix-governor?label=latest)][actix-governor] [![dependency status](https://deps.rs/crate/actix-governor/latest/status.svg)](https://deps.rs/crate/actix-governor) | Rate-limiting backed by governor. | | [actix-casbin] | [![crates.io](https://img.shields.io/crates/v/actix-casbin?label=latest)][actix-casbin] [![dependency status](https://deps.rs/crate/actix-casbin/latest/status.svg)](https://deps.rs/crate/actix-casbin) | Authorization library that supports access control models like ACL, RBAC & ABAC. | | [actix-ip-filter] | [![crates.io](https://img.shields.io/crates/v/actix-ip-filter?label=latest)][actix-ip-filter] [![dependency status](https://deps.rs/crate/actix-ip-filter/latest/status.svg)](https://deps.rs/crate/actix-ip-filter) | IP address filter. Supports glob patterns. | @@ -40,7 +41,6 @@ These crates are provided by the community. | [actix-web-flash-messages] | [![crates.io](https://img.shields.io/crates/v/actix-web-flash-messages?label=latest)][actix-web-flash-messages] [![dependency status](https://deps.rs/crate/actix-web-flash-messages/latest/status.svg)](https://deps.rs/crate/actix-web-flash-messages) | Support for flash messages/one-time notifications in `actix-web`. | | [awmp] | [![crates.io](https://img.shields.io/crates/v/awmp?label=latest)][awmp] [![dependency status](https://deps.rs/crate/awmp/latest/status.svg)](https://deps.rs/crate/awmp) | An easy to use wrapper around multipart fields for Actix Web. | | [tracing-actix-web] | [![crates.io](https://img.shields.io/crates/v/tracing-actix-web?label=latest)][tracing-actix-web] [![dependency status](https://deps.rs/crate/tracing-actix-web/latest/status.svg)](https://deps.rs/crate/tracing-actix-web) | A middleware to collect telemetry data from applications built on top of the Actix Web framework. | -| [actix-ws] | [![crates.io](https://img.shields.io/crates/v/actix-ws?label=latest)][actix-ws] [![dependency status](https://deps.rs/crate/actix-ws/latest/status.svg)](https://deps.rs/crate/actix-ws) | Actor-less WebSockets for the Actix Runtime. | | [actix-hash] | [![crates.io](https://img.shields.io/crates/v/actix-hash?label=latest)][actix-hash] [![dependency status](https://deps.rs/crate/actix-hash/latest/status.svg)](https://deps.rs/crate/actix-hash) | Hashing utilities for Actix Web. | | [actix-bincode] | ![crates.io](https://img.shields.io/crates/v/actix-bincode?label=latest) [![dependency status](https://deps.rs/crate/actix-bincode/latest/status.svg)](https://deps.rs/crate/actix-bincode) | Bincode payload extractor for Actix Web | | [sentinel-actix] | ![crates.io](https://img.shields.io/crates/v/sentinel-actix?label=latest) [![dependency status](https://deps.rs/crate/sentinel-actix/latest/status.svg)](https://deps.rs/crate/sentinel-actix) | General and flexible protection for Actix Web | diff --git a/actix-web-httpauth/Cargo.toml b/actix-web-httpauth/Cargo.toml index f666b6238..f054cdbc8 100644 --- a/actix-web-httpauth/Cargo.toml +++ b/actix-web-httpauth/Cargo.toml @@ -1,15 +1,15 @@ [package] name = "actix-web-httpauth" version = "0.8.1" +description = "HTTP authentication schemes for Actix Web" +categories = ["web-programming"] +keywords = ["http", "web", "framework", "authentication", "security"] authors = [ "svartalf ", "Yuki Okushi ", ] -description = "HTTP authentication schemes for Actix Web" -keywords = ["http", "web", "framework", "authentication", "security"] homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-extras.git" -categories = ["web-programming::http-server"] +repository = "https://github.com/actix/actix-extras" license.workspace = true edition.workspace = true rust-version.workspace = true diff --git a/actix-ws/CHANGELOG.md b/actix-ws/CHANGELOG.md new file mode 100644 index 000000000..985050ec9 --- /dev/null +++ b/actix-ws/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +## Unreleased + +- Remove type parameters from `Session::{text, binary}()` methods, replacing with equivalent `impl Trait` parameters. +- `Session::text()` now receives an `impl Into`, making broadcasting text messages more efficient. + +## 0.2.5 + +- Adopted into @actix org from . diff --git a/actix-ws/Cargo.toml b/actix-ws/Cargo.toml new file mode 100644 index 000000000..a59415cff --- /dev/null +++ b/actix-ws/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "actix-ws" +version = "0.2.0" +description = "WebSockets for Actix Web, without actors" +categories = ["web-programming::websocket"] +keywords = ["actix", "web", "websocket", "websockets", "http"] +authors = [ + "asonix ", + "Rob Ede ", +] +repository = "https://github.com/actix/actix-extras" +license.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +actix-codec = "0.5" +actix-http = { version = "3", default-features = false, features = ["ws"] } +actix-web = { version = "4", default-features = false } +bytestring = "1" +futures-core = "0.3.17" +tokio = { version = "1", features = ["sync"] } + +[dev-dependencies] +actix-rt = "2.6" +actix-web = "4.0.1" +anyhow = "1.0" +futures = "0.3" +log = "0.4" +pretty_env_logger = "0.5" +tokio = { version = "1", features = ["sync"] } diff --git a/actix-ws/LICENSE-APACHE b/actix-ws/LICENSE-APACHE new file mode 120000 index 000000000..965b606f3 --- /dev/null +++ b/actix-ws/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/actix-ws/LICENSE-MIT b/actix-ws/LICENSE-MIT new file mode 120000 index 000000000..76219eb72 --- /dev/null +++ b/actix-ws/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/actix-ws/README.md b/actix-ws/README.md new file mode 100644 index 000000000..6117d53a9 --- /dev/null +++ b/actix-ws/README.md @@ -0,0 +1,74 @@ +# Actix WS (Next Gen) + +> WebSockets for Actix Web, without actors. + +[![crates.io](https://img.shields.io/crates/v/actix-ws?label=latest)](https://crates.io/crates/actix-ws) +[![Documentation](https://docs.rs/actix-ws/badge.svg?version=0.2.0)](https://docs.rs/actix-ws/0.2.0) +![Apache 2.0 or MIT licensed](https://img.shields.io/crates/l/actix-ws) +[![Dependency Status](https://deps.rs/crate/actix-ws/0.2.0/status.svg)](https://deps.rs/crate/actix-ws/0.2.0) + +## Documentation & Resources + +- [API Documentation](https://docs.rs/actix-ws) +- [Example Projects](https://github.com/actix/examples/tree/master/websockets) +- Minimum Supported Rust Version (MSRV): 1.68 + +## Usage + +```toml +# Cargo.toml +anyhow = "1" +actix-web = "4" +actix-ws-ng = "0.3" +``` + +```rust +// main.rs +use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use actix_ws::Message; + +async fn ws(req: HttpRequest, body: web::Payload) -> Result { + let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; + + actix_rt::spawn(async move { + while let Some(Ok(msg)) = msg_stream.next().await { + match msg { + Message::Ping(bytes) => { + if session.pong(&bytes).await.is_err() { + return; + } + } + Message::Text(s) => println!("Got text, {}", s), + _ => break, + } + } + + let _ = session.close(None).await; + }); + + Ok(response) +} + +#[actix_web::main] +async fn main() -> Result<(), anyhow::Error> { + HttpServer::new(move || { + App::new() + .wrap(Logger::default()) + .route("/ws", web::get().to(ws)) + }) + .bind("127.0.0.1:8080")? + .run() + .await?; + + Ok(()) +} +``` + +## License + +This project is licensed under either of + +- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) +- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT) + +at your option. diff --git a/actix-ws/examples/chat.rs b/actix-ws/examples/chat.rs new file mode 100644 index 000000000..035ce7a49 --- /dev/null +++ b/actix-ws/examples/chat.rs @@ -0,0 +1,208 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use actix_web::{middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer}; +use actix_ws::{Message, Session}; +use futures::stream::{FuturesUnordered, StreamExt as _}; +use log::info; +use tokio::sync::Mutex; + +#[derive(Clone)] +struct Chat { + inner: Arc>, +} + +struct ChatInner { + sessions: Vec, +} + +impl Chat { + fn new() -> Self { + Chat { + inner: Arc::new(Mutex::new(ChatInner { + sessions: Vec::new(), + })), + } + } + + async fn insert(&self, session: Session) { + self.inner.lock().await.sessions.push(session); + } + + async fn send(&self, msg: String) { + let mut inner = self.inner.lock().await; + let mut unordered = FuturesUnordered::new(); + + for mut session in inner.sessions.drain(..) { + let msg = msg.clone(); + unordered.push(async move { + let res = session.text(msg).await; + res.map(|_| session).map_err(|_| info!("Dropping session")) + }); + } + + while let Some(res) = unordered.next().await { + if let Ok(session) = res { + inner.sessions.push(session); + } + } + } +} + +async fn ws( + req: HttpRequest, + body: web::Payload, + chat: web::Data, +) -> Result { + let (response, mut session, mut stream) = actix_ws::handle(&req, body)?; + + chat.insert(session.clone()).await; + info!("Inserted session"); + + let alive = Arc::new(Mutex::new(Instant::now())); + + let mut session2 = session.clone(); + let alive2 = alive.clone(); + actix_rt::spawn(async move { + let mut interval = actix_rt::time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + if session2.ping(b"").await.is_err() { + break; + } + + if Instant::now().duration_since(*alive2.lock().await) > Duration::from_secs(10) { + let _ = session2.close(None).await; + break; + } + } + }); + + actix_rt::spawn(async move { + while let Some(Ok(msg)) = stream.next().await { + match msg { + Message::Ping(bytes) => { + if session.pong(&bytes).await.is_err() { + return; + } + } + Message::Text(s) => { + info!("Relaying text, {}", s); + let s: &str = s.as_ref(); + chat.send(s.into()).await; + } + Message::Close(reason) => { + let _ = session.close(reason).await; + info!("Got close, bailing"); + return; + } + Message::Continuation(_) => { + let _ = session.close(None).await; + info!("Got continuation, bailing"); + return; + } + Message::Pong(_) => { + *alive.lock().await = Instant::now(); + } + _ => (), + }; + } + let _ = session.close(None).await; + }); + info!("Spawned"); + + Ok(response) +} + +async fn index() -> HttpResponse { + let s = r#" + + + + Chat + + + + +
    +
+ + + "#; + + HttpResponse::Ok().content_type("text/html").body(s) +} + +#[actix_rt::main] +async fn main() -> Result<(), anyhow::Error> { + std::env::set_var("RUST_LOG", "info"); + pretty_env_logger::init(); + let chat = Chat::new(); + + HttpServer::new(move || { + App::new() + .wrap(Logger::default()) + .app_data(web::Data::new(chat.clone())) + .route("/", web::get().to(index)) + .route("/ws", web::get().to(ws)) + }) + .bind("127.0.0.1:8080")? + .run() + .await?; + + Ok(()) +} diff --git a/actix-ws/src/fut.rs b/actix-ws/src/fut.rs new file mode 100644 index 000000000..92b762c36 --- /dev/null +++ b/actix-ws/src/fut.rs @@ -0,0 +1,183 @@ +use std::{ + collections::VecDeque, + future::poll_fn, + io, + pin::Pin, + task::{Context, Poll}, +}; + +use actix_codec::{Decoder, Encoder}; +use actix_http::{ + ws::{Codec, Frame, Message, ProtocolError}, + Payload, +}; +use actix_web::{ + web::{Bytes, BytesMut}, + Error, +}; +use futures_core::stream::Stream; +use tokio::sync::mpsc::Receiver; + +/// A response body for Websocket HTTP Requests +pub struct StreamingBody { + session_rx: Receiver, + + messages: VecDeque, + buf: BytesMut, + codec: Codec, + closing: bool, +} + +/// A stream of Messages from a websocket client +/// +/// Messages can be accessed via the stream's `.next()` method +pub struct MessageStream { + payload: Payload, + + messages: VecDeque, + buf: BytesMut, + codec: Codec, + closing: bool, +} + +impl StreamingBody { + pub(super) fn new(session_rx: Receiver) -> Self { + StreamingBody { + session_rx, + messages: VecDeque::new(), + buf: BytesMut::new(), + codec: Codec::new(), + closing: false, + } + } +} + +impl MessageStream { + pub(super) fn new(payload: Payload) -> Self { + MessageStream { + payload, + messages: VecDeque::new(), + buf: BytesMut::new(), + codec: Codec::new(), + closing: false, + } + } + + /// Wait for the next item from the message stream + /// + /// ```rust,ignore + /// while let Some(Ok(msg)) = stream.recv().await { + /// // handle message + /// } + /// ``` + pub async fn recv(&mut self) -> Option> { + poll_fn(|cx| Pin::new(&mut *self).poll_next(cx)).await + } +} + +impl Stream for StreamingBody { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + if this.closing { + return Poll::Ready(None); + } + + loop { + match Pin::new(&mut this.session_rx).poll_recv(cx) { + Poll::Ready(Some(msg)) => { + this.messages.push_back(msg); + } + Poll::Ready(None) => { + this.closing = true; + break; + } + Poll::Pending => break, + } + } + + while let Some(msg) = this.messages.pop_front() { + if let Err(e) = this.codec.encode(msg, &mut this.buf) { + return Poll::Ready(Some(Err(e.into()))); + } + } + + if !this.buf.is_empty() { + return Poll::Ready(Some(Ok(this.buf.split().freeze()))); + } + + Poll::Pending + } +} + +impl Stream for MessageStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + // Return the first message in the queue if one exists + // + // This is faster than polling and parsing + if let Some(msg) = this.messages.pop_front() { + return Poll::Ready(Some(Ok(msg))); + } + + if !this.closing { + // Read in bytes until there's nothing left to read + loop { + match Pin::new(&mut this.payload).poll_next(cx) { + Poll::Ready(Some(Ok(bytes))) => { + this.buf.extend_from_slice(&bytes); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(ProtocolError::Io(io::Error::new( + io::ErrorKind::Other, + e.to_string(), + ))))); + } + Poll::Ready(None) => { + this.closing = true; + break; + } + Poll::Pending => break, + } + } + } + + // Create messages until there's no more bytes left + while let Some(frame) = this.codec.decode(&mut this.buf)? { + let message = match frame { + Frame::Text(bytes) => { + let s = std::str::from_utf8(&bytes) + .map_err(|e| { + ProtocolError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())) + })? + .to_string(); + Message::Text(s.into()) + } + Frame::Binary(bytes) => Message::Binary(bytes), + Frame::Ping(bytes) => Message::Ping(bytes), + Frame::Pong(bytes) => Message::Pong(bytes), + Frame::Close(reason) => Message::Close(reason), + Frame::Continuation(item) => Message::Continuation(item), + }; + + this.messages.push_back(message); + } + + // Return the first message in the queue + if let Some(msg) = this.messages.pop_front() { + return Poll::Ready(Some(Ok(msg))); + } + + // If we've exhausted our message queue and we're closing, close the stream + if this.closing { + return Poll::Ready(None); + } + + Poll::Pending + } +} diff --git a/actix-ws/src/lib.rs b/actix-ws/src/lib.rs new file mode 100644 index 000000000..20cc016a7 --- /dev/null +++ b/actix-ws/src/lib.rs @@ -0,0 +1,84 @@ +//! WebSockets for Actix Web, without actors. +//! +//! For usage, see documentation on [`handle()`]. + +#![deny(rust_2018_idioms, nonstandard_style, future_incompatible)] +#![warn(missing_docs)] +#![doc(html_logo_url = "https://actix.rs/img/logo.png")] +#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +pub use actix_http::ws::{CloseCode, CloseReason, Message, ProtocolError}; +use actix_http::{ + body::{BodyStream, MessageBody}, + ws::handshake, +}; +use actix_web::{web, HttpRequest, HttpResponse}; +use tokio::sync::mpsc::channel; + +mod fut; +mod session; + +pub use self::{ + fut::{MessageStream, StreamingBody}, + session::{Closed, Session}, +}; + +/// Begin handling websocket traffic +/// +/// ```no_run +/// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer}; +/// use actix_ws::Message; +/// use futures::stream::StreamExt as _; +/// +/// async fn ws(req: HttpRequest, body: web::Payload) -> Result { +/// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; +/// +/// actix_rt::spawn(async move { +/// while let Some(Ok(msg)) = msg_stream.next().await { +/// match msg { +/// Message::Ping(bytes) => { +/// if session.pong(&bytes).await.is_err() { +/// return; +/// } +/// } +/// Message::Text(s) => println!("Got text, {}", s), +/// _ => break, +/// } +/// } +/// +/// let _ = session.close(None).await; +/// }); +/// +/// Ok(response) +/// } +/// +/// #[actix_rt::main] +/// async fn main() -> Result<(), anyhow::Error> { +/// HttpServer::new(move || { +/// App::new() +/// .wrap(Logger::default()) +/// .route("/ws", web::get().to(ws)) +/// }) +/// .bind("127.0.0.1:8080")? +/// .run() +/// .await?; +/// +/// Ok(()) +/// } +/// ``` +pub fn handle( + req: &HttpRequest, + body: web::Payload, +) -> Result<(HttpResponse, Session, MessageStream), actix_web::Error> { + let mut response = handshake(req.head())?; + let (tx, rx) = channel(32); + + Ok(( + response + .message_body(BodyStream::new(StreamingBody::new(rx)).boxed())? + .into(), + Session::new(tx), + MessageStream::new(body.into_inner()), + )) +} diff --git a/actix-ws/src/session.rs b/actix-ws/src/session.rs new file mode 100644 index 000000000..c5bf64f8b --- /dev/null +++ b/actix-ws/src/session.rs @@ -0,0 +1,143 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use actix_http::ws::{CloseReason, Message}; +use actix_web::web::Bytes; +use bytestring::ByteString; +use tokio::sync::mpsc::Sender; + +/// A handle into the websocket session. +/// +/// This type can be used to send messages into the websocket. +#[derive(Clone)] +pub struct Session { + inner: Option>, + closed: Arc, +} + +/// The error representing a closed websocket session +#[derive(Debug)] +pub struct Closed; + +impl std::fmt::Display for Closed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Session is closed") + } +} + +impl std::error::Error for Closed {} + +impl Session { + pub(super) fn new(inner: Sender) -> Self { + Session { + inner: Some(inner), + closed: Arc::new(AtomicBool::new(false)), + } + } + + fn pre_check(&mut self) { + if self.closed.load(Ordering::Relaxed) { + self.inner.take(); + } + } + + /// Send text into the websocket + /// + /// ```rust,ignore + /// if session.text("Some text").await.is_err() { + /// // session closed + /// } + /// ``` + pub async fn text(&mut self, msg: impl Into) -> Result<(), Closed> { + self.pre_check(); + if let Some(inner) = self.inner.as_mut() { + inner + .send(Message::Text(msg.into())) + .await + .map_err(|_| Closed) + } else { + Err(Closed) + } + } + + /// Send raw bytes into the websocket + /// + /// ```rust,ignore + /// if session.binary(b"some bytes").await.is_err() { + /// // session closed + /// } + /// ``` + pub async fn binary(&mut self, msg: impl Into) -> Result<(), Closed> { + self.pre_check(); + if let Some(inner) = self.inner.as_mut() { + inner + .send(Message::Binary(msg.into())) + .await + .map_err(|_| Closed) + } else { + Err(Closed) + } + } + + /// Ping the client + /// + /// For many applications, it will be important to send regular pings to keep track of if the + /// client has disconnected + /// + /// ```rust,ignore + /// if session.ping(b"").await.is_err() { + /// // session is closed + /// } + /// ``` + pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> { + self.pre_check(); + if let Some(inner) = self.inner.as_mut() { + inner + .send(Message::Ping(Bytes::copy_from_slice(msg))) + .await + .map_err(|_| Closed) + } else { + Err(Closed) + } + } + + /// Pong the client + /// + /// ```rust,ignore + /// match msg { + /// Message::Ping(bytes) => { + /// let _ = session.pong(&bytes).await; + /// } + /// _ => (), + /// } + pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> { + self.pre_check(); + if let Some(inner) = self.inner.as_mut() { + inner + .send(Message::Pong(Bytes::copy_from_slice(msg))) + .await + .map_err(|_| Closed) + } else { + Err(Closed) + } + } + + /// Send a close message, and consume the session + /// + /// All clones will return `Err(Closed)` if used after this call + /// + /// ```rust,ignore + /// session.close(None).await + /// ``` + pub async fn close(mut self, reason: Option) -> Result<(), Closed> { + self.pre_check(); + if let Some(inner) = self.inner.take() { + self.closed.store(true, Ordering::Relaxed); + inner.send(Message::Close(reason)).await.map_err(|_| Closed) + } else { + Err(Closed) + } + } +}