mirror of
https://github.com/actix/examples
synced 2025-06-26 17:17:42 +02:00
chore: update actix-ws to v0.3
This commit is contained in:
@ -7,7 +7,6 @@ edition = "2021"
|
||||
actix-files.workspace = true
|
||||
actix-web.workspace = true
|
||||
actix-ws.workspace = true
|
||||
|
||||
env_logger.workspace = true
|
||||
futures-util.workspace = true
|
||||
log.workspace = true
|
||||
|
@ -1,11 +1,14 @@
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{
|
||||
pin::pin,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use actix_ws::Message;
|
||||
use actix_ws::AggregatedMessage;
|
||||
use futures_util::{
|
||||
future::{select, Either},
|
||||
StreamExt as _,
|
||||
};
|
||||
use tokio::{pin, sync::mpsc, time::interval};
|
||||
use tokio::{sync::mpsc, time::interval};
|
||||
|
||||
use crate::{ChatServerHandle, ConnId};
|
||||
|
||||
@ -20,7 +23,7 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
pub async fn chat_ws(
|
||||
chat_server: ChatServerHandle,
|
||||
mut session: actix_ws::Session,
|
||||
mut msg_stream: actix_ws::MessageStream,
|
||||
msg_stream: actix_ws::MessageStream,
|
||||
) {
|
||||
log::info!("connected");
|
||||
|
||||
@ -33,18 +36,21 @@ pub async fn chat_ws(
|
||||
// unwrap: chat server is not dropped before the HTTP server
|
||||
let conn_id = chat_server.connect(conn_tx).await;
|
||||
|
||||
let msg_stream = msg_stream
|
||||
.max_frame_size(128 * 1024)
|
||||
.aggregate_continuations()
|
||||
.max_continuation_size(2 * 1024 * 1024);
|
||||
|
||||
let mut msg_stream = pin!(msg_stream);
|
||||
|
||||
let close_reason = loop {
|
||||
// most of the futures we process need to be stack-pinned to work with select()
|
||||
|
||||
let tick = interval.tick();
|
||||
pin!(tick);
|
||||
|
||||
let msg_rx = conn_rx.recv();
|
||||
pin!(msg_rx);
|
||||
let tick = pin!(interval.tick());
|
||||
let msg_rx = pin!(conn_rx.recv());
|
||||
|
||||
// TODO: nested select is pretty gross for readability on the match
|
||||
let messages = select(msg_stream.next(), msg_rx);
|
||||
pin!(messages);
|
||||
let messages = pin!(select(msg_stream.next(), msg_rx));
|
||||
|
||||
match select(messages, tick).await {
|
||||
// commands & messages received from client
|
||||
@ -52,30 +58,26 @@ pub async fn chat_ws(
|
||||
log::debug!("msg: {msg:?}");
|
||||
|
||||
match msg {
|
||||
Message::Ping(bytes) => {
|
||||
AggregatedMessage::Ping(bytes) => {
|
||||
last_heartbeat = Instant::now();
|
||||
// unwrap:
|
||||
session.pong(&bytes).await.unwrap();
|
||||
}
|
||||
|
||||
Message::Pong(_) => {
|
||||
AggregatedMessage::Pong(_) => {
|
||||
last_heartbeat = Instant::now();
|
||||
}
|
||||
|
||||
Message::Text(text) => {
|
||||
AggregatedMessage::Text(text) => {
|
||||
process_text_msg(&chat_server, &mut session, &text, conn_id, &mut name)
|
||||
.await;
|
||||
}
|
||||
|
||||
Message::Binary(_bin) => {
|
||||
AggregatedMessage::Binary(_bin) => {
|
||||
log::warn!("unexpected binary message");
|
||||
}
|
||||
|
||||
Message::Close(reason) => break reason,
|
||||
|
||||
_ => {
|
||||
break None;
|
||||
}
|
||||
AggregatedMessage::Close(reason) => break reason,
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user