1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

use semantic SSE types

This commit is contained in:
Rob Ede 2022-08-07 01:24:40 +01:00
parent 087ac5ef24
commit 05e19266ad
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
6 changed files with 154 additions and 155 deletions

14
Cargo.lock generated
View File

@ -548,9 +548,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-web-lab" name = "actix-web-lab"
version = "0.16.7" version = "0.16.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8d29deb0fe8063f205804af50329301cfa45eee64ede25e436f75839ff79af7" checksum = "4caa655398cbfeecd57e639f997b62ee3090a4e88b974130035c2de41393363e"
dependencies = [ dependencies = [
"actix-files", "actix-files",
"actix-http", "actix-http",
@ -563,6 +563,7 @@ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 1.2.1", "bytes 1.2.1",
"bytestring",
"csv", "csv",
"derive_more", "derive_more",
"digest 0.10.3", "digest 0.10.3",
@ -730,9 +731,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.59" version = "1.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91f1f46651137be86f3a2b9a8359f9ab421d04d941c62b5982e1ca21113adf9" checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142"
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
@ -5224,9 +5225,9 @@ dependencies = [
[[package]] [[package]]
name = "rust_decimal" name = "rust_decimal"
version = "1.26.0" version = "1.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc129ab6000ab4037e7718703cdeab82a12c4ee23a238658f55372d80ef2b05" checksum = "ee9164faf726e4f3ece4978b25ca877ddc6802fa77f38cdccb32c7f805ecd70c"
dependencies = [ dependencies = [
"arrayvec 0.7.2", "arrayvec 0.7.2",
"num-traits", "num-traits",
@ -5604,6 +5605,7 @@ name = "server-sent-events"
version = "1.0.0" version = "1.0.0"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"actix-web-lab",
"env_logger 0.9.0", "env_logger 0.9.0",
"futures-util", "futures-util",
"log", "log",

View File

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
actix-web = "4" actix-web = "4"
actix-web-lab = "0.16.8"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.17", default-features = false, features = ["std"] } futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
log = "0.4" log = "0.4"

View File

@ -1,60 +1,73 @@
const http = require('http') const http = require('http')
const n = 120; const n = 120
let connected = 0; let connected = 0
let messages = 0; let messages = 0
let start = Date.now(); let start = Date.now()
let phase = 'connecting'; let phase = 'connecting'
let connection_time; let connection_time
let broadcast_time; let broadcast_time
let message = process.argv[2] || 'msg'; let message = process.argv[2] || 'msg'
let expected_data = "data: " + message; let expected_data = 'data: ' + message
for (let i = 0; i < n; i++) { for (let i = 0; i < n; i++) {
http.get({ http
.get(
{
host: '127.0.0.1', host: '127.0.0.1',
port: 8080, port: 8080,
path: '/events' path: '/events',
}, response => { },
response.on('data', data => { (response) => {
if (data.includes(expected_data)) { response.on('data', (data) => {
messages += 1; if (data.includes(expected_data)) {
} else if (data.includes("data: connected\n")) { messages += 1
connected += 1; } else if (data.includes('data: connected\n')) {
} connected += 1
}
}) })
}).on('error', (_) => {}); }
)
.on('error', (_) => {})
} }
setInterval(() => { setInterval(() => {
if (phase === 'connecting' && connected === n) { if (phase === 'connecting' && connected === n) {
// done connecting // done connecting
phase = 'messaging'; phase = 'messaging'
connection_time = Date.now() - start; connection_time = Date.now() - start
} }
if (phase === 'messaging') { if (phase === 'messaging') {
phase = 'waiting'; phase = 'waiting'
start = Date.now(); start = Date.now()
http.get({ http
host: '127.0.0.1', .request(
port: 8080, {
path: '/broadcast/' + message method: 'POST',
}, response => { host: '127.0.0.1',
response.on('data', _ => {}) port: 8080,
}) path: '/broadcast/' + message,
} },
(response) => {
response.on('data', (_) => {})
}
)
.end()
}
if (phase === 'waiting' && messages >= n) { if (phase === 'waiting' && messages >= n) {
// all messages received // all messages received
broadcast_time = Date.now() - start; broadcast_time = Date.now() - start
phase = 'paused'; phase = 'paused'
messages = 0; messages = 0
phase = 'messaging'; phase = 'messaging'
} }
process.stdout.write("\r\x1b[K"); process.stdout.write('\r\x1b[K')
process.stdout.write(`Connected: ${connected}, connection time: ${connection_time} ms, total broadcast time: ${broadcast_time} ms`); process.stdout.write(
`Connected: ${connected}, connection time: ${connection_time} ms, total broadcast time: ${broadcast_time} ms`
)
}, 20) }, 20)

View File

@ -1,36 +1,42 @@
const http = require('http') const http = require('http')
let drop_goal = 5_000; let drop_goal = 5_000
let dropped = 0; let dropped = 0
let query = { let query = {
host: '127.0.0.1', method: 'POST',
port: 8080, host: '127.0.0.1',
path: '/events' port: 8080,
path: '/events',
} }
setInterval(() => { setInterval(() => {
if (dropped < drop_goal) { if (dropped < drop_goal) {
let request = http.get(query, response => { let request = http
response.on('data', data => { .request(query, (response) => {
if (data.includes("data: connected\n")) { response.on('data', (data) => {
// drop connection after welcome message if (data.includes('data: connected\n')) {
dropped += 1; // drop connection after welcome message
request.abort() dropped += 1
} request.abort()
}) }
}) })
.on('error', () => {}) })
} .on('error', () => {})
.end()
}
}, 1) }, 1)
setInterval(() => { setInterval(() => {
http.get('http://127.0.0.1:8080/', () => print_status(true)) http
.setTimeout(100, () => print_status(false)) .post('http://127.0.0.1:8080/', () => print_status(true))
.on('error', () => {}) .setTimeout(100, () => print_status(false))
.on('error', () => {})
}, 20) }, 20)
function print_status(accepting_connections) { function print_status(accepting_connections) {
process.stdout.write("\r\x1b[K"); process.stdout.write('\r\x1b[K')
process.stdout.write(`Connections dropped: ${dropped}, accepting connections: ${accepting_connections}`); process.stdout.write(
`Connections dropped: ${dropped}, accepting connections: ${accepting_connections}`
)
} }

View File

@ -1,98 +1,78 @@
use std::{ use std::{sync::Arc, time::Duration};
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use actix_web::{ use actix_web::rt::time::interval;
rt::time::{interval_at, Instant}, use actix_web_lab::sse::{sse, Sse, SseSender};
web::{Bytes, Data}, use futures_util::future;
Error,
};
use futures_util::Stream;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc::{channel, Receiver, Sender};
pub struct Broadcaster { pub struct Broadcaster {
inner: Mutex<BroadcasterInner>, inner: Mutex<BroadcasterInner>,
} }
#[derive(Debug, Clone, Default)]
struct BroadcasterInner { struct BroadcasterInner {
clients: Vec<Sender<Bytes>>, clients: Vec<SseSender>,
} }
impl Broadcaster { impl Broadcaster {
pub fn create() -> Data<Self> { /// Constructs new broadcaster and spawns ping loop.
// Data ~≃ Arc pub fn create() -> Arc<Self> {
let me = Data::new(Broadcaster { let this = Arc::new(Broadcaster {
inner: Mutex::new(BroadcasterInner { inner: Mutex::new(BroadcasterInner::default()),
clients: Vec::new(),
}),
}); });
// ping clients every 10 seconds to see if they are alive Broadcaster::spawn_ping(Arc::clone(&this));
Broadcaster::spawn_ping(me.clone());
me this
} }
fn spawn_ping(me: Data<Self>) { /// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast
/// list if not.
fn spawn_ping(this: Arc<Self>) {
actix_web::rt::spawn(async move { actix_web::rt::spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(10)); let mut interval = interval(Duration::from_secs(10));
loop { loop {
interval.tick().await; interval.tick().await;
me.remove_stale_clients(); this.remove_stale_clients().await;
} }
}); });
} }
fn remove_stale_clients(&self) { /// Removes all non-responsive clients from broadcast list.
let mut inner = self.inner.lock(); async fn remove_stale_clients(&self) {
let clients = self.inner.lock().clients.clone();
let mut ok_clients = Vec::new(); let mut ok_clients = Vec::new();
for client in inner.clients.iter() {
let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
if let Ok(()) = result { for client in clients {
if client.comment("ping").await.is_ok() {
ok_clients.push(client.clone()); ok_clients.push(client.clone());
} }
} }
inner.clients = ok_clients;
self.inner.lock().clients = ok_clients;
} }
pub fn new_client(&self) -> Client { /// Registers client with broadcaster, returning an SSE response body.
let (tx, rx) = channel(100); pub async fn new_client(&self) -> Sse {
let (tx, rx) = sse(10);
tx.try_send(Bytes::from("data: connected\n\n")).unwrap(); tx.data("connected").await.unwrap();
let mut inner = self.inner.lock(); self.inner.lock().clients.push(tx);
inner.clients.push(tx);
Client(rx) rx
} }
pub fn send(&self, msg: &str) { /// Broadcasts `msg` to all clients.
let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); pub async fn broadcast(&self, msg: &str) {
let clients = self.inner.lock().clients.clone();
let inner = self.inner.lock(); let send_futures = clients.iter().map(|client| client.data(msg));
for client in inner.clients.iter() {
client.clone().try_send(msg.clone()).unwrap_or(()); // try to send to all clients, ignoring failures
} // disconnected clients will get swept up by `remove_stale_clients`
} let _ = future::join_all(send_futures).await;
}
// wrap Receiver in own type, with correct error type
pub struct Client(Receiver<Bytes>);
impl Stream for Client {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_recv(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
} }
} }

