From 7ea3f7f54a94f1ce9ccbb253a5ce60dc2b93338d Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 14 Feb 2022 01:37:51 +0000 Subject: [PATCH] clean up sse example --- Cargo.lock | 231 ++++++++++++++-------- other/server-sent-events/Cargo.toml | 6 +- other/server-sent-events/README.md | 4 +- other/server-sent-events/benchmark.js | 4 +- other/server-sent-events/drain.js | 2 +- other/server-sent-events/src/broadcast.rs | 98 +++++++++ other/server-sent-events/src/main.rs | 123 ++---------- 7 files changed, 278 insertions(+), 190 deletions(-) create mode 100644 other/server-sent-events/src/broadcast.rs diff --git a/Cargo.lock b/Cargo.lock index 6e6f7d5..5275ff0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,7 +45,7 @@ dependencies = [ "futures-util", "log", "once_cell", - "parking_lot", + "parking_lot 0.11.2", "pin-project 0.4.29", "smallvec", "tokio 0.2.25", @@ -70,7 +70,7 @@ dependencies = [ "futures-util", "log", "once_cell", - "parking_lot", + "parking_lot 0.11.2", "pin-project-lite 0.2.8", "smallvec", "tokio 1.16.1", @@ -92,7 +92,7 @@ dependencies = [ name = "actix-casbin-example" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "casbin", "loge", "tokio 1.16.1", @@ -170,7 +170,7 @@ checksum = "d4f1bd0e31c745df129f0e94efd374d21f2a455bcc386c15d78ed9a9e7d4dd50" dependencies = [ "actix-service 2.0.2", "actix-utils 3.0.0", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "derive_more", "futures-util", "log", @@ -225,10 +225,10 @@ version = "0.6.0-beta.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b49f1b48724a52605ba40b67ede24f5a6cbc246817f9278d280d393a28e8b0e" dependencies = [ - "actix-http 3.0.0-rc.1", + "actix-http 3.0.0-rc.2", "actix-service 2.0.2", "actix-utils 3.0.0", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "askama_escape", "bitflags", "bytes 1.1.0", @@ -292,9 +292,9 @@ dependencies = [ [[package]] name = "actix-http" -version = "3.0.0-rc.1" +version = "3.0.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08aac516b88cb8cfbfa834c76b58607ffac75946d947dcb6a9ffc5673e1e875d" +checksum = "fb0185d65352deeea60d92231708068c04dc64f1ab307a1a307206a47d5a45d3" dependencies = [ "actix-codec 0.4.2", "actix-rt 2.6.0", @@ -362,7 +362,7 @@ checksum = "7f084963856cf7990b1f21d6298626de4ae6178385cadece312e12c9f7a9f432" dependencies = [ "actix-service 2.0.2", "actix-utils 3.0.0", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "futures-util", "serde 1.0.136", "serde_json", @@ -396,7 +396,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59b1f14a8b2bc14df9be544d173f5390da5b62d531e406fd0f0ce9b825fea5a" dependencies = [ "actix-utils 3.0.0", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "bytes 1.1.0", "derive_more", "futures-core", @@ -458,7 +458,7 @@ dependencies = [ "actix-service 2.0.2", "actix-session 0.5.0-beta.7", "actix-tls 3.0.2", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "backoff", "derive_more", "futures-core", @@ -608,7 +608,7 @@ checksum = "8984fa6256873b5075143ef662898ed6699bcd0ca094d8a6e40048c12af8d591" dependencies = [ "actix-service 2.0.2", "actix-utils 3.0.0", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "derive_more", "futures-util", "log", @@ -624,12 +624,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "386598d02615bd976b19f05bbfad0d3a559bfdb6af658131889114672d689858" dependencies = [ "actix-codec 0.4.2", - "actix-http 3.0.0-rc.1", + "actix-http 3.0.0-rc.2", "actix-http-test", "actix-rt 2.6.0", "actix-service 2.0.2", "actix-utils 3.0.0", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "awc 3.0.0-beta.20", "futures-core", "futures-util", @@ -665,7 +665,7 @@ dependencies = [ "lazy_static", "log", "num_cpus", - "parking_lot", + "parking_lot 0.11.2", "threadpool", ] @@ -773,12 +773,12 @@ dependencies = [ [[package]] name = "actix-web" -version = "4.0.0-rc.2" +version = "4.0.0-rc.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73170d019de2d82c0d826c1f315c3106134bd764e9247505ba6f0d78d22dfe9e" +checksum = "83e3c85bc4116b69913b03f16cff8cade1212508fcd321847d9cfe3d3e41f991" dependencies = [ "actix-codec 0.4.2", - "actix-http 3.0.0-rc.1", + "actix-http 3.0.0-rc.2", "actix-macros 0.2.3", "actix-router 0.5.0-rc.3", "actix-rt 2.6.0", @@ -855,7 +855,7 @@ name = "actix-web-cors" version = "1.0.0" dependencies = [ "actix-cors 0.6.0-beta.8", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "futures", "serde 1.0.136", @@ -1263,7 +1263,7 @@ dependencies = [ name = "async_data_factory" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "num_cpus", "redis", "redis_tang", @@ -1273,7 +1273,7 @@ dependencies = [ name = "async_db" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "futures-util", "log", @@ -1288,7 +1288,7 @@ dependencies = [ name = "async_ex2" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "serde 1.0.136", "serde_json", @@ -1298,7 +1298,7 @@ dependencies = [ name = "async_pg" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "config", "deadpool-postgres", "derive_more", @@ -1366,7 +1366,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a64a95bbf4905fd057ea45b70fdfeeb9c7ae09f03f091642c4e66a16ebb3ad5" dependencies = [ "actix-codec 0.4.2", - "actix-http 3.0.0-rc.1", + "actix-http 3.0.0-rc.2", "actix-rt 2.6.0", "actix-service 2.0.2", "actix-tls 3.0.2", @@ -1398,7 +1398,7 @@ dependencies = [ name = "awc_https" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "awc 3.0.0-beta.20", "env_logger 0.9.0", "log", @@ -1456,7 +1456,7 @@ version = "1.0.0" dependencies = [ "actix-files 0.6.0-beta.16", "actix-session 0.5.0-beta.7", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "async-stream", "env_logger 0.9.0", "log", @@ -1701,7 +1701,7 @@ checksum = "e3be2866a3a3174b9d31a11d9f9ef219ab84a444d107981c51bf79d7be221adc" dependencies = [ "async-trait", "lazy_static", - "parking_lot", + "parking_lot 0.11.2", "regex", "rhai", "ritelinked", @@ -1942,7 +1942,7 @@ name = "cookie-auth" version = "1.0.0" dependencies = [ "actix-identity", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "rand 0.8.4", ] @@ -1952,7 +1952,7 @@ name = "cookie-session" version = "1.0.0" dependencies = [ "actix-session 0.5.0-beta.7", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "log", ] @@ -2371,7 +2371,7 @@ checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" name = "docker_sample" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "log", ] @@ -2448,7 +2448,7 @@ dependencies = [ name = "error_handling" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "derive_more", "env_logger 0.9.0", "rand 0.8.4", @@ -2565,7 +2565,7 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" name = "form-example" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "serde 1.0.136", ] @@ -2666,7 +2666,7 @@ checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e" dependencies = [ "futures-core", "lock_api", - "parking_lot", + "parking_lot 0.11.2", ] [[package]] @@ -2917,7 +2917,7 @@ checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" name = "hello-world" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", ] @@ -3031,7 +3031,7 @@ dependencies = [ name = "http-proxy" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "awc 3.0.0-beta.20", "clap", "env_logger 0.9.0", @@ -3307,7 +3307,7 @@ checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" name = "json-example" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "futures", "json", @@ -3319,7 +3319,7 @@ dependencies = [ name = "json-validation" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "awc 3.0.0-beta.20", "env_logger 0.9.0", "futures", @@ -3334,7 +3334,7 @@ dependencies = [ name = "json_decode_error" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "serde 1.0.136", ] @@ -3342,7 +3342,7 @@ dependencies = [ name = "json_error" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "failure", "serde 1.0.136", "serde_json", @@ -3352,7 +3352,7 @@ dependencies = [ name = "jsonrpc-example" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "bytes 1.1.0", "env_logger 0.9.0", "futures-util", @@ -3657,7 +3657,7 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" name = "middleware-example" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "futures", "pin-project 1.0.10", @@ -3667,7 +3667,7 @@ dependencies = [ name = "middleware-ext-mut" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "log", ] @@ -3676,7 +3676,7 @@ dependencies = [ name = "middleware-http-to-https" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "futures", "rustls 0.20.2", "rustls-pemfile", @@ -3795,7 +3795,7 @@ dependencies = [ name = "mongodb" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "futures-util", "mongodb 2.1.0", "serde 1.0.136", @@ -3872,7 +3872,7 @@ name = "multipart-example" version = "1.0.0" dependencies = [ "actix-multipart", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "futures-util", "sanitize-filename", "uuid", @@ -3883,7 +3883,7 @@ name = "multipart-s3" version = "1.0.0" dependencies = [ "actix-multipart", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "dotenv", "futures", "rusoto_core", @@ -4155,7 +4155,7 @@ version = "1.0.0" dependencies = [ "acme-micro", "actix-files 0.6.0-beta.16", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "anyhow", "env_logger 0.9.0", "futures-util", @@ -4168,7 +4168,7 @@ dependencies = [ name = "openssl-example" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "openssl", ] @@ -4219,7 +4219,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core", + "parking_lot_core 0.8.5", +] + +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.1", ] [[package]] @@ -4236,6 +4246,19 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parking_lot_core" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + [[package]] name = "parse-zoneinfo" version = "0.3.0" @@ -4665,7 +4688,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f" dependencies = [ "log", - "parking_lot", + "parking_lot 0.11.2", "scheduled-thread-pool", ] @@ -4839,7 +4862,7 @@ dependencies = [ "actix-redis 0.10.0-beta.5", "actix-session 0.5.0-beta.7", "actix-test", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "serde 1.0.136", "serde_json", @@ -5039,7 +5062,7 @@ dependencies = [ name = "run-in-thread" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "log", ] @@ -5224,7 +5247,7 @@ name = "rustls-client-cert" version = "1.0.0" dependencies = [ "actix-tls 3.0.2", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "log", "rustls 0.20.2", @@ -5236,7 +5259,7 @@ name = "rustls-example" version = "1.0.0" dependencies = [ "actix-files 0.6.0-beta.16", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "rustls 0.20.2", "rustls-pemfile", @@ -5298,7 +5321,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7" dependencies = [ - "parking_lot", + "parking_lot 0.11.2", ] [[package]] @@ -5469,10 +5492,13 @@ dependencies = [ name = "server-sent-events" version = "1.0.0" dependencies = [ - "actix-web 3.3.3", - "env_logger 0.8.4", - "futures", - "tokio 0.2.25", + "actix-web 4.0.0-rc.3", + "env_logger 0.9.0", + "futures-util", + "log", + "parking_lot 0.12.0", + "tokio 1.16.1", + "tokio-stream", ] [[package]] @@ -5572,7 +5598,7 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" name = "shutdown-server" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "futures", "tokio 1.16.1", @@ -5592,7 +5618,7 @@ name = "simple-auth-server" version = "1.0.0" dependencies = [ "actix-identity", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "chrono", "derive_more", "diesel", @@ -5746,7 +5772,7 @@ dependencies = [ "log", "memchr", "once_cell", - "parking_lot", + "parking_lot 0.11.2", "percent-encoding", "rustls 0.19.1", "serde 1.0.136", @@ -5808,7 +5834,7 @@ dependencies = [ name = "state" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", ] @@ -5829,7 +5855,7 @@ name = "static_index" version = "1.0.0" dependencies = [ "actix-files 0.6.0-beta.16", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", ] @@ -5890,7 +5916,7 @@ checksum = "923f0f39b6267d37d23ce71ae7235602134b250ace715dd2c90421998ddac0c6" dependencies = [ "lazy_static", "new_debug_unreachable", - "parking_lot", + "parking_lot 0.11.2", "phf_shared 0.8.0", "precomputed-hash", "serde 1.0.136", @@ -5983,7 +6009,7 @@ dependencies = [ name = "template-askama" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "askama", "env_logger 0.9.0", ] @@ -5992,7 +6018,7 @@ dependencies = [ name = "template-tera" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "tera", ] @@ -6001,7 +6027,7 @@ dependencies = [ name = "template-tinytemplate" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", "serde_json", "tinytemplate", @@ -6011,7 +6037,7 @@ dependencies = [ name = "template_handlebars" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "handlebars", "serde_json", ] @@ -6020,7 +6046,7 @@ dependencies = [ name = "template_yarte" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "derive_more", "env_logger 0.9.0", "yarte", @@ -6210,7 +6236,7 @@ version = "1.0.0" dependencies = [ "actix-files 0.6.0-beta.16", "actix-session 0.5.0-beta.7", - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "dotenv", "env_logger 0.9.0", "futures-util", @@ -6255,7 +6281,7 @@ dependencies = [ "mio 0.7.14", "num_cpus", "once_cell", - "parking_lot", + "parking_lot 0.11.2", "pin-project-lite 0.2.8", "signal-hook-registry", "tokio-macros", @@ -6327,7 +6353,7 @@ dependencies = [ "fallible-iterator", "futures", "log", - "parking_lot", + "parking_lot 0.11.2", "percent-encoding", "phf 0.10.1", "pin-project-lite 0.2.8", @@ -6532,7 +6558,7 @@ dependencies = [ "lazy_static", "log", "lru-cache", - "parking_lot", + "parking_lot 0.11.2", "resolv-conf", "smallvec", "thiserror", @@ -6717,7 +6743,7 @@ dependencies = [ name = "unix-socket" version = "1.0.0" dependencies = [ - "actix-web 4.0.0-rc.2", + "actix-web 4.0.0-rc.3", "env_logger 0.9.0", ] @@ -7216,6 +7242,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" + +[[package]] +name = "windows_i686_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" + +[[package]] +name = "windows_i686_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" + [[package]] name = "winreg" version = "0.6.2" @@ -7395,18 +7464,18 @@ checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006" [[package]] name = "zstd" -version = "0.9.2+zstd.1.5.1" +version = "0.10.0+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2390ea1bf6c038c39674f22d95f0564725fc06034a47129179810b2fc58caa54" +checksum = "3b1365becbe415f3f0fcd024e2f7b45bacfb5bdd055f0dc113571394114e7bdd" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "4.1.3+zstd.1.5.1" +version = "4.1.4+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e99d81b99fb3c2c2c794e3fe56c305c63d5173a16a46b5850b07c935ffc7db79" +checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee" dependencies = [ "libc", "zstd-sys", @@ -7414,9 +7483,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.2+zstd.1.5.1" +version = "1.6.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2daf2f248d9ea44454bfcb2516534e8b8ad2fc91bf818a1885495fc42bc8ac9f" +checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" dependencies = [ "cc", "libc", diff --git a/other/server-sent-events/Cargo.toml b/other/server-sent-events/Cargo.toml index 3e509e7..a292df5 100644 --- a/other/server-sent-events/Cargo.toml +++ b/other/server-sent-events/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] actix-web = "4.0.0-rc.3" env_logger = "0.9" -futures = "0.3.1" -tokio = { version = "1.16.1", features = ["sync"] } +futures-util = { version = "0.3.7", default-features = false, features = ["std"] } +log = "0.4" +parking_lot = "0.12" +tokio = { version = "1.16", features = ["sync"] } tokio-stream = { version = "0.1.8", features = ["time"] } diff --git a/other/server-sent-events/README.md b/other/server-sent-events/README.md index 5ad4fa9..aa627ae 100644 --- a/other/server-sent-events/README.md +++ b/other/server-sent-events/README.md @@ -15,7 +15,7 @@ curl 127.0.0.1:8080/broadcast/my_message *my_message* should appear in the browser with a timestamp. ## Performance -This implementation serve thousands of clients on a 2021 macbook with no problems. +This implementation can serve thousands of clients on a 2021 MacBook with no problems. Run [benchmark.js](benchmark.js) to benchmark your own system: @@ -38,4 +38,4 @@ $ node drain.js Connections dropped: 10450, accepting connections: false^CāŽ ``` -_Accepting connections_ indicates whether resources for the server have been exhausted. \ No newline at end of file +_Accepting connections_ indicates whether resources for the server have been exhausted. diff --git a/other/server-sent-events/benchmark.js b/other/server-sent-events/benchmark.js index 4bf3730..d91da58 100644 --- a/other/server-sent-events/benchmark.js +++ b/other/server-sent-events/benchmark.js @@ -1,6 +1,6 @@ const http = require('http') -const n = 1000; +const n = 120; let connected = 0; let messages = 0; let start = Date.now(); @@ -8,7 +8,7 @@ let phase = 'connecting'; let connection_time; let broadcast_time; -let message = process.argv[2] ||Ā 'msg'; +let message = process.argv[2] || 'msg'; let expected_data = "data: " + message; for (let i = 0; i < n; i++) { diff --git a/other/server-sent-events/drain.js b/other/server-sent-events/drain.js index 6244c36..1a4651b 100644 --- a/other/server-sent-events/drain.js +++ b/other/server-sent-events/drain.js @@ -1,6 +1,6 @@ const http = require('http') -let drop_goal = 100_000; +let drop_goal = 5_000; let dropped = 0; let query = { diff --git a/other/server-sent-events/src/broadcast.rs b/other/server-sent-events/src/broadcast.rs new file mode 100644 index 0000000..c0e41d5 --- /dev/null +++ b/other/server-sent-events/src/broadcast.rs @@ -0,0 +1,98 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use actix_web::{ + rt::time::{interval_at, Instant}, + web::{Bytes, Data}, + Error, +}; +use futures_util::{Stream, StreamExt as _}; +use parking_lot::Mutex; +use tokio::sync::mpsc::{channel, Sender}; +use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; + +pub struct Broadcaster { + clients: Vec>, +} + +impl Broadcaster { + pub fn create() -> Data> { + // Data ~ā‰ƒ Arc + let me = Data::new(Mutex::new(Broadcaster::new())); + + // ping clients every 10 seconds to see if they are alive + Broadcaster::spawn_ping(me.clone()); + + me + } + + fn new() -> Self { + Broadcaster { + clients: Vec::new(), + } + } + + fn spawn_ping(me: Data>) { + actix_web::rt::spawn(async move { + let mut task = IntervalStream::new(interval_at( + Instant::now(), + Duration::from_secs(10), + )); + + while task.next().await.is_some() { + me.lock().remove_stale_clients(); + } + }); + } + + fn remove_stale_clients(&mut self) { + let mut ok_clients = Vec::new(); + for client in self.clients.iter() { + let result = client.clone().try_send(Bytes::from("data: ping\n\n")); + + if let Ok(()) = result { + ok_clients.push(client.clone()); + } + } + self.clients = ok_clients; + } + + pub fn new_client(&mut self) -> Client { + let (tx, rx) = channel(100); + let rx = ReceiverStream::new(rx); + + tx.try_send(Bytes::from("data: connected\n\n")).unwrap(); + + self.clients.push(tx); + Client(rx) + } + + pub fn send(&self, msg: &str) { + let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); + + for client in self.clients.iter() { + client.clone().try_send(msg.clone()).unwrap_or(()); + } + } +} + +// wrap Receiver in own type, with correct error type +pub struct Client(ReceiverStream); + +impl Stream for Client { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/other/server-sent-events/src/main.rs b/other/server-sent-events/src/main.rs index 72f7ecc..04d6821 100644 --- a/other/server-sent-events/src/main.rs +++ b/other/server-sent-events/src/main.rs @@ -1,20 +1,22 @@ -use actix_web::rt::time::{interval_at, Instant}; -use actix_web::web::{Bytes, Data, Path}; -use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer, Responder}; -use futures::{Stream, StreamExt}; -use std::pin::Pin; -use std::sync::Mutex; -use std::task::{Context, Poll}; -use std::time::Duration; -use tokio::sync::mpsc::{channel, Sender}; -use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; +use actix_web::{ + http::header::{self, ContentType}, + middleware, + web::{self, Data, Path}, + App, HttpResponse, HttpServer, Responder, +}; +use parking_lot::Mutex; + +mod broadcast; +use broadcast::Broadcaster; #[actix_web::main] async fn main() -> std::io::Result<()> { - std::env::set_var("RUST_LOG", "actix_web=trace"); - env_logger::init(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + let data = Broadcaster::create(); + log::info!("starting HTTP server at http://localhost:8080"); + HttpServer::new(move || { App::new() .app_data(data.clone()) @@ -23,24 +25,24 @@ async fn main() -> std::io::Result<()> { .route("/events", web::get().to(new_client)) .route("/broadcast/{msg}", web::get().to(broadcast)) }) - .bind("0.0.0.0:8080")? + .bind(("127.0.0.1", 8080))? .run() .await } async fn index() -> impl Responder { - let content = include_str!("index.html"); + let index_html = include_str!("index.html"); HttpResponse::Ok() - .append_header(("content-type", "text/html")) - .body(content) + .append_header(ContentType::html()) + .body(index_html) } async fn new_client(broadcaster: Data>) -> impl Responder { - let rx = broadcaster.lock().unwrap().new_client(); + let rx = broadcaster.lock().new_client(); HttpResponse::Ok() - .append_header(("content-type", "text/event-stream")) + .append_header((header::CONTENT_TYPE, "text/event-stream")) .streaming(rx) } @@ -48,89 +50,6 @@ async fn broadcast( msg: Path, broadcaster: Data>, ) -> impl Responder { - broadcaster.lock().unwrap().send(&msg.into_inner()); - + broadcaster.lock().send(&msg.into_inner()); HttpResponse::Ok().body("msg sent") } - -struct Broadcaster { - clients: Vec>, -} - -impl Broadcaster { - fn create() -> Data> { - // Data ā‰ƒ Arc - let me = Data::new(Mutex::new(Broadcaster::new())); - - // ping clients every 10 seconds to see if they are alive - Broadcaster::spawn_ping(me.clone()); - - me - } - - fn new() -> Self { - Broadcaster { - clients: Vec::new(), - } - } - - fn spawn_ping(me: Data>) { - actix_web::rt::spawn(async move { - let mut task = IntervalStream::new(interval_at( - Instant::now(), - Duration::from_secs(10), - )); - while task.next().await.is_some() { - me.lock().unwrap().remove_stale_clients(); - } - }); - } - - fn remove_stale_clients(&mut self) { - let mut ok_clients = Vec::new(); - for client in self.clients.iter() { - let result = client.clone().try_send(Bytes::from("data: ping\n\n")); - - if let Ok(()) = result { - ok_clients.push(client.clone()); - } - } - self.clients = ok_clients; - } - - fn new_client(&mut self) -> Client { - let (tx, rx) = channel(100); - let rx = ReceiverStream::new(rx); - - tx.try_send(Bytes::from("data: connected\n\n")).unwrap(); - - self.clients.push(tx); - Client(rx) - } - - fn send(&self, msg: &str) { - let msg = Bytes::from(["data: ", msg, "\n\n"].concat()); - - for client in self.clients.iter() { - client.clone().try_send(msg.clone()).unwrap_or(()); - } - } -} - -// wrap Receiver in own type, with correct error type -struct Client(ReceiverStream); - -impl Stream for Client { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match Pin::new(&mut self.0).poll_next(cx) { - Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -}