mirror of
https://github.com/actix/examples
synced 2024-11-23 14:31:07 +01:00
refactor: use ractor
This commit is contained in:
parent
2c68b99c0b
commit
f167545ff3
44
Cargo.lock
generated
44
Cargo.lock
generated
@ -1008,7 +1008,7 @@ dependencies = [
|
|||||||
"proc-macro-crate 1.3.1",
|
"proc-macro-crate 1.3.1",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"strum",
|
"strum 0.25.0",
|
||||||
"syn 2.0.77",
|
"syn 2.0.77",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
]
|
]
|
||||||
@ -6070,6 +6070,20 @@ dependencies = [
|
|||||||
"r2d2",
|
"r2d2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ractor"
|
||||||
|
version = "0.10.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "11e9a53fcabb680bb70a3ce495e744676ee7e1800a188e01a68d18916a1b48e1"
|
||||||
|
dependencies = [
|
||||||
|
"dashmap",
|
||||||
|
"futures",
|
||||||
|
"once_cell",
|
||||||
|
"strum 0.26.3",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "radium"
|
name = "radium"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
@ -7484,7 +7498,16 @@ version = "0.25.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
|
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"strum_macros",
|
"strum_macros 0.25.3",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "strum"
|
||||||
|
version = "0.26.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
|
||||||
|
dependencies = [
|
||||||
|
"strum_macros 0.26.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -7500,6 +7523,19 @@ dependencies = [
|
|||||||
"syn 2.0.77",
|
"syn 2.0.77",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "strum_macros"
|
||||||
|
version = "0.26.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
|
||||||
|
dependencies = [
|
||||||
|
"heck 0.5.0",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"rustversion",
|
||||||
|
"syn 2.0.77",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "subprocess"
|
name = "subprocess"
|
||||||
version = "0.2.9"
|
version = "0.2.9"
|
||||||
@ -8884,14 +8920,14 @@ dependencies = [
|
|||||||
name = "websocket"
|
name = "websocket"
|
||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix",
|
|
||||||
"actix-files",
|
"actix-files",
|
||||||
"actix-web",
|
"actix-web",
|
||||||
"actix-web-actors",
|
"actix-ws",
|
||||||
"awc",
|
"awc",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"log",
|
"log",
|
||||||
|
"ractor",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
]
|
]
|
||||||
|
@ -12,14 +12,14 @@ name = "websocket-client"
|
|||||||
path = "src/client.rs"
|
path = "src/client.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix.workspace = true
|
|
||||||
actix-files.workspace = true
|
actix-files.workspace = true
|
||||||
actix-web.workspace = true
|
actix-web.workspace = true
|
||||||
actix-web-actors.workspace = true
|
actix-ws.workspace = true
|
||||||
awc.workspace = true
|
awc.workspace = true
|
||||||
|
|
||||||
env_logger.workspace = true
|
env_logger.workspace = true
|
||||||
futures-util = { workspace = true, features = ["sink"] }
|
futures-util = { workspace = true, features = ["sink"] }
|
||||||
log.workspace = true
|
log.workspace = true
|
||||||
|
ractor = { version = "0.10", default-features = false }
|
||||||
tokio = { workspace = true, features = ["full"] }
|
tokio = { workspace = true, features = ["full"] }
|
||||||
tokio-stream.workspace = true
|
tokio-stream.workspace = true
|
||||||
|
@ -4,10 +4,10 @@
|
|||||||
|
|
||||||
use actix_files::NamedFile;
|
use actix_files::NamedFile;
|
||||||
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
|
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
|
||||||
use actix_web_actors::ws;
|
use ractor::Actor;
|
||||||
|
|
||||||
mod server;
|
mod server;
|
||||||
use self::server::MyWebSocket;
|
use self::server::{AsMessage, MyWebSocket};
|
||||||
|
|
||||||
async fn index() -> impl Responder {
|
async fn index() -> impl Responder {
|
||||||
NamedFile::open_async("./static/index.html").await.unwrap()
|
NamedFile::open_async("./static/index.html").await.unwrap()
|
||||||
@ -15,7 +15,19 @@ async fn index() -> impl Responder {
|
|||||||
|
|
||||||
/// WebSocket handshake and start `MyWebSocket` actor.
|
/// WebSocket handshake and start `MyWebSocket` actor.
|
||||||
async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
|
async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
|
||||||
ws::start(MyWebSocket::new(), &req, stream)
|
let (res, session, stream) = actix_ws::handle(&req, stream)?;
|
||||||
|
|
||||||
|
let (actor, _handle) = Actor::spawn(None, MyWebSocket, session).await.unwrap();
|
||||||
|
|
||||||
|
actix_web::rt::spawn(async move {
|
||||||
|
let mut stream = stream.aggregate_continuations();
|
||||||
|
|
||||||
|
while let Some(Ok(msg)) = stream.recv().await {
|
||||||
|
actor.send_message(AsMessage::Ws(msg)).unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the actor-based WebSocket examples REQUIRE `actix_web::main` for actor support
|
// the actor-based WebSocket examples REQUIRE `actix_web::main` for actor support
|
||||||
|
@ -1,78 +1,111 @@
|
|||||||
|
use actix_ws::AggregatedMessage;
|
||||||
|
use ractor::{ActorProcessingErr, ActorRef};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use actix::prelude::*;
|
|
||||||
use actix_web_actors::ws;
|
|
||||||
|
|
||||||
/// How often heartbeat pings are sent
|
/// How often heartbeat pings are sent
|
||||||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
|
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
/// How long before lack of client response causes a timeout
|
/// How long before lack of client response causes a timeout
|
||||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
|
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) enum AsMessage {
|
||||||
|
Ws(actix_ws::AggregatedMessage),
|
||||||
|
Hb,
|
||||||
|
}
|
||||||
|
|
||||||
/// websocket connection is long running connection, it easier
|
/// websocket connection is long running connection, it easier
|
||||||
/// to handle with an actor
|
/// to handle with an actor
|
||||||
pub struct MyWebSocket {
|
pub(crate) struct MyWebSocket;
|
||||||
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
|
|
||||||
/// otherwise we drop connection.
|
|
||||||
hb: Instant,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MyWebSocket {
|
impl MyWebSocket {
|
||||||
pub fn new() -> Self {
|
async fn handle_hb(
|
||||||
Self { hb: Instant::now() }
|
&self,
|
||||||
|
state: &mut (Instant, actix_ws::Session),
|
||||||
|
myself: &ActorRef<AsMessage>,
|
||||||
|
) -> Result<(), ActorProcessingErr> {
|
||||||
|
if Instant::now().duration_since(state.0) > CLIENT_TIMEOUT {
|
||||||
|
// heartbeat timed out
|
||||||
|
println!("Websocket Client heartbeat failed, disconnecting!");
|
||||||
|
|
||||||
|
let _ = state.1.clone().close(None).await;
|
||||||
|
myself.stop(None);
|
||||||
|
|
||||||
|
// don't try to send a ping
|
||||||
|
} else {
|
||||||
|
state.1.ping(b"").await?;
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL).
|
async fn handle_ws_msg(
|
||||||
///
|
&self,
|
||||||
/// also this method checks heartbeats from client
|
msg: AggregatedMessage,
|
||||||
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
|
state: &mut (Instant, actix_ws::Session),
|
||||||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
myself: ActorRef<AsMessage>,
|
||||||
// check client heartbeats
|
) -> Result<(), ActorProcessingErr> {
|
||||||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
|
||||||
// heartbeat timed out
|
|
||||||
println!("Websocket Client heartbeat failed, disconnecting!");
|
|
||||||
|
|
||||||
// stop actor
|
|
||||||
ctx.stop();
|
|
||||||
|
|
||||||
// don't try to send a ping
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.ping(b"");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Actor for MyWebSocket {
|
|
||||||
type Context = ws::WebsocketContext<Self>;
|
|
||||||
|
|
||||||
/// Method is called on actor start. We start the heartbeat process here.
|
|
||||||
fn started(&mut self, ctx: &mut Self::Context) {
|
|
||||||
self.hb(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handler for `ws::Message`
|
|
||||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
|
|
||||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
|
||||||
// process websocket messages
|
|
||||||
println!("WS: {msg:?}");
|
println!("WS: {msg:?}");
|
||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
Ok(ws::Message::Ping(msg)) => {
|
AggregatedMessage::Ping(msg) => {
|
||||||
self.hb = Instant::now();
|
state.0 = Instant::now();
|
||||||
ctx.pong(&msg);
|
state.1.pong(&msg).await?;
|
||||||
}
|
}
|
||||||
Ok(ws::Message::Pong(_)) => {
|
|
||||||
self.hb = Instant::now();
|
AggregatedMessage::Pong(_) => {
|
||||||
|
state.0 = Instant::now();
|
||||||
}
|
}
|
||||||
Ok(ws::Message::Text(text)) => ctx.text(text),
|
|
||||||
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
|
AggregatedMessage::Text(text) => {
|
||||||
Ok(ws::Message::Close(reason)) => {
|
state.1.text(text).await?;
|
||||||
ctx.close(reason);
|
|
||||||
ctx.stop();
|
|
||||||
}
|
}
|
||||||
_ => ctx.stop(),
|
|
||||||
}
|
AggregatedMessage::Binary(bin) => {
|
||||||
|
state.1.binary(bin).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
AggregatedMessage::Close(reason) => {
|
||||||
|
let _ = state.1.clone().close(reason).await;
|
||||||
|
myself.stop(None);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ractor::Actor for MyWebSocket {
|
||||||
|
type Msg = AsMessage;
|
||||||
|
type State = (Instant, actix_ws::Session);
|
||||||
|
type Arguments = actix_ws::Session;
|
||||||
|
|
||||||
|
async fn pre_start(
|
||||||
|
&self,
|
||||||
|
myself: ActorRef<Self::Msg>,
|
||||||
|
session: Self::Arguments,
|
||||||
|
) -> Result<Self::State, ActorProcessingErr> {
|
||||||
|
myself.send_interval(HEARTBEAT_INTERVAL, || AsMessage::Hb);
|
||||||
|
|
||||||
|
Ok((Instant::now(), session))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
&self,
|
||||||
|
myself: ActorRef<Self::Msg>,
|
||||||
|
message: Self::Msg,
|
||||||
|
state: &mut Self::State,
|
||||||
|
) -> Result<(), ActorProcessingErr> {
|
||||||
|
match message {
|
||||||
|
AsMessage::Hb => {
|
||||||
|
self.handle_hb(state, &myself).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
AsMessage::Ws(msg) => {
|
||||||
|
self.handle_ws_msg(msg, state, myself).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user