From 061860743feb7ae6cbed877e98e6d6bb8dfbcb88 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 30 Dec 2022 16:23:24 +0000 Subject: [PATCH] add background-jobs example --- Cargo.lock | 527 +++++++++++++++++-- Cargo.toml | 3 + background-jobs/Cargo.toml | 18 + background-jobs/src/ephemeral_jobs.rs | 56 ++ background-jobs/src/main.rs | 52 ++ background-jobs/src/persistent_jobs.rs | 66 +++ background-jobs/src/routes.rs | 65 +++ middleware/middleware-http-to-https/cert.pem | 2 +- middleware/middleware-http-to-https/key.pem | 2 +- 9 files changed, 751 insertions(+), 40 deletions(-) create mode 100644 background-jobs/Cargo.toml create mode 100644 background-jobs/src/ephemeral_jobs.rs create mode 100644 background-jobs/src/main.rs create mode 100644 background-jobs/src/persistent_jobs.rs create mode 100644 background-jobs/src/routes.rs diff --git a/Cargo.lock b/Cargo.lock index e877e170..55e74f9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,7 +48,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "pin-project-lite 0.2.9", - "smallvec", + "smallvec 1.10.0", "tokio 1.23.0", "tokio-util 0.7.4", ] @@ -103,7 +103,7 @@ dependencies = [ "futures-util", "log", "once_cell", - "smallvec", + "smallvec 1.10.0", ] [[package]] @@ -162,7 +162,7 @@ dependencies = [ "pin-project-lite 0.2.9", "rand 0.8.5", "sha1 0.10.5", - "smallvec", + "smallvec 1.10.0", "tracing", "zstd", ] @@ -434,7 +434,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "smallvec", + "smallvec 1.10.0", "socket2 0.4.7", "time 0.3.17", "url", @@ -682,6 +682,59 @@ version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" +[[package]] +name = "apalis" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "184a9431748203f466bc6885501a2fad9c2fd290483ab2be9fdfd7c0d23983c8" +dependencies = [ + "apalis-core", + "apalis-redis", +] + +[[package]] +name = "apalis-core" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e416500b525d65f282204b770863fde436c330a02ace9e61b04a93db930b69a" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "futures 0.3.25", + "http", + "log", + "pin-project-lite 0.2.9", + "serde", + "serde_json", + "strum", + "thiserror", + "tokio 1.23.0", + "tower", + "tracing", + "tracing-futures", + "uuid 0.8.2", +] + +[[package]] +name = "apalis-redis" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "169420e419898206f2d84b45b9c2839988f2892341d34b964e6d099e59667a9d" +dependencies = [ + "apalis-core", + "async-stream", + "async-trait", + "chrono", + "futures 0.3.25", + "log", + "redis 0.21.7", + "serde", + "serde_json", + "tokio 1.23.0", + "uuid 0.8.2", +] + [[package]] name = "arc-swap" version = "1.5.1" @@ -1349,6 +1402,23 @@ dependencies = [ "zeroize", ] +[[package]] +name = "background-jobs" +version = "1.0.0" +dependencies = [ + "actix-web", + "anyhow", + "apalis", + "chrono", + "dotenv", + "env_logger", + "log", + "rand 0.8.5", + "serde", + "tokio 1.23.0", + "tokio-util 0.7.4", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1617,6 +1687,16 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "0.5.6" @@ -1800,6 +1880,15 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", +] + [[package]] name = "cmake" version = "0.1.49" @@ -1863,7 +1952,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.8.14", ] [[package]] @@ -2057,10 +2146,10 @@ checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" dependencies = [ "cfg-if 1.0.0", "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", + "crossbeam-deque 0.8.2", + "crossbeam-epoch 0.9.13", + "crossbeam-queue 0.3.8", + "crossbeam-utils 0.8.14", ] [[package]] @@ -2070,7 +2159,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils", + "crossbeam-utils 0.8.14", +] + +[[package]] +name = "crossbeam-deque" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed" +dependencies = [ + "crossbeam-epoch 0.8.2", + "crossbeam-utils 0.7.2", + "maybe-uninit", ] [[package]] @@ -2080,8 +2180,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" dependencies = [ "cfg-if 1.0.0", - "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-epoch 0.9.13", + "crossbeam-utils 0.8.14", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "crossbeam-utils 0.7.2", + "lazy_static", + "maybe-uninit", + "memoffset 0.5.6", + "scopeguard", ] [[package]] @@ -2092,11 +2207,22 @@ checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" dependencies = [ "autocfg", "cfg-if 1.0.0", - "crossbeam-utils", - "memoffset", + "crossbeam-utils 0.8.14", + "memoffset 0.7.1", "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" +dependencies = [ + "cfg-if 0.1.10", + "crossbeam-utils 0.7.2", + "maybe-uninit", +] + [[package]] name = "crossbeam-queue" version = "0.3.8" @@ -2104,7 +2230,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" dependencies = [ "cfg-if 1.0.0", - "crossbeam-utils", + "crossbeam-utils 0.8.14", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", ] [[package]] @@ -2292,7 +2429,7 @@ dependencies = [ "env_logger", "futures-util", "log", - "redis", + "redis 0.22.1", "serde", ] @@ -2613,7 +2750,7 @@ checksum = "4e884668cd0c7480504233e951174ddc3b382f7c2666e3b7310b5c4e7b0c37f9" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "windows-sys 0.42.0", ] @@ -2650,7 +2787,7 @@ dependencies = [ "intl_pluralrules", "rustc-hash", "self_cell", - "smallvec", + "smallvec 1.10.0", "unic-langid", ] @@ -2867,6 +3004,12 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.25" @@ -2927,7 +3070,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" dependencies = [ "futures-core", - "lock_api", + "lock_api 0.4.9", "parking_lot 0.11.2", ] @@ -3468,7 +3611,7 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "713f1b139373f96a2e0ce3ac931cd01ee973c3c5dd7c40c0c2efe96ad2b6751d" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.8.14", "globset", "lazy_static", "log", @@ -3747,7 +3890,7 @@ dependencies = [ "bson 1.2.4", "chrono", "fnv", - "futures", + "futures 0.3.25", "futures-enum", "graphql-parser", "indexmap", @@ -4002,6 +4145,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e34f76eb3611940e0e7d53a9aaa4e6a3151f69541a282fd0dad5571420c53ff1" +[[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] + [[package]] name = "lock_api" version = "0.4.9" @@ -4084,6 +4236,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "md-5" version = "0.10.5" @@ -4111,6 +4269,15 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aec276c09560ce4447087aaefc19eb0c18d97e31bd05ebac38881c4723400c40" +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.7.1" @@ -4255,6 +4422,17 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio 0.6.23", +] + [[package]] name = "miow" version = "0.2.2" @@ -4426,7 +4604,7 @@ dependencies = [ "serde_json", "sha1 0.10.5", "sha2", - "smallvec", + "smallvec 1.10.0", "subprocess", "thiserror", "time 0.3.17", @@ -4660,6 +4838,17 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "parking_lot" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" +dependencies = [ + "lock_api 0.3.4", + "parking_lot_core 0.6.3", + "rustc_version 0.2.3", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -4667,7 +4856,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", - "lock_api", + "lock_api 0.4.9", "parking_lot_core 0.8.5", ] @@ -4677,10 +4866,25 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ - "lock_api", + "lock_api 0.4.9", "parking_lot_core 0.9.5", ] +[[package]] +name = "parking_lot_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66b810a62be75176a80873726630147a5ca780cd33921e0b5709033e66b0a" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi", + "libc", + "redox_syscall 0.1.57", + "rustc_version 0.2.3", + "smallvec 0.6.14", + "winapi 0.3.9", +] + [[package]] name = "parking_lot_core" version = "0.8.5" @@ -4690,8 +4894,8 @@ dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall", - "smallvec", + "redox_syscall 0.2.16", + "smallvec 1.10.0", "winapi 0.3.9", ] @@ -4703,8 +4907,8 @@ checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall", - "smallvec", + "redox_syscall 0.2.16", + "smallvec 1.10.0", "windows-sys 0.42.0", ] @@ -5252,6 +5456,26 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "redis" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152f3863635cbb76b73bc247845781098302c6c9ad2060e1a9a7de56840346b6" +dependencies = [ + "async-trait", + "bytes 1.3.0", + "combine 4.6.6", + "futures-util", + "itoa 1.0.4", + "percent-encoding", + "pin-project-lite 0.2.9", + "ryu", + "sha1 0.6.1", + "tokio 1.23.0", + "tokio-util 0.7.4", + "url", +] + [[package]] name = "redis" version = "0.22.1" @@ -5262,7 +5486,7 @@ dependencies = [ "async-trait", "bytes 1.3.0", "combine 4.6.6", - "futures", + "futures 0.3.25", "futures-util", "itoa 1.0.4", "percent-encoding", @@ -5302,6 +5526,12 @@ dependencies = [ "time 0.3.17", ] +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + [[package]] name = "redox_syscall" version = "0.2.16" @@ -5453,7 +5683,7 @@ dependencies = [ "num-traits", "rhai_codegen", "serde", - "smallvec", + "smallvec 1.10.0", "smartstring 1.0.1", ] @@ -5550,7 +5780,7 @@ dependencies = [ "hashlink 0.7.0", "libsqlite3-sys", "memchr", - "smallvec", + "smallvec 1.10.0", ] [[package]] @@ -5562,7 +5792,7 @@ dependencies = [ "base64 0.13.1", "blake2b_simd", "constant_time_eq", - "crossbeam-utils", + "crossbeam-utils 0.8.14", ] [[package]] @@ -5721,6 +5951,12 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "rustversion" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" + [[package]] name = "ryu" version = "1.0.11" @@ -6120,6 +6356,15 @@ dependencies = [ "deunicode", ] +[[package]] +name = "smallvec" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" +dependencies = [ + "maybe-uninit", +] + [[package]] name = "smallvec" version = "1.10.0" @@ -6218,7 +6463,7 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09" dependencies = [ - "lock_api", + "lock_api 0.4.9", ] [[package]] @@ -6254,7 +6499,7 @@ dependencies = [ "byteorder", "bytes 1.3.0", "crc", - "crossbeam-queue", + "crossbeam-queue 0.3.8", "dotenvy", "either", "event-listener", @@ -6279,7 +6524,7 @@ dependencies = [ "rustls-pemfile 1.0.1", "serde", "sha2", - "smallvec", + "smallvec 1.10.0", "sqlformat", "sqlx-rt", "stringprep", @@ -6446,6 +6691,28 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "subprocess" version = "0.2.9" @@ -6494,7 +6761,7 @@ dependencies = [ "cfg-if 1.0.0", "fastrand", "libc", - "redox_syscall", + "redox_syscall 0.2.16", "remove_dir_all", "winapi 0.3.9", ] @@ -6791,6 +7058,30 @@ dependencies = [ "tera", ] +[[package]] +name = "tokio" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "mio 0.6.23", + "num_cpus", + "tokio-codec", + "tokio-current-thread", + "tokio-executor", + "tokio-fs", + "tokio-io", + "tokio-reactor", + "tokio-sync", + "tokio-tcp", + "tokio-threadpool", + "tokio-timer", + "tokio-udp", + "tokio-uds", +] + [[package]] name = "tokio" version = "0.2.25" @@ -6829,6 +7120,59 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "tokio-codec" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "tokio-io", +] + +[[package]] +name = "tokio-current-thread" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1de0e32a83f131e002238d7ccde18211c0a5397f60cbfffcb112868c2e0e20e" +dependencies = [ + "futures 0.1.31", + "tokio-executor", +] + +[[package]] +name = "tokio-executor" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" +dependencies = [ + "crossbeam-utils 0.7.2", + "futures 0.1.31", +] + +[[package]] +name = "tokio-fs" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" +dependencies = [ + "futures 0.1.31", + "tokio-io", + "tokio-threadpool", +] + +[[package]] +name = "tokio-io" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "log", +] + [[package]] name = "tokio-macros" version = "1.8.2" @@ -6906,6 +7250,25 @@ dependencies = [ "tokio-util 0.7.4", ] +[[package]] +name = "tokio-reactor" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" +dependencies = [ + "crossbeam-utils 0.7.2", + "futures 0.1.31", + "lazy_static", + "log", + "mio 0.6.23", + "num_cpus", + "parking_lot 0.9.0", + "slab", + "tokio-executor", + "tokio-io", + "tokio-sync", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -6928,6 +7291,59 @@ dependencies = [ "tokio 1.23.0", ] +[[package]] +name = "tokio-sync" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee" +dependencies = [ + "fnv", + "futures 0.1.31", +] + +[[package]] +name = "tokio-tcp" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98df18ed66e3b72e742f185882a9e201892407957e45fbff8da17ae7a7c51f72" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "iovec", + "mio 0.6.23", + "tokio-io", + "tokio-reactor", +] + +[[package]] +name = "tokio-threadpool" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df720b6581784c118f0eb4310796b12b1d242a7eb95f716a8367855325c25f89" +dependencies = [ + "crossbeam-deque 0.7.4", + "crossbeam-queue 0.2.3", + "crossbeam-utils 0.7.2", + "futures 0.1.31", + "lazy_static", + "log", + "num_cpus", + "slab", + "tokio-executor", +] + +[[package]] +name = "tokio-timer" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" +dependencies = [ + "crossbeam-utils 0.7.2", + "futures 0.1.31", + "slab", + "tokio-executor", +] + [[package]] name = "tokio-tls" version = "0.3.1" @@ -6938,6 +7354,39 @@ dependencies = [ "tokio 0.2.25", ] +[[package]] +name = "tokio-udp" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "log", + "mio 0.6.23", + "tokio-codec", + "tokio-io", + "tokio-reactor", +] + +[[package]] +name = "tokio-uds" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "iovec", + "libc", + "log", + "mio 0.6.23", + "mio-uds", + "tokio-codec", + "tokio-io", + "tokio-reactor", +] + [[package]] name = "tokio-util" version = "0.3.1" @@ -7043,6 +7492,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ "pin-project", + "tokio 0.1.22", "tracing", ] @@ -7064,7 +7514,7 @@ dependencies = [ "lazy_static", "log", "rand 0.8.5", - "smallvec", + "smallvec 1.10.0", "thiserror", "tinyvec", "tokio 1.23.0", @@ -7085,7 +7535,7 @@ dependencies = [ "lru-cache", "parking_lot 0.12.1", "resolv-conf", - "smallvec", + "smallvec 1.10.0", "thiserror", "tokio 1.23.0", "trust-dns-proto", @@ -7394,6 +7844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ "getrandom 0.2.8", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 77bc2da9..cfade366 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "auth/cookie-session", "auth/redis-session", "auth/simple-auth-server", + "background-jobs", "basics/basics", "basics/error-handling", "basics/hello-world", @@ -83,3 +84,5 @@ actix-web = "4" actix-web-actors = "4.1" actix-web-lab = "0.18" actix-ws = "0.2.5" + +env_logger = "0.10" diff --git a/background-jobs/Cargo.toml b/background-jobs/Cargo.toml new file mode 100644 index 00000000..83c99ebd --- /dev/null +++ b/background-jobs/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "background-jobs" +version = "1.0.0" +edition = "2021" + +[dependencies] +actix-web.workspace = true + +anyhow = "1" +apalis = { version = "0.3", features = ["redis"] } +chrono = { version = "0.4.20", default-features = false, features = ["clock", "serde"] } +dotenv = "0.15" +env_logger.workspace = true +log = "0.4" +rand = "0.8" +serde = { version = "1", features = ["derive"] } +tokio = { version = "1.13.1", features = ["sync", "rt-multi-thread", "macros"] } +tokio-util = "0.7.4" diff --git a/background-jobs/src/ephemeral_jobs.rs b/background-jobs/src/ephemeral_jobs.rs new file mode 100644 index 00000000..2ff0eeb4 --- /dev/null +++ b/background-jobs/src/ephemeral_jobs.rs @@ -0,0 +1,56 @@ +use std::{sync::Arc, time::Duration}; + +use chrono::Utc; +use tokio::{task::JoinHandle, time::sleep}; +use tokio_util::sync::CancellationToken; + +use crate::ItemCache; + +pub(crate) fn init_item_cache() -> (Arc, JoinHandle<()>, CancellationToken) { + // construct empty item cache + let cache = Arc::new(ItemCache::default()); + + // stop signal for cache purge job + let cache_sweep_cancel = CancellationToken::new(); + + // spawn cache purge job + ( + Arc::clone(&cache), + tokio::spawn(spawn_cache_sweep( + Arc::clone(&cache), + cache_sweep_cancel.clone(), + )), + cache_sweep_cancel, + ) +} + +async fn spawn_cache_sweep(cache: Arc, stop_signal: CancellationToken) { + loop { + // only _try_ to lock so reads and writes from route handlers do not get blocked + if let Ok(mut cache) = cache.try_lock() { + let size = cache.len(); + + // purge any cached entries where timestamp is in the past + cache.retain(|_k, v| *v > Utc::now()); + + let removed = size - cache.len(); + + if removed > 0 { + log::info!("removed {removed} cache entries"); + } else { + log::debug!("cache sweep removed no entries") + } + } + + tokio::select! { + _ = sleep(Duration::from_secs(10)) => { + continue; + } + + _ = stop_signal.cancelled() => { + log::info!("gracefully shutting down cache purge job"); + break; + } + }; + } +} diff --git a/background-jobs/src/main.rs b/background-jobs/src/main.rs new file mode 100644 index 00000000..62602529 --- /dev/null +++ b/background-jobs/src/main.rs @@ -0,0 +1,52 @@ +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use actix_web::{web::Data, App, HttpServer}; +use chrono::{DateTime, Utc}; + +mod ephemeral_jobs; +mod persistent_jobs; +mod routes; + +/// Maps data to its cache expiry time. +pub(crate) type ItemCache = Mutex>>; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + dotenv::dotenv().ok(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + // background jobs relating to local, disposable tasks + let (item_cache, cache_sweep_handle, cache_sweep_cancel) = ephemeral_jobs::init_item_cache(); + + // background jobs that should be run even if the server is restarted + let email_sender = persistent_jobs::start_processing_email_queue().await?; + + log::info!("starting HTTP server at http://localhost:8080"); + + HttpServer::new(move || { + App::new() + .app_data(Data::from(Arc::clone(&item_cache))) + .app_data(Data::new(email_sender.clone())) + .service(routes::view_cache) + .service(routes::cache_item) + .service(routes::send_email) + .service(routes::send_email_batch) + }) + .workers(2) + .bind(("127.0.0.1", 8080))? + .run() + .await?; + + // signal cache sweep task to stop running + cache_sweep_cancel.cancel(); + + // wait for the cache sweep job to exit it's loop gracefully + cache_sweep_handle.await.unwrap(); + + log::info!("application successfully shut down gracefully"); + + Ok(()) +} diff --git a/background-jobs/src/persistent_jobs.rs b/background-jobs/src/persistent_jobs.rs new file mode 100644 index 00000000..ad1d793c --- /dev/null +++ b/background-jobs/src/persistent_jobs.rs @@ -0,0 +1,66 @@ +//! Persistent background jobs using the [`apalis`] crate with a Redis storage backend. + +use std::time::Duration; + +use apalis::{prelude::*, redis::RedisStorage}; +use rand::Rng as _; +use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; + +#[derive(Debug, Deserialize, Serialize)] + +pub(crate) struct Email { + to: String, +} + +impl Email { + pub(crate) fn random() -> Self { + let user = (&mut rand::thread_rng()) + .sample_iter(rand::distributions::Alphanumeric) + .take(10) + .map(char::from) + .collect::(); + + let to = format!("{user}@fake-mail.com"); + + Self { to } + } +} + +impl Job for Email { + const NAME: &'static str = "send_email"; +} + +async fn process_email_job(job: Email, _ctx: JobContext) -> Result { + log::info!("sending email to {}", &job.to); + + // simulate time taken to send email + tokio::time::sleep(rand_delay_with_jitter()).await; + + Ok(JobResult::Success) +} + +pub(crate) async fn start_processing_email_queue() -> anyhow::Result> { + let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL"); + let storage = RedisStorage::connect(redis_url).await?; + + // create job monitor(s) and attach email job handler + let monitor = Monitor::new().register_with_count(2, { + let storage = storage.clone(); + move |_n| WorkerBuilder::new(storage.clone()).build_fn(process_email_job) + }); + + // spawn job monitor into background + let _ = tokio::spawn(async move { + // run_without_signals: don't listen for ctrl-c because Actix Web does + // the monitor manages itself otherwise so we don't need to return a join handle + monitor.run_without_signals().await; + }); + + Ok(storage) +} + +/// Returns a duration close to 1 second. +fn rand_delay_with_jitter() -> Duration { + Duration::from_millis(800_u64 + rand::random::() as u64 * 2) +} diff --git a/background-jobs/src/routes.rs b/background-jobs/src/routes.rs new file mode 100644 index 00000000..894fd30c --- /dev/null +++ b/background-jobs/src/routes.rs @@ -0,0 +1,65 @@ +use actix_web::{ + error, get, post, + web::{self, Data}, + HttpResponse, Responder, +}; +use apalis::{prelude::*, redis::RedisStorage}; +use chrono::{Duration, Utc}; +use serde::Deserialize; + +use crate::{persistent_jobs::Email, ItemCache}; + +#[derive(Debug, Deserialize)] +pub(crate) struct CacheInsert { + data: String, + duration: u64, +} + +#[get("/cache")] +pub(crate) async fn view_cache(cache: Data) -> actix_web::Result { + let cached_data = &*cache.lock().unwrap(); + Ok(HttpResponse::Ok().json(cached_data)) +} + +#[post("/cache")] +pub(crate) async fn cache_item( + cache: Data, + web::Json(form): web::Json, +) -> actix_web::Result { + let expires = Utc::now() + Duration::seconds(form.duration as i64); + + // insert into item cache + cache.lock().unwrap().insert(form.data, expires); + + Ok(HttpResponse::Ok().body(format!("data cached until {:?}", expires))) +} + +#[post("/email")] +pub(crate) async fn send_email( + sender: Data>, + web::Json(form): web::Json, +) -> actix_web::Result { + (**sender) + .clone() + .push(form) + .await + .map_err(error::ErrorInternalServerError)?; + + Ok(HttpResponse::Accepted()) +} + +#[post("/email-spam")] +pub(crate) async fn send_email_batch( + sender: Data>, +) -> actix_web::Result { + let mut sender = (**sender).clone(); + + for _ in 0..50 { + sender + .push(Email::random()) + .await + .map_err(error::ErrorInternalServerError)?; + } + + Ok(HttpResponse::Accepted()) +} diff --git a/middleware/middleware-http-to-https/cert.pem b/middleware/middleware-http-to-https/cert.pem index f9bcc09f..d46d8985 120000 --- a/middleware/middleware-http-to-https/cert.pem +++ b/middleware/middleware-http-to-https/cert.pem @@ -1 +1 @@ -cert.pem \ No newline at end of file +../../https-tls/rustls/cert.pem \ No newline at end of file diff --git a/middleware/middleware-http-to-https/key.pem b/middleware/middleware-http-to-https/key.pem index 9ba3d1b8..c25a2764 120000 --- a/middleware/middleware-http-to-https/key.pem +++ b/middleware/middleware-http-to-https/key.pem @@ -1 +1 @@ -key.pem \ No newline at end of file +../../https-tls/rustls/key.pem \ No newline at end of file