diff --git a/websockets/chat-actorless/Cargo.toml b/websockets/chat-actorless/Cargo.toml index 36fdacbf..0c9f0ad4 100644 --- a/websockets/chat-actorless/Cargo.toml +++ b/websockets/chat-actorless/Cargo.toml @@ -11,7 +11,7 @@ awc = "3" derive_more = "0.99.5" env_logger = "0.9" -futures-util = { version = "0.3.17", default-features = false, features = ["std", "sink"] } +futures-util = { version = "0.3.17", default-features = false, features = ["std"] } log = "0.4" rand = "0.8" tokio = { version = "1.13.1", features = ["rt", "macros", "sync", "time"] } diff --git a/websockets/chat-actorless/src/handler.rs b/websockets/chat-actorless/src/handler.rs index f362f67e..f89ba8cb 100644 --- a/websockets/chat-actorless/src/handler.rs +++ b/websockets/chat-actorless/src/handler.rs @@ -1,14 +1,17 @@ use std::time::{Duration, Instant}; -use actix_web::rt; use actix_ws::Message; -use futures_util::stream::StreamExt as _; +use futures_util::{ + future::{select, Either}, + StreamExt as _, +}; use tokio::{ - select, + pin, sync::{ mpsc::{self, UnboundedSender}, oneshot, }, + time::interval, }; use crate::{Command, ConnId, RoomId}; @@ -31,27 +34,41 @@ pub async fn chat_ws( let mut name = None; let mut room = "main".to_owned(); let mut last_heartbeat = Instant::now(); - let mut interval = rt::time::interval(HEARTBEAT_INTERVAL); + let mut interval = interval(HEARTBEAT_INTERVAL); let (conn_tx, mut conn_rx) = mpsc::unbounded_channel(); let (res_tx, res_rx) = oneshot::channel(); + // unwrap: chat server is not dropped before the HTTP server server_tx .send(Command::Connect { conn_tx, res_tx }) .unwrap(); + // unwrap: chat server does not drop our response channel let conn_id = res_rx.await.unwrap(); - loop { - select! { + let close_reason = loop { + // most of the futures we process need to be stack-pinned to work with select() + + let tick = interval.tick(); + pin!(tick); + + let msg_rx = conn_rx.recv(); + pin!(msg_rx); + + let messages = select(msg_stream.next(), msg_rx); + pin!(messages); + + match select(messages, tick).await { // commands & messages received from client - Some(Ok(msg)) = msg_stream.next() => { + Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => { log::debug!("msg: {msg:?}"); match msg { Message::Ping(bytes) => { last_heartbeat = Instant::now(); - let _ = session.pong(&bytes).await; + // unwrap: + session.pong(&bytes).await.unwrap(); } Message::Pong(_) => { @@ -59,47 +76,64 @@ pub async fn chat_ws( } Message::Text(text) => { - process_text_msg(&server_tx, &mut session, &text, conn_id, &mut room, &mut name).await; + process_text_msg( + &server_tx, + &mut session, + &text, + conn_id, + &mut room, + &mut name, + ) + .await; } Message::Binary(_bin) => { log::warn!("unexpected binary message"); } - Message::Close(reason) => { - let _ = session.close(reason).await; - break; - } + Message::Close(reason) => break reason, _ => { - let _ = session.close(None).await; - break; + break None; } } } + // client WebSocket stream error + Either::Left((Either::Left((Some(Err(err)), _)), _)) => { + log::error!("{}", err); + break None; + } + + // client WebSocket stream ended + Either::Left((Either::Left((None, _)), _)) => break None, + // chat messages received from other room participants - Some(chat_msg) = conn_rx.recv() => { + Either::Left((Either::Right((Some(chat_msg), _)), _)) => { session.text(chat_msg).await.unwrap(); } - // heartbeat - _ = interval.tick() => { + // all connection's msg senders were dropped + Either::Left((Either::Right((None, _)), _)) => unreachable!(), + + // heartbeat internal tick + Either::Right((_inst, _)) => { // if no heartbeat ping/pong received recently, close the connection if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { - log::info!("client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"); - let _ = session.close(None).await; - break; + log::info!( + "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" + ); + break None; } // send heartbeat ping let _ = session.ping(b"").await; - - // reset interval duration - interval.reset(); } }; - } + }; + + // attempt to close connection gracefully + let _ = session.close(close_reason).await; } async fn process_text_msg( @@ -110,12 +144,14 @@ async fn process_text_msg( room: &mut RoomId, name: &mut Option, ) { + // strip leading and trailing whitespace (spaces, newlines, etc.) let msg = text.trim(); // we check for / type of messages if msg.starts_with('/') { let mut cmd_args = msg.splitn(2, ' '); + // unwrap: we have guaranteed non-zero string length already match cmd_args.next().unwrap() { "/list" => { // Send ListRooms message to chat server and wait for @@ -124,6 +160,7 @@ async fn process_text_msg( let (res_tx, res_rx) = oneshot::channel(); server_tx.send(Command::List { res_tx }).unwrap(); + // unwrap: chat server does not drop our response channel let rooms = res_rx.await.unwrap(); for room in rooms { @@ -145,6 +182,7 @@ async fn process_text_msg( }) .unwrap(); + // unwrap: chat server does not drop our response channel res_rx.await.unwrap(); session.text(format!("joined {room_id}")).await.unwrap(); @@ -171,6 +209,7 @@ async fn process_text_msg( } } } else { + // prefix message with our name, if assigned let msg = match name { Some(ref name) => format!("{name}: {msg}"), None => msg.to_owned(), @@ -186,8 +225,10 @@ async fn process_text_msg( skip: conn, res_tx, }) + // unwrap: chat server is not dropped before the HTTP server .unwrap(); + // unwrap: chat server does not drop our response channel res_rx.await.unwrap(); } } diff --git a/websockets/chat-actorless/src/main.rs b/websockets/chat-actorless/src/main.rs index e427b156..9342e2dd 100644 --- a/websockets/chat-actorless/src/main.rs +++ b/websockets/chat-actorless/src/main.rs @@ -3,10 +3,12 @@ //! Open `http://localhost:8080/` in browser to test. use actix_files::NamedFile; -use actix_web::{ - middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder, +use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder}; +use tokio::{ + sync::mpsc::UnboundedSender, + task::{spawn, spawn_local}, + try_join, }; -use tokio::sync::mpsc::UnboundedSender; mod command; mod handler; @@ -37,7 +39,7 @@ async fn chat_ws( let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; // spawn websocket handler (and don't await it) so that the response is returned immediately - rt::spawn(handler::chat_ws((**server_tx).clone(), session, msg_stream)); + spawn_local(handler::chat_ws((**server_tx).clone(), session, msg_stream)); Ok(res) } @@ -51,9 +53,9 @@ async fn main() -> std::io::Result<()> { let (chat_server, server_tx) = ChatServer::new(); - rt::spawn(chat_server.run()); + let chat_server = spawn(chat_server.run()); - HttpServer::new(move || { + let http_server = HttpServer::new(move || { App::new() .app_data(web::Data::new(server_tx.clone())) // WebSocket UI HTML file @@ -65,6 +67,9 @@ async fn main() -> std::io::Result<()> { }) .workers(2) .bind(("127.0.0.1", 8080))? - .run() - .await + .run(); + + try_join!(http_server, async move { chat_server.await.unwrap() })?; + + Ok(()) } diff --git a/websockets/chat-actorless/src/server.rs b/websockets/chat-actorless/src/server.rs index c9670f24..c7fbcc83 100644 --- a/websockets/chat-actorless/src/server.rs +++ b/websockets/chat-actorless/src/server.rs @@ -2,6 +2,7 @@ use std::{ collections::{HashMap, HashSet}, + io, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -62,7 +63,8 @@ impl ChatServer { for conn_id in sessions { if *conn_id != skip_id { if let Some(tx) = self.sessions.get(conn_id) { - tx.send(msg.clone()).unwrap(); + // errors if client disconnected abruptly and hasn't been timed-out yet + let _ = tx.send(msg.clone()); } } } @@ -152,12 +154,17 @@ impl ChatServer { self.send_message(&room, "Someone connected", conn_id).await; } - pub async fn run(mut self) { + pub async fn run(mut self) -> io::Result<()> { loop { - match self.rx.recv().await.unwrap() { + let cmd = match self.rx.recv().await { + Some(cmd) => cmd, + None => break, + }; + + match cmd { Command::Connect { conn_tx, res_tx } => { let conn_id = self.connect(conn_tx).await; - res_tx.send(conn_id).unwrap(); + let _ = res_tx.send(conn_id); } Command::Disconnect { conn } => { @@ -165,12 +172,12 @@ impl ChatServer { } Command::List { res_tx } => { - res_tx.send(self.list_rooms()).unwrap(); + let _ = res_tx.send(self.list_rooms()); } Command::Join { conn, room, res_tx } => { self.join_room(conn, room).await; - res_tx.send(()).unwrap(); + let _ = res_tx.send(()); } Command::Message { @@ -180,9 +187,11 @@ impl ChatServer { res_tx, } => { self.send_message(&room, msg, skip).await; - res_tx.send(()).unwrap(); + let _ = res_tx.send(()); } } } + + Ok(()) } } diff --git a/websockets/echo-actorless/src/handler.rs b/websockets/echo-actorless/src/handler.rs index 6eac3e63..2fd51934 100644 --- a/websockets/echo-actorless/src/handler.rs +++ b/websockets/echo-actorless/src/handler.rs @@ -73,6 +73,7 @@ pub async fn echo_heartbeat_ws( // client WebSocket stream error Either::Left((Some(Err(err)), _)) => { log::error!("{}", err); + break None; } // client WebSocket stream ended