1
0
mirror of https://github.com/actix/examples synced 2025-06-28 18:00:37 +02:00

clean up sse example

This commit is contained in:
Rob Ede
2022-02-14 01:37:51 +00:00
parent c84c16e8b7
commit 7ea3f7f54a
7 changed files with 278 additions and 190 deletions

View File

@ -6,6 +6,8 @@ edition = "2021"
[dependencies]
actix-web = "4.0.0-rc.3"
env_logger = "0.9"
futures = "0.3.1"
tokio = { version = "1.16.1", features = ["sync"] }
futures-util = { version = "0.3.7", default-features = false, features = ["std"] }
log = "0.4"
parking_lot = "0.12"
tokio = { version = "1.16", features = ["sync"] }
tokio-stream = { version = "0.1.8", features = ["time"] }

View File

@ -15,7 +15,7 @@ curl 127.0.0.1:8080/broadcast/my_message
*my_message* should appear in the browser with a timestamp.
## Performance
This implementation serve thousands of clients on a 2021 macbook with no problems.
This implementation can serve thousands of clients on a 2021 MacBook with no problems.
Run [benchmark.js](benchmark.js) to benchmark your own system:
@ -38,4 +38,4 @@ $ node drain.js
Connections dropped: 10450, accepting connections: false^C⏎
```
_Accepting connections_ indicates whether resources for the server have been exhausted.
_Accepting connections_ indicates whether resources for the server have been exhausted.

View File

@ -1,6 +1,6 @@
const http = require('http')
const n = 1000;
const n = 120;
let connected = 0;
let messages = 0;
let start = Date.now();
@ -8,7 +8,7 @@ let phase = 'connecting';
let connection_time;
let broadcast_time;
let message = process.argv[2] || 'msg';
let message = process.argv[2] || 'msg';
let expected_data = "data: " + message;
for (let i = 0; i < n; i++) {

View File

@ -1,6 +1,6 @@
const http = require('http')
let drop_goal = 100_000;
let drop_goal = 5_000;
let dropped = 0;
let query = {

View File

@ -0,0 +1,98 @@
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use actix_web::{
rt::time::{interval_at, Instant},
web::{Bytes, Data},
Error,
};
use futures_util::{Stream, StreamExt as _};
use parking_lot::Mutex;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
pub struct Broadcaster {
clients: Vec<Sender<Bytes>>,
}
impl Broadcaster {
pub fn create() -> Data<Mutex<Self>> {
// Data ~≃ Arc
let me = Data::new(Mutex::new(Broadcaster::new()));
// ping clients every 10 seconds to see if they are alive
Broadcaster::spawn_ping(me.clone());
me
}
fn new() -> Self {
Broadcaster {
clients: Vec::new(),
}
}
fn spawn_ping(me: Data<Mutex<Self>>) {
actix_web::rt::spawn(async move {
let mut task = IntervalStream::new(interval_at(
Instant::now(),
Duration::from_secs(10),
));
while task.next().await.is_some() {
me.lock().remove_stale_clients();
}
});
}
fn remove_stale_clients(&mut self) {
let mut ok_clients = Vec::new();
for client in self.clients.iter() {
let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
if let Ok(()) = result {
ok_clients.push(client.clone());
}
}
self.clients = ok_clients;
}
pub fn new_client(&mut self) -> Client {
let (tx, rx) = channel(100);
let rx = ReceiverStream::new(rx);
tx.try_send(Bytes::from("data: connected\n\n")).unwrap();
self.clients.push(tx);
Client(rx)
}
pub fn send(&self, msg: &str) {
let msg = Bytes::from(["data: ", msg, "\n\n"].concat());
for client in self.clients.iter() {
client.clone().try_send(msg.clone()).unwrap_or(());
}
}
}
// wrap Receiver in own type, with correct error type
pub struct Client(ReceiverStream<Bytes>);
impl Stream for Client {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -1,20 +1,22 @@
use actix_web::rt::time::{interval_at, Instant};
use actix_web::web::{Bytes, Data, Path};
use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer, Responder};
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::sync::Mutex;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
use actix_web::{
http::header::{self, ContentType},
middleware,
web::{self, Data, Path},
App, HttpResponse, HttpServer, Responder,
};
use parking_lot::Mutex;
mod broadcast;
use broadcast::Broadcaster;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_web=trace");
env_logger::init();
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let data = Broadcaster::create();
log::info!("starting HTTP server at http://localhost:8080");
HttpServer::new(move || {
App::new()
.app_data(data.clone())
@ -23,24 +25,24 @@ async fn main() -> std::io::Result<()> {
.route("/events", web::get().to(new_client))
.route("/broadcast/{msg}", web::get().to(broadcast))
})
.bind("0.0.0.0:8080")?
.bind(("127.0.0.1", 8080))?
.run()
.await
}
async fn index() -> impl Responder {
let content = include_str!("index.html");
let index_html = include_str!("index.html");
HttpResponse::Ok()
.append_header(("content-type", "text/html"))
.body(content)
.append_header(ContentType::html())
.body(index_html)
}
async fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder {
let rx = broadcaster.lock().unwrap().new_client();
let rx = broadcaster.lock().new_client();
HttpResponse::Ok()
.append_header(("content-type", "text/event-stream"))
.append_header((header::CONTENT_TYPE, "text/event-stream"))
.streaming(rx)
}
@ -48,89 +50,6 @@ async fn broadcast(
msg: Path<String>,
broadcaster: Data<Mutex<Broadcaster>>,
) -> impl Responder {
broadcaster.lock().unwrap().send(&msg.into_inner());
broadcaster.lock().send(&msg.into_inner());
HttpResponse::Ok().body("msg sent")
}
struct Broadcaster {
clients: Vec<Sender<Bytes>>,
}
impl Broadcaster {
fn create() -> Data<Mutex<Self>> {
// Data ≃ Arc
let me = Data::new(Mutex::new(Broadcaster::new()));
// ping clients every 10 seconds to see if they are alive
Broadcaster::spawn_ping(me.clone());
me
}
fn new() -> Self {
Broadcaster {
clients: Vec::new(),
}
}
fn spawn_ping(me: Data<Mutex<Self>>) {
actix_web::rt::spawn(async move {
let mut task = IntervalStream::new(interval_at(
Instant::now(),
Duration::from_secs(10),
));
while task.next().await.is_some() {
me.lock().unwrap().remove_stale_clients();
}
});
}
fn remove_stale_clients(&mut self) {
let mut ok_clients = Vec::new();
for client in self.clients.iter() {
let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
if let Ok(()) = result {
ok_clients.push(client.clone());
}
}
self.clients = ok_clients;
}
fn new_client(&mut self) -> Client {
let (tx, rx) = channel(100);
let rx = ReceiverStream::new(rx);
tx.try_send(Bytes::from("data: connected\n\n")).unwrap();
self.clients.push(tx);
Client(rx)
}
fn send(&self, msg: &str) {
let msg = Bytes::from(["data: ", msg, "\n\n"].concat());
for client in self.clients.iter() {
client.clone().try_send(msg.clone()).unwrap_or(());
}
}
}
// wrap Receiver in own type, with correct error type
struct Client(ReceiverStream<Bytes>);
impl Stream for Client {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}