1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

Don't use mutex directly in async code (#534)

This commit is contained in:
Alice Ryhl 2022-03-05 20:35:22 +01:00 committed by GitHub
parent b8fc58bec6
commit c5f2a8fcc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 30 deletions

View File

@ -9,19 +9,26 @@ use actix_web::{
web::{Bytes, Data}, web::{Bytes, Data},
Error, Error,
}; };
use futures_util::{Stream, StreamExt as _}; use futures_util::Stream;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
pub struct Broadcaster { pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
}
struct BroadcasterInner {
clients: Vec<Sender<Bytes>>, clients: Vec<Sender<Bytes>>,
} }
impl Broadcaster { impl Broadcaster {
pub fn create() -> Data<Mutex<Self>> { pub fn create() -> Data<Self> {
// Data ~≃ Arc // Data ~≃ Arc
let me = Data::new(Mutex::new(Broadcaster::new())); let me = Data::new(Broadcaster {
inner: Mutex::new(BroadcasterInner {
clients: Vec::new(),
}),
});
// ping clients every 10 seconds to see if they are alive // ping clients every 10 seconds to see if they are alive
Broadcaster::spawn_ping(me.clone()); Broadcaster::spawn_ping(me.clone());
@ -29,62 +36,60 @@ impl Broadcaster {
me me
} }
fn new() -> Self { fn spawn_ping(me: Data<Self>) {
Broadcaster {
clients: Vec::new(),
}
}
fn spawn_ping(me: Data<Mutex<Self>>) {
actix_web::rt::spawn(async move { actix_web::rt::spawn(async move {
let mut task = let mut interval = interval_at(Instant::now(), Duration::from_secs(10));
IntervalStream::new(interval_at(Instant::now(), Duration::from_secs(10)));
while task.next().await.is_some() { loop {
me.lock().remove_stale_clients(); interval.tick().await;
me.remove_stale_clients();
} }
}); });
} }
fn remove_stale_clients(&mut self) { fn remove_stale_clients(&self) {
let mut inner = self.inner.lock();
let mut ok_clients = Vec::new(); let mut ok_clients = Vec::new();
for client in self.clients.iter() { for client in inner.clients.iter() {
let result = client.clone().try_send(Bytes::from("data: ping\n\n")); let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
if let Ok(()) = result { if let Ok(()) = result {
ok_clients.push(client.clone()); ok_clients.push(client.clone());
} }
} }
self.clients = ok_clients; inner.clients = ok_clients;
} }
pub fn new_client(&mut self) -> Client { pub fn new_client(&self) -> Client {
let (tx, rx) = channel(100); let (tx, rx) = channel(100);
let rx = ReceiverStream::new(rx);
tx.try_send(Bytes::from("data: connected\n\n")).unwrap(); tx.try_send(Bytes::from("data: connected\n\n")).unwrap();
self.clients.push(tx); let mut inner = self.inner.lock();
inner.clients.push(tx);
Client(rx) Client(rx)
} }
pub fn send(&self, msg: &str) { pub fn send(&self, msg: &str) {
let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); let msg = Bytes::from(["data: ", msg, "\n\n"].concat());
for client in self.clients.iter() { let inner = self.inner.lock();
for client in inner.clients.iter() {
client.clone().try_send(msg.clone()).unwrap_or(()); client.clone().try_send(msg.clone()).unwrap_or(());
} }
} }
} }
// wrap Receiver in own type, with correct error type // wrap Receiver in own type, with correct error type
pub struct Client(ReceiverStream<Bytes>); pub struct Client(Receiver<Bytes>);
impl Stream for Client { impl Stream for Client {
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) { match Pin::new(&mut self.0).poll_recv(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,

View File

@ -4,7 +4,6 @@ use actix_web::{
web::{self, Data, Path}, web::{self, Data, Path},
App, HttpResponse, HttpServer, Responder, App, HttpResponse, HttpServer, Responder,
}; };
use parking_lot::Mutex;
mod broadcast; mod broadcast;
use broadcast::Broadcaster; use broadcast::Broadcaster;
@ -38,15 +37,15 @@ async fn index() -> impl Responder {
.body(index_html) .body(index_html)
} }
async fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder { async fn new_client(broadcaster: Data<Broadcaster>) -> impl Responder {
let rx = broadcaster.lock().new_client(); let rx = broadcaster.new_client();
HttpResponse::Ok() HttpResponse::Ok()
.append_header((header::CONTENT_TYPE, "text/event-stream")) .append_header((header::CONTENT_TYPE, "text/event-stream"))
.streaming(rx) .streaming(rx)
} }
async fn broadcast(msg: Path<String>, broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder { async fn broadcast(msg: Path<String>, broadcaster: Data<Broadcaster>) -> impl Responder {
broadcaster.lock().send(&msg.into_inner()); broadcaster.send(&msg.into_inner());
HttpResponse::Ok().body("msg sent") HttpResponse::Ok().body("msg sent")
} }