From 876da8cf703e0786a2aab52877fd8430799ec178 Mon Sep 17 00:00:00 2001 From: Aravinth Manivannan Date: Mon, 30 Nov 2020 18:11:33 +0530 Subject: [PATCH] stateful websockets using actors (#384) Co-authored-by: Yuki Okushi --- Cargo.lock | 17 +++++++++++++++++ websocket-chat/README.md | 6 +++++- websocket-chat/src/main.rs | 22 +++++++++++++++++++--- websocket-chat/src/server.rs | 15 +++++++++++++-- 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7cc2cdb3..2596ef2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6274,6 +6274,23 @@ dependencies = [ "serde_json", ] +[[package]] +name = "websocket-state" +version = "2.0.0" +dependencies = [ + "actix 0.10.0", + "actix-files 0.3.0", + "actix-web 3.2.0", + "actix-web-actors 3.0.0", + "byteorder", + "bytes 0.5.6", + "env_logger 0.7.1", + "futures 0.3.6", + "rand", + "serde 1.0.116", + "serde_json", +] + [[package]] name = "websocket-tcp-example" version = "2.0.0" diff --git a/websocket-chat/README.md b/websocket-chat/README.md index b8265ea1..cbf84668 100644 --- a/websocket-chat/README.md +++ b/websocket-chat/README.md @@ -8,10 +8,11 @@ Added features: * Browser WebSocket client * Chat server runs in separate thread * Tcp listener runs in separate thread +* Application state is shared with the websocket server and a resource at `/count/` ## Server -Chat server listens for incoming tcp connections. Server can access several types of message: +1. Chat server listens for incoming tcp connections. Server can access several types of message: * `/list` - list all available rooms * `/join name` - join room, if room does not exist, create new one @@ -19,6 +20,9 @@ Chat server listens for incoming tcp connections. Server can access several type * `some message` - just string, send message to all peers in same room * client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped +2. [http://localhost:8080/count/](http://localhost:8080/count/) is a + non-websocket endpoint and will affect and display state. + To start server use command: `cargo run --bin websocket-chat-server` ## Client diff --git a/websocket-chat/src/main.rs b/websocket-chat/src/main.rs index ef4aee3d..8fcf9b07 100644 --- a/websocket-chat/src/main.rs +++ b/websocket-chat/src/main.rs @@ -1,8 +1,12 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; use std::time::{Duration, Instant}; use actix::*; use actix_files as fs; -use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder}; use actix_web_actors::ws; mod server; @@ -12,7 +16,7 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); -/// Entry point for our route +/// Entry point for our websocket route async fn chat_route( req: HttpRequest, stream: web::Payload, @@ -31,6 +35,12 @@ async fn chat_route( ) } +/// Displays and affects state +async fn get_count(count: web::Data>) -> impl Responder { + let current_count = count.fetch_add(1, Ordering::SeqCst); + format!("Visitors: {}", current_count) +} + struct WsChatSession { /// unique session id id: usize, @@ -224,12 +234,17 @@ impl WsChatSession { async fn main() -> std::io::Result<()> { env_logger::init(); + // App state + // We are keeping a count of the number of visitors + let app_state = Arc::new(AtomicUsize::new(0)); + // Start chat server actor - let server = server::ChatServer::default().start(); + let server = server::ChatServer::new(app_state.clone()).start(); // Create Http server with websocket support HttpServer::new(move || { App::new() + .data(app_state.clone()) .data(server.clone()) // redirect to websocket.html .service(web::resource("/").route(web::get().to(|| { @@ -237,6 +252,7 @@ async fn main() -> std::io::Result<()> { .header("LOCATION", "/static/websocket.html") .finish() }))) + .route("/count/", web::get().to(get_count)) // websocket .service(web::resource("/ws/").to(chat_route)) // static resources diff --git a/websocket-chat/src/server.rs b/websocket-chat/src/server.rs index 14013214..d78fd29c 100644 --- a/websocket-chat/src/server.rs +++ b/websocket-chat/src/server.rs @@ -4,6 +4,12 @@ use actix::prelude::*; use rand::{self, rngs::ThreadRng, Rng}; + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + use std::collections::{HashMap, HashSet}; /// Chat server sends this messages to session @@ -62,10 +68,11 @@ pub struct ChatServer { sessions: HashMap>, rooms: HashMap>, rng: ThreadRng, + visitor_count: Arc, } -impl Default for ChatServer { - fn default() -> ChatServer { +impl ChatServer { + pub fn new(visitor_count: Arc) -> ChatServer { // default room let mut rooms = HashMap::new(); rooms.insert("Main".to_owned(), HashSet::new()); @@ -74,6 +81,7 @@ impl Default for ChatServer { sessions: HashMap::new(), rooms, rng: rand::thread_rng(), + visitor_count, } } } @@ -122,6 +130,9 @@ impl Handler for ChatServer { .or_insert_with(HashSet::new) .insert(id); + let count = self.visitor_count.fetch_add(1, Ordering::SeqCst); + self.send_message("Main", &format!("Total visitors {}", count), 0); + // send id back id }