1
0
mirror of https://github.com/actix/examples synced 2025-02-17 07:23:29 +01:00

Updated others/server-sent-events to v4. (#525)

This commit is contained in:
Christopher Gubbin 2022-02-14 01:16:18 +00:00 committed by GitHub
parent 568a8e9dfa
commit c84c16e8b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 35 additions and 29 deletions

View File

@ -4,7 +4,8 @@ version = "1.0.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
actix-web = "3" actix-web = "4.0.0-rc.3"
env_logger = "0.8" env_logger = "0.9"
futures = "0.3.1" futures = "0.3.1"
tokio = { version = "0.2", features = ["sync"] } tokio = { version = "1.16.1", features = ["sync"] }
tokio-stream = { version = "0.1.8", features = ["time"] }

View File

@ -6,22 +6,22 @@ cd other/server-sent-events
cargo run cargo run
``` ```
Open http://localhost:8080/ with a browser, then send events with another HTTP client: Open http://127.0.0.1:8080/ with a browser, then send events with another HTTP client:
```sh ```sh
curl localhost:8080/broadcast/my_message curl 127.0.0.1:8080/broadcast/my_message
``` ```
*my_message* should appear in the browser with a timestamp. *my_message* should appear in the browser with a timestamp.
## Performance ## Performance
This implementation serve thousands of clients on a 2013 macbook air without problems. This implementation serve thousands of clients on a 2021 macbook with no problems.
Run [benchmark.js](benchmark.js) to benchmark your own system: Run [benchmark.js](benchmark.js) to benchmark your own system:
```sh ```sh
$ node benchmark.js $ node benchmark.js
Connected: 1000, connection time: 867 ms, total broadcast time: 23 ms^C⏎ Connected: 1000, connection time: 201 ms, total broadcast time: 20 ms^C⏎
``` ```
### Error *Too many open files* ### Error *Too many open files*
@ -35,7 +35,7 @@ Test maximum number of open connections with [drain.js](drain.js):
```sh ```sh
$ node drain.js $ node drain.js
Connections dropped: 5957, accepting connections: false^C⏎ Connections dropped: 10450, accepting connections: false^C⏎
``` ```
_Accepting connections_ indicates whether resources for the server have been exhausted. _Accepting connections_ indicates whether resources for the server have been exhausted.

View File

@ -1,6 +1,6 @@
const http = require('http') const http = require('http')
const n = 100; const n = 1000;
let connected = 0; let connected = 0;
let messages = 0; let messages = 0;
let start = Date.now(); let start = Date.now();
@ -13,7 +13,7 @@ let expected_data = "data: " + message;
for (let i = 0; i < n; i++) { for (let i = 0; i < n; i++) {
http.get({ http.get({
host: 'localhost', host: '127.0.0.1',
port: 8080, port: 8080,
path: '/events' path: '/events'
}, response => { }, response => {
@ -39,7 +39,7 @@ setInterval(() => {
start = Date.now(); start = Date.now();
http.get({ http.get({
host: 'localhost', host: '127.0.0.1',
port: 8080, port: 8080,
path: '/broadcast/' + message path: '/broadcast/' + message
}, response => { }, response => {

View File

@ -1,10 +1,10 @@
const http = require('http') const http = require('http')
let drop_goal = 10_000; let drop_goal = 100_000;
let dropped = 0; let dropped = 0;
let query = { let query = {
host: 'localhost', host: '127.0.0.1',
port: 8080, port: 8080,
path: '/events' path: '/events'
} }
@ -25,7 +25,7 @@ setInterval(() => {
}, 1) }, 1)
setInterval(() => { setInterval(() => {
http.get('http://localhost:8080/', () => print_status(true)) http.get('http://127.0.0.1:8080/', () => print_status(true))
.setTimeout(100, () => print_status(false)) .setTimeout(100, () => print_status(false))
.on('error', () => {}) .on('error', () => {})
}, 20) }, 20)

View File

@ -1,3 +1,4 @@
<!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">

View File

@ -1,27 +1,29 @@
use actix_web::rt::time::{interval_at, Instant};
use actix_web::web::{Bytes, Data, Path};
use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer, Responder};
use futures::{Stream, StreamExt};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc::{channel, Sender};
use actix_web::rt::time::{interval_at, Instant}; use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
use actix_web::web::{Bytes, Data, Path};
use actix_web::{web, App, Error, HttpResponse, HttpServer, Responder};
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::{channel, Receiver, Sender};
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_web=trace");
env_logger::init(); env_logger::init();
let data = Broadcaster::create(); let data = Broadcaster::create();
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(data.clone()) .app_data(data.clone())
.wrap(middleware::Logger::default())
.route("/", web::get().to(index)) .route("/", web::get().to(index))
.route("/events", web::get().to(new_client)) .route("/events", web::get().to(new_client))
.route("/broadcast/{msg}", web::get().to(broadcast)) .route("/broadcast/{msg}", web::get().to(broadcast))
}) })
.bind("127.0.0.1:8080")? .bind("0.0.0.0:8080")?
.run() .run()
.await .await
} }
@ -30,7 +32,7 @@ async fn index() -> impl Responder {
let content = include_str!("index.html"); let content = include_str!("index.html");
HttpResponse::Ok() HttpResponse::Ok()
.header("content-type", "text/html") .append_header(("content-type", "text/html"))
.body(content) .body(content)
} }
@ -38,7 +40,7 @@ async fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder {
let rx = broadcaster.lock().unwrap().new_client(); let rx = broadcaster.lock().unwrap().new_client();
HttpResponse::Ok() HttpResponse::Ok()
.header("content-type", "text/event-stream") .append_header(("content-type", "text/event-stream"))
.streaming(rx) .streaming(rx)
} }
@ -74,11 +76,14 @@ impl Broadcaster {
fn spawn_ping(me: Data<Mutex<Self>>) { fn spawn_ping(me: Data<Mutex<Self>>) {
actix_web::rt::spawn(async move { actix_web::rt::spawn(async move {
let mut task = interval_at(Instant::now(), Duration::from_secs(10)); let mut task = IntervalStream::new(interval_at(
Instant::now(),
Duration::from_secs(10),
));
while task.next().await.is_some() { while task.next().await.is_some() {
me.lock().unwrap().remove_stale_clients(); me.lock().unwrap().remove_stale_clients();
} }
}) });
} }
fn remove_stale_clients(&mut self) { fn remove_stale_clients(&mut self) {
@ -95,10 +100,9 @@ impl Broadcaster {
fn new_client(&mut self) -> Client { fn new_client(&mut self) -> Client {
let (tx, rx) = channel(100); let (tx, rx) = channel(100);
let rx = ReceiverStream::new(rx);
tx.clone() tx.try_send(Bytes::from("data: connected\n\n")).unwrap();
.try_send(Bytes::from("data: connected\n\n"))
.unwrap();
self.clients.push(tx); self.clients.push(tx);
Client(rx) Client(rx)
@ -114,7 +118,7 @@ impl Broadcaster {
} }
// wrap Receiver in own type, with correct error type // wrap Receiver in own type, with correct error type
struct Client(Receiver<Bytes>); struct Client(ReceiverStream<Bytes>);
impl Stream for Client { impl Stream for Client {
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;