1
0
mirror of https://github.com/actix/examples synced 2025-05-11 16:02:58 +02:00

refactor: improve select usage with tokio::select! macro

This commit is contained in:
destinyFvcker 2025-05-06 10:37:24 +08:00
parent 00aedf05ea
commit 9b6dd7b9b7

View File

@ -1,13 +1,7 @@
use std::{
pin::pin,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use actix_ws::AggregatedMessage;
use futures_util::{
StreamExt as _,
future::{Either, select},
};
use futures_util::StreamExt as _;
use tokio::{sync::mpsc, time::interval};
use crate::{ChatServerHandle, ConnId};
@ -27,7 +21,7 @@ pub async fn chat_ws(
) {
log::info!("connected");
let mut name = None;
let mut name: Option<String> = None;
let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);
@ -36,31 +30,19 @@ pub async fn chat_ws(
// unwrap: chat server is not dropped before the HTTP server
let conn_id = chat_server.connect(conn_tx).await;
let msg_stream = msg_stream
let mut msg_stream = msg_stream
.max_frame_size(128 * 1024)
.aggregate_continuations()
.max_continuation_size(2 * 1024 * 1024);
let mut msg_stream = pin!(msg_stream);
let close_reason = loop {
// most of the futures we process need to be stack-pinned to work with select()
let tick = pin!(interval.tick());
let msg_rx = pin!(conn_rx.recv());
// TODO: nested select is pretty gross for readability on the match
let messages = pin!(select(msg_stream.next(), msg_rx));
match select(messages, tick).await {
// commands & messages received from client
Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => {
tokio::select! {
Some(Ok(msg)) = msg_stream.next() => {
log::debug!("msg: {msg:?}");
match msg {
AggregatedMessage::Ping(bytes) => {
last_heartbeat = Instant::now();
// unwrap:
session.pong(&bytes).await.unwrap();
}
@ -81,39 +63,21 @@ pub async fn chat_ws(
}
}
// client WebSocket stream error
Either::Left((Either::Left((Some(Err(err)), _)), _)) => {
log::error!("{}", err);
break None;
Some(chat_msg) = conn_rx.recv() => {
session.text(chat_msg).await.unwrap();
}
// client WebSocket stream ended
Either::Left((Either::Left((None, _)), _)) => break None,
// chat messages received from other room participants
Either::Left((Either::Right((Some(chat_msg), _)), _)) => {
session.text(chat_msg).await.unwrap();
}
// all connection's message senders were dropped
Either::Left((Either::Right((None, _)), _)) => unreachable!(
"all connection message senders were dropped; chat server may have panicked"
),
// heartbeat internal tick
Either::Right((_inst, _)) => {
// if no heartbeat ping/pong received recently, close the connection
_ = interval.tick() => {
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);
break None;
}
// send heartbeat ping
let _ = session.ping(b"").await;
}
};
else => {
break None;
}
}
};
chat_server.disconnect(conn_id);