use std::time::{Duration, Instant}; use actix_ws::AggregatedMessage; use futures_util::StreamExt as _; use tokio::{sync::mpsc, time::interval}; use crate::{ChatServerHandle, ConnId}; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// Echo text & binary messages received from the client, respond to ping messages, and monitor /// connection health to detect network issues and free up resources. pub async fn chat_ws( chat_server: ChatServerHandle, mut session: actix_ws::Session, msg_stream: actix_ws::MessageStream, ) { log::info!("connected"); let mut name: Option = None; let mut last_heartbeat = Instant::now(); let mut interval = interval(HEARTBEAT_INTERVAL); let (conn_tx, mut conn_rx) = mpsc::unbounded_channel(); // unwrap: chat server is not dropped before the HTTP server let conn_id = chat_server.connect(conn_tx).await; let mut msg_stream = msg_stream .max_frame_size(128 * 1024) .aggregate_continuations() .max_continuation_size(2 * 1024 * 1024); let close_reason = loop { tokio::select! { Some(Ok(msg)) = msg_stream.next() => { log::debug!("msg: {msg:?}"); match msg { AggregatedMessage::Ping(bytes) => { last_heartbeat = Instant::now(); session.pong(&bytes).await.unwrap(); } AggregatedMessage::Pong(_) => { last_heartbeat = Instant::now(); } AggregatedMessage::Text(text) => { process_text_msg(&chat_server, &mut session, &text, conn_id, &mut name) .await; } AggregatedMessage::Binary(_bin) => { log::warn!("unexpected binary message"); } AggregatedMessage::Close(reason) => break reason, } } Some(chat_msg) = conn_rx.recv() => { session.text(chat_msg).await.unwrap(); } _ = interval.tick() => { if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { break None; } let _ = session.ping(b"").await; } else => { break None; } } }; chat_server.disconnect(conn_id); // attempt to close connection gracefully let _ = session.close(close_reason).await; } async fn process_text_msg( chat_server: &ChatServerHandle, session: &mut actix_ws::Session, text: &str, conn: ConnId, name: &mut Option, ) { // strip leading and trailing whitespace (spaces, newlines, etc.) let msg = text.trim(); // we check for / type of messages if msg.starts_with('/') { let mut cmd_args = msg.splitn(2, ' '); // unwrap: we have guaranteed non-zero string length already match cmd_args.next().unwrap() { "/list" => { log::info!("conn {conn}: listing rooms"); let rooms = chat_server.list_rooms().await; for room in rooms { session.text(room).await.unwrap(); } } "/join" => match cmd_args.next() { Some(room) => { log::info!("conn {conn}: joining room {room}"); chat_server.join_room(conn, room).await; session.text(format!("joined {room}")).await.unwrap(); } None => { session.text("!!! room name is required").await.unwrap(); } }, "/name" => match cmd_args.next() { Some(new_name) => { log::info!("conn {conn}: setting name to: {new_name}"); name.replace(new_name.to_owned()); } None => { session.text("!!! name is required").await.unwrap(); } }, _ => { session .text(format!("!!! unknown command: {msg}")) .await .unwrap(); } } } else { // prefix message with our name, if assigned let msg = match name { Some(name) => format!("{name}: {msg}"), None => msg.to_owned(), }; chat_server.send_message(conn, msg).await } }