View File

@ -1,15 +1,13 @@
use actix_web::{ use std::{io, sync::Arc};
http::header::{self, ContentType},
middleware, use actix_web::{get, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder};
web::{self, Data, Path}, use actix_web_lab::{extract::Path, respond::Html};
App, HttpResponse, HttpServer, Responder,
};
mod broadcast; mod broadcast;
use broadcast::Broadcaster; use self::broadcast::Broadcaster;
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let data = Broadcaster::create(); let data = Broadcaster::create();
@ -18,34 +16,33 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(data.clone()) .app_data(web::Data::from(Arc::clone(&data)))
.wrap(middleware::Logger::default()) .service(index)
.route("/", web::get().to(index)) .service(event_stream)
.route("/events", web::get().to(new_client)) .service(broadcast_msg)
.route("/broadcast/{msg}", web::get().to(broadcast)) .wrap(Logger::default())
}) })
.bind(("127.0.0.1", 8080))? .bind(("127.0.0.1", 8080))?
.workers(2)
.run() .run()
.await .await
} }
#[get("/")]
async fn index() -> impl Responder { async fn index() -> impl Responder {
let index_html = include_str!("index.html"); Html(include_str!("index.html").to_string())
HttpResponse::Ok()
.append_header(ContentType::html())
.body(index_html)
} }
async fn new_client(broadcaster: Data<Broadcaster>) -> impl Responder { #[get("/events")]
let rx = broadcaster.new_client(); async fn event_stream(broadcaster: web::Data<Broadcaster>) -> impl Responder {
broadcaster.new_client().await
HttpResponse::Ok()
.append_header((header::CONTENT_TYPE, "text/event-stream"))
.streaming(rx)
} }
async fn broadcast(msg: Path<String>, broadcaster: Data<Broadcaster>) -> impl Responder { #[post("/broadcast/{msg}")]
broadcaster.send(&msg.into_inner()); async fn broadcast_msg(
broadcaster: web::Data<Broadcaster>,
Path((msg,)): Path<(String,)>,
) -> impl Responder {
broadcaster.broadcast(&msg).await;
HttpResponse::Ok().body("msg sent") HttpResponse::Ok().body("msg sent")
} }