diff --git a/Cargo.lock b/Cargo.lock index ea49972..3ee79b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,15 +206,15 @@ dependencies = [ [[package]] name = "actix-identity" -version = "0.5.2" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1224c9f9593dc27c9077b233ce04adedc1d7febcfc35ee9f53ea3c24df180bec" +checksum = "36e1cc6f95e245b2f3c6995df4e1c0c697704c48c28ec325d135a3ca039d4952" dependencies = [ "actix-service", "actix-session", "actix-utils", "actix-web", - "anyhow", + "derive_more", "futures-core", "serde", "tracing", @@ -355,9 +355,9 @@ dependencies = [ [[package]] name = "actix-session" -version = "0.7.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43da8b818ae1f11049a4d218975345fe8e56ce5a5f92c11f972abcff5ff80e87" +checksum = "2e6a28f813a6671e1847d005cad0be36ae4d016287690f765c303379837c13d6" dependencies = [ "actix", "actix-redis", @@ -506,9 +506,9 @@ dependencies = [ [[package]] name = "actix-web-lab" -version = "0.19.2" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9b7c50a90657ef1868db9dd85f74e82c4d9858ce583d4639ae3583f0bb97775" +checksum = "e15f180c2bf7abacfda7d8d9ee4169e7f792ec8983313dc38809e902f61c79d0" dependencies = [ "actix-http", "actix-router", @@ -538,18 +538,19 @@ dependencies = [ "serde_html_form", "serde_json", "tokio 1.33.0", + "tokio-stream", "tracing", ] [[package]] name = "actix-web-lab-derive" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16294584c7794939b1e5711f28e7cae84ef30e62a520db3f9af425f85269bcd2" +checksum = "9aa0b287c8de4a76b691f29dbb5451e8dd5b79d777eaf87350c9b0cbfdb5e968" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.38", ] [[package]] @@ -6759,6 +6760,8 @@ dependencies = [ "futures-util", "log", "parking_lot 0.12.1", + "tokio 1.33.0", + "tokio-stream", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 289ea64..46fc676 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,17 +77,17 @@ actix-codec = "0.5" actix-cors = "0.6" actix-files = "0.6" actix-http = "3.4" -actix-identity = "0.5" +actix-identity = "0.6" actix-multipart = "0.6" actix-multipart-derive = "0.6" actix-protobuf = "0.10" -actix-session = "0.7" +actix-session = "0.8" actix-test = "0.1" actix-tls = "3.1.1" actix-utils = "3" actix-web = "4.4" actix-web-actors = "4.1" -actix-web-lab = "0.19" +actix-web-lab = "0.20" actix-ws = "0.2.5" awc = "3.2" @@ -104,3 +104,4 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1.24.2", features = ["sync", "io-util"] } tokio-util = "0.7.4" +tokio-stream = "0.1.1" diff --git a/server-sent-events/Cargo.toml b/server-sent-events/Cargo.toml index 4989d75..e574f2d 100644 --- a/server-sent-events/Cargo.toml +++ b/server-sent-events/Cargo.toml @@ -10,3 +10,5 @@ env_logger.workspace = true futures-util.workspace = true log.workspace = true parking_lot = "0.12" +tokio = { workspace = true, features = ["sync"] } +tokio-stream.workspace = true diff --git a/server-sent-events/src/broadcast.rs b/server-sent-events/src/broadcast.rs index 89a3dd1..25f216f 100644 --- a/server-sent-events/src/broadcast.rs +++ b/server-sent-events/src/broadcast.rs @@ -1,9 +1,14 @@ use std::{sync::Arc, time::Duration}; use actix_web::rt::time::interval; -use actix_web_lab::sse::{self, ChannelStream, Sse}; +use actix_web_lab::{ + sse::{self, Sse}, + util::InfallibleStream, +}; use futures_util::future; use parking_lot::Mutex; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; pub struct Broadcaster { inner: Mutex, @@ -11,7 +16,7 @@ pub struct Broadcaster { #[derive(Debug, Clone, Default)] struct BroadcasterInner { - clients: Vec, + clients: Vec>, } impl Broadcaster { @@ -59,14 +64,14 @@ impl Broadcaster { } /// Registers client with broadcaster, returning an SSE response body. - pub async fn new_client(&self) -> Sse { - let (tx, rx) = sse::channel(10); + pub async fn new_client(&self) -> Sse>> { + let (tx, rx) = mpsc::channel(10); - tx.send(sse::Data::new("connected")).await.unwrap(); + tx.send(sse::Data::new("connected").into()).await.unwrap(); self.inner.lock().clients.push(tx); - rx + Sse::from_infallible_receiver(rx) } /// Broadcasts `msg` to all clients. @@ -75,7 +80,7 @@ impl Broadcaster { let send_futures = clients .iter() - .map(|client| client.send(sse::Data::new(msg))); + .map(|client| client.send(sse::Data::new(msg).into())); // try to send to all clients, ignoring failures // disconnected clients will get swept up by `remove_stale_clients`