diff --git a/Cargo.lock b/Cargo.lock index 1c3e8c8..6d52563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -767,60 +767,43 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "apalis" -version = "0.4.9" +version = "0.6.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78bbaeebf00817d5aa561515b313ef0d280bf4b92592e4709b21925c1233f613" +checksum = "3e78684d69b4361d01bddf318a5117c18179c52396c7792fc0ebb04da46abbb2" dependencies = [ "apalis-core", - "apalis-cron", - "apalis-redis", - "apalis-sql", -] - -[[package]] -name = "apalis-core" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1deb48475efcdece1f23a0553209ee842f264c2a5e9bcc4928bfa6a15a044cde" -dependencies = [ - "async-stream", - "async-trait", - "chrono", "futures", - "graceful-shutdown", - "http 1.0.0", - "log", "pin-project-lite", "serde", - "strum", "thiserror", "tokio", "tower", "tracing", "tracing-futures", +] + +[[package]] +name = "apalis-core" +version = "0.6.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94c70a31bd587ce6ffe53af31d9693257af40b47f515999b3cf70b8d7353d259" +dependencies = [ + "async-oneshot", + "futures", + "futures-timer", + "pin-project-lite", + "serde", + "serde_json", + "thiserror", + "tower", "ulid", ] -[[package]] -name = "apalis-cron" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43310b7e0132f9520b09224fb6faafb32eec82a672aa79c09e46b5b488ed505b" -dependencies = [ - "apalis-core", - "async-stream", - "chrono", - "cron", - "futures", - "tokio", - "tower", -] - [[package]] name = "apalis-redis" -version = "0.4.9" +version = "0.6.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2abee8225fd875e57b530abbcf2d9c3122c1a2cce905367b67c6410b6f9654d7" +checksum = "d2ae52c1baf49b8faa59b2b03d5cb47ebf9bed90411d7b5878bc97600752761c" dependencies = [ "apalis-core", "async-stream", @@ -828,28 +811,8 @@ dependencies = [ "chrono", "futures", "log", - "redis 0.24.0", + "redis 0.25.4", "serde", - "serde_json", - "tokio", -] - -[[package]] -name = "apalis-sql" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5899bfd124e460f1449ffab643f6bac0dc417e7ca234f34c732c923c6a3addbf" -dependencies = [ - "apalis-core", - "async-stream", - "async-trait", - "chrono", - "debounced", - "futures", - "futures-lite", - "serde", - "serde_json", - "sqlx", "tokio", ] @@ -1048,6 +1011,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-oneshot" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae47de2a02d543205f3f5457a90b6ecbc9494db70557bd29590ec8f1ddff5463" +dependencies = [ + "futures-micro", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -1614,6 +1586,7 @@ version = "1.0.0" dependencies = [ "actix-web", "apalis", + "apalis-redis", "chrono", "color-eyre", "dotenvy", @@ -2348,17 +2321,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "cron" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" -dependencies = [ - "chrono", - "nom", - "once_cell", -] - [[package]] name = "crossbeam" version = "0.8.4" @@ -2658,16 +2620,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "debounced" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d8b0346b9fa0aa01a3fa4bcce48d62f8738e9c2956e92f275bbf6cf9d6fab5" -dependencies = [ - "futures-timer", - "futures-util", -] - [[package]] name = "der" version = "0.6.1" @@ -3327,9 +3279,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -3358,9 +3310,9 @@ checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -3384,19 +3336,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" -[[package]] -name = "futures-lite" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "parking", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.30" @@ -3408,6 +3347,15 @@ dependencies = [ "syn 2.0.69", ] +[[package]] +name = "futures-micro" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b460264b3593d68b16a7bc35f7bc226ddfebdf9a1c8db1ed95d5cc6b7168c826" +dependencies = [ + "pin-project-lite", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -3472,8 +3420,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -3542,17 +3492,6 @@ dependencies = [ "spinning_top", ] -[[package]] -name = "graceful-shutdown" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3effbaf774a1da3462925bb182ccf975c284cf46edca5569ea93420a657af484" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "group" version = "0.12.1" @@ -5407,12 +5346,6 @@ dependencies = [ "sha2", ] -[[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - [[package]] name = "parking_lot" version = "0.11.2" @@ -5659,9 +5592,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -6033,7 +5966,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "ryu", - "sha1_smol", "tokio", "tokio-retry", "tokio-util", @@ -7205,7 +7137,6 @@ dependencies = [ "atoi", "byteorder", "bytes", - "chrono", "crc", "crossbeam-queue", "either", @@ -7269,7 +7200,6 @@ dependencies = [ "sha2", "sqlx-core", "sqlx-mysql", - "sqlx-postgres", "sqlx-sqlite", "syn 1.0.109", "tempfile", @@ -7288,7 +7218,6 @@ dependencies = [ "bitflags 2.4.1", "byteorder", "bytes", - "chrono", "crc", "digest", "dotenvy", @@ -7330,7 +7259,6 @@ dependencies = [ "base64 0.21.7", "bitflags 2.4.1", "byteorder", - "chrono", "crc", "dotenvy", "etcetera", @@ -7366,7 +7294,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa" dependencies = [ "atoi", - "chrono", "flume", "futures-channel", "futures-core", @@ -8266,7 +8193,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project", "tracing", ] @@ -8457,11 +8383,13 @@ checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" [[package]] name = "ulid" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e37c4b6cbcc59a8dcd09a6429fbc7890286bcbb79215cea7b38a3c4c0921d93" +checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" dependencies = [ + "getrandom", "rand", + "web-time", ] [[package]] diff --git a/background-jobs/Cargo.toml b/background-jobs/Cargo.toml index c5cac7d..a9a6e94 100644 --- a/background-jobs/Cargo.toml +++ b/background-jobs/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" [dependencies] actix-web.workspace = true -apalis = { version = "0.4", features = ["redis"] } +apalis = { version = "0.6.0-rc.2" } +apalis-redis = { version = "0.6.0-rc.2" } chrono.workspace = true color-eyre.workspace = true dotenvy.workspace = true diff --git a/background-jobs/src/persistent_jobs.rs b/background-jobs/src/persistent_jobs.rs index 416fe48..4fdbe01 100644 --- a/background-jobs/src/persistent_jobs.rs +++ b/background-jobs/src/persistent_jobs.rs @@ -2,7 +2,8 @@ use std::time::Duration; -use apalis::{prelude::*, redis::RedisStorage}; +use apalis::prelude::*; +use apalis_redis::{Config, RedisStorage}; use rand::distributions::{Alphanumeric, DistString as _}; use serde::{Deserialize, Serialize}; @@ -19,37 +20,31 @@ impl Email { } } -impl Job for Email { - const NAME: &'static str = "send_email"; -} - -async fn process_email_job(job: Email, _ctx: JobContext) -> Result<(), JobError> { +async fn process_email_job(job: Email) { log::info!("sending email to {}", &job.to); // simulate time taken to send email tokio::time::sleep(rand_delay_with_jitter()).await; - Ok(()) + // Ok(()) } pub(crate) async fn start_processing_email_queue() -> eyre::Result> { let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL"); - let storage = RedisStorage::connect(redis_url).await?; + let conn = apalis_redis::connect(redis_url).await?; + let config = Config::default().set_namespace("send_email"); + let storage = RedisStorage::new_with_config(conn, config); - // create job monitor(s) and attach email job handler - let monitor = Monitor::new().register_with_count(2, { - let storage = storage.clone(); - move |n| { - WorkerBuilder::new(format!("job-handler-{n}")) - .with_storage(storage.clone()) - .build_fn(process_email_job) - } - }); + // create unmonitored workers for handling emails + let workers = WorkerBuilder::new("job-handler") + .backend(storage.clone()) + .build_fn(process_email_job) + .with_executor_instances(2, TokioExecutor); - // spawn job monitor into background - // the monitor manages itself otherwise so we don't need to return a join handle - #[allow(clippy::let_underscore_future)] - let _ = tokio::spawn(monitor.run()); + for worker in workers { + #[allow(clippy::let_underscore_future)] + let _ = tokio::spawn(worker.run()); + } Ok(storage) } diff --git a/background-jobs/src/routes.rs b/background-jobs/src/routes.rs index b715ec2..5da2ca8 100644 --- a/background-jobs/src/routes.rs +++ b/background-jobs/src/routes.rs @@ -3,7 +3,8 @@ use actix_web::{ web::{self, Data}, HttpResponse, Responder, }; -use apalis::{prelude::*, redis::RedisStorage}; +use apalis::prelude::*; +use apalis_redis::RedisStorage; use chrono::{TimeDelta, Utc}; use serde::Deserialize;