diff --git a/other/server-sent-events/Cargo.toml b/other/server-sent-events/Cargo.toml index 8b9f0e91..3e509e79 100644 --- a/other/server-sent-events/Cargo.toml +++ b/other/server-sent-events/Cargo.toml @@ -4,7 +4,8 @@ version = "1.0.0" edition = "2021" [dependencies] -actix-web = "3" -env_logger = "0.8" +actix-web = "4.0.0-rc.3" +env_logger = "0.9" futures = "0.3.1" -tokio = { version = "0.2", features = ["sync"] } +tokio = { version = "1.16.1", features = ["sync"] } +tokio-stream = { version = "0.1.8", features = ["time"] } diff --git a/other/server-sent-events/README.md b/other/server-sent-events/README.md index 219d2db0..5ad4fa9d 100644 --- a/other/server-sent-events/README.md +++ b/other/server-sent-events/README.md @@ -6,22 +6,22 @@ cd other/server-sent-events cargo run ``` -Open http://localhost:8080/ with a browser, then send events with another HTTP client: +Open http://127.0.0.1:8080/ with a browser, then send events with another HTTP client: ```sh -curl localhost:8080/broadcast/my_message +curl 127.0.0.1:8080/broadcast/my_message ``` *my_message* should appear in the browser with a timestamp. ## Performance -This implementation serve thousands of clients on a 2013 macbook air without problems. +This implementation serve thousands of clients on a 2021 macbook with no problems. Run [benchmark.js](benchmark.js) to benchmark your own system: ```sh $ node benchmark.js -Connected: 1000, connection time: 867 ms, total broadcast time: 23 ms^C⏎ +Connected: 1000, connection time: 201 ms, total broadcast time: 20 ms^C⏎ ``` ### Error *Too many open files* @@ -35,7 +35,7 @@ Test maximum number of open connections with [drain.js](drain.js): ```sh $ node drain.js -Connections dropped: 5957, accepting connections: false^C⏎ +Connections dropped: 10450, accepting connections: false^C⏎ ``` _Accepting connections_ indicates whether resources for the server have been exhausted. \ No newline at end of file diff --git a/other/server-sent-events/benchmark.js b/other/server-sent-events/benchmark.js index 68c4ad51..4bf3730f 100644 --- a/other/server-sent-events/benchmark.js +++ b/other/server-sent-events/benchmark.js @@ -1,6 +1,6 @@ const http = require('http') -const n = 100; +const n = 1000; let connected = 0; let messages = 0; let start = Date.now(); @@ -13,7 +13,7 @@ let expected_data = "data: " + message; for (let i = 0; i < n; i++) { http.get({ - host: 'localhost', + host: '127.0.0.1', port: 8080, path: '/events' }, response => { @@ -39,7 +39,7 @@ setInterval(() => { start = Date.now(); http.get({ - host: 'localhost', + host: '127.0.0.1', port: 8080, path: '/broadcast/' + message }, response => { diff --git a/other/server-sent-events/drain.js b/other/server-sent-events/drain.js index ae803c07..6244c367 100644 --- a/other/server-sent-events/drain.js +++ b/other/server-sent-events/drain.js @@ -1,10 +1,10 @@ const http = require('http') -let drop_goal = 10_000; +let drop_goal = 100_000; let dropped = 0; let query = { - host: 'localhost', + host: '127.0.0.1', port: 8080, path: '/events' } @@ -25,7 +25,7 @@ setInterval(() => { }, 1) setInterval(() => { - http.get('http://localhost:8080/', () => print_status(true)) + http.get('http://127.0.0.1:8080/', () => print_status(true)) .setTimeout(100, () => print_status(false)) .on('error', () => {}) }, 20) diff --git a/other/server-sent-events/src/index.html b/other/server-sent-events/src/index.html index 66c601c1..ef87ab39 100644 --- a/other/server-sent-events/src/index.html +++ b/other/server-sent-events/src/index.html @@ -1,3 +1,4 @@ + diff --git a/other/server-sent-events/src/main.rs b/other/server-sent-events/src/main.rs index fbd3bebb..72f7eccf 100644 --- a/other/server-sent-events/src/main.rs +++ b/other/server-sent-events/src/main.rs @@ -1,27 +1,29 @@ +use actix_web::rt::time::{interval_at, Instant}; +use actix_web::web::{Bytes, Data, Path}; +use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer, Responder}; +use futures::{Stream, StreamExt}; use std::pin::Pin; use std::sync::Mutex; use std::task::{Context, Poll}; use std::time::Duration; - -use actix_web::rt::time::{interval_at, Instant}; -use actix_web::web::{Bytes, Data, Path}; -use actix_web::{web, App, Error, HttpResponse, HttpServer, Responder}; -use futures::{Stream, StreamExt}; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{channel, Sender}; +use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; #[actix_web::main] async fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=trace"); env_logger::init(); let data = Broadcaster::create(); HttpServer::new(move || { App::new() .app_data(data.clone()) + .wrap(middleware::Logger::default()) .route("/", web::get().to(index)) .route("/events", web::get().to(new_client)) .route("/broadcast/{msg}", web::get().to(broadcast)) }) - .bind("127.0.0.1:8080")? + .bind("0.0.0.0:8080")? .run() .await } @@ -30,7 +32,7 @@ async fn index() -> impl Responder { let content = include_str!("index.html"); HttpResponse::Ok() - .header("content-type", "text/html") + .append_header(("content-type", "text/html")) .body(content) } @@ -38,7 +40,7 @@ async fn new_client(broadcaster: Data>) -> impl Responder { let rx = broadcaster.lock().unwrap().new_client(); HttpResponse::Ok() - .header("content-type", "text/event-stream") + .append_header(("content-type", "text/event-stream")) .streaming(rx) } @@ -74,11 +76,14 @@ impl Broadcaster { fn spawn_ping(me: Data>) { actix_web::rt::spawn(async move { - let mut task = interval_at(Instant::now(), Duration::from_secs(10)); + let mut task = IntervalStream::new(interval_at( + Instant::now(), + Duration::from_secs(10), + )); while task.next().await.is_some() { me.lock().unwrap().remove_stale_clients(); } - }) + }); } fn remove_stale_clients(&mut self) { @@ -95,10 +100,9 @@ impl Broadcaster { fn new_client(&mut self) -> Client { let (tx, rx) = channel(100); + let rx = ReceiverStream::new(rx); - tx.clone() - .try_send(Bytes::from("data: connected\n\n")) - .unwrap(); + tx.try_send(Bytes::from("data: connected\n\n")).unwrap(); self.clients.push(tx); Client(rx) @@ -114,7 +118,7 @@ impl Broadcaster { } // wrap Receiver in own type, with correct error type -struct Client(Receiver); +struct Client(ReceiverStream); impl Stream for Client { type Item = Result;