1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

add actorless chat example

This commit is contained in:
Rob Ede 2022-07-11 20:19:29 +01:00
parent fd17252725
commit f3ff729a69
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
9 changed files with 581 additions and 1 deletions

45
Cargo.lock generated
View File

@ -2446,6 +2446,21 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite 0.2.9",
"waker-fn",
]
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.21" version = "0.3.21"
@ -3985,6 +4000,12 @@ version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@ -6849,6 +6870,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.3.2" version = "2.3.2"
@ -7033,6 +7060,22 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "websocket-chat-actorless-example"
version = "1.0.0"
dependencies = [
"actix-files",
"actix-web",
"actix-ws",
"awc",
"derive_more",
"env_logger 0.9.0",
"futures-util",
"log",
"rand 0.8.5",
"tokio 1.19.2",
]
[[package]] [[package]]
name = "websocket-chat-broker" name = "websocket-chat-broker"
version = "1.0.0" version = "1.0.0"
@ -7056,10 +7099,10 @@ dependencies = [
"actix-ws", "actix-ws",
"awc", "awc",
"env_logger 0.9.0", "env_logger 0.9.0",
"futures-lite",
"futures-util", "futures-util",
"log", "log",
"tokio 1.19.2", "tokio 1.19.2",
"tokio-stream",
] ]
[[package]] [[package]]

View File

@ -53,6 +53,7 @@ members = [
"templating/yarte", "templating/yarte",
"unix-socket", "unix-socket",
"websockets/autobahn", "websockets/autobahn",
"websockets/chat-actorless",
"websockets/chat-broker", "websockets/chat-broker",
"websockets/chat-tcp", "websockets/chat-tcp",
"websockets/chat", "websockets/chat",

View File

@ -0,0 +1,17 @@
[package]
name = "websocket-chat-actorless-example"
version = "1.0.0"
edition = "2021"
[dependencies]
actix-files = "0.6"
actix-web = "4"
actix-ws = "0.2.5"
awc = "3"
derive_more = "0.99.5"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] }
log = "0.4"
rand = "0.8"
tokio = { version = "1.13.1", features = ["full"] }

View File

@ -0,0 +1,36 @@
# WebSocket Chat (actor-less)
> Multi-room WebSocket chat server using [`actix-ws`].
## Usage
### Server
```sh
cd websockets/echo-actorless
cargo run
# starting HTTP server at http://localhost:8080
```
### Browser Client
Go to <http://localhost:8080/> in a browser.
### CLI Client
```sh
# using `websocat` (https://github.com/vi/websocat)
websocat -v --ping-interval=2 ws://127.0.0.1:8080/ws
```
## Chat Commands
Once connected, the following slash commands can be sent:
- `/list`: list all available rooms
- `/join name`: join room, if room does not exist, create new one
- `/name name`: set session name
Sending a plain string will broadcast that message to all peers in same room.
[`actix-ws`]: https://crates.io/crates/actix-ws

View File

@ -0,0 +1,32 @@
use tokio::sync::{mpsc, oneshot};
use crate::{ConnId, Msg, RoomId};
#[derive(Debug)]
pub enum Command {
Connect {
conn_tx: mpsc::UnboundedSender<Msg>,
res_tx: oneshot::Sender<ConnId>,
},
Disconnect {
conn: ConnId,
},
List {
res_tx: oneshot::Sender<Vec<RoomId>>,
},
Join {
conn: ConnId,
room: RoomId,
res_tx: oneshot::Sender<()>,
},
Message {
room: RoomId,
msg: Msg,
skip: ConnId,
res_tx: oneshot::Sender<()>,
},
}

View File

