1
0
mirror of https://github.com/actix/examples synced 2025-01-22 14:05:55 +01:00

add broadcast example to actorless echo websocket

This commit is contained in:
Rob Ede 2023-08-30 15:36:21 +01:00
parent 72d0d1eb4b
commit 8de8672d6e
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
2 changed files with 140 additions and 2 deletions

View File

@ -1,11 +1,12 @@
use std::time::{Duration, Instant};
use actix_web::web;
use actix_ws::Message;
use futures_util::{
future::{self, Either},
StreamExt as _,
};
use tokio::{pin, time::interval};
use tokio::{pin, select, sync::broadcast, time::interval};
/// How often heartbeat pings are sent.
///
@ -155,3 +156,109 @@ pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::M
log::info!("disconnected");
}
/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn broadcast_ws(
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
mut rx: broadcast::Receiver<web::Bytes>,
) {
log::info!("connected");
let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);
let reason = loop {
// waits for either `msg_stream` to receive a message from the client, the broadcast channel
// to send a message, or the heartbeat interval timer to tick, yielding the value of
// whichever one is ready first
select! {
broadcast_msg = rx.recv() => {
let msg = match broadcast_msg {
Ok(msg) => msg,
Err(broadcast::error::RecvError::Closed) => break None,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
};
let res = match std::str::from_utf8(&msg) {
Ok(val) => session.text(val).await,
Err(_) => session.binary(msg).await,
};
if let Err(err) = res {
log::error!("{err}");
break None;
}
}
// heartbeat interval ticked
_tick = interval.tick() => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);
break None;
}
// send heartbeat ping
let _ = session.ping(b"").await;
},
msg = msg_stream.next() => {
let msg = match msg {
// received message from WebSocket client
Some(Ok(msg)) => msg,
// client WebSocket stream error
Some(Err(err)) => {
log::error!("{err}");
break None;
}
// client WebSocket stream ended
None => break None
};
log::debug!("msg: {msg:?}");
match msg {
Message::Text(_) => {
// drop client's text messages
}
Message::Binary(_) => {
// drop client's binary messages
}
Message::Close(reason) => {
break reason;
}
Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Continuation(_) => {
log::warn!("no support for continuation frames");
}
// no-op; ignore
Message::Nop => {}
};
}
}
};
// attempt to close connection gracefully
let _ = session.close(reason).await;
log::info!("disconnected");
}

View File

@ -6,6 +6,7 @@ use actix_files::NamedFile;
use actix_web::{
middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
use tokio::sync::broadcast;
mod handler;
@ -37,6 +38,31 @@ async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse,
Ok(res)
}
/// Handshake and start broadcast WebSocket handler with heartbeats.
async fn send(
body: web::Bytes,
tx: web::Data<broadcast::Sender<web::Bytes>>,
) -> Result<impl Responder, Error> {
tx.send(body)
.map_err(actix_web::error::ErrorInternalServerError)?;
Ok(HttpResponse::NoContent())
}
/// Handshake and start broadcast WebSocket handler with heartbeats.
async fn broadcast_ws(
req: HttpRequest,
stream: web::Payload,
tx: web::Data<broadcast::Sender<web::Bytes>>,
) -> Result<HttpResponse, Error> {
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
// spawn websocket handler (and don't await it) so that the response is returned immediately
rt::spawn(handler::broadcast_ws(session, msg_stream, tx.subscribe()));
Ok(res)
}
// note that the `actix` based WebSocket handling would NOT work under `tokio::main`
#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
@ -44,13 +70,18 @@ async fn main() -> std::io::Result<()> {
log::info!("starting HTTP server at http://localhost:8080");
HttpServer::new(|| {
let (tx, _) = broadcast::channel::<web::Bytes>(128);
HttpServer::new(move || {
App::new()
// WebSocket UI HTML file
.service(web::resource("/").to(index))
// websocket routes
.service(web::resource("/ws").route(web::get().to(echo_heartbeat_ws)))
.service(web::resource("/ws-basic").route(web::get().to(echo_ws)))
.app_data(web::Data::new(tx.clone()))
.service(web::resource("/ws-broadcast").route(web::get().to(broadcast_ws)))
.service(web::resource("/send").route(web::post().to(send)))
// enable logger
.wrap(middleware::Logger::default())
})