2022-12-30 17:23:24 +01:00
|
|
|
//! Persistent background jobs using the [`apalis`] crate with a Redis storage backend.
|
|
|
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use apalis::{prelude::*, redis::RedisStorage};
|
|
|
|
use rand::Rng as _;
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
|
|
|
#[derive(Debug, Deserialize, Serialize)]
|
|
|
|
pub(crate) struct Email {
|
|
|
|
to: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Email {
|
|
|
|
pub(crate) fn random() -> Self {
|
|
|
|
let user = (&mut rand::thread_rng())
|
|
|
|
.sample_iter(rand::distributions::Alphanumeric)
|
|
|
|
.take(10)
|
|
|
|
.map(char::from)
|
|
|
|
.collect::<String>();
|
|
|
|
|
|
|
|
let to = format!("{user}@fake-mail.com");
|
|
|
|
|
|
|
|
Self { to }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Job for Email {
|
|
|
|
const NAME: &'static str = "send_email";
|
|
|
|
}
|
|
|
|
|
2023-07-09 01:56:56 +02:00
|
|
|
async fn process_email_job(job: Email, _ctx: JobContext) -> Result<(), JobError> {
|
2022-12-30 17:23:24 +01:00
|
|
|
log::info!("sending email to {}", &job.to);
|
|
|
|
|
|
|
|
// simulate time taken to send email
|
|
|
|
tokio::time::sleep(rand_delay_with_jitter()).await;
|
|
|
|
|
2023-07-09 01:56:56 +02:00
|
|
|
Ok(())
|
2022-12-30 17:23:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) async fn start_processing_email_queue() -> anyhow::Result<RedisStorage<Email>> {
|
|
|
|
let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
|
|
|
|
let storage = RedisStorage::connect(redis_url).await?;
|
|
|
|
|
|
|
|
// create job monitor(s) and attach email job handler
|
|
|
|
let monitor = Monitor::new().register_with_count(2, {
|
|
|
|
let storage = storage.clone();
|
2023-07-09 01:56:56 +02:00
|
|
|
move |n| {
|
|
|
|
WorkerBuilder::new(format!("job-handler-{n}"))
|
|
|
|
.with_storage(storage.clone())
|
|
|
|
.build_fn(process_email_job)
|
|
|
|
}
|
2022-12-30 17:23:24 +01:00
|
|
|
});
|
|
|
|
|
|
|
|
// spawn job monitor into background
|
2022-12-30 17:30:16 +01:00
|
|
|
// the monitor manages itself otherwise so we don't need to return a join handle
|
|
|
|
#[allow(clippy::let_underscore_future)]
|
2023-07-09 01:56:56 +02:00
|
|
|
let _ = tokio::spawn(monitor.run());
|
2022-12-30 17:23:24 +01:00
|
|
|
|
|
|
|
Ok(storage)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns a duration close to 1 second.
|
|
|
|
fn rand_delay_with_jitter() -> Duration {
|
|
|
|
Duration::from_millis(800_u64 + rand::random::<u8>() as u64 * 2)
|
|
|
|
}
|