diff --git a/websockets/echo-actorless/src/handler.rs b/websockets/echo-actorless/src/handler.rs index 2fd5193..1d9bac2 100644 --- a/websockets/echo-actorless/src/handler.rs +++ b/websockets/echo-actorless/src/handler.rs @@ -1,11 +1,12 @@ use std::time::{Duration, Instant}; +use actix_web::web; use actix_ws::Message; use futures_util::{ future::{self, Either}, StreamExt as _, }; -use tokio::{pin, time::interval}; +use tokio::{pin, select, sync::broadcast, time::interval}; /// How often heartbeat pings are sent. /// @@ -155,3 +156,109 @@ pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::M log::info!("disconnected"); } + +/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor +/// connection health to detect network issues and free up resources. +pub async fn broadcast_ws( + mut session: actix_ws::Session, + mut msg_stream: actix_ws::MessageStream, + mut rx: broadcast::Receiver, +) { + log::info!("connected"); + + let mut last_heartbeat = Instant::now(); + let mut interval = interval(HEARTBEAT_INTERVAL); + + let reason = loop { + // waits for either `msg_stream` to receive a message from the client, the broadcast channel + // to send a message, or the heartbeat interval timer to tick, yielding the value of + // whichever one is ready first + select! { + broadcast_msg = rx.recv() => { + let msg = match broadcast_msg { + Ok(msg) => msg, + Err(broadcast::error::RecvError::Closed) => break None, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + }; + + let res = match std::str::from_utf8(&msg) { + Ok(val) => session.text(val).await, + Err(_) => session.binary(msg).await, + }; + + if let Err(err) = res { + log::error!("{err}"); + break None; + } + } + + // heartbeat interval ticked + _tick = interval.tick() => { + // if no heartbeat ping/pong received recently, close the connection + if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { + log::info!( + "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" + ); + + break None; + } + + // send heartbeat ping + let _ = session.ping(b"").await; + }, + + msg = msg_stream.next() => { + let msg = match msg { + // received message from WebSocket client + Some(Ok(msg)) => msg, + + // client WebSocket stream error + Some(Err(err)) => { + log::error!("{err}"); + break None; + } + + // client WebSocket stream ended + None => break None + }; + + log::debug!("msg: {msg:?}"); + + match msg { + Message::Text(_) => { + // drop client's text messages + } + + Message::Binary(_) => { + // drop client's binary messages + } + + Message::Close(reason) => { + break reason; + } + + Message::Ping(bytes) => { + last_heartbeat = Instant::now(); + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => { + last_heartbeat = Instant::now(); + } + + Message::Continuation(_) => { + log::warn!("no support for continuation frames"); + } + + // no-op; ignore + Message::Nop => {} + }; + } + } + }; + + // attempt to close connection gracefully + let _ = session.close(reason).await; + + log::info!("disconnected"); +} diff --git a/websockets/echo-actorless/src/main.rs b/websockets/echo-actorless/src/main.rs index 034aca8..f449b60 100644 --- a/websockets/echo-actorless/src/main.rs +++ b/websockets/echo-actorless/src/main.rs @@ -6,6 +6,7 @@ use actix_files::NamedFile; use actix_web::{ middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder, }; +use tokio::sync::broadcast; mod handler; @@ -37,6 +38,31 @@ async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result>, +) -> Result { + tx.send(body) + .map_err(actix_web::error::ErrorInternalServerError)?; + + Ok(HttpResponse::NoContent()) +} + +/// Handshake and start broadcast WebSocket handler with heartbeats. +async fn broadcast_ws( + req: HttpRequest, + stream: web::Payload, + tx: web::Data>, +) -> Result { + let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; + + // spawn websocket handler (and don't await it) so that the response is returned immediately + rt::spawn(handler::broadcast_ws(session, msg_stream, tx.subscribe())); + + Ok(res) +} + // note that the `actix` based WebSocket handling would NOT work under `tokio::main` #[tokio::main(flavor = "current_thread")] async fn main() -> std::io::Result<()> { @@ -44,13 +70,18 @@ async fn main() -> std::io::Result<()> { log::info!("starting HTTP server at http://localhost:8080"); - HttpServer::new(|| { + let (tx, _) = broadcast::channel::(128); + + HttpServer::new(move || { App::new() // WebSocket UI HTML file .service(web::resource("/").to(index)) // websocket routes .service(web::resource("/ws").route(web::get().to(echo_heartbeat_ws))) .service(web::resource("/ws-basic").route(web::get().to(echo_ws))) + .app_data(web::Data::new(tx.clone())) + .service(web::resource("/ws-broadcast").route(web::get().to(broadcast_ws))) + .service(web::resource("/send").route(web::post().to(send))) // enable logger .wrap(middleware::Logger::default()) })