use std::time::{Duration, Instant}; use actix_ws::Message; use futures_util::{ future::{self, Either}, StreamExt as _, }; use tokio::{pin, time::interval}; /// How often heartbeat pings are sent. /// /// Should be half (or less) of the acceptable client timeout. const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout. const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// Echo text & binary messages received from the client, respond to ping messages, and monitor /// connection health to detect network issues and free up resources. pub async fn echo_heartbeat_ws( mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream, ) { log::info!("connected"); let mut last_heartbeat = Instant::now(); let mut interval = interval(HEARTBEAT_INTERVAL); let reason = loop { // create "next client timeout check" future let tick = interval.tick(); // required for select() pin!(tick); // waits for either `msg_stream` to receive a message from the client or the heartbeat // interval timer to tick, yielding the value of whichever one is ready first match future::select(msg_stream.next(), tick).await { // received message from WebSocket client Either::Left((Some(Ok(msg)), _)) => { log::debug!("msg: {msg:?}"); match msg { Message::Text(text) => { session.text(text).await.unwrap(); } Message::Binary(bin) => { session.binary(bin).await.unwrap(); } Message::Close(reason) => { break reason; } Message::Ping(bytes) => { last_heartbeat = Instant::now(); let _ = session.pong(&bytes).await; } Message::Pong(_) => { last_heartbeat = Instant::now(); } Message::Continuation(_) => { log::warn!("no support for continuation frames"); } // no-op; ignore Message::Nop => {} }; } // client WebSocket stream error Either::Left((Some(Err(err)), _)) => { log::error!("{}", err); break None; } // client WebSocket stream ended Either::Left((None, _)) => break None, // heartbeat interval ticked Either::Right((_inst, _)) => { // if no heartbeat ping/pong received recently, close the connection if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { log::info!( "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" ); break None; } // send heartbeat ping let _ = session.ping(b"").await; } } }; // attempt to close connection gracefully let _ = session.close(reason).await; log::info!("disconnected"); } /// Echo text & binary messages received from the client and respond to ping messages. /// /// This example is just for demonstration of simplicity. In reality, you likely want to include /// some handling of heartbeats for connection health tracking to free up server resources when /// connections die or network issues arise. /// /// See [`echo_heartbeat_ws`] for a more realistic implementation. pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream) { log::info!("connected"); let close_reason = loop { match msg_stream.next().await { Some(Ok(msg)) => { log::debug!("msg: {msg:?}"); match msg { Message::Text(text) => { session.text(text).await.unwrap(); } Message::Binary(bin) => { session.binary(bin).await.unwrap(); } Message::Close(reason) => { break reason; } Message::Ping(bytes) => { let _ = session.pong(&bytes).await; } Message::Pong(_) => {} Message::Continuation(_) => { log::warn!("no support for continuation frames"); } // no-op; ignore Message::Nop => {} }; } // error or end of stream _ => break None, } }; // attempt to close connection gracefully let _ = session.close(close_reason).await; log::info!("disconnected"); }