diff --git a/websockets/chat-actorless/src/handler.rs b/websockets/chat-actorless/src/handler.rs index f3da3be9..092c38af 100644 --- a/websockets/chat-actorless/src/handler.rs +++ b/websockets/chat-actorless/src/handler.rs @@ -1,13 +1,7 @@ -use std::{ - pin::pin, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use actix_ws::AggregatedMessage; -use futures_util::{ - StreamExt as _, - future::{Either, select}, -}; +use futures_util::StreamExt as _; use tokio::{sync::mpsc, time::interval}; use crate::{ChatServerHandle, ConnId}; @@ -27,7 +21,7 @@ pub async fn chat_ws( ) { log::info!("connected"); - let mut name = None; + let mut name: Option = None; let mut last_heartbeat = Instant::now(); let mut interval = interval(HEARTBEAT_INTERVAL); @@ -36,31 +30,19 @@ pub async fn chat_ws( // unwrap: chat server is not dropped before the HTTP server let conn_id = chat_server.connect(conn_tx).await; - let msg_stream = msg_stream + let mut msg_stream = msg_stream .max_frame_size(128 * 1024) .aggregate_continuations() .max_continuation_size(2 * 1024 * 1024); - let mut msg_stream = pin!(msg_stream); - let close_reason = loop { - // most of the futures we process need to be stack-pinned to work with select() - - let tick = pin!(interval.tick()); - let msg_rx = pin!(conn_rx.recv()); - - // TODO: nested select is pretty gross for readability on the match - let messages = pin!(select(msg_stream.next(), msg_rx)); - - match select(messages, tick).await { - // commands & messages received from client - Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => { + tokio::select! { + Some(Ok(msg)) = msg_stream.next() => { log::debug!("msg: {msg:?}"); match msg { AggregatedMessage::Ping(bytes) => { last_heartbeat = Instant::now(); - // unwrap: session.pong(&bytes).await.unwrap(); } @@ -81,39 +63,21 @@ pub async fn chat_ws( } } - // client WebSocket stream error - Either::Left((Either::Left((Some(Err(err)), _)), _)) => { - log::error!("{}", err); - break None; + Some(chat_msg) = conn_rx.recv() => { + session.text(chat_msg).await.unwrap(); } - // client WebSocket stream ended - Either::Left((Either::Left((None, _)), _)) => break None, - - // chat messages received from other room participants - Either::Left((Either::Right((Some(chat_msg), _)), _)) => { - session.text(chat_msg).await.unwrap(); - } - - // all connection's message senders were dropped - Either::Left((Either::Right((None, _)), _)) => unreachable!( - "all connection message senders were dropped; chat server may have panicked" - ), - - // heartbeat internal tick - Either::Right((_inst, _)) => { - // if no heartbeat ping/pong received recently, close the connection + _ = interval.tick() => { if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { - log::info!( - "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" - ); break None; } - - // send heartbeat ping let _ = session.ping(b"").await; } - }; + + else => { + break None; + } + } }; chat_server.disconnect(conn_id);