diff --git a/server-sent-events/src/broadcast.rs b/server-sent-events/src/broadcast.rs index 7136e2fc..7da50748 100644 --- a/server-sent-events/src/broadcast.rs +++ b/server-sent-events/src/broadcast.rs @@ -9,19 +9,26 @@ use actix_web::{ web::{Bytes, Data}, Error, }; -use futures_util::{Stream, StreamExt as _}; +use futures_util::Stream; use parking_lot::Mutex; -use tokio::sync::mpsc::{channel, Sender}; -use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; pub struct Broadcaster { + inner: Mutex, +} + +struct BroadcasterInner { clients: Vec>, } impl Broadcaster { - pub fn create() -> Data> { + pub fn create() -> Data { // Data ~≃ Arc - let me = Data::new(Mutex::new(Broadcaster::new())); + let me = Data::new(Broadcaster { + inner: Mutex::new(BroadcasterInner { + clients: Vec::new(), + }), + }); // ping clients every 10 seconds to see if they are alive Broadcaster::spawn_ping(me.clone()); @@ -29,62 +36,60 @@ impl Broadcaster { me } - fn new() -> Self { - Broadcaster { - clients: Vec::new(), - } - } - - fn spawn_ping(me: Data>) { + fn spawn_ping(me: Data) { actix_web::rt::spawn(async move { - let mut task = - IntervalStream::new(interval_at(Instant::now(), Duration::from_secs(10))); + let mut interval = interval_at(Instant::now(), Duration::from_secs(10)); - while task.next().await.is_some() { - me.lock().remove_stale_clients(); + loop { + interval.tick().await; + me.remove_stale_clients(); } }); } - fn remove_stale_clients(&mut self) { + fn remove_stale_clients(&self) { + let mut inner = self.inner.lock(); + let mut ok_clients = Vec::new(); - for client in self.clients.iter() { + for client in inner.clients.iter() { let result = client.clone().try_send(Bytes::from("data: ping\n\n")); if let Ok(()) = result { ok_clients.push(client.clone()); } } - self.clients = ok_clients; + inner.clients = ok_clients; } - pub fn new_client(&mut self) -> Client { + pub fn new_client(&self) -> Client { let (tx, rx) = channel(100); - let rx = ReceiverStream::new(rx); tx.try_send(Bytes::from("data: connected\n\n")).unwrap(); - self.clients.push(tx); + let mut inner = self.inner.lock(); + inner.clients.push(tx); + Client(rx) } pub fn send(&self, msg: &str) { let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); - for client in self.clients.iter() { + let inner = self.inner.lock(); + for client in inner.clients.iter() { client.clone().try_send(msg.clone()).unwrap_or(()); } } } // wrap Receiver in own type, with correct error type -pub struct Client(ReceiverStream); +pub struct Client(Receiver); impl Stream for Client { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.0).poll_next(cx) { + match Pin::new(&mut self.0).poll_recv(cx) { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, diff --git a/server-sent-events/src/main.rs b/server-sent-events/src/main.rs index 4e5302dc..fe090fd3 100644 --- a/server-sent-events/src/main.rs +++ b/server-sent-events/src/main.rs @@ -4,7 +4,6 @@ use actix_web::{ web::{self, Data, Path}, App, HttpResponse, HttpServer, Responder, }; -use parking_lot::Mutex; mod broadcast; use broadcast::Broadcaster; @@ -38,15 +37,15 @@ async fn index() -> impl Responder { .body(index_html) } -async fn new_client(broadcaster: Data>) -> impl Responder { - let rx = broadcaster.lock().new_client(); +async fn new_client(broadcaster: Data) -> impl Responder { + let rx = broadcaster.new_client(); HttpResponse::Ok() .append_header((header::CONTENT_TYPE, "text/event-stream")) .streaming(rx) } -async fn broadcast(msg: Path, broadcaster: Data>) -> impl Responder { - broadcaster.lock().send(&msg.into_inner()); +async fn broadcast(msg: Path, broadcaster: Data) -> impl Responder { + broadcaster.send(&msg.into_inner()); HttpResponse::Ok().body("msg sent") }