1
0
mirror of https://github.com/actix/examples synced 2025-05-12 16:32:48 +02:00

Merge pull request #1082 from destinyFvcker/refactor/websocket-select-idiomatic

refactor: improve select usage with tokio::select! macro in websockets/actor-actorless example
This commit is contained in:
Rob Ede 2025-05-09 20:14:10 +00:00 committed by GitHub
commit f0035f7188
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,13 +1,7 @@
use std::{ use std::time::{Duration, Instant};
pin::pin,
time::{Duration, Instant},
};
use actix_ws::AggregatedMessage; use actix_ws::AggregatedMessage;
use futures_util::{ use futures_util::StreamExt as _;
StreamExt as _,
future::{Either, select},
};
use tokio::{sync::mpsc, time::interval}; use tokio::{sync::mpsc, time::interval};
use crate::{ChatServerHandle, ConnId}; use crate::{ChatServerHandle, ConnId};
@ -27,7 +21,7 @@ pub async fn chat_ws(
) { ) {
log::info!("connected"); log::info!("connected");
let mut name = None; let mut name: Option<String> = None;
let mut last_heartbeat = Instant::now(); let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL); let mut interval = interval(HEARTBEAT_INTERVAL);
@ -36,31 +30,19 @@ pub async fn chat_ws(
// unwrap: chat server is not dropped before the HTTP server // unwrap: chat server is not dropped before the HTTP server
let conn_id = chat_server.connect(conn_tx).await; let conn_id = chat_server.connect(conn_tx).await;
let msg_stream = msg_stream let mut msg_stream = msg_stream
.max_frame_size(128 * 1024) .max_frame_size(128 * 1024)
.aggregate_continuations() .aggregate_continuations()
.max_continuation_size(2 * 1024 * 1024); .max_continuation_size(2 * 1024 * 1024);
let mut msg_stream = pin!(msg_stream);
let close_reason = loop { let close_reason = loop {
// most of the futures we process need to be stack-pinned to work with select() tokio::select! {
Some(Ok(msg)) = msg_stream.next() => {
let tick = pin!(interval.tick());
let msg_rx = pin!(conn_rx.recv());
// TODO: nested select is pretty gross for readability on the match
let messages = pin!(select(msg_stream.next(), msg_rx));
match select(messages, tick).await {
// commands & messages received from client
Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => {
log::debug!("msg: {msg:?}"); log::debug!("msg: {msg:?}");
match msg { match msg {
AggregatedMessage::Ping(bytes) => { AggregatedMessage::Ping(bytes) => {
last_heartbeat = Instant::now(); last_heartbeat = Instant::now();
// unwrap:
session.pong(&bytes).await.unwrap(); session.pong(&bytes).await.unwrap();
} }
@ -81,39 +63,21 @@ pub async fn chat_ws(
} }
} }
// client WebSocket stream error Some(chat_msg) = conn_rx.recv() => {
Either::Left((Either::Left((Some(Err(err)), _)), _)) => { session.text(chat_msg).await.unwrap();
log::error!("{}", err);
break None;
} }
// client WebSocket stream ended _ = interval.tick() => {
Either::Left((Either::Left((None, _)), _)) => break None,
// chat messages received from other room participants
Either::Left((Either::Right((Some(chat_msg), _)), _)) => {
session.text(chat_msg).await.unwrap();
}
// all connection's message senders were dropped
Either::Left((Either::Right((None, _)), _)) => unreachable!(
"all connection message senders were dropped; chat server may have panicked"
),
// heartbeat internal tick
Either::Right((_inst, _)) => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);
break None; break None;
} }
// send heartbeat ping
let _ = session.ping(b"").await; let _ = session.ping(b"").await;
} }
};
else => {
break None;
}
}
}; };
chat_server.disconnect(conn_id); chat_server.disconnect(conn_id);