1
0
mirror of https://github.com/actix/examples synced 2025-06-26 17:17:42 +02:00

Chore: Update apalis to the latest version

This commit is contained in:
geofmureithi
2024-07-12 22:41:00 +03:00
parent 42acb27186
commit 642fbc84ad
4 changed files with 72 additions and 147 deletions

View File

@ -6,7 +6,8 @@ edition = "2021"
[dependencies]
actix-web.workspace = true
apalis = { version = "0.4", features = ["redis"] }
apalis = { version = "0.6.0-rc.2" }
apalis-redis = { version = "0.6.0-rc.2" }
chrono.workspace = true
color-eyre.workspace = true
dotenvy.workspace = true

View File

@ -2,7 +2,8 @@
use std::time::Duration;
use apalis::{prelude::*, redis::RedisStorage};
use apalis::prelude::*;
use apalis_redis::{Config, RedisStorage};
use rand::distributions::{Alphanumeric, DistString as _};
use serde::{Deserialize, Serialize};
@ -19,37 +20,31 @@ impl Email {
}
}
impl Job for Email {
const NAME: &'static str = "send_email";
}
async fn process_email_job(job: Email, _ctx: JobContext) -> Result<(), JobError> {
async fn process_email_job(job: Email) {
log::info!("sending email to {}", &job.to);
// simulate time taken to send email
tokio::time::sleep(rand_delay_with_jitter()).await;
Ok(())
// Ok(())
}
pub(crate) async fn start_processing_email_queue() -> eyre::Result<RedisStorage<Email>> {
let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
let storage = RedisStorage::connect(redis_url).await?;
let conn = apalis_redis::connect(redis_url).await?;
let config = Config::default().set_namespace("send_email");
let storage = RedisStorage::new_with_config(conn, config);
// create job monitor(s) and attach email job handler
let monitor = Monitor::new().register_with_count(2, {
let storage = storage.clone();
move |n| {
WorkerBuilder::new(format!("job-handler-{n}"))
.with_storage(storage.clone())
.build_fn(process_email_job)
}
});
// create unmonitored workers for handling emails
let workers = WorkerBuilder::new("job-handler")
.backend(storage.clone())
.build_fn(process_email_job)
.with_executor_instances(2, TokioExecutor);
// spawn job monitor into background
// the monitor manages itself otherwise so we don't need to return a join handle
#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(monitor.run());
for worker in workers {
#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(worker.run());
}
Ok(storage)
}

View File

@ -3,7 +3,8 @@ use actix_web::{
web::{self, Data},
HttpResponse, Responder,
};
use apalis::{prelude::*, redis::RedisStorage};
use apalis::prelude::*;
use apalis_redis::RedisStorage;
use chrono::{TimeDelta, Utc};
use serde::Deserialize;