diff --git a/.gitignore b/.gitignore index dbdafba..72bdb47 100644 --- a/.gitignore +++ b/.gitignore @@ -16,5 +16,8 @@ Cargo.lock .history/ +# VS Code workspace config +.vscode + # For multipart example upload.png diff --git a/Cargo.toml b/Cargo.toml index 0319017..642ea3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ members = [ "web-cors/backend", "websocket", "websocket-chat", -# "websocket-chat-broker", + "websocket-chat-broker", "websocket-tcp-chat", "docker_sample", ] diff --git a/websocket-chat-broker/Cargo.toml b/websocket-chat-broker/Cargo.toml index ccac402..cf7fcbd 100644 --- a/websocket-chat-broker/Cargo.toml +++ b/websocket-chat-broker/Cargo.toml @@ -1,7 +1,10 @@ [package] -name = "websocket-broker-example" +name = "websocket-chat-broker" version = "0.1.0" -authors = ["Chris Ricketts "] +authors = [ + "Chris Ricketts ", + "Rob Ede ", +] edition = "2018" [[bin]] @@ -9,12 +12,13 @@ name = "server" path = "src/main.rs" [dependencies] -rand = "0.6" -futures = "0.1.24" -actix = "0.8.2" -actix-web = "1.0" -actix-files = "0.1" -actix-web-actors = "1.0" -actix-broker = "0.2.0" -log = "0.4.5" -simple_logger = "0.5.0" +actix = "0.9" +actix-broker = "0.3.0" +actix-files = "0.2" +actix-rt = "1.1" +actix-web = "2.0" +actix-web-actors = "2.0" +env_logger = "0.7" +futures = "0.3" +log = "0.4" +rand = "0.7" diff --git a/websocket-chat-broker/src/main.rs b/websocket-chat-broker/src/main.rs index 4509bad..094b031 100644 --- a/websocket-chat-broker/src/main.rs +++ b/websocket-chat-broker/src/main.rs @@ -1,159 +1,36 @@ -#[macro_use] -extern crate log; +use log::info; -use actix::fut; -use actix::prelude::*; -use actix_broker::BrokerIssue; use actix_files::Files; use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web_actors::ws; +mod message; mod server; -use server::*; +mod session; -fn chat_route(req: HttpRequest, stream: web::Payload) -> Result { +use session::WsChatSession; + +async fn chat_route( + req: HttpRequest, + stream: web::Payload, +) -> Result { ws::start(WsChatSession::default(), &req, stream) } -#[derive(Default)] -struct WsChatSession { - id: usize, - room: String, - name: Option, -} +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + env_logger::from_env(env_logger::Env::default().default_filter_or("info")).init(); -impl WsChatSession { - fn join_room(&mut self, room_name: &str, ctx: &mut ws::WebsocketContext) { - let room_name = room_name.to_owned(); - // First send a leave message for the current room - let leave_msg = LeaveRoom(self.room.clone(), self.id); - // issue_sync comes from having the `BrokerIssue` trait in scope. - self.issue_system_sync(leave_msg, ctx); - // Then send a join message for the new room - let join_msg = JoinRoom( - room_name.to_owned(), - self.name.clone(), - ctx.address().recipient(), - ); + let addr = "127.0.0.1:8080"; - WsChatServer::from_registry() - .send(join_msg) - .into_actor(self) - .then(|id, act, _ctx| { - if let Ok(id) = id { - act.id = id; - act.room = room_name; - } - - fut::ok(()) - }) - .spawn(ctx); - } - - fn list_rooms(&mut self, ctx: &mut ws::WebsocketContext) { - WsChatServer::from_registry() - .send(ListRooms) - .into_actor(self) - .then(|res, _, ctx| { - if let Ok(rooms) = res { - for room in rooms { - ctx.text(room); - } - } - fut::ok(()) - }) - .spawn(ctx); - } - - fn send_msg(&self, msg: &str) { - let content = format!( - "{}: {}", - self.name.clone().unwrap_or_else(|| "anon".to_string()), - msg - ); - let msg = SendMessage(self.room.clone(), self.id, content); - // issue_async comes from having the `BrokerIssue` trait in scope. - self.issue_system_async(msg); - } -} - -impl Actor for WsChatSession { - type Context = ws::WebsocketContext; - - fn started(&mut self, ctx: &mut Self::Context) { - self.join_room("Main", ctx); - } - - fn stopped(&mut self, _ctx: &mut Self::Context) { - info!( - "WsChatSession closed for {}({}) in room {}", - self.name.clone().unwrap_or_else(|| "anon".to_string()), - self.id, - self.room - ); - } -} - -impl Handler for WsChatSession { - type Result = (); - - fn handle(&mut self, msg: ChatMessage, ctx: &mut Self::Context) { - ctx.text(msg.0); - } -} - -impl StreamHandler for WsChatSession { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { - debug!("WEBSOCKET MESSAGE: {:?}", msg); - match msg { - ws::Message::Text(text) => { - let msg = text.trim(); - if msg.starts_with('/') { - let mut command = msg.splitn(2, ' '); - match command.next() { - Some("/list") => self.list_rooms(ctx), - Some("/join") => { - if let Some(room_name) = command.next() { - self.join_room(room_name, ctx); - } else { - ctx.text("!!! room name is required"); - } - } - Some("/name") => { - if let Some(name) = command.next() { - self.name = Some(name.to_owned()); - ctx.text(format!("name changed to: {}", name)); - } else { - ctx.text("!!! name is required"); - } - } - _ => ctx.text(format!("!!! unknown command: {:?}", msg)), - } - return; - } - self.send_msg(msg); - } - ws::Message::Close(_) => { - ctx.stop(); - } - _ => {} - } - } -} - -fn main() -> std::io::Result<()> { - let sys = actix::System::new("websocket-broker-example"); - simple_logger::init_with_level(log::Level::Info).unwrap(); - - HttpServer::new(move || { + let srv = HttpServer::new(move || { App::new() .service(web::resource("/ws/").to(chat_route)) .service(Files::new("/", "./static/").index_file("index.html")) }) - .bind("127.0.0.1:8080") - .unwrap() - .start(); + .bind(&addr)?; - info!("Started http server: 127.0.0.1:8080"); - sys.run() + info!("Starting http server: {}", &addr); + + srv.run().await } diff --git a/websocket-chat-broker/src/message.rs b/websocket-chat-broker/src/message.rs new file mode 100644 index 0000000..4fb4e91 --- /dev/null +++ b/websocket-chat-broker/src/message.rs @@ -0,0 +1,21 @@ +use actix::prelude::*; + +#[derive(Clone, Message)] +#[rtype(result = "()")] +pub struct ChatMessage(pub String); + +#[derive(Clone, Message)] +#[rtype(result = "usize")] +pub struct JoinRoom(pub String, pub Option, pub Recipient); + +#[derive(Clone, Message)] +#[rtype(result = "()")] +pub struct LeaveRoom(pub String, pub usize); + +#[derive(Clone, Message)] +#[rtype(result = "Vec")] +pub struct ListRooms; + +#[derive(Clone, Message)] +#[rtype(result = "()")] +pub struct SendMessage(pub String, pub usize, pub String); diff --git a/websocket-chat-broker/src/server.rs b/websocket-chat-broker/src/server.rs index 946f670..03cd252 100644 --- a/websocket-chat-broker/src/server.rs +++ b/websocket-chat-broker/src/server.rs @@ -1,29 +1,14 @@ use actix::prelude::*; use actix_broker::BrokerSubscribe; -use rand; use std::collections::HashMap; use std::mem; -#[derive(Clone, Message)] -pub struct ChatMessage(pub String); - -#[derive(Clone, Message)] -#[rtype(result = "usize")] -pub struct JoinRoom(pub String, pub Option, pub Recipient); - -#[derive(Clone, Message)] -pub struct LeaveRoom(pub String, pub usize); - -#[derive(Clone, Message)] -#[rtype(result = "Vec")] -pub struct ListRooms; - -#[derive(Clone, Message)] -pub struct SendMessage(pub String, pub usize, pub String); +use crate::message::{ChatMessage, JoinRoom, LeaveRoom, ListRooms, SendMessage}; type Client = Recipient; type Room = HashMap; + #[derive(Default)] pub struct WsChatServer { rooms: HashMap, @@ -43,6 +28,7 @@ impl WsChatServer { client: Client, ) -> usize { let mut id = id.unwrap_or_else(rand::random::); + if let Some(room) = self.rooms.get_mut(room_name) { loop { if room.contains_key(&id) { @@ -51,13 +37,17 @@ impl WsChatServer { break; } } + room.insert(id, client); return id; } + // Create a new room for the first client let mut room: Room = HashMap::new(); + room.insert(id, client); self.rooms.insert(room_name.to_owned(), room); + id } @@ -68,11 +58,13 @@ impl WsChatServer { _src: usize, ) -> Option<()> { let mut room = self.take_room(room_name)?; + for (id, client) in room.drain() { if client.do_send(ChatMessage(msg.to_owned())).is_ok() { self.add_client_to_room(room_name, Some(id), client); } } + Some(()) } } @@ -91,12 +83,14 @@ impl Handler for WsChatServer { fn handle(&mut self, msg: JoinRoom, _ctx: &mut Self::Context) -> Self::Result { let JoinRoom(room_name, client_name, client) = msg; + let id = self.add_client_to_room(&room_name, None, client); let join_msg = format!( "{} joined {}", client_name.unwrap_or_else(|| "anon".to_string()), room_name ); + self.send_chat_message(&room_name, &join_msg, id); MessageResult(id) } diff --git a/websocket-chat-broker/src/session.rs b/websocket-chat-broker/src/session.rs new file mode 100644 index 0000000..86b5783 --- /dev/null +++ b/websocket-chat-broker/src/session.rs @@ -0,0 +1,162 @@ +use log::{debug, info}; + +use actix::fut; +use actix::prelude::*; +use actix_broker::BrokerIssue; +use actix_web_actors::ws; + +use crate::message::{ChatMessage, JoinRoom, LeaveRoom, ListRooms, SendMessage}; +use crate::server::WsChatServer; + +#[derive(Default)] +pub struct WsChatSession { + id: usize, + room: String, + name: Option, +} + +impl WsChatSession { + pub fn join_room(&mut self, room_name: &str, ctx: &mut ws::WebsocketContext) { + let room_name = room_name.to_owned(); + + // First send a leave message for the current room + let leave_msg = LeaveRoom(self.room.clone(), self.id); + + // issue_sync comes from having the `BrokerIssue` trait in scope. + self.issue_system_sync(leave_msg, ctx); + + // Then send a join message for the new room + let join_msg = JoinRoom( + room_name.to_owned(), + self.name.clone(), + ctx.address().recipient(), + ); + + WsChatServer::from_registry() + .send(join_msg) + .into_actor(self) + .then(|id, act, _ctx| { + if let Ok(id) = id { + act.id = id; + act.room = room_name; + } + + fut::ready(()) + }) + .wait(ctx); + } + + pub fn list_rooms(&mut self, ctx: &mut ws::WebsocketContext) { + WsChatServer::from_registry() + .send(ListRooms) + .into_actor(self) + .then(|res, _, ctx| { + if let Ok(rooms) = res { + for room in rooms { + ctx.text(room); + } + } + + fut::ready(()) + }) + .wait(ctx); + } + + pub fn send_msg(&self, msg: &str) { + let content = format!( + "{}: {}", + self.name.clone().unwrap_or_else(|| "anon".to_string()), + msg + ); + + let msg = SendMessage(self.room.clone(), self.id, content); + + // issue_async comes from having the `BrokerIssue` trait in scope. + self.issue_system_async(msg); + } +} + +impl Actor for WsChatSession { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + self.join_room("Main", ctx); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + info!( + "WsChatSession closed for {}({}) in room {}", + self.name.clone().unwrap_or_else(|| "anon".to_string()), + self.id, + self.room + ); + } +} + +impl Handler for WsChatSession { + type Result = (); + + fn handle(&mut self, msg: ChatMessage, ctx: &mut Self::Context) { + ctx.text(msg.0); + } +} + +impl StreamHandler> for WsChatSession { + fn handle( + &mut self, + msg: Result, + ctx: &mut Self::Context, + ) { + let msg = match msg { + Err(_) => { + ctx.stop(); + return; + } + Ok(msg) => msg, + }; + + debug!("WEBSOCKET MESSAGE: {:?}", msg); + + match msg { + ws::Message::Text(text) => { + let msg = text.trim(); + + if msg.starts_with('/') { + let mut command = msg.splitn(2, ' '); + + match command.next() { + Some("/list") => self.list_rooms(ctx), + + Some("/join") => { + if let Some(room_name) = command.next() { + self.join_room(room_name, ctx); + } else { + ctx.text("!!! room name is required"); + } + } + + Some("/name") => { + if let Some(name) = command.next() { + self.name = Some(name.to_owned()); + ctx.text(format!("name changed to: {}", name)); + } else { + ctx.text("!!! name is required"); + } + } + + _ => ctx.text(format!("!!! unknown command: {:?}", msg)), + } + + return; + } + self.send_msg(msg); + } + + ws::Message::Close(_) => { + ctx.stop(); + } + + _ => {} + } + } +}