1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

Merge pull request #856 from geofmureithi/update/apalis

Chore: Update apalis to the latest version
This commit is contained in:
Rob Ede 2024-07-20 00:58:04 +00:00 committed by GitHub
commit cec0aba354
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 71 additions and 148 deletions

176
Cargo.lock generated
View File

@ -767,60 +767,43 @@ checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "apalis"
version = "0.4.9"
version = "0.6.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78bbaeebf00817d5aa561515b313ef0d280bf4b92592e4709b21925c1233f613"
checksum = "3e78684d69b4361d01bddf318a5117c18179c52396c7792fc0ebb04da46abbb2"
dependencies = [
"apalis-core",
"apalis-cron",
"apalis-redis",
"apalis-sql",
]
[[package]]
name = "apalis-core"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1deb48475efcdece1f23a0553209ee842f264c2a5e9bcc4928bfa6a15a044cde"
dependencies = [
"async-stream",
"async-trait",
"chrono",
"futures",
"graceful-shutdown",
"http 1.0.0",
"log",
"pin-project-lite",
"serde",
"strum",
"thiserror",
"tokio",
"tower",
"tracing",
"tracing-futures",
]
[[package]]
name = "apalis-core"
version = "0.6.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94c70a31bd587ce6ffe53af31d9693257af40b47f515999b3cf70b8d7353d259"
dependencies = [
"async-oneshot",
"futures",
"futures-timer",
"pin-project-lite",
"serde",
"serde_json",
"thiserror",
"tower",
"ulid",
]
[[package]]
name = "apalis-cron"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43310b7e0132f9520b09224fb6faafb32eec82a672aa79c09e46b5b488ed505b"
dependencies = [
"apalis-core",
"async-stream",
"chrono",
"cron",
"futures",
"tokio",
"tower",
]
[[package]]
name = "apalis-redis"
version = "0.4.9"
version = "0.6.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2abee8225fd875e57b530abbcf2d9c3122c1a2cce905367b67c6410b6f9654d7"
checksum = "d2ae52c1baf49b8faa59b2b03d5cb47ebf9bed90411d7b5878bc97600752761c"
dependencies = [
"apalis-core",
"async-stream",
@ -828,28 +811,8 @@ dependencies = [
"chrono",
"futures",
"log",
"redis 0.24.0",
"redis 0.25.4",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "apalis-sql"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5899bfd124e460f1449ffab643f6bac0dc417e7ca234f34c732c923c6a3addbf"
dependencies = [
"apalis-core",
"async-stream",
"async-trait",
"chrono",
"debounced",
"futures",
"futures-lite",
"serde",
"serde_json",
"sqlx",
"tokio",
]
@ -1048,6 +1011,15 @@ dependencies = [
"serde_json",
]
[[package]]
name = "async-oneshot"
version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae47de2a02d543205f3f5457a90b6ecbc9494db70557bd29590ec8f1ddff5463"
dependencies = [
"futures-micro",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -1614,6 +1586,7 @@ version = "1.0.0"
dependencies = [
"actix-web",
"apalis",
"apalis-redis",
"chrono",
"color-eyre",
"dotenvy",
@ -2348,17 +2321,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "cron"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07"
dependencies = [
"chrono",
"nom",
"once_cell",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
@ -2658,16 +2620,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "debounced"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d8b0346b9fa0aa01a3fa4bcce48d62f8738e9c2956e92f275bbf6cf9d6fab5"
dependencies = [
"futures-timer",
"futures-util",
]
[[package]]
name = "der"
version = "0.6.1"
@ -3327,9 +3279,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
@ -3358,9 +3310,9 @@ checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
@ -3384,19 +3336,6 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-lite"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"parking",
"pin-project-lite",
]
[[package]]
name = "futures-macro"
version = "0.3.30"
@ -3408,6 +3347,15 @@ dependencies = [
"syn 2.0.69",
]
[[package]]
name = "futures-micro"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b460264b3593d68b16a7bc35f7bc226ddfebdf9a1c8db1ed95d5cc6b7168c826"
dependencies = [
"pin-project-lite",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -3472,8 +3420,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasm-bindgen",
]
[[package]]
@ -3542,17 +3492,6 @@ dependencies = [
"spinning_top",
]
[[package]]
name = "graceful-shutdown"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3effbaf774a1da3462925bb182ccf975c284cf46edca5569ea93420a657af484"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "group"
version = "0.12.1"
@ -5407,12 +5346,6 @@ dependencies = [
"sha2",
]
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -5659,9 +5592,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.13"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
@ -6033,7 +5966,6 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"tokio",
"tokio-retry",
"tokio-util",
@ -7205,7 +7137,6 @@ dependencies = [
"atoi",
"byteorder",
"bytes",
"chrono",
"crc",
"crossbeam-queue",
"either",
@ -7269,7 +7200,6 @@ dependencies = [
"sha2",
"sqlx-core",
"sqlx-mysql",
"sqlx-postgres",
"sqlx-sqlite",
"syn 1.0.109",
"tempfile",
@ -7288,7 +7218,6 @@ dependencies = [
"bitflags 2.4.1",
"byteorder",
"bytes",
"chrono",
"crc",
"digest",
"dotenvy",
@ -7330,7 +7259,6 @@ dependencies = [
"base64 0.21.7",
"bitflags 2.4.1",
"byteorder",
"chrono",
"crc",
"dotenvy",
"etcetera",
@ -7366,7 +7294,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b244ef0a8414da0bed4bb1910426e890b19e5e9bccc27ada6b797d05c55ae0aa"
dependencies = [
"atoi",
"chrono",
"flume",
"futures-channel",
"futures-core",
@ -8266,7 +8193,6 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
@ -8457,11 +8383,13 @@ checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "ulid"
version = "1.1.0"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e37c4b6cbcc59a8dcd09a6429fbc7890286bcbb79215cea7b38a3c4c0921d93"
checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259"
dependencies = [
"getrandom",
"rand",
"web-time",
]
[[package]]

View File

@ -6,7 +6,8 @@ edition = "2021"
[dependencies]
actix-web.workspace = true
apalis = { version = "0.4", features = ["redis"] }
apalis = { version = "0.6.0-rc.2" }
apalis-redis = { version = "0.6.0-rc.2" }
chrono.workspace = true
color-eyre.workspace = true
dotenvy.workspace = true

View File

@ -2,7 +2,8 @@
use std::time::Duration;
use apalis::{prelude::*, redis::RedisStorage};
use apalis::prelude::*;
use apalis_redis::{Config, RedisStorage};
use rand::distributions::{Alphanumeric, DistString as _};
use serde::{Deserialize, Serialize};
@ -19,37 +20,29 @@ impl Email {
}
}
impl Job for Email {
const NAME: &'static str = "send_email";
}
async fn process_email_job(job: Email, _ctx: JobContext) -> Result<(), JobError> {
async fn process_email_job(job: Email) {
log::info!("sending email to {}", &job.to);
// simulate time taken to send email
tokio::time::sleep(rand_delay_with_jitter()).await;
Ok(())
}
pub(crate) async fn start_processing_email_queue() -> eyre::Result<RedisStorage<Email>> {
let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
let storage = RedisStorage::connect(redis_url).await?;
let conn = apalis_redis::connect(redis_url).await?;
let config = Config::default().set_namespace("send_email");
let storage = RedisStorage::new_with_config(conn, config);
// create job monitor(s) and attach email job handler
let monitor = Monitor::new().register_with_count(2, {
let storage = storage.clone();
move |n| {
WorkerBuilder::new(format!("job-handler-{n}"))
.with_storage(storage.clone())
// create unmonitored workers for handling emails
let workers = WorkerBuilder::new("job-handler")
.backend(storage.clone())
.build_fn(process_email_job)
}
});
.with_executor_instances(2, TokioExecutor);
// spawn job monitor into background
// the monitor manages itself otherwise so we don't need to return a join handle
for worker in workers {
#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(monitor.run());
let _ = tokio::spawn(worker.run());
}
Ok(storage)
}

View File

@ -3,7 +3,8 @@ use actix_web::{
web::{self, Data},
HttpResponse, Responder,
};
use apalis::{prelude::*, redis::RedisStorage};
use apalis::prelude::*;
use apalis_redis::RedisStorage;
use chrono::{TimeDelta, Utc};
use serde::Deserialize;