diff --git a/Cargo.lock b/Cargo.lock index 3192bf7f..841aa653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,9 +548,9 @@ dependencies = [ [[package]] name = "actix-web-lab" -version = "0.16.7" +version = "0.16.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8d29deb0fe8063f205804af50329301cfa45eee64ede25e436f75839ff79af7" +checksum = "4caa655398cbfeecd57e639f997b62ee3090a4e88b974130035c2de41393363e" dependencies = [ "actix-files", "actix-http", @@ -563,6 +563,7 @@ dependencies = [ "arc-swap", "async-trait", "bytes 1.2.1", + "bytestring", "csv", "derive_more", "digest 0.10.3", @@ -730,9 +731,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91f1f46651137be86f3a2b9a8359f9ab421d04d941c62b5982e1ca21113adf9" +checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142" [[package]] name = "arc-swap" @@ -5224,9 +5225,9 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.26.0" +version = "1.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc129ab6000ab4037e7718703cdeab82a12c4ee23a238658f55372d80ef2b05" +checksum = "ee9164faf726e4f3ece4978b25ca877ddc6802fa77f38cdccb32c7f805ecd70c" dependencies = [ "arrayvec 0.7.2", "num-traits", @@ -5604,6 +5605,7 @@ name = "server-sent-events" version = "1.0.0" dependencies = [ "actix-web", + "actix-web-lab", "env_logger 0.9.0", "futures-util", "log", diff --git a/server-sent-events/Cargo.toml b/server-sent-events/Cargo.toml index d06aa0b3..cd0a9276 100644 --- a/server-sent-events/Cargo.toml +++ b/server-sent-events/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] actix-web = "4" +actix-web-lab = "0.16.8" env_logger = "0.9" futures-util = { version = "0.3.17", default-features = false, features = ["std"] } log = "0.4" diff --git a/server-sent-events/benchmark.js b/server-sent-events/benchmark.js index d91da584..85f0854f 100644 --- a/server-sent-events/benchmark.js +++ b/server-sent-events/benchmark.js @@ -1,60 +1,73 @@ const http = require('http') -const n = 120; -let connected = 0; -let messages = 0; -let start = Date.now(); -let phase = 'connecting'; -let connection_time; -let broadcast_time; +const n = 120 +let connected = 0 +let messages = 0 +let start = Date.now() +let phase = 'connecting' +let connection_time +let broadcast_time -let message = process.argv[2] || 'msg'; -let expected_data = "data: " + message; +let message = process.argv[2] || 'msg' +let expected_data = 'data: ' + message for (let i = 0; i < n; i++) { - http.get({ + http + .get( + { host: '127.0.0.1', port: 8080, - path: '/events' - }, response => { - response.on('data', data => { - if (data.includes(expected_data)) { - messages += 1; - } else if (data.includes("data: connected\n")) { - connected += 1; - } + path: '/events', + }, + (response) => { + response.on('data', (data) => { + if (data.includes(expected_data)) { + messages += 1 + } else if (data.includes('data: connected\n')) { + connected += 1 + } }) - }).on('error', (_) => {}); + } + ) + .on('error', (_) => {}) } setInterval(() => { - if (phase === 'connecting' && connected === n) { - // done connecting - phase = 'messaging'; - connection_time = Date.now() - start; - } + if (phase === 'connecting' && connected === n) { + // done connecting + phase = 'messaging' + connection_time = Date.now() - start + } - if (phase === 'messaging') { - phase = 'waiting'; - start = Date.now(); + if (phase === 'messaging') { + phase = 'waiting' + start = Date.now() - http.get({ - host: '127.0.0.1', - port: 8080, - path: '/broadcast/' + message - }, response => { - response.on('data', _ => {}) - }) - } + http + .request( + { + method: 'POST', + host: '127.0.0.1', + port: 8080, + path: '/broadcast/' + message, + }, + (response) => { + response.on('data', (_) => {}) + } + ) + .end() + } - if (phase === 'waiting' && messages >= n) { - // all messages received - broadcast_time = Date.now() - start; - phase = 'paused'; - messages = 0; - phase = 'messaging'; - } + if (phase === 'waiting' && messages >= n) { + // all messages received + broadcast_time = Date.now() - start + phase = 'paused' + messages = 0 + phase = 'messaging' + } - process.stdout.write("\r\x1b[K"); - process.stdout.write(`Connected: ${connected}, connection time: ${connection_time} ms, total broadcast time: ${broadcast_time} ms`); + process.stdout.write('\r\x1b[K') + process.stdout.write( + `Connected: ${connected}, connection time: ${connection_time} ms, total broadcast time: ${broadcast_time} ms` + ) }, 20) diff --git a/server-sent-events/drain.js b/server-sent-events/drain.js index 1a4651b4..c5019022 100644 --- a/server-sent-events/drain.js +++ b/server-sent-events/drain.js @@ -1,36 +1,42 @@ const http = require('http') -let drop_goal = 5_000; -let dropped = 0; +let drop_goal = 5_000 +let dropped = 0 let query = { - host: '127.0.0.1', - port: 8080, - path: '/events' + method: 'POST', + host: '127.0.0.1', + port: 8080, + path: '/events', } setInterval(() => { - if (dropped < drop_goal) { - let request = http.get(query, response => { - response.on('data', data => { - if (data.includes("data: connected\n")) { - // drop connection after welcome message - dropped += 1; - request.abort() - } - }) + if (dropped < drop_goal) { + let request = http + .request(query, (response) => { + response.on('data', (data) => { + if (data.includes('data: connected\n')) { + // drop connection after welcome message + dropped += 1 + request.abort() + } }) - .on('error', () => {}) - } + }) + .on('error', () => {}) + .end() + } }, 1) setInterval(() => { - http.get('http://127.0.0.1:8080/', () => print_status(true)) - .setTimeout(100, () => print_status(false)) - .on('error', () => {}) + http + .post('http://127.0.0.1:8080/', () => print_status(true)) + .setTimeout(100, () => print_status(false)) + .on('error', () => {}) }, 20) function print_status(accepting_connections) { - process.stdout.write("\r\x1b[K"); - process.stdout.write(`Connections dropped: ${dropped}, accepting connections: ${accepting_connections}`); + process.stdout.write('\r\x1b[K') + process.stdout.write( + `Connections dropped: ${dropped}, accepting connections: ${accepting_connections}` + ) } diff --git a/server-sent-events/src/broadcast.rs b/server-sent-events/src/broadcast.rs index 7da50748..b0b196ef 100644 --- a/server-sent-events/src/broadcast.rs +++ b/server-sent-events/src/broadcast.rs @@ -1,98 +1,78 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use actix_web::{ - rt::time::{interval_at, Instant}, - web::{Bytes, Data}, - Error, -}; -use futures_util::Stream; +use actix_web::rt::time::interval; +use actix_web_lab::sse::{sse, Sse, SseSender}; +use futures_util::future; use parking_lot::Mutex; -use tokio::sync::mpsc::{channel, Receiver, Sender}; pub struct Broadcaster { inner: Mutex, } +#[derive(Debug, Clone, Default)] struct BroadcasterInner { - clients: Vec>, + clients: Vec, } impl Broadcaster { - pub fn create() -> Data { - // Data ~≃ Arc - let me = Data::new(Broadcaster { - inner: Mutex::new(BroadcasterInner { - clients: Vec::new(), - }), + /// Constructs new broadcaster and spawns ping loop. + pub fn create() -> Arc { + let this = Arc::new(Broadcaster { + inner: Mutex::new(BroadcasterInner::default()), }); - // ping clients every 10 seconds to see if they are alive - Broadcaster::spawn_ping(me.clone()); + Broadcaster::spawn_ping(Arc::clone(&this)); - me + this } - fn spawn_ping(me: Data) { + /// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast + /// list if not. + fn spawn_ping(this: Arc) { actix_web::rt::spawn(async move { - let mut interval = interval_at(Instant::now(), Duration::from_secs(10)); + let mut interval = interval(Duration::from_secs(10)); loop { interval.tick().await; - me.remove_stale_clients(); + this.remove_stale_clients().await; } }); } - fn remove_stale_clients(&self) { - let mut inner = self.inner.lock(); + /// Removes all non-responsive clients from broadcast list. + async fn remove_stale_clients(&self) { + let clients = self.inner.lock().clients.clone(); let mut ok_clients = Vec::new(); - for client in inner.clients.iter() { - let result = client.clone().try_send(Bytes::from("data: ping\n\n")); - if let Ok(()) = result { + for client in clients { + if client.comment("ping").await.is_ok() { ok_clients.push(client.clone()); } } - inner.clients = ok_clients; + + self.inner.lock().clients = ok_clients; } - pub fn new_client(&self) -> Client { - let (tx, rx) = channel(100); + /// Registers client with broadcaster, returning an SSE response body. + pub async fn new_client(&self) -> Sse { + let (tx, rx) = sse(10); - tx.try_send(Bytes::from("data: connected\n\n")).unwrap(); + tx.data("connected").await.unwrap(); - let mut inner = self.inner.lock(); - inner.clients.push(tx); + self.inner.lock().clients.push(tx); - Client(rx) + rx } - pub fn send(&self, msg: &str) { - let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); + /// Broadcasts `msg` to all clients. + pub async fn broadcast(&self, msg: &str) { + let clients = self.inner.lock().clients.clone(); - let inner = self.inner.lock(); - for client in inner.clients.iter() { - client.clone().try_send(msg.clone()).unwrap_or(()); - } - } -} - -// wrap Receiver in own type, with correct error type -pub struct Client(Receiver); - -impl Stream for Client { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.0).poll_recv(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } + let send_futures = clients.iter().map(|client| client.data(msg)); + + // try to send to all clients, ignoring failures + // disconnected clients will get swept up by `remove_stale_clients` + let _ = future::join_all(send_futures).await; } } diff --git a/server-sent-events/src/main.rs b/server-sent-events/src/main.rs index fe090fd3..e3c4f49e 100644 --- a/server-sent-events/src/main.rs +++ b/server-sent-events/src/main.rs @@ -1,15 +1,13 @@ -use actix_web::{ - http::header::{self, ContentType}, - middleware, - web::{self, Data, Path}, - App, HttpResponse, HttpServer, Responder, -}; +use std::{io, sync::Arc}; + +use actix_web::{get, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder}; +use actix_web_lab::{extract::Path, respond::Html}; mod broadcast; -use broadcast::Broadcaster; +use self::broadcast::Broadcaster; #[actix_web::main] -async fn main() -> std::io::Result<()> { +async fn main() -> io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); let data = Broadcaster::create(); @@ -18,34 +16,33 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { App::new() - .app_data(data.clone()) - .wrap(middleware::Logger::default()) - .route("/", web::get().to(index)) - .route("/events", web::get().to(new_client)) - .route("/broadcast/{msg}", web::get().to(broadcast)) + .app_data(web::Data::from(Arc::clone(&data))) + .service(index) + .service(event_stream) + .service(broadcast_msg) + .wrap(Logger::default()) }) .bind(("127.0.0.1", 8080))? + .workers(2) .run() .await } +#[get("/")] async fn index() -> impl Responder { - let index_html = include_str!("index.html"); - - HttpResponse::Ok() - .append_header(ContentType::html()) - .body(index_html) + Html(include_str!("index.html").to_string()) } -async fn new_client(broadcaster: Data) -> impl Responder { - let rx = broadcaster.new_client(); - - HttpResponse::Ok() - .append_header((header::CONTENT_TYPE, "text/event-stream")) - .streaming(rx) +#[get("/events")] +async fn event_stream(broadcaster: web::Data) -> impl Responder { + broadcaster.new_client().await } -async fn broadcast(msg: Path, broadcaster: Data) -> impl Responder { - broadcaster.send(&msg.into_inner()); +#[post("/broadcast/{msg}")] +async fn broadcast_msg( + broadcaster: web::Data, + Path((msg,)): Path<(String,)>, +) -> impl Responder { + broadcaster.broadcast(&msg).await; HttpResponse::Ok().body("msg sent") }