1
0
mirror of https://github.com/actix/examples synced 2025-06-26 17:17:42 +02:00

add background-jobs example

This commit is contained in:
Rob Ede
2022-12-30 16:23:24 +00:00
parent aee47e0817
commit 061860743f
9 changed files with 751 additions and 40 deletions

View File

@ -0,0 +1,18 @@
[package]
name = "background-jobs"
version = "1.0.0"
edition = "2021"
[dependencies]
actix-web.workspace = true
anyhow = "1"
apalis = { version = "0.3", features = ["redis"] }
chrono = { version = "0.4.20", default-features = false, features = ["clock", "serde"] }
dotenv = "0.15"
env_logger.workspace = true
log = "0.4"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.13.1", features = ["sync", "rt-multi-thread", "macros"] }
tokio-util = "0.7.4"

View File

@ -0,0 +1,56 @@
use std::{sync::Arc, time::Duration};
use chrono::Utc;
use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;
use crate::ItemCache;
pub(crate) fn init_item_cache() -> (Arc<ItemCache>, JoinHandle<()>, CancellationToken) {
// construct empty item cache
let cache = Arc::new(ItemCache::default());
// stop signal for cache purge job
let cache_sweep_cancel = CancellationToken::new();
// spawn cache purge job
(
Arc::clone(&cache),
tokio::spawn(spawn_cache_sweep(
Arc::clone(&cache),
cache_sweep_cancel.clone(),
)),
cache_sweep_cancel,
)
}
async fn spawn_cache_sweep(cache: Arc<ItemCache>, stop_signal: CancellationToken) {
loop {
// only _try_ to lock so reads and writes from route handlers do not get blocked
if let Ok(mut cache) = cache.try_lock() {
let size = cache.len();
// purge any cached entries where timestamp is in the past
cache.retain(|_k, v| *v > Utc::now());
let removed = size - cache.len();
if removed > 0 {
log::info!("removed {removed} cache entries");
} else {
log::debug!("cache sweep removed no entries")
}
}
tokio::select! {
_ = sleep(Duration::from_secs(10)) => {
continue;
}
_ = stop_signal.cancelled() => {
log::info!("gracefully shutting down cache purge job");
break;
}
};
}
}

View File

@ -0,0 +1,52 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use actix_web::{web::Data, App, HttpServer};
use chrono::{DateTime, Utc};
mod ephemeral_jobs;
mod persistent_jobs;
mod routes;
/// Maps data to its cache expiry time.
pub(crate) type ItemCache = Mutex<HashMap<String, DateTime<Utc>>>;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
// background jobs relating to local, disposable tasks
let (item_cache, cache_sweep_handle, cache_sweep_cancel) = ephemeral_jobs::init_item_cache();
// background jobs that should be run even if the server is restarted
let email_sender = persistent_jobs::start_processing_email_queue().await?;
log::info!("starting HTTP server at http://localhost:8080");
HttpServer::new(move || {
App::new()
.app_data(Data::from(Arc::clone(&item_cache)))
.app_data(Data::new(email_sender.clone()))
.service(routes::view_cache)
.service(routes::cache_item)
.service(routes::send_email)
.service(routes::send_email_batch)
})
.workers(2)
.bind(("127.0.0.1", 8080))?
.run()
.await?;
// signal cache sweep task to stop running
cache_sweep_cancel.cancel();
// wait for the cache sweep job to exit it's loop gracefully
cache_sweep_handle.await.unwrap();
log::info!("application successfully shut down gracefully");
Ok(())
}

View File

@ -0,0 +1,66 @@
//! Persistent background jobs using the [`apalis`] crate with a Redis storage backend.
use std::time::Duration;
use apalis::{prelude::*, redis::RedisStorage};
use rand::Rng as _;
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Email {
to: String,
}
impl Email {
pub(crate) fn random() -> Self {
let user = (&mut rand::thread_rng())
.sample_iter(rand::distributions::Alphanumeric)
.take(10)
.map(char::from)
.collect::<String>();
let to = format!("{user}@fake-mail.com");
Self { to }
}
}
impl Job for Email {
const NAME: &'static str = "send_email";
}
async fn process_email_job(job: Email, _ctx: JobContext) -> Result<JobResult, JobError> {
log::info!("sending email to {}", &job.to);
// simulate time taken to send email
tokio::time::sleep(rand_delay_with_jitter()).await;
Ok(JobResult::Success)
}
pub(crate) async fn start_processing_email_queue() -> anyhow::Result<RedisStorage<Email>> {
let redis_url = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
let storage = RedisStorage::connect(redis_url).await?;
// create job monitor(s) and attach email job handler
let monitor = Monitor::new().register_with_count(2, {
let storage = storage.clone();
move |_n| WorkerBuilder::new(storage.clone()).build_fn(process_email_job)
});
// spawn job monitor into background
let _ = tokio::spawn(async move {
// run_without_signals: don't listen for ctrl-c because Actix Web does
// the monitor manages itself otherwise so we don't need to return a join handle
monitor.run_without_signals().await;
});
Ok(storage)
}
/// Returns a duration close to 1 second.
fn rand_delay_with_jitter() -> Duration {
Duration::from_millis(800_u64 + rand::random::<u8>() as u64 * 2)
}

View File

@ -0,0 +1,65 @@
use actix_web::{
error, get, post,
web::{self, Data},
HttpResponse, Responder,
};
use apalis::{prelude::*, redis::RedisStorage};
use chrono::{Duration, Utc};
use serde::Deserialize;
use crate::{persistent_jobs::Email, ItemCache};
#[derive(Debug, Deserialize)]
pub(crate) struct CacheInsert {
data: String,
duration: u64,
}
#[get("/cache")]
pub(crate) async fn view_cache(cache: Data<ItemCache>) -> actix_web::Result<impl Responder> {
let cached_data = &*cache.lock().unwrap();
Ok(HttpResponse::Ok().json(cached_data))
}
#[post("/cache")]
pub(crate) async fn cache_item(
cache: Data<ItemCache>,
web::Json(form): web::Json<CacheInsert>,
) -> actix_web::Result<impl Responder> {
let expires = Utc::now() + Duration::seconds(form.duration as i64);
// insert into item cache
cache.lock().unwrap().insert(form.data, expires);
Ok(HttpResponse::Ok().body(format!("data cached until {:?}", expires)))
}
#[post("/email")]
pub(crate) async fn send_email(
sender: Data<RedisStorage<Email>>,
web::Json(form): web::Json<Email>,
) -> actix_web::Result<impl Responder> {
(**sender)
.clone()
.push(form)
.await
.map_err(error::ErrorInternalServerError)?;
Ok(HttpResponse::Accepted())
}
#[post("/email-spam")]
pub(crate) async fn send_email_batch(
sender: Data<RedisStorage<Email>>,
) -> actix_web::Result<impl Responder> {
let mut sender = (**sender).clone();
for _ in 0..50 {
sender
.push(Email::random())
.await
.map_err(error::ErrorInternalServerError)?;
}
Ok(HttpResponse::Accepted())
}