@ -0,0 +1,193 @@
use std::time::{Duration, Instant};
use actix_web::rt;
use actix_ws::Message;
use futures_util::stream::StreamExt as _;
use tokio::{
select,
sync::{
mpsc::{self, UnboundedSender},
oneshot,
},
};
use crate::{Command, ConnId, RoomId};
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// Echo text & binary messages received from the client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn chat_ws(
server_tx: UnboundedSender<Command>,
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
) {
log::info!("connected");
let mut name = None;
let mut room = "main".to_owned();
let mut last_heartbeat = Instant::now();
let mut interval = rt::time::interval(HEARTBEAT_INTERVAL);
let (conn_tx, mut conn_rx) = mpsc::unbounded_channel();
let (res_tx, res_rx) = oneshot::channel();
server_tx
.send(Command::Connect { conn_tx, res_tx })
.unwrap();
let conn_id = res_rx.await.unwrap();
loop {
select! {
// commands & messages received from client
Some(Ok(msg)) = msg_stream.next() => {
log::debug!("msg: {msg:?}");
match msg {
Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Text(text) => {
process_text_msg(&server_tx, &mut session, &text, conn_id, &mut room, &mut name).await;
}
Message::Binary(_bin) => {
log::warn!("unexpected binary message");
}
Message::Close(reason) => {
let _ = session.close(reason).await;
break;
}
_ => {
let _ = session.close(None).await;
break;
}
}
}
// chat messages received from other room participants
Some(chat_msg) = conn_rx.recv() => {
session.text(chat_msg).await.unwrap();
}
// heartbeat
_ = interval.tick() => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!("client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting");
let _ = session.close(None).await;
break;
}
// send heartbeat ping
let _ = session.ping(b"").await;
// reset interval duration
interval.reset();
}
};
}
}
async fn process_text_msg(
server_tx: &UnboundedSender<Command>,
session: &mut actix_ws::Session,
text: &str,
conn: ConnId,
room: &mut RoomId,
name: &mut Option<String>,
) {
let msg = text.trim();
// we check for /<cmd> type of messages
if msg.starts_with('/') {
let mut cmd_args = msg.splitn(2, ' ');
match cmd_args.next().unwrap() {
"/list" => {
// Send ListRooms message to chat server and wait for
// response
log::info!("List rooms");
let (res_tx, res_rx) = oneshot::channel();
server_tx.send(Command::List { res_tx }).unwrap();
let rooms = res_rx.await.unwrap();
for room in rooms {
session.text(room).await.unwrap();
}
}
"/join" => match cmd_args.next() {
Some(room_id) => {
*room = room_id.to_owned();
let (res_tx, res_rx) = oneshot::channel();
server_tx
.send(Command::Join {
conn,
room: room.clone(),
res_tx,
})
.unwrap();
res_rx.await.unwrap();
session.text(format!("joined {room_id}")).await.unwrap();
}
None => {
session.text("!!! room name is required").await.unwrap();
}
},
"/name" => match cmd_args.next() {
Some(new_name) => {
name.replace(new_name.to_owned());
}
None => {
session.text("!!! name is required").await.unwrap();
}
},
_ => {
session
.text(format!("!!! unknown command: {msg}"))
.await
.unwrap();
}
}
} else {
let msg = match name {
Some(ref name) => format!("{name}: {msg}"),
None => msg.to_owned(),
};
let (res_tx, res_rx) = oneshot::channel();
// send message to chat server
server_tx
.send(Command::Message {
msg,
room: room.clone(),
skip: conn,
res_tx,
})
.unwrap();
res_rx.await.unwrap();
}
}

View File

@ -0,0 +1,69 @@
//! Multi-room WebSocket chat server.
//!
//! Open `http://localhost:8080/` in browser to test.
use actix_files::NamedFile;
use actix_web::{
middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
use tokio::sync::mpsc::UnboundedSender;
mod command;
mod handler;
mod server;
pub use self::command::Command;
pub use self::server::ChatServer;
/// Connection ID.
pub type ConnId = usize;
/// Room ID.
pub type RoomId = String;
/// Message sent to a room/client.
pub type Msg = String;
async fn index() -> impl Responder {
NamedFile::open_async("./static/index.html").await.unwrap()
}
/// Handshake and start WebSocket handler with heartbeats.
async fn chat_ws(
req: HttpRequest,
stream: web::Payload,
server_tx: web::Data<UnboundedSender<Command>>,
) -> Result<HttpResponse, Error> {
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
// spawn websocket handler (and don't await it) so that the response is returned immediately
rt::spawn(handler::chat_ws((**server_tx).clone(), session, msg_stream));
Ok(res)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
log::info!("starting HTTP server at http://localhost:8080");
let (chat_server, server_tx) = ChatServer::new();
rt::spawn(chat_server.run());
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(server_tx.clone()))
// WebSocket UI HTML file
.service(web::resource("/").to(index))
// websocket routes
.service(web::resource("/ws").route(web::get().to(chat_ws)))
// enable logger
.wrap(middleware::Logger::default())
})
.workers(2)
.bind(("127.0.0.1", 8080))?
.run()
.await
}

