diff --git a/Cargo.lock b/Cargo.lock index 907a3d74..ff0d0739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2446,6 +2446,21 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite 0.2.9", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.21" @@ -3985,6 +4000,12 @@ version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.2" @@ -6849,6 +6870,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.2" @@ -7033,6 +7060,22 @@ dependencies = [ "log", ] +[[package]] +name = "websocket-chat-actorless-example" +version = "1.0.0" +dependencies = [ + "actix-files", + "actix-web", + "actix-ws", + "awc", + "derive_more", + "env_logger 0.9.0", + "futures-util", + "log", + "rand 0.8.5", + "tokio 1.19.2", +] + [[package]] name = "websocket-chat-broker" version = "1.0.0" @@ -7056,10 +7099,10 @@ dependencies = [ "actix-ws", "awc", "env_logger 0.9.0", + "futures-lite", "futures-util", "log", "tokio 1.19.2", - "tokio-stream", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0233bf75..f538a2b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "templating/yarte", "unix-socket", "websockets/autobahn", + "websockets/chat-actorless", "websockets/chat-broker", "websockets/chat-tcp", "websockets/chat", diff --git a/websockets/chat-actorless/Cargo.toml b/websockets/chat-actorless/Cargo.toml new file mode 100644 index 00000000..28f22f56 --- /dev/null +++ b/websockets/chat-actorless/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "websocket-chat-actorless-example" +version = "1.0.0" +edition = "2021" + +[dependencies] +actix-files = "0.6" +actix-web = "4" +actix-ws = "0.2.5" +awc = "3" + +derive_more = "0.99.5" +env_logger = "0.9" +futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] } +log = "0.4" +rand = "0.8" +tokio = { version = "1.13.1", features = ["full"] } diff --git a/websockets/chat-actorless/README.md b/websockets/chat-actorless/README.md new file mode 100644 index 00000000..afd6ebb5 --- /dev/null +++ b/websockets/chat-actorless/README.md @@ -0,0 +1,36 @@ +# WebSocket Chat (actor-less) + +> Multi-room WebSocket chat server using [`actix-ws`]. + +## Usage + +### Server + +```sh +cd websockets/echo-actorless +cargo run +# starting HTTP server at http://localhost:8080 +``` + +### Browser Client + +Go to in a browser. + +### CLI Client + +```sh +# using `websocat` (https://github.com/vi/websocat) +websocat -v --ping-interval=2 ws://127.0.0.1:8080/ws +``` + +## Chat Commands + +Once connected, the following slash commands can be sent: + +- `/list`: list all available rooms +- `/join name`: join room, if room does not exist, create new one +- `/name name`: set session name + +Sending a plain string will broadcast that message to all peers in same room. + +[`actix-ws`]: https://crates.io/crates/actix-ws diff --git a/websockets/chat-actorless/src/command.rs b/websockets/chat-actorless/src/command.rs new file mode 100644 index 00000000..5781f926 --- /dev/null +++ b/websockets/chat-actorless/src/command.rs @@ -0,0 +1,32 @@ +use tokio::sync::{mpsc, oneshot}; + +use crate::{ConnId, Msg, RoomId}; + +#[derive(Debug)] +pub enum Command { + Connect { + conn_tx: mpsc::UnboundedSender, + res_tx: oneshot::Sender, + }, + + Disconnect { + conn: ConnId, + }, + + List { + res_tx: oneshot::Sender>, + }, + + Join { + conn: ConnId, + room: RoomId, + res_tx: oneshot::Sender<()>, + }, + + Message { + room: RoomId, + msg: Msg, + skip: ConnId, + res_tx: oneshot::Sender<()>, + }, +} diff --git a/websockets/chat-actorless/src/handler.rs b/websockets/chat-actorless/src/handler.rs new file mode 100644 index 00000000..f362f67e --- /dev/null +++ b/websockets/chat-actorless/src/handler.rs @@ -0,0 +1,193 @@ +use std::time::{Duration, Instant}; + +use actix_web::rt; +use actix_ws::Message; +use futures_util::stream::StreamExt as _; +use tokio::{ + select, + sync::{ + mpsc::{self, UnboundedSender}, + oneshot, + }, +}; + +use crate::{Command, ConnId, RoomId}; + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); + +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Echo text & binary messages received from the client, respond to ping messages, and monitor +/// connection health to detect network issues and free up resources. +pub async fn chat_ws( + server_tx: UnboundedSender, + mut session: actix_ws::Session, + mut msg_stream: actix_ws::MessageStream, +) { + log::info!("connected"); + + let mut name = None; + let mut room = "main".to_owned(); + let mut last_heartbeat = Instant::now(); + let mut interval = rt::time::interval(HEARTBEAT_INTERVAL); + + let (conn_tx, mut conn_rx) = mpsc::unbounded_channel(); + let (res_tx, res_rx) = oneshot::channel(); + + server_tx + .send(Command::Connect { conn_tx, res_tx }) + .unwrap(); + + let conn_id = res_rx.await.unwrap(); + + loop { + select! { + // commands & messages received from client + Some(Ok(msg)) = msg_stream.next() => { + log::debug!("msg: {msg:?}"); + + match msg { + Message::Ping(bytes) => { + last_heartbeat = Instant::now(); + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => { + last_heartbeat = Instant::now(); + } + + Message::Text(text) => { + process_text_msg(&server_tx, &mut session, &text, conn_id, &mut room, &mut name).await; + } + + Message::Binary(_bin) => { + log::warn!("unexpected binary message"); + } + + Message::Close(reason) => { + let _ = session.close(reason).await; + break; + } + + _ => { + let _ = session.close(None).await; + break; + } + } + } + + // chat messages received from other room participants + Some(chat_msg) = conn_rx.recv() => { + session.text(chat_msg).await.unwrap(); + } + + // heartbeat + _ = interval.tick() => { + // if no heartbeat ping/pong received recently, close the connection + if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { + log::info!("client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"); + let _ = session.close(None).await; + break; + } + + // send heartbeat ping + let _ = session.ping(b"").await; + + // reset interval duration + interval.reset(); + } + }; + } +} + +async fn process_text_msg( + server_tx: &UnboundedSender, + session: &mut actix_ws::Session, + text: &str, + conn: ConnId, + room: &mut RoomId, + name: &mut Option, +) { + let msg = text.trim(); + + // we check for / type of messages + if msg.starts_with('/') { + let mut cmd_args = msg.splitn(2, ' '); + + match cmd_args.next().unwrap() { + "/list" => { + // Send ListRooms message to chat server and wait for + // response + log::info!("List rooms"); + + let (res_tx, res_rx) = oneshot::channel(); + server_tx.send(Command::List { res_tx }).unwrap(); + let rooms = res_rx.await.unwrap(); + + for room in rooms { + session.text(room).await.unwrap(); + } + } + + "/join" => match cmd_args.next() { + Some(room_id) => { + *room = room_id.to_owned(); + + let (res_tx, res_rx) = oneshot::channel(); + + server_tx + .send(Command::Join { + conn, + room: room.clone(), + res_tx, + }) + .unwrap(); + + res_rx.await.unwrap(); + + session.text(format!("joined {room_id}")).await.unwrap(); + } + None => { + session.text("!!! room name is required").await.unwrap(); + } + }, + + "/name" => match cmd_args.next() { + Some(new_name) => { + name.replace(new_name.to_owned()); + } + None => { + session.text("!!! name is required").await.unwrap(); + } + }, + + _ => { + session + .text(format!("!!! unknown command: {msg}")) + .await + .unwrap(); + } + } + } else { + let msg = match name { + Some(ref name) => format!("{name}: {msg}"), + None => msg.to_owned(), + }; + + let (res_tx, res_rx) = oneshot::channel(); + + // send message to chat server + server_tx + .send(Command::Message { + msg, + room: room.clone(), + skip: conn, + res_tx, + }) + .unwrap(); + + res_rx.await.unwrap(); + } +} diff --git a/websockets/chat-actorless/src/main.rs b/websockets/chat-actorless/src/main.rs new file mode 100644 index 00000000..345eadd8 --- /dev/null +++ b/websockets/chat-actorless/src/main.rs @@ -0,0 +1,69 @@ +//! Multi-room WebSocket chat server. +//! +//! Open `http://localhost:8080/` in browser to test. + +use actix_files::NamedFile; +use actix_web::{ + middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder, +}; +use tokio::sync::mpsc::UnboundedSender; + +mod command; +mod handler; +mod server; + +pub use self::command::Command; +pub use self::server::ChatServer; + +/// Connection ID. +pub type ConnId = usize; + +/// Room ID. +pub type RoomId = String; + +/// Message sent to a room/client. +pub type Msg = String; + +async fn index() -> impl Responder { + NamedFile::open_async("./static/index.html").await.unwrap() +} + +/// Handshake and start WebSocket handler with heartbeats. +async fn chat_ws( + req: HttpRequest, + stream: web::Payload, + server_tx: web::Data>, +) -> Result { + let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; + + // spawn websocket handler (and don't await it) so that the response is returned immediately + rt::spawn(handler::chat_ws((**server_tx).clone(), session, msg_stream)); + + Ok(res) +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + log::info!("starting HTTP server at http://localhost:8080"); + + let (chat_server, server_tx) = ChatServer::new(); + + rt::spawn(chat_server.run()); + + HttpServer::new(move || { + App::new() + .app_data(web::Data::new(server_tx.clone())) + // WebSocket UI HTML file + .service(web::resource("/").to(index)) + // websocket routes + .service(web::resource("/ws").route(web::get().to(chat_ws))) + // enable logger + .wrap(middleware::Logger::default()) + }) + .workers(2) + .bind(("127.0.0.1", 8080))? + .run() + .await +} diff --git a/websockets/chat-actorless/src/server.rs b/websockets/chat-actorless/src/server.rs new file mode 100644 index 00000000..10a9e350 --- /dev/null +++ b/websockets/chat-actorless/src/server.rs @@ -0,0 +1,188 @@ +//! A multi-room chat server. + +use std::{ + collections::{HashMap, HashSet}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +use rand::{thread_rng, Rng as _}; +use tokio::sync::{mpsc, oneshot}; + +use crate::{Command, ConnId, Msg, RoomId}; + +/// A multi-room chat server. +#[derive(Debug)] +pub struct ChatServer { + /// Map of connection IDs to their message receivers. + sessions: HashMap>, + + /// Map of room name to participant IDs in that room. + rooms: HashMap>, + + /// Tracks total number of historical connections established. + visitor_count: Arc, + + /// Command receiver. + rx: mpsc::UnboundedReceiver, +} + +impl ChatServer { + pub fn new() -> (Self, mpsc::UnboundedSender) { + // create empty server + let mut rooms = HashMap::with_capacity(4); + + // create default room + rooms.insert("main".to_owned(), HashSet::new()); + + let (tx, rx) = mpsc::unbounded_channel(); + + ( + Self { + sessions: HashMap::new(), + rooms, + visitor_count: Arc::new(AtomicUsize::new(0)), + rx, + }, + tx, + ) + } +} + +impl ChatServer { + /// Send message to all users in the room. + /// + /// `skip_id` is used to prevent messages send by a connection also being received by it. + async fn send_message(&self, room: &str, msg: impl Into, skip_id: ConnId) { + if let Some(sessions) = self.rooms.get(room) { + let msg = msg.into(); + + for conn_id in sessions { + if *conn_id != skip_id { + if let Some(tx) = self.sessions.get(conn_id) { + tx.send(msg.clone()).unwrap(); + } + } + } + } + } + + /// Handler for Connect message. + /// + /// Register new session and assign unique id to this session + async fn connect(&mut self, tx: mpsc::UnboundedSender) -> ConnId { + log::info!("Someone joined"); + + // notify all users in same room + self.send_message("main", "Someone joined", 0).await; + + // register session with random connection ID + let id = thread_rng().gen::(); + self.sessions.insert(id, tx); + + // auto join session to main room + self.rooms + .entry("main".to_owned()) + .or_insert_with(HashSet::new) + .insert(id); + + let count = self.visitor_count.fetch_add(1, Ordering::SeqCst); + self.send_message("main", format!("Total visitors {count}"), 0) + .await; + + // send id back + id + } + + /// Handler for Disconnect message. + async fn disconnect(&mut self, conn_id: ConnId) { + println!("Someone disconnected"); + + let mut rooms: Vec = Vec::new(); + + // remove sender + if self.sessions.remove(&conn_id).is_some() { + // remove session from all rooms + for (name, sessions) in &mut self.rooms { + if sessions.remove(&conn_id) { + rooms.push(name.to_owned()); + } + } + } + + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0).await; + } + } + + /// Handler for `ListRooms` message. + fn list_rooms(&mut self) -> Vec { + let mut rooms = Vec::new(); + + for key in self.rooms.keys() { + rooms.push(key.to_owned()) + } + + rooms + } + + /// Join room, send disconnect message to old room send join message to new room. + async fn join_room(&mut self, conn_id: ConnId, room: String) { + let mut rooms = Vec::new(); + + // remove session from all rooms + for (n, sessions) in &mut self.rooms { + if sessions.remove(&conn_id) { + rooms.push(n.to_owned()); + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0).await; + } + + self.rooms + .entry(room.clone()) + .or_insert_with(HashSet::new) + .insert(conn_id); + + self.send_message(&room, "Someone connected", conn_id).await; + } + + pub async fn run(mut self) { + loop { + match self.rx.recv().await.unwrap() { + Command::Connect { conn_tx, res_tx } => { + let conn_id = self.connect(conn_tx).await; + res_tx.send(conn_id).unwrap(); + } + + Command::Disconnect { conn } => { + self.disconnect(conn).await; + } + + Command::List { res_tx } => { + res_tx.send(self.list_rooms()).unwrap(); + } + + Command::Join { conn, room, res_tx } => { + self.join_room(conn, room).await; + res_tx.send(()).unwrap(); + } + + Command::Message { + room, + msg, + skip, + res_tx, + } => { + self.send_message(&room, msg, skip).await; + res_tx.send(()).unwrap(); + } + } + } + } +} diff --git a/websockets/chat-actorless/static/index.html b/websockets/chat-actorless/static/index.html new file mode 120000 index 00000000..2c6d9def --- /dev/null +++ b/websockets/chat-actorless/static/index.html @@ -0,0 +1 @@ +../../chat/static/index.html \ No newline at end of file