diff --git a/.travis.yml b/.travis.yml index d1a7723b..b8199db1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -64,4 +64,5 @@ script: cd web-cors/backend && cargo check && cd ../.. cd websocket && cargo check && cd .. cd websocket-chat && cargo check && cd .. + cd websocket-chat-broker && cargo check && cd .. cd websocket-tcp-chat && cargo check && cd .. diff --git a/Cargo.toml b/Cargo.toml index 6d64bc49..7bf46b2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,5 +31,6 @@ members = [ "web-cors/backend", "websocket", "websocket-chat", + "websocket-chat-broker", "websocket-tcp-chat", ] diff --git a/websocket-chat-broker/Cargo.toml b/websocket-chat-broker/Cargo.toml new file mode 100644 index 00000000..3b61d476 --- /dev/null +++ b/websocket-chat-broker/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "websocket-broker-example" +version = "0.1.0" +authors = ["Chris Ricketts "] +workspace = "../" + +[[bin]] +name = "server" +path = "src/main.rs" + +[dependencies] +rand = "*" +futures = "0.1.24" +actix = "0.7" +actix-web = "0.7" +actix-broker = "0.1.4" +log = "0.4.5" +simple_logger = "0.5.0" diff --git a/websocket-chat-broker/README.md b/websocket-chat-broker/README.md new file mode 100644 index 00000000..6b5ac57a --- /dev/null +++ b/websocket-chat-broker/README.md @@ -0,0 +1,29 @@ +# Websocket chat broker example + +This is a different implementation of the +[websocket chat example](https://github.com/actix/examples/tree/master/websocket-chat) + +Differences: + +* Chat Server Actor is a System Service that runs in the same thread as the HttpServer/WS Listener. +* The [actix-broker](https://github.com/Chris-Ricketts/actix-broker) crate is used to facilitate the sending of some messages between the Chat Session and Server Actors where the session does not require a response. +* The Client is not required to send Ping messages. The Chat Server Actor auto-clears dead sessions. + +Possible Improvements: + +* Could the Chat Server Actor be simultaneously a System Service (accessible to the Chat Session via the System Registry) and also run in a seperate thread? + +## Server + +Chat server listens for incoming tcp connections. Server can access several types of message: + +* `/list` - list all available rooms +* `/join name` - join room, if room does not exist, create new one +* `/name name` - set session name +* `some message` - just string, send message to all peers in same room + +To start server use command: `cargo run` + +## WebSocket Browser Client + +Open url: [http://localhost:8080/](http://localhost:8080/) diff --git a/websocket-chat-broker/client.py b/websocket-chat-broker/client.py new file mode 100755 index 00000000..8a1bd9ae --- /dev/null +++ b/websocket-chat-broker/client.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +"""websocket cmd client for wssrv.py example.""" +import argparse +import asyncio +import signal +import sys + +import aiohttp + + +def start_client(loop, url): + name = input('Please enter your name: ') + + # send request + ws = yield from aiohttp.ClientSession().ws_connect(url, autoclose=False, autoping=False) + + # input reader + def stdin_callback(): + line = sys.stdin.buffer.readline().decode('utf-8') + if not line: + loop.stop() + else: + ws.send_str(name + ': ' + line) + loop.add_reader(sys.stdin.fileno(), stdin_callback) + + @asyncio.coroutine + def dispatch(): + while True: + msg = yield from ws.receive() + + if msg.type == aiohttp.WSMsgType.TEXT: + print('Text: ', msg.data.strip()) + elif msg.type == aiohttp.WSMsgType.BINARY: + print('Binary: ', msg.data) + elif msg.type == aiohttp.WSMsgType.PING: + ws.pong() + elif msg.type == aiohttp.WSMsgType.PONG: + print('Pong received') + else: + if msg.type == aiohttp.WSMsgType.CLOSE: + yield from ws.close() + elif msg.type == aiohttp.WSMsgType.ERROR: + print('Error during receive %s' % ws.exception()) + elif msg.type == aiohttp.WSMsgType.CLOSED: + pass + + break + + yield from dispatch() + + +ARGS = argparse.ArgumentParser( + description="websocket console client for wssrv.py example.") +ARGS.add_argument( + '--host', action="store", dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action="store", dest='port', + default=8080, type=int, help='Port number') + +if __name__ == '__main__': + args = ARGS.parse_args() + if ':' in args.host: + args.host, port = args.host.split(':', 1) + args.port = int(port) + + url = 'http://{}:{}/ws/'.format(args.host, args.port) + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, loop.stop) + asyncio.Task(start_client(loop, url)) + loop.run_forever() diff --git a/websocket-chat-broker/src/main.rs b/websocket-chat-broker/src/main.rs new file mode 100644 index 00000000..24ae4929 --- /dev/null +++ b/websocket-chat-broker/src/main.rs @@ -0,0 +1,167 @@ +#[macro_use] +extern crate actix; +extern crate actix_broker; +extern crate actix_web; +extern crate futures; +extern crate rand; +#[macro_use] +extern crate log; +extern crate simple_logger; + +use actix::fut; +use actix::prelude::*; +use actix_broker::BrokerIssue; +use actix_web::server::HttpServer; +use actix_web::{fs, ws, App, Error, HttpRequest, HttpResponse}; + +mod server; +use server::*; + +fn chat_route(req: &HttpRequest<()>) -> Result { + ws::start(req, WsChatSession::default()) +} + +#[derive(Default)] +struct WsChatSession { + id: usize, + room: String, + name: Option, +} + +impl WsChatSession { + fn join_room(&mut self, room_name: &str, ctx: &mut ws::WebsocketContext) { + let room_name = room_name.to_owned(); + // First send a leave message for the current room + let leave_msg = LeaveRoom(self.room.clone(), self.id); + // issue_sync comes from having the `BrokerIssue` trait in scope. + self.issue_sync(leave_msg, ctx); + // Then send a join message for the new room + let join_msg = JoinRoom( + room_name.to_owned(), + self.name.clone(), + ctx.address().recipient(), + ); + + WsChatServer::from_registry() + .send(join_msg) + .into_actor(self) + .then(|id, act, _ctx| { + if let Ok(id) = id { + act.id = id; + act.room = room_name; + } + + fut::ok(()) + }).spawn(ctx); + } + + fn list_rooms(&mut self, ctx: &mut ws::WebsocketContext) { + WsChatServer::from_registry() + .send(ListRooms) + .into_actor(self) + .then(|res, _, ctx| { + if let Ok(rooms) = res { + for room in rooms { + ctx.text(room); + } + } + fut::ok(()) + }).spawn(ctx); + } + + fn send_msg(&self, msg: &str) { + let content = format!( + "{}: {}", + self.name.clone().unwrap_or("anon".to_string()), + msg + ); + let msg = SendMessage(self.room.clone(), self.id, content); + // issue_async comes from having the `BrokerIssue` trait in scope. + self.issue_async(msg); + } +} + +impl Actor for WsChatSession { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + self.join_room("Main", ctx); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + info!( + "WsChatSession closed for {}({}) in room {}", + self.name.clone().unwrap_or("anon".to_string()), + self.id, + self.room + ); + } +} + +impl Handler for WsChatSession { + type Result = (); + + fn handle(&mut self, msg: ChatMessage, ctx: &mut Self::Context) { + ctx.text(msg.0); + } +} + +impl StreamHandler for WsChatSession { + fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { + debug!("WEBSOCKET MESSAGE: {:?}", msg); + match msg { + ws::Message::Text(text) => { + let msg = text.trim(); + if msg.starts_with('/') { + let mut command = msg.splitn(2, ' '); + match command.next() { + Some("/list") => self.list_rooms(ctx), + Some("/join") => { + if let Some(room_name) = command.next() { + self.join_room(room_name, ctx); + } else { + ctx.text("!!! room name is required"); + } + } + Some("/name") => { + if let Some(name) = command.next() { + self.name = Some(name.to_owned()); + ctx.text(format!("name changed to: {}", name)); + } else { + ctx.text("!!! name is required"); + } + } + _ => ctx.text(format!("!!! unknown command: {:?}", msg)), + } + return; + } + self.send_msg(msg); + } + ws::Message::Close(_) => { + ctx.stop(); + } + _ => {} + } + } +} + +fn main() { + let sys = actix::System::new("websocket-broker-example"); + simple_logger::init_with_level(log::Level::Info).unwrap(); + + HttpServer::new(move || { + App::new() + .resource("/ws/", |r| r.route().f(chat_route)) + .handler( + "/", + fs::StaticFiles::new("./static/") + .unwrap() + .index_file("index.html"), + ) + }).bind("127.0.0.1:8080") + .unwrap() + .start(); + + info!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/websocket-chat-broker/src/server.rs b/websocket-chat-broker/src/server.rs new file mode 100644 index 00000000..87a6052b --- /dev/null +++ b/websocket-chat-broker/src/server.rs @@ -0,0 +1,133 @@ +use actix::prelude::*; +use actix_broker::BrokerSubscribe; +use rand; + +use std::collections::HashMap; +use std::mem; + +#[derive(Clone, Message)] +pub struct ChatMessage(pub String); + +#[derive(Clone, Message)] +#[rtype(result = "usize")] +pub struct JoinRoom(pub String, pub Option, pub Recipient); + +#[derive(Clone, Message)] +pub struct LeaveRoom(pub String, pub usize); + +#[derive(Clone, Message)] +#[rtype(result = "Vec")] +pub struct ListRooms; + +#[derive(Clone, Message)] +pub struct SendMessage(pub String, pub usize, pub String); + +type Client = Recipient; +type Room = HashMap; +#[derive(Default)] +pub struct WsChatServer { + rooms: HashMap, +} + +impl WsChatServer { + fn take_room(&mut self, room_name: &str) -> Option { + let room = self.rooms.get_mut(room_name)?; + let room = mem::replace(room, HashMap::new()); + Some(room) + } + + fn add_client_to_room( + &mut self, + room_name: &str, + id: Option, + client: Client, + ) -> usize { + let mut id = id.unwrap_or_else(|| rand::random::()); + if let Some(room) = self.rooms.get_mut(room_name) { + loop { + if room.contains_key(&id) { + id = rand::random::(); + } else { + break; + } + } + room.insert(id, client); + return id; + } + // Create a new room for the first client + let mut room: Room = HashMap::new(); + room.insert(id, client); + self.rooms.insert(room_name.to_owned(), room); + id + } + + fn send_chat_message( + &mut self, + room_name: &str, + msg: &str, + _src: usize, + ) -> Option<()> { + let mut room = self.take_room(room_name)?; + for (id, client) in room.drain() { + if client.do_send(ChatMessage(msg.to_owned())).is_ok() { + self.add_client_to_room(room_name, Some(id), client); + } + } + Some(()) + } +} + +impl Actor for WsChatServer { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.subscribe_async::(ctx); + self.subscribe_async::(ctx); + } +} + +impl Handler for WsChatServer { + type Result = MessageResult; + + fn handle(&mut self, msg: JoinRoom, _ctx: &mut Self::Context) -> Self::Result { + let JoinRoom(room_name, client_name, client) = msg; + let id = self.add_client_to_room(&room_name, None, client); + let join_msg = format!( + "{} joined {}", + client_name.unwrap_or("anon".to_string()), + room_name + ); + self.send_chat_message(&room_name, &join_msg, id); + MessageResult(id) + } +} + +impl Handler for WsChatServer { + type Result = (); + + fn handle(&mut self, msg: LeaveRoom, _ctx: &mut Self::Context) { + if let Some(room) = self.rooms.get_mut(&msg.0) { + room.remove(&msg.1); + } + } +} + +impl Handler for WsChatServer { + type Result = MessageResult; + + fn handle(&mut self, _: ListRooms, _ctx: &mut Self::Context) -> Self::Result { + MessageResult(self.rooms.keys().cloned().collect()) + } +} + +impl Handler for WsChatServer { + type Result = (); + + fn handle(&mut self, msg: SendMessage, _ctx: &mut Self::Context) { + let SendMessage(room_name, id, msg) = msg; + self.send_chat_message(&room_name, &msg, id); + } +} + +impl SystemService for WsChatServer {} +impl Supervised for WsChatServer {} diff --git a/websocket-chat-broker/static/index.html b/websocket-chat-broker/static/index.html new file mode 100644 index 00000000..e59e13f1 --- /dev/null +++ b/websocket-chat-broker/static/index.html @@ -0,0 +1,90 @@ + + + + + + + + +

Chat!

+
+  | Status: + disconnected +
+
+
+
+ + +
+ +