From b8a144960506d63b921652aa29275daa64a67612 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 12 Jul 2022 01:08:15 +0100 Subject: [PATCH] improve actorless echo example --- Cargo.lock | 28 ---- websockets/chat-actorless/Cargo.toml | 4 +- websockets/chat-actorless/src/main.rs | 3 +- websockets/chat-actorless/src/server.rs | 2 +- websockets/echo-actorless/Cargo.toml | 5 +- websockets/echo-actorless/src/handler.rs | 158 ++++++++++++++--------- websockets/echo-actorless/src/main.rs | 3 +- 7 files changed, 104 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff0d0739..421965b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2446,21 +2446,6 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" -[[package]] -name = "futures-lite" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite 0.2.9", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.21" @@ -4000,12 +3985,6 @@ version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" -[[package]] -name = "parking" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" - [[package]] name = "parking_lot" version = "0.11.2" @@ -6870,12 +6849,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.3.2" @@ -7099,7 +7072,6 @@ dependencies = [ "actix-ws", "awc", "env_logger 0.9.0", - "futures-lite", "futures-util", "log", "tokio 1.19.2", diff --git a/websockets/chat-actorless/Cargo.toml b/websockets/chat-actorless/Cargo.toml index 28f22f56..36fdacbf 100644 --- a/websockets/chat-actorless/Cargo.toml +++ b/websockets/chat-actorless/Cargo.toml @@ -11,7 +11,7 @@ awc = "3" derive_more = "0.99.5" env_logger = "0.9" -futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] } +futures-util = { version = "0.3.17", default-features = false, features = ["std", "sink"] } log = "0.4" rand = "0.8" -tokio = { version = "1.13.1", features = ["full"] } +tokio = { version = "1.13.1", features = ["rt", "macros", "sync", "time"] } diff --git a/websockets/chat-actorless/src/main.rs b/websockets/chat-actorless/src/main.rs index 345eadd8..e427b156 100644 --- a/websockets/chat-actorless/src/main.rs +++ b/websockets/chat-actorless/src/main.rs @@ -42,7 +42,8 @@ async fn chat_ws( Ok(res) } -#[actix_web::main] +// note that the `actix` based WebSocket handling would NOT work under `tokio::main` +#[tokio::main(flavor = "current_thread")] async fn main() -> std::io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); diff --git a/websockets/chat-actorless/src/server.rs b/websockets/chat-actorless/src/server.rs index 10a9e350..c9670f24 100644 --- a/websockets/chat-actorless/src/server.rs +++ b/websockets/chat-actorless/src/server.rs @@ -9,7 +9,7 @@ use std::{ }; use rand::{thread_rng, Rng as _}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use crate::{Command, ConnId, Msg, RoomId}; diff --git a/websockets/echo-actorless/Cargo.toml b/websockets/echo-actorless/Cargo.toml index 1ef89f1d..4da163b1 100644 --- a/websockets/echo-actorless/Cargo.toml +++ b/websockets/echo-actorless/Cargo.toml @@ -10,7 +10,6 @@ actix-ws = "0.2.5" awc = "3" env_logger = "0.9" -futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] } -futures-lite = "1.3" +futures-util = { version = "0.3.17", default-features = false, features = ["std"] } log = "0.4" -tokio = { version = "1.13.1", features = ["full"] } +tokio = { version = "1.13.1", features = ["rt", "macros", "sync", "time"] } diff --git a/websockets/echo-actorless/src/handler.rs b/websockets/echo-actorless/src/handler.rs index ea4c5e90..6eac3e63 100644 --- a/websockets/echo-actorless/src/handler.rs +++ b/websockets/echo-actorless/src/handler.rs @@ -1,14 +1,18 @@ use std::time::{Duration, Instant}; -use actix_web::rt; use actix_ws::Message; -use futures_lite::future; -use futures_util::{future::Either, stream::StreamExt as _, FutureExt as _}; +use futures_util::{ + future::{self, Either}, + StreamExt as _, +}; +use tokio::{pin, time::interval}; -/// How often heartbeat pings are sent +/// How often heartbeat pings are sent. +/// +/// Should be half (or less) of the acceptable client timeout. const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); -/// How long before lack of client response causes a timeout +/// How long before lack of client response causes a timeout. const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// Echo text & binary messages received from the client, respond to ping messages, and monitor @@ -20,28 +24,22 @@ pub async fn echo_heartbeat_ws( log::info!("connected"); let mut last_heartbeat = Instant::now(); - let mut interval = rt::time::interval(HEARTBEAT_INTERVAL); + let mut interval = interval(HEARTBEAT_INTERVAL); - loop { - match future::or( - msg_stream.next().map(Either::Left), - interval.tick().map(Either::Right), - ) - .await - { - Either::Left(Some(Ok(msg))) => { + let reason = loop { + // create "next client timeout check" future + let tick = interval.tick(); + // required for select() + pin!(tick); + + // waits for either `msg_stream` to receive a message from the client or the heartbeat + // interval timer to tick, yielding the value of whichever one is ready first + match future::select(msg_stream.next(), tick).await { + // received message from WebSocket client + Either::Left((Some(Ok(msg)), _)) => { log::debug!("msg: {msg:?}"); match msg { - Message::Ping(bytes) => { - last_heartbeat = Instant::now(); - let _ = session.pong(&bytes).await; - } - - Message::Pong(_) => { - last_heartbeat = Instant::now(); - } - Message::Text(text) => { session.text(text).await.unwrap(); } @@ -51,37 +49,56 @@ pub async fn echo_heartbeat_ws( } Message::Close(reason) => { - let _ = session.close(reason).await; - break; + break reason; } - _ => { - let _ = session.close(None).await; - break; + Message::Ping(bytes) => { + last_heartbeat = Instant::now(); + let _ = session.pong(&bytes).await; } - } + + Message::Pong(_) => { + last_heartbeat = Instant::now(); + } + + Message::Continuation(_) => { + log::warn!("no support for continuation frames"); + } + + // no-op; ignore + Message::Nop => {} + }; } - Either::Left(_) => {} + // client WebSocket stream error + Either::Left((Some(Err(err)), _)) => { + log::error!("{}", err); + } - Either::Right(_) => { + // client WebSocket stream ended + Either::Left((None, _)) => break None, + + // heartbeat interval ticked + Either::Right((_inst, _)) => { // if no heartbeat ping/pong received recently, close the connection if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { log::info!( "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" ); - let _ = session.close(None).await; - break; + + break None; } // send heartbeat ping let _ = session.ping(b"").await; - - // reset interval duration - interval.reset(); } - }; - } + } + }; + + // attempt to close connection gracefully + let _ = session.close(reason).await; + + log::info!("disconnected"); } /// Echo text & binary messages received from the client and respond to ping messages. @@ -89,36 +106,51 @@ pub async fn echo_heartbeat_ws( /// This example is just for demonstration of simplicity. In reality, you likely want to include /// some handling of heartbeats for connection health tracking to free up server resources when /// connections die or network issues arise. +/// +/// See [`echo_heartbeat_ws`] for a more realistic implementation. pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream) { log::info!("connected"); - while let Some(Ok(msg)) = msg_stream.next().await { - log::debug!("msg: {msg:?}"); + let close_reason = loop { + match msg_stream.next().await { + Some(Ok(msg)) => { + log::debug!("msg: {msg:?}"); - match msg { - Message::Ping(bytes) => { - let _ = session.pong(&bytes).await; + match msg { + Message::Text(text) => { + session.text(text).await.unwrap(); + } + + Message::Binary(bin) => { + session.binary(bin).await.unwrap(); + } + + Message::Close(reason) => { + break reason; + } + + Message::Ping(bytes) => { + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => {} + + Message::Continuation(_) => { + log::warn!("no support for continuation frames"); + } + + // no-op; ignore + Message::Nop => {} + }; } - Message::Pong(_) => {} - - Message::Text(text) => { - session.text(text).await.unwrap(); - } - - Message::Binary(bin) => { - session.binary(bin).await.unwrap(); - } - - Message::Close(reason) => { - let _ = session.close(reason).await; - break; - } - - _ => { - let _ = session.close(None).await; - break; - } + // error or end of stream + _ => break None, } - } + }; + + // attempt to close connection gracefully + let _ = session.close(close_reason).await; + + log::info!("disconnected"); } diff --git a/websockets/echo-actorless/src/main.rs b/websockets/echo-actorless/src/main.rs index fcb67604..034aca8a 100644 --- a/websockets/echo-actorless/src/main.rs +++ b/websockets/echo-actorless/src/main.rs @@ -37,7 +37,8 @@ async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result std::io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));