1
0
mirror of https://github.com/actix/examples synced 2024-11-23 22:41:07 +01:00

stateful websockets using actors (#384)

Co-authored-by: Yuki Okushi <huyuumi.dev@gmail.com>
This commit is contained in:
Aravinth Manivannan 2020-11-30 18:11:33 +05:30 committed by GitHub
parent c6f9ce071a
commit 876da8cf70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 6 deletions

17
Cargo.lock generated
View File

@ -6274,6 +6274,23 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "websocket-state"
version = "2.0.0"
dependencies = [
"actix 0.10.0",
"actix-files 0.3.0",
"actix-web 3.2.0",
"actix-web-actors 3.0.0",
"byteorder",
"bytes 0.5.6",
"env_logger 0.7.1",
"futures 0.3.6",
"rand",
"serde 1.0.116",
"serde_json",
]
[[package]] [[package]]
name = "websocket-tcp-example" name = "websocket-tcp-example"
version = "2.0.0" version = "2.0.0"

View File

@ -8,10 +8,11 @@ Added features:
* Browser WebSocket client * Browser WebSocket client
* Chat server runs in separate thread * Chat server runs in separate thread
* Tcp listener runs in separate thread * Tcp listener runs in separate thread
* Application state is shared with the websocket server and a resource at `/count/`
## Server ## Server
Chat server listens for incoming tcp connections. Server can access several types of message: 1. Chat server listens for incoming tcp connections. Server can access several types of message:
* `/list` - list all available rooms * `/list` - list all available rooms
* `/join name` - join room, if room does not exist, create new one * `/join name` - join room, if room does not exist, create new one
@ -19,6 +20,9 @@ Chat server listens for incoming tcp connections. Server can access several type
* `some message` - just string, send message to all peers in same room * `some message` - just string, send message to all peers in same room
* client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped * client has to send heartbeat `Ping` messages, if server does not receive a heartbeat message for 10 seconds connection gets dropped
2. [http://localhost:8080/count/](http://localhost:8080/count/) is a
non-websocket endpoint and will affect and display state.
To start server use command: `cargo run --bin websocket-chat-server` To start server use command: `cargo run --bin websocket-chat-server`
## Client ## Client

View File

@ -1,8 +1,12 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use actix::*; use actix::*;
use actix_files as fs; use actix_files as fs;
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
use actix_web_actors::ws; use actix_web_actors::ws;
mod server; mod server;
@ -12,7 +16,7 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout /// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// Entry point for our route /// Entry point for our websocket route
async fn chat_route( async fn chat_route(
req: HttpRequest, req: HttpRequest,
stream: web::Payload, stream: web::Payload,
@ -31,6 +35,12 @@ async fn chat_route(
) )
} }
/// Displays and affects state
async fn get_count(count: web::Data<Arc<AtomicUsize>>) -> impl Responder {
let current_count = count.fetch_add(1, Ordering::SeqCst);
format!("Visitors: {}", current_count)
}
struct WsChatSession { struct WsChatSession {
/// unique session id /// unique session id
id: usize, id: usize,
@ -224,12 +234,17 @@ impl WsChatSession {
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
env_logger::init(); env_logger::init();
// App state
// We are keeping a count of the number of visitors
let app_state = Arc::new(AtomicUsize::new(0));
// Start chat server actor // Start chat server actor
let server = server::ChatServer::default().start(); let server = server::ChatServer::new(app_state.clone()).start();
// Create Http server with websocket support // Create Http server with websocket support
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.data(app_state.clone())
.data(server.clone()) .data(server.clone())
// redirect to websocket.html // redirect to websocket.html
.service(web::resource("/").route(web::get().to(|| { .service(web::resource("/").route(web::get().to(|| {
@ -237,6 +252,7 @@ async fn main() -> std::io::Result<()> {
.header("LOCATION", "/static/websocket.html") .header("LOCATION", "/static/websocket.html")
.finish() .finish()
}))) })))
.route("/count/", web::get().to(get_count))
// websocket // websocket
.service(web::resource("/ws/").to(chat_route)) .service(web::resource("/ws/").to(chat_route))
// static resources // static resources

View File

@ -4,6 +4,12 @@
use actix::prelude::*; use actix::prelude::*;
use rand::{self, rngs::ThreadRng, Rng}; use rand::{self, rngs::ThreadRng, Rng};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
/// Chat server sends this messages to session /// Chat server sends this messages to session
@ -62,10 +68,11 @@ pub struct ChatServer {
sessions: HashMap<usize, Recipient<Message>>, sessions: HashMap<usize, Recipient<Message>>,
rooms: HashMap<String, HashSet<usize>>, rooms: HashMap<String, HashSet<usize>>,
rng: ThreadRng, rng: ThreadRng,
visitor_count: Arc<AtomicUsize>,
} }
impl Default for ChatServer { impl ChatServer {
fn default() -> ChatServer { pub fn new(visitor_count: Arc<AtomicUsize>) -> ChatServer {
// default room // default room
let mut rooms = HashMap::new(); let mut rooms = HashMap::new();
rooms.insert("Main".to_owned(), HashSet::new()); rooms.insert("Main".to_owned(), HashSet::new());
@ -74,6 +81,7 @@ impl Default for ChatServer {
sessions: HashMap::new(), sessions: HashMap::new(),
rooms, rooms,
rng: rand::thread_rng(), rng: rand::thread_rng(),
visitor_count,
} }
} }
} }
@ -122,6 +130,9 @@ impl Handler<Connect> for ChatServer {
.or_insert_with(HashSet::new) .or_insert_with(HashSet::new)
.insert(id); .insert(id);
let count = self.visitor_count.fetch_add(1, Ordering::SeqCst);
self.send_message("Main", &format!("Total visitors {}", count), 0);
// send id back // send id back
id id
} }