1
0
mirror of https://github.com/actix/examples synced 2024-11-23 22:41:07 +01:00

improve actorless echo example

This commit is contained in:
Rob Ede 2022-07-12 01:08:15 +01:00
parent f3ff729a69
commit b8a1449605
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
7 changed files with 104 additions and 99 deletions

28
Cargo.lock generated
View File

@ -2446,21 +2446,6 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite 0.2.9",
"waker-fn",
]
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.21" version = "0.3.21"
@ -4000,12 +3985,6 @@ version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.11.2" version = "0.11.2"
@ -6870,12 +6849,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.3.2" version = "2.3.2"
@ -7099,7 +7072,6 @@ dependencies = [
"actix-ws", "actix-ws",
"awc", "awc",
"env_logger 0.9.0", "env_logger 0.9.0",
"futures-lite",
"futures-util", "futures-util",
"log", "log",
"tokio 1.19.2", "tokio 1.19.2",

View File

@ -11,7 +11,7 @@ awc = "3"
derive_more = "0.99.5" derive_more = "0.99.5"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] } futures-util = { version = "0.3.17", default-features = false, features = ["std", "sink"] }
log = "0.4" log = "0.4"
rand = "0.8" rand = "0.8"
tokio = { version = "1.13.1", features = ["full"] } tokio = { version = "1.13.1", features = ["rt", "macros", "sync", "time"] }

View File

@ -42,7 +42,8 @@ async fn chat_ws(
Ok(res) Ok(res)
} }
#[actix_web::main] // note that the `actix` based WebSocket handling would NOT work under `tokio::main`
#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

View File

@ -9,7 +9,7 @@ use std::{
}; };
use rand::{thread_rng, Rng as _}; use rand::{thread_rng, Rng as _};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use crate::{Command, ConnId, Msg, RoomId}; use crate::{Command, ConnId, Msg, RoomId};

View File

@ -10,7 +10,6 @@ actix-ws = "0.2.5"
awc = "3" awc = "3"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] } futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
futures-lite = "1.3"
log = "0.4" log = "0.4"
tokio = { version = "1.13.1", features = ["full"] } tokio = { version = "1.13.1", features = ["rt", "macros", "sync", "time"] }

View File

@ -1,14 +1,18 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use actix_web::rt;
use actix_ws::Message; use actix_ws::Message;
use futures_lite::future; use futures_util::{
use futures_util::{future::Either, stream::StreamExt as _, FutureExt as _}; future::{self, Either},
StreamExt as _,
};
use tokio::{pin, time::interval};
/// How often heartbeat pings are sent /// How often heartbeat pings are sent.
///
/// Should be half (or less) of the acceptable client timeout.
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout /// How long before lack of client response causes a timeout.
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// Echo text & binary messages received from the client, respond to ping messages, and monitor /// Echo text & binary messages received from the client, respond to ping messages, and monitor
@ -20,28 +24,22 @@ pub async fn echo_heartbeat_ws(
log::info!("connected"); log::info!("connected");
let mut last_heartbeat = Instant::now(); let mut last_heartbeat = Instant::now();
let mut interval = rt::time::interval(HEARTBEAT_INTERVAL); let mut interval = interval(HEARTBEAT_INTERVAL);
loop { let reason = loop {
match future::or( // create "next client timeout check" future
msg_stream.next().map(Either::Left), let tick = interval.tick();
interval.tick().map(Either::Right), // required for select()
) pin!(tick);
.await
{ // waits for either `msg_stream` to receive a message from the client or the heartbeat
Either::Left(Some(Ok(msg))) => { // interval timer to tick, yielding the value of whichever one is ready first
match future::select(msg_stream.next(), tick).await {
// received message from WebSocket client
Either::Left((Some(Ok(msg)), _)) => {
log::debug!("msg: {msg:?}"); log::debug!("msg: {msg:?}");
match msg { match msg {
Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Text(text) => { Message::Text(text) => {
session.text(text).await.unwrap(); session.text(text).await.unwrap();
} }
@ -51,37 +49,56 @@ pub async fn echo_heartbeat_ws(
} }
Message::Close(reason) => { Message::Close(reason) => {
let _ = session.close(reason).await; break reason;
break;
} }
_ => { Message::Ping(bytes) => {
let _ = session.close(None).await; last_heartbeat = Instant::now();
break; let _ = session.pong(&bytes).await;
} }
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Continuation(_) => {
log::warn!("no support for continuation frames");
}
// no-op; ignore
Message::Nop => {}
};
} }
Either::Left(_) => {} // client WebSocket stream error
Either::Left((Some(Err(err)), _)) => {
log::error!("{}", err);
}
Either::Right(_) => { // client WebSocket stream ended
Either::Left((None, _)) => break None,
// heartbeat interval ticked
Either::Right((_inst, _)) => {
// if no heartbeat ping/pong received recently, close the connection // if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!( log::info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
); );
let _ = session.close(None).await;
break; break None;
} }
// send heartbeat ping // send heartbeat ping
let _ = session.ping(b"").await; let _ = session.ping(b"").await;
// reset interval duration
interval.reset();
} }
}; }
} };
// attempt to close connection gracefully
let _ = session.close(reason).await;
log::info!("disconnected");
} }
/// Echo text & binary messages received from the client and respond to ping messages. /// Echo text & binary messages received from the client and respond to ping messages.
@ -89,36 +106,51 @@ pub async fn echo_heartbeat_ws(
/// This example is just for demonstration of simplicity. In reality, you likely want to include /// This example is just for demonstration of simplicity. In reality, you likely want to include
/// some handling of heartbeats for connection health tracking to free up server resources when /// some handling of heartbeats for connection health tracking to free up server resources when
/// connections die or network issues arise. /// connections die or network issues arise.
///
/// See [`echo_heartbeat_ws`] for a more realistic implementation.
pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream) { pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream) {
log::info!("connected"); log::info!("connected");
while let Some(Ok(msg)) = msg_stream.next().await { let close_reason = loop {
log::debug!("msg: {msg:?}"); match msg_stream.next().await {
Some(Ok(msg)) => {
log::debug!("msg: {msg:?}");
match msg { match msg {
Message::Ping(bytes) => { Message::Text(text) => {
let _ = session.pong(&bytes).await; session.text(text).await.unwrap();
}
Message::Binary(bin) => {
session.binary(bin).await.unwrap();
}
Message::Close(reason) => {
break reason;
}
Message::Ping(bytes) => {
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {}
Message::Continuation(_) => {
log::warn!("no support for continuation frames");
}
// no-op; ignore
Message::Nop => {}
};
} }
Message::Pong(_) => {} // error or end of stream
_ => break None,
Message::Text(text) => {
session.text(text).await.unwrap();
}
Message::Binary(bin) => {
session.binary(bin).await.unwrap();
}
Message::Close(reason) => {
let _ = session.close(reason).await;
break;
}
_ => {
let _ = session.close(None).await;
break;
}
} }
} };
// attempt to close connection gracefully
let _ = session.close(close_reason).await;
log::info!("disconnected");
} }

View File

@ -37,7 +37,8 @@ async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse,
Ok(res) Ok(res)
} }
#[actix_web::main] // note that the `actix` based WebSocket handling would NOT work under `tokio::main`
#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));