1
0
mirror of https://github.com/actix/examples synced 2025-02-22 17:23:18 +01:00

Merge pull request #912 from actix/ractor

Ractor WebSockets
This commit is contained in:
Rob Ede 2025-02-21 13:23:56 +00:00 committed by GitHub
commit 3a237844ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 192 additions and 66 deletions

View File

@ -20,6 +20,7 @@
"pemfile", "pemfile",
"prost", "prost",
"protobuf", "protobuf",
"ractor",
"reqwest", "reqwest",
"rustls", "rustls",
"rustup", "rustup",

84
Cargo.lock generated
View File

@ -1003,7 +1003,7 @@ dependencies = [
"proc-macro-crate 1.3.1", "proc-macro-crate 1.3.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"strum", "strum 0.25.0",
"syn 2.0.93", "syn 2.0.93",
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
@ -1807,6 +1807,29 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "bon"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97493a391b4b18ee918675fb8663e53646fd09321c58b46afa04e8ce2499c869"
dependencies = [
"bon-macros",
"rustversion",
]
[[package]]
name = "bon-macros"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a2af3eac944c12cdf4423eab70d310da0a8e5851a18ffb192c0a5e3f7ae1663"
dependencies = [
"darling",
"ident_case",
"proc-macro2",
"quote",
"syn 2.0.93",
]
[[package]] [[package]]
name = "borsh" name = "borsh"
version = "1.5.3" version = "1.5.3"
@ -2534,6 +2557,20 @@ dependencies = [
"parking_lot_core", "parking_lot_core",
] ]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.6.0" version = "2.6.0"
@ -3522,7 +3559,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"dashmap", "dashmap 5.5.3",
"futures", "futures",
"futures-timer", "futures-timer",
"no-std-compat", "no-std-compat",
@ -6292,6 +6329,21 @@ dependencies = [
"r2d2", "r2d2",
] ]
[[package]]
name = "ractor"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc03eb43cae6ef79bae037e58e645664a480a1b17232e5a9cc77b37a99aa7b1f"
dependencies = [
"bon",
"dashmap 6.1.0",
"futures",
"once_cell",
"strum 0.26.3",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "radium" name = "radium"
version = "0.7.0" version = "0.7.0"
@ -7724,7 +7776,16 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
dependencies = [ dependencies = [
"strum_macros", "strum_macros 0.25.3",
]
[[package]]
name = "strum"
version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
dependencies = [
"strum_macros 0.26.4",
] ]
[[package]] [[package]]
@ -7740,6 +7801,19 @@ dependencies = [
"syn 2.0.93", "syn 2.0.93",
] ]
[[package]]
name = "strum_macros"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.93",
]
[[package]] [[package]]
name = "subprocess" name = "subprocess"
version = "0.2.9" version = "0.2.9"
@ -9168,14 +9242,14 @@ dependencies = [
name = "websocket" name = "websocket"
version = "1.0.0" version = "1.0.0"
dependencies = [ dependencies = [
"actix",
"actix-files", "actix-files",
"actix-web", "actix-web",
"actix-web-actors", "actix-ws",
"awc", "awc",
"env_logger", "env_logger",
"futures-util", "futures-util",
"log", "log",
"ractor",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
] ]

View File

@ -7,6 +7,11 @@ fmt:
cargo +nightly fmt cargo +nightly fmt
fd --type=file --hidden --extension=yml --extension=md --extension=js --exec-batch npx -y prettier --write fd --type=file --hidden --extension=yml --extension=md --extension=js --exec-batch npx -y prettier --write
# Check project.
[group("lint")]
check: clippy
fd --type=file --hidden --extension=yml --extension=md --extension=js --exec-batch npx -y prettier --check
# Run Clippy over workspace. # Run Clippy over workspace.
[group("lint")] [group("lint")]
clippy: clippy:

View File

@ -12,14 +12,14 @@ name = "websocket-client"
path = "src/client.rs" path = "src/client.rs"
[dependencies] [dependencies]
actix.workspace = true
actix-files.workspace = true actix-files.workspace = true
actix-web.workspace = true actix-web.workspace = true
actix-web-actors.workspace = true actix-ws.workspace = true
awc.workspace = true awc.workspace = true
env_logger.workspace = true env_logger.workspace = true
futures-util = { workspace = true, features = ["sink"] } futures-util = { workspace = true, features = ["sink"] }
log.workspace = true log.workspace = true
ractor = { version = "0.15", default-features = false }
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }
tokio-stream.workspace = true tokio-stream.workspace = true

