diff --git a/Cargo.lock b/Cargo.lock index 9b93e57..47dac77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,13 +543,14 @@ dependencies = [ [[package]] name = "actix-ws" -version = "0.2.5" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3" +checksum = "a3a1fb4f9f2794b0aadaf2ba5f14a6f034c7e86957b458c506a8cb75953f2d99" dependencies = [ "actix-codec", "actix-http", "actix-web", + "bytestring", "futures-core", "tokio", ] @@ -8890,19 +8891,6 @@ dependencies = [ "rand", ] -[[package]] -name = "websocket-echo-actorless-example" -version = "1.0.0" -dependencies = [ - "actix-files", - "actix-web", - "actix-ws", - "env_logger", - "futures-util", - "log", - "tokio", -] - [[package]] name = "websocket-example" version = "1.0.0" @@ -8937,6 +8925,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "websockets-echo-actorless-example" +version = "1.0.0" +dependencies = [ + "actix-files", + "actix-web", + "actix-ws", + "env_logger", + "futures-util", + "log", + "tokio", +] + [[package]] name = "which" version = "4.4.2" diff --git a/Cargo.toml b/Cargo.toml index 4cc7040..5606129 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ actix-utils = "3" actix-web = "4.7" actix-web-actors = "4.1" actix-web-lab = "0.20" -actix-ws = "0.2.5" +actix-ws = "0.3" awc = "3.2" chrono = { version = "0.4.30", features = ["serde"] } diff --git a/websockets/chat-actorless/Cargo.toml b/websockets/chat-actorless/Cargo.toml index e4895a9..795ec39 100644 --- a/websockets/chat-actorless/Cargo.toml +++ b/websockets/chat-actorless/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" actix-files.workspace = true actix-web.workspace = true actix-ws.workspace = true - env_logger.workspace = true futures-util.workspace = true log.workspace = true diff --git a/websockets/chat-actorless/src/handler.rs b/websockets/chat-actorless/src/handler.rs index d9e7104..864e6fd 100644 --- a/websockets/chat-actorless/src/handler.rs +++ b/websockets/chat-actorless/src/handler.rs @@ -1,11 +1,14 @@ -use std::time::{Duration, Instant}; +use std::{ + pin::pin, + time::{Duration, Instant}, +}; -use actix_ws::Message; +use actix_ws::AggregatedMessage; use futures_util::{ future::{select, Either}, StreamExt as _, }; -use tokio::{pin, sync::mpsc, time::interval}; +use tokio::{sync::mpsc, time::interval}; use crate::{ChatServerHandle, ConnId}; @@ -20,7 +23,7 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); pub async fn chat_ws( chat_server: ChatServerHandle, mut session: actix_ws::Session, - mut msg_stream: actix_ws::MessageStream, + msg_stream: actix_ws::MessageStream, ) { log::info!("connected"); @@ -33,18 +36,21 @@ pub async fn chat_ws( // unwrap: chat server is not dropped before the HTTP server let conn_id = chat_server.connect(conn_tx).await; + let msg_stream = msg_stream + .max_frame_size(128 * 1024) + .aggregate_continuations() + .max_continuation_size(2 * 1024 * 1024); + + let mut msg_stream = pin!(msg_stream); + let close_reason = loop { // most of the futures we process need to be stack-pinned to work with select() - let tick = interval.tick(); - pin!(tick); - - let msg_rx = conn_rx.recv(); - pin!(msg_rx); + let tick = pin!(interval.tick()); + let msg_rx = pin!(conn_rx.recv()); // TODO: nested select is pretty gross for readability on the match - let messages = select(msg_stream.next(), msg_rx); - pin!(messages); + let messages = pin!(select(msg_stream.next(), msg_rx)); match select(messages, tick).await { // commands & messages received from client @@ -52,30 +58,26 @@ pub async fn chat_ws( log::debug!("msg: {msg:?}"); match msg { - Message::Ping(bytes) => { + AggregatedMessage::Ping(bytes) => { last_heartbeat = Instant::now(); // unwrap: session.pong(&bytes).await.unwrap(); } - Message::Pong(_) => { + AggregatedMessage::Pong(_) => { last_heartbeat = Instant::now(); } - Message::Text(text) => { + AggregatedMessage::Text(text) => { process_text_msg(&chat_server, &mut session, &text, conn_id, &mut name) .await; } - Message::Binary(_bin) => { + AggregatedMessage::Binary(_bin) => { log::warn!("unexpected binary message"); } - Message::Close(reason) => break reason, - - _ => { - break None; - } + AggregatedMessage::Close(reason) => break reason, } } diff --git a/websockets/chat/static/index.html b/websockets/chat/static/index.html index c41adb4..fd30d0e 100644 --- a/websockets/chat/static/index.html +++ b/websockets/chat/static/index.html @@ -66,7 +66,7 @@
- +
@@ -133,7 +133,7 @@ updateConnectionStatus() } - socket.onmessage = (ev) => { + socket.onmessage = ev => { log('Received: ' + ev.data, 'message') } @@ -179,7 +179,7 @@ updateConnectionStatus() }) - $form.addEventListener('submit', (ev) => { + $form.addEventListener('submit', ev => { ev.preventDefault() const text = $input.value @@ -192,7 +192,6 @@ }) updateConnectionStatus() - diff --git a/websockets/echo-actorless/Cargo.toml b/websockets/echo-actorless/Cargo.toml index 0e4b144..2edfdd9 100644 --- a/websockets/echo-actorless/Cargo.toml +++ b/websockets/echo-actorless/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "websocket-echo-actorless-example" +name = "websockets-echo-actorless-example" version = "1.0.0" edition = "2021" @@ -7,7 +7,6 @@ edition = "2021" actix-files.workspace = true actix-web.workspace = true actix-ws.workspace = true - env_logger.workspace = true futures-util.workspace = true log.workspace = true