1
0
mirror of https://github.com/actix/examples synced 2024-12-03 02:02:22 +01:00
examples/server-sent-events/src/broadcast.rs

90 lines
2.4 KiB
Rust
Raw Normal View History

2022-08-07 02:24:40 +02:00
use std::{sync::Arc, time::Duration};
use actix_web::rt::time::interval;
2023-10-29 02:06:58 +02:00
use actix_web_lab::{
sse::{self, Sse},
util::InfallibleStream,
};
2022-08-07 02:24:40 +02:00
use futures_util::future;
2022-02-14 02:37:51 +01:00
use parking_lot::Mutex;
2023-10-29 02:06:58 +02:00
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
2022-02-14 02:37:51 +01:00
pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
}
2022-08-07 02:24:40 +02:00
#[derive(Debug, Clone, Default)]
struct BroadcasterInner {
2023-10-29 02:06:58 +02:00
clients: Vec<mpsc::Sender<sse::Event>>,
2022-02-14 02:37:51 +01:00
}
impl Broadcaster {
2022-08-07 02:24:40 +02:00
/// Constructs new broadcaster and spawns ping loop.
pub fn create() -> Arc<Self> {
let this = Arc::new(Broadcaster {
inner: Mutex::new(BroadcasterInner::default()),
});
2022-02-14 02:37:51 +01:00
2022-08-07 02:24:40 +02:00
Broadcaster::spawn_ping(Arc::clone(&this));
2022-02-14 02:37:51 +01:00
2022-08-07 02:24:40 +02:00
this
2022-02-14 02:37:51 +01:00
}
2022-08-07 02:24:40 +02:00
/// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast
/// list if not.
fn spawn_ping(this: Arc<Self>) {
2022-02-14 02:37:51 +01:00
actix_web::rt::spawn(async move {
2022-08-07 02:24:40 +02:00
let mut interval = interval(Duration::from_secs(10));
2022-02-14 02:37:51 +01:00
loop {
interval.tick().await;
2022-08-07 02:24:40 +02:00
this.remove_stale_clients().await;
2022-02-14 02:37:51 +01:00
}
});
}
2022-08-07 02:24:40 +02:00
/// Removes all non-responsive clients from broadcast list.
async fn remove_stale_clients(&self) {
let clients = self.inner.lock().clients.clone();
2022-02-14 02:37:51 +01:00
let mut ok_clients = Vec::new();
2022-08-07 02:24:40 +02:00
for client in clients {
2022-08-08 00:12:52 +02:00
if client
2022-10-26 17:51:27 +02:00
.send(sse::Event::Comment("ping".into()))
2022-08-08 00:12:52 +02:00
.await
.is_ok()
{
2022-02-14 02:37:51 +01:00
ok_clients.push(client.clone());
}
}
2022-08-07 02:24:40 +02:00
self.inner.lock().clients = ok_clients;
}
2022-02-14 02:37:51 +01:00
2022-08-07 02:24:40 +02:00
/// Registers client with broadcaster, returning an SSE response body.
2023-10-29 02:06:58 +02:00
pub async fn new_client(&self) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);
2023-10-29 02:06:58 +02:00
tx.send(sse::Data::new("connected").into()).await.unwrap();
2022-02-14 02:37:51 +01:00
2022-08-07 02:24:40 +02:00
self.inner.lock().clients.push(tx);
2022-02-14 02:37:51 +01:00
2023-10-29 02:06:58 +02:00
Sse::from_infallible_receiver(rx)
2022-02-14 02:37:51 +01:00
}
2022-08-07 02:24:40 +02:00
/// Broadcasts `msg` to all clients.
pub async fn broadcast(&self, msg: &str) {
let clients = self.inner.lock().clients.clone();
2022-02-14 02:37:51 +01:00
2022-10-26 17:51:27 +02:00
let send_futures = clients
.iter()
2023-10-29 02:06:58 +02:00
.map(|client| client.send(sse::Data::new(msg).into()));
2022-02-14 02:37:51 +01:00
2022-08-07 02:24:40 +02:00
// try to send to all clients, ignoring failures
// disconnected clients will get swept up by `remove_stale_clients`
let _ = future::join_all(send_futures).await;
2022-02-14 02:37:51 +01:00
}
}