View File

@ -4,10 +4,10 @@
use actix_files::NamedFile; use actix_files::NamedFile;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder}; use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
use actix_web_actors::ws; use ractor::Actor;
mod server; mod server;
use self::server::MyWebSocket; use self::server::{MyWebSocket, WsMessage};
async fn index() -> impl Responder { async fn index() -> impl Responder {
NamedFile::open_async("./static/index.html").await.unwrap() NamedFile::open_async("./static/index.html").await.unwrap()
@ -15,7 +15,19 @@ async fn index() -> impl Responder {
/// WebSocket handshake and start `MyWebSocket` actor. /// WebSocket handshake and start `MyWebSocket` actor.
async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> { async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
ws::start(MyWebSocket::new(), &req, stream) let (res, session, stream) = actix_ws::handle(&req, stream)?;
let (actor, _handle) = Actor::spawn(None, MyWebSocket, session).await.unwrap();
actix_web::rt::spawn(async move {
let mut stream = stream.aggregate_continuations();
while let Some(Ok(msg)) = stream.recv().await {
actor.send_message(WsMessage::Ws(msg)).unwrap();
}
});
Ok(res)
} }
// the actor-based WebSocket examples REQUIRE `actix_web::main` for actor support // the actor-based WebSocket examples REQUIRE `actix_web::main` for actor support

View File

@ -1,7 +1,7 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use actix::prelude::*; use actix_ws::AggregatedMessage;
use actix_web_actors::ws; use ractor::{ActorProcessingErr, ActorRef};
/// How often heartbeat pings are sent /// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
@ -9,70 +9,104 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout /// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug)]
pub(crate) enum WsMessage {
Ws(actix_ws::AggregatedMessage),
Hb,
}
/// websocket connection is long running connection, it easier /// websocket connection is long running connection, it easier
/// to handle with an actor /// to handle with an actor
pub struct MyWebSocket { pub(crate) struct MyWebSocket;
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// otherwise we drop connection.
hb: Instant,
}
impl MyWebSocket { impl MyWebSocket {
pub fn new() -> Self { async fn handle_hb(
Self { hb: Instant::now() } &self,
state: &mut (Instant, actix_ws::Session),
myself: &ActorRef<WsMessage>,
) -> Result<(), ActorProcessingErr> {
if Instant::now().duration_since(state.0) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");
let _ = state.1.clone().close(None).await;
myself.stop(None);
// don't try to send a ping
} else {
state.1.ping(b"").await?;
};
Ok(())
} }
/// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL). async fn handle_ws_msg(
/// &self,
/// also this method checks heartbeats from client msg: AggregatedMessage,
fn hb(&self, ctx: &mut <Self as Actor>::Context) { state: &mut (Instant, actix_ws::Session),
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { myself: ActorRef<WsMessage>,
// check client heartbeats ) -> Result<(), ActorProcessingErr> {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");
// stop actor
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(b"");
});
}
}
impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
/// Method is called on actor start. We start the heartbeat process here.
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
}
}
/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
// process websocket messages
println!("WS: {msg:?}"); println!("WS: {msg:?}");
match msg { match msg {
Ok(ws::Message::Ping(msg)) => { AggregatedMessage::Ping(msg) => {
self.hb = Instant::now(); state.0 = Instant::now();
ctx.pong(&msg); state.1.pong(&msg).await?;
} }
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now(); AggregatedMessage::Pong(_) => {
state.0 = Instant::now();
} }
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin), AggregatedMessage::Text(text) => {
Ok(ws::Message::Close(reason)) => { state.1.text(text).await?;
ctx.close(reason);
ctx.stop();
} }
_ => ctx.stop(),
} AggregatedMessage::Binary(bin) => {
state.1.binary(bin).await?;
}
AggregatedMessage::Close(reason) => {
let _ = state.1.clone().close(reason).await;
myself.stop(None);
}
};
Ok(())
}
}
impl ractor::Actor for MyWebSocket {
type Msg = WsMessage;
type State = (Instant, actix_ws::Session);
type Arguments = actix_ws::Session;
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
session: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
myself.send_interval(HEARTBEAT_INTERVAL, || WsMessage::Hb);
Ok((Instant::now(), session))
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WsMessage::Hb => {
self.handle_hb(state, &myself).await?;
}
WsMessage::Ws(msg) => {
self.handle_ws_msg(msg, state, myself).await?;
}
}
Ok(())
} }
} }