mirror of
https://github.com/actix/examples
synced 2025-08-30 17:50:21 +02:00
restructure folders
This commit is contained in:
13
server-sent-events/Cargo.toml
Normal file
13
server-sent-events/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "server-sent-events"
|
||||
version = "1.0.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
actix-web = "4.0.0-rc.3"
|
||||
env_logger = "0.9"
|
||||
futures-util = { version = "0.3.7", default-features = false, features = ["std"] }
|
||||
log = "0.4"
|
||||
parking_lot = "0.12"
|
||||
tokio = { version = "1.16", features = ["sync"] }
|
||||
tokio-stream = { version = "0.1.8", features = ["time"] }
|
41
server-sent-events/README.md
Normal file
41
server-sent-events/README.md
Normal file
@@ -0,0 +1,41 @@
|
||||
# actix-sse
|
||||
Example of server-sent events, aka `EventSource`, with actix web.
|
||||
|
||||
```sh
|
||||
cd other/server-sent-events
|
||||
cargo run
|
||||
```
|
||||
|
||||
Open http://127.0.0.1:8080/ with a browser, then send events with another HTTP client:
|
||||
|
||||
```sh
|
||||
curl 127.0.0.1:8080/broadcast/my_message
|
||||
```
|
||||
|
||||
*my_message* should appear in the browser with a timestamp.
|
||||
|
||||
## Performance
|
||||
This implementation can serve thousands of clients on a 2021 MacBook with no problems.
|
||||
|
||||
Run [benchmark.js](benchmark.js) to benchmark your own system:
|
||||
|
||||
```sh
|
||||
$ node benchmark.js
|
||||
Connected: 1000, connection time: 201 ms, total broadcast time: 20 ms^C⏎
|
||||
```
|
||||
|
||||
### Error *Too many open files*
|
||||
You may be limited to a maximal number of connections (open file descriptors). Setting maximum number of open file descriptors to 2048:
|
||||
|
||||
```sh
|
||||
ulimit -n 2048
|
||||
```
|
||||
|
||||
Test maximum number of open connections with [drain.js](drain.js):
|
||||
|
||||
```sh
|
||||
$ node drain.js
|
||||
Connections dropped: 10450, accepting connections: false^C⏎
|
||||
```
|
||||
|
||||
_Accepting connections_ indicates whether resources for the server have been exhausted.
|
60
server-sent-events/benchmark.js
Normal file
60
server-sent-events/benchmark.js
Normal file
@@ -0,0 +1,60 @@
|
||||
const http = require('http')
|
||||
|
||||
const n = 120;
|
||||
let connected = 0;
|
||||
let messages = 0;
|
||||
let start = Date.now();
|
||||
let phase = 'connecting';
|
||||
let connection_time;
|
||||
let broadcast_time;
|
||||
|
||||
let message = process.argv[2] || 'msg';
|
||||
let expected_data = "data: " + message;
|
||||
|
||||
for (let i = 0; i < n; i++) {
|
||||
http.get({
|
||||
host: '127.0.0.1',
|
||||
port: 8080,
|
||||
path: '/events'
|
||||
}, response => {
|
||||
response.on('data', data => {
|
||||
if (data.includes(expected_data)) {
|
||||
messages += 1;
|
||||
} else if (data.includes("data: connected\n")) {
|
||||
connected += 1;
|
||||
}
|
||||
})
|
||||
}).on('error', (_) => {});
|
||||
}
|
||||
|
||||
setInterval(() => {
|
||||
if (phase === 'connecting' && connected === n) {
|
||||
// done connecting
|
||||
phase = 'messaging';
|
||||
connection_time = Date.now() - start;
|
||||
}
|
||||
|
||||
if (phase === 'messaging') {
|
||||
phase = 'waiting';
|
||||
start = Date.now();
|
||||
|
||||
http.get({
|
||||
host: '127.0.0.1',
|
||||
port: 8080,
|
||||
path: '/broadcast/' + message
|
||||
}, response => {
|
||||
response.on('data', _ => {})
|
||||
})
|
||||
}
|
||||
|
||||
if (phase === 'waiting' && messages >= n) {
|
||||
// all messages received
|
||||
broadcast_time = Date.now() - start;
|
||||
phase = 'paused';
|
||||
messages = 0;
|
||||
phase = 'messaging';
|
||||
}
|
||||
|
||||
process.stdout.write("\r\x1b[K");
|
||||
process.stdout.write(`Connected: ${connected}, connection time: ${connection_time} ms, total broadcast time: ${broadcast_time} ms`);
|
||||
}, 20)
|
36
server-sent-events/drain.js
Normal file
36
server-sent-events/drain.js
Normal file
@@ -0,0 +1,36 @@
|
||||
const http = require('http')
|
||||
|
||||
let drop_goal = 5_000;
|
||||
let dropped = 0;
|
||||
|
||||
let query = {
|
||||
host: '127.0.0.1',
|
||||
port: 8080,
|
||||
path: '/events'
|
||||
}
|
||||
|
||||
setInterval(() => {
|
||||
if (dropped < drop_goal) {
|
||||
let request = http.get(query, response => {
|
||||
response.on('data', data => {
|
||||
if (data.includes("data: connected\n")) {
|
||||
// drop connection after welcome message
|
||||
dropped += 1;
|
||||
request.abort()
|
||||
}
|
||||
})
|
||||
})
|
||||
.on('error', () => {})
|
||||
}
|
||||
}, 1)
|
||||
|
||||
setInterval(() => {
|
||||
http.get('http://127.0.0.1:8080/', () => print_status(true))
|
||||
.setTimeout(100, () => print_status(false))
|
||||
.on('error', () => {})
|
||||
}, 20)
|
||||
|
||||
function print_status(accepting_connections) {
|
||||
process.stdout.write("\r\x1b[K");
|
||||
process.stdout.write(`Connections dropped: ${dropped}, accepting connections: ${accepting_connections}`);
|
||||
}
|
98
server-sent-events/src/broadcast.rs
Normal file
98
server-sent-events/src/broadcast.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use actix_web::{
|
||||
rt::time::{interval_at, Instant},
|
||||
web::{Bytes, Data},
|
||||
Error,
|
||||
};
|
||||
use futures_util::{Stream, StreamExt as _};
|
||||
use parking_lot::Mutex;
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
|
||||
|
||||
pub struct Broadcaster {
|
||||
clients: Vec<Sender<Bytes>>,
|
||||
}
|
||||
|
||||
impl Broadcaster {
|
||||
pub fn create() -> Data<Mutex<Self>> {
|
||||
// Data ~≃ Arc
|
||||
let me = Data::new(Mutex::new(Broadcaster::new()));
|
||||
|
||||
// ping clients every 10 seconds to see if they are alive
|
||||
Broadcaster::spawn_ping(me.clone());
|
||||
|
||||
me
|
||||
}
|
||||
|
||||
fn new() -> Self {
|
||||
Broadcaster {
|
||||
clients: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_ping(me: Data<Mutex<Self>>) {
|
||||
actix_web::rt::spawn(async move {
|
||||
let mut task = IntervalStream::new(interval_at(
|
||||
Instant::now(),
|
||||
Duration::from_secs(10),
|
||||
));
|
||||
|
||||
while task.next().await.is_some() {
|
||||
me.lock().remove_stale_clients();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn remove_stale_clients(&mut self) {
|
||||
let mut ok_clients = Vec::new();
|
||||
for client in self.clients.iter() {
|
||||
let result = client.clone().try_send(Bytes::from("data: ping\n\n"));
|
||||
|
||||
if let Ok(()) = result {
|
||||
ok_clients.push(client.clone());
|
||||
}
|
||||
}
|
||||
self.clients = ok_clients;
|
||||
}
|
||||
|
||||
pub fn new_client(&mut self) -> Client {
|
||||
let (tx, rx) = channel(100);
|
||||
let rx = ReceiverStream::new(rx);
|
||||
|
||||
tx.try_send(Bytes::from("data: connected\n\n")).unwrap();
|
||||
|
||||
self.clients.push(tx);
|
||||
Client(rx)
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: &str) {
|
||||
let msg = Bytes::from(["data: ", msg, "\n\n"].concat());
|
||||
|
||||
for client in self.clients.iter() {
|
||||
client.clone().try_send(msg.clone()).unwrap_or(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wrap Receiver in own type, with correct error type
|
||||
pub struct Client(ReceiverStream<Bytes>);
|
||||
|
||||
impl Stream for Client {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
match Pin::new(&mut self.0).poll_next(cx) {
|
||||
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
28
server-sent-events/src/index.html
Normal file
28
server-sent-events/src/index.html
Normal file
@@ -0,0 +1,28 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<meta http-equiv="X-UA-Compatible" content="ie=edge">
|
||||
<title>Server-sent events</title>
|
||||
<style>
|
||||
p {
|
||||
margin-top: 0.5em;
|
||||
margin-bottom: 0.5em;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div id="root"></div>
|
||||
<script>
|
||||
let root = document.getElementById("root");
|
||||
let events = new EventSource("/events");
|
||||
events.onmessage = (event) => {
|
||||
let data = document.createElement("p");
|
||||
let time = new Date().toLocaleTimeString();
|
||||
data.innerText = time + ": " + event.data;
|
||||
root.appendChild(data);
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
55
server-sent-events/src/main.rs
Normal file
55
server-sent-events/src/main.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use actix_web::{
|
||||
http::header::{self, ContentType},
|
||||
middleware,
|
||||
web::{self, Data, Path},
|
||||
App, HttpResponse, HttpServer, Responder,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
mod broadcast;
|
||||
use broadcast::Broadcaster;
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||
|
||||
let data = Broadcaster::create();
|
||||
|
||||
log::info!("starting HTTP server at http://localhost:8080");
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.app_data(data.clone())
|
||||
.wrap(middleware::Logger::default())
|
||||
.route("/", web::get().to(index))
|
||||
.route("/events", web::get().to(new_client))
|
||||
.route("/broadcast/{msg}", web::get().to(broadcast))
|
||||
})
|
||||
.bind(("127.0.0.1", 8080))?
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn index() -> impl Responder {
|
||||
let index_html = include_str!("index.html");
|
||||
|
||||
HttpResponse::Ok()
|
||||
.append_header(ContentType::html())
|
||||
.body(index_html)
|
||||
}
|
||||
|
||||
async fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder {
|
||||
let rx = broadcaster.lock().new_client();
|
||||
|
||||
HttpResponse::Ok()
|
||||
.append_header((header::CONTENT_TYPE, "text/event-stream"))
|
||||
.streaming(rx)
|
||||
}
|
||||
|
||||
async fn broadcast(
|
||||
msg: Path<String>,
|
||||
broadcaster: Data<Mutex<Broadcaster>>,
|
||||
) -> impl Responder {
|
||||
broadcaster.lock().send(&msg.into_inner());
|
||||
HttpResponse::Ok().body("msg sent")
|
||||
}
|
Reference in New Issue
Block a user