View File

@ -0,0 +1,188 @@
//! A multi-room chat server.
use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use rand::{thread_rng, Rng as _};
use tokio::sync::{mpsc, oneshot};
use crate::{Command, ConnId, Msg, RoomId};
/// A multi-room chat server.
#[derive(Debug)]
pub struct ChatServer {
/// Map of connection IDs to their message receivers.
sessions: HashMap<ConnId, mpsc::UnboundedSender<Msg>>,
/// Map of room name to participant IDs in that room.
rooms: HashMap<RoomId, HashSet<ConnId>>,
/// Tracks total number of historical connections established.
visitor_count: Arc<AtomicUsize>,
/// Command receiver.
rx: mpsc::UnboundedReceiver<Command>,
}
impl ChatServer {
pub fn new() -> (Self, mpsc::UnboundedSender<Command>) {
// create empty server
let mut rooms = HashMap::with_capacity(4);
// create default room
rooms.insert("main".to_owned(), HashSet::new());
let (tx, rx) = mpsc::unbounded_channel();
(
Self {
sessions: HashMap::new(),
rooms,
visitor_count: Arc::new(AtomicUsize::new(0)),
rx,
},
tx,
)
}
}
impl ChatServer {
/// Send message to all users in the room.
///
/// `skip_id` is used to prevent messages send by a connection also being received by it.
async fn send_message(&self, room: &str, msg: impl Into<String>, skip_id: ConnId) {
if let Some(sessions) = self.rooms.get(room) {
let msg = msg.into();
for conn_id in sessions {
if *conn_id != skip_id {
if let Some(tx) = self.sessions.get(conn_id) {
tx.send(msg.clone()).unwrap();
}
}
}
}
}
/// Handler for Connect message.
///
/// Register new session and assign unique id to this session
async fn connect(&mut self, tx: mpsc::UnboundedSender<Msg>) -> ConnId {
log::info!("Someone joined");
// notify all users in same room
self.send_message("main", "Someone joined", 0).await;
// register session with random connection ID
let id = thread_rng().gen::<usize>();
self.sessions.insert(id, tx);
// auto join session to main room
self.rooms
.entry("main".to_owned())
.or_insert_with(HashSet::new)
.insert(id);
let count = self.visitor_count.fetch_add(1, Ordering::SeqCst);
self.send_message("main", format!("Total visitors {count}"), 0)
.await;
// send id back
id
}
/// Handler for Disconnect message.
async fn disconnect(&mut self, conn_id: ConnId) {
println!("Someone disconnected");
let mut rooms: Vec<String> = Vec::new();
// remove sender
if self.sessions.remove(&conn_id).is_some() {
// remove session from all rooms
for (name, sessions) in &mut self.rooms {
if sessions.remove(&conn_id) {
rooms.push(name.to_owned());
}
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", 0).await;
}
}
/// Handler for `ListRooms` message.
fn list_rooms(&mut self) -> Vec<String> {
let mut rooms = Vec::new();
for key in self.rooms.keys() {
rooms.push(key.to_owned())
}
rooms
}
/// Join room, send disconnect message to old room send join message to new room.
async fn join_room(&mut self, conn_id: ConnId, room: String) {
let mut rooms = Vec::new();
// remove session from all rooms
for (n, sessions) in &mut self.rooms {
if sessions.remove(&conn_id) {
rooms.push(n.to_owned());
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", 0).await;
}
self.rooms
.entry(room.clone())
.or_insert_with(HashSet::new)
.insert(conn_id);
self.send_message(&room, "Someone connected", conn_id).await;
}
pub async fn run(mut self) {
loop {
match self.rx.recv().await.unwrap() {
Command::Connect { conn_tx, res_tx } => {
let conn_id = self.connect(conn_tx).await;
res_tx.send(conn_id).unwrap();
}
Command::Disconnect { conn } => {
self.disconnect(conn).await;
}
Command::List { res_tx } => {
res_tx.send(self.list_rooms()).unwrap();
}
Command::Join { conn, room, res_tx } => {
self.join_room(conn, room).await;
res_tx.send(()).unwrap();
}
Command::Message {
room,
msg,
skip,
res_tx,
} => {
self.send_message(&room, msg, skip).await;
res_tx.send(()).unwrap();
}
}
}
}
}

View File

@ -0,0 +1 @@
../../chat/static/index.html