diff --git a/Cargo.toml b/Cargo.toml index ed798f7..e1db836 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "r2d2", "redis-session", "rustls", + "server-sent-events", "simple-auth-server", "state", "static_index", diff --git a/server-sent-events/Cargo.toml b/server-sent-events/Cargo.toml new file mode 100644 index 0000000..a21366d --- /dev/null +++ b/server-sent-events/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "server-sent-events" +version = "0.1.0" +authors = ["Arve Seljebu"] +workspace = ".." +edition = "2018" + +[dependencies] +actix-rt = "0.2" +actix-web = "1.0" +env_logger = "0.6" +futures = "0.1" +tokio = "0.1" diff --git a/server-sent-events/README.md b/server-sent-events/README.md new file mode 100644 index 0000000..804c5c9 --- /dev/null +++ b/server-sent-events/README.md @@ -0,0 +1,40 @@ +# actix-sse +Example of server-sent events, aka `EventSource`, with actix web. + +```sh +cargo run +``` + +Open http://localhost:8080/ with a browser, then send events with another HTTP client: + +```sh +curl localhost:8080/broadcast/my_message +``` + +*my_message* should appear in the browser with a timestamp. + +## Performance +This implementation serve thousand of clients on a 2013 macbook air without problems. + +Run [benchmark.js](benchmark.js) to benchmark your own system: + +```sh +$ node benchmark.js +Connected: 1000, connection time: 867 ms, total broadcast time: 23 ms^C⏎ +``` + +### Error *Too many open files* +You may be limited to a maximal number of connections (open file descriptors). Setting maximum number of open file descriptors to 2048: + +```sh +ulimit -n 2048 +``` + +Test maximum number of open connections with [drain.js](drain.js): + +```sh +$ node drain.js +Connections dropped: 5957, accepting connections: false^C⏎ +``` + +_Accepting connections_ indicates wheter resources for the server have been exhausted. \ No newline at end of file diff --git a/server-sent-events/benchmark.js b/server-sent-events/benchmark.js new file mode 100644 index 0000000..452d75d --- /dev/null +++ b/server-sent-events/benchmark.js @@ -0,0 +1,60 @@ +const http = require('http') + +const n = 1000; +let connected = 0; +let messages = 0; +let start = Date.now(); +let phase = 'connecting'; +let connection_time; +let broadcast_time; + +let message = process.argv[2] || 'msg'; +let expected_data = "data: " + message; + +for (let i = 0; i < n; i++) { + http.get({ + host: 'localhost', + port: 8080, + path: '/events' + }, response => { + response.on('data', data => { + if (data.includes(expected_data)) { + messages += 1; + } else if (data.includes("data: connected\n")) { + connected += 1; + } + }) + }).on('error', (_) => {}); +} + +setInterval(() => { + if (phase === 'connecting' && connected === n) { + // done connecting + phase = 'messaging'; + connection_time = Date.now() - start; + } + + if (phase === 'messaging') { + phase = 'waiting'; + start = Date.now(); + + http.get({ + host: 'localhost', + port: 8080, + path: '/broadcast/' + message + }, response => { + response.on('data', _ => {}) + }) + } + + if (phase === 'waiting' && messages >= n) { + // all messages received + broadcast_time = Date.now() - start; + phase = 'paused'; + messages = 0; + phase = 'messaging'; + } + + process.stdout.write("\r\x1b[K"); + process.stdout.write(`Connected: ${connected}, connection time: ${connection_time} ms, total broadcast time: ${broadcast_time} ms`); +}, 20) diff --git a/server-sent-events/drain.js b/server-sent-events/drain.js new file mode 100644 index 0000000..ae803c0 --- /dev/null +++ b/server-sent-events/drain.js @@ -0,0 +1,36 @@ +const http = require('http') + +let drop_goal = 10_000; +let dropped = 0; + +let query = { + host: 'localhost', + port: 8080, + path: '/events' +} + +setInterval(() => { + if (dropped < drop_goal) { + let request = http.get(query, response => { + response.on('data', data => { + if (data.includes("data: connected\n")) { + // drop connection after welcome message + dropped += 1; + request.abort() + } + }) + }) + .on('error', () => {}) + } +}, 1) + +setInterval(() => { + http.get('http://localhost:8080/', () => print_status(true)) + .setTimeout(100, () => print_status(false)) + .on('error', () => {}) +}, 20) + +function print_status(accepting_connections) { + process.stdout.write("\r\x1b[K"); + process.stdout.write(`Connections dropped: ${dropped}, accepting connections: ${accepting_connections}`); +} diff --git a/server-sent-events/src/index.html b/server-sent-events/src/index.html new file mode 100644 index 0000000..66c601c --- /dev/null +++ b/server-sent-events/src/index.html @@ -0,0 +1,27 @@ + + + + + + Server-sent events + + + +
+ + + diff --git a/server-sent-events/src/main.rs b/server-sent-events/src/main.rs new file mode 100644 index 0000000..aea38de --- /dev/null +++ b/server-sent-events/src/main.rs @@ -0,0 +1,128 @@ +use actix_rt::Arbiter; +use actix_web::error::ErrorInternalServerError; +use actix_web::web::{Bytes, Data, Path}; +use actix_web::{web, App, Error, HttpResponse, HttpServer, Responder}; + +use env_logger; +use tokio::prelude::*; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::timer::Interval; + +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +fn main() { + env_logger::init(); + let data = Broadcaster::create(); + + HttpServer::new(move || { + App::new() + .register_data(data.clone()) + .route("/", web::get().to(index)) + .route("/events", web::get().to(new_client)) + .route("/broadcast/{msg}", web::get().to(broadcast)) + }) + .bind("127.0.0.1:8080") + .expect("Unable to bind port") + .run() + .unwrap(); +} + +fn index() -> impl Responder { + let content = include_str!("index.html"); + + HttpResponse::Ok() + .header("content-type", "text/html") + .body(content) +} + +fn new_client(broadcaster: Data>) -> impl Responder { + let rx = broadcaster.lock().unwrap().new_client(); + + HttpResponse::Ok() + .header("content-type", "text/event-stream") + .no_chunking() + .streaming(rx) +} + +fn broadcast(msg: Path, broadcaster: Data>) -> impl Responder { + broadcaster.lock().unwrap().send(&msg.into_inner()); + + HttpResponse::Ok().body("msg sent") +} + +struct Broadcaster { + clients: Vec>, +} + +impl Broadcaster { + fn create() -> Data> { + // Data ≃ Arc + let me = Data::new(Mutex::new(Broadcaster::new())); + + // ping clients every 10 seconds to see if they are alive + Broadcaster::spawn_ping(me.clone()); + + me + } + + fn new() -> Self { + Broadcaster { + clients: Vec::new(), + } + } + + fn spawn_ping(me: Data>) { + let task = Interval::new(Instant::now(), Duration::from_secs(10)) + .for_each(move |_| { + me.lock().unwrap().remove_stale_clients(); + Ok(()) + }) + .map_err(|e| panic!("interval errored; err={:?}", e)); + + Arbiter::spawn(task); + } + + fn remove_stale_clients(&mut self) { + let mut ok_clients = Vec::new(); + for client in self.clients.iter() { + let result = client.clone().try_send(Bytes::from("data: ping\n\n")); + + if let Ok(()) = result { + ok_clients.push(client.clone()); + } + } + self.clients = ok_clients; + } + + fn new_client(&mut self) -> Client { + let (tx, rx) = channel(100); + + tx.clone() + .try_send(Bytes::from("data: connected\n\n")) + .unwrap(); + + self.clients.push(tx); + Client(rx) + } + + fn send(&self, msg: &str) { + let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); + + for client in self.clients.iter() { + client.clone().try_send(msg.clone()).unwrap_or(()); + } + } +} + +// wrap Receiver in own type, with correct error type +struct Client(Receiver); + +impl Stream for Client { + type Item = Bytes; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + self.0.poll().map_err(ErrorInternalServerError) + } +}