From 6953e927ac62890339204c33e8f85d3450272d2a Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sun, 16 Oct 2022 19:24:17 +0100 Subject: [PATCH] simplify data_factory example --- Cargo.lock | 57 ++++---------------- data-factory/Cargo.toml | 9 ++-- data-factory/README.md | 23 ++------ data-factory/src/main.rs | 112 +++++++++------------------------------ 4 files changed, 44 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4c2c8b..b655756 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -809,6 +809,15 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-data" +version = "1.0.0" +dependencies = [ + "actix-web", + "env_logger 0.9.0", + "log", +] + [[package]] name = "async-graphql" version = "4.0.1" @@ -945,16 +954,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async_data_factory" -version = "1.0.0" -dependencies = [ - "actix-web", - "num_cpus", - "redis 0.21.6", - "redis_tang", -] - [[package]] name = "async_db" version = "1.0.0" @@ -2111,7 +2110,7 @@ dependencies = [ "env_logger 0.9.0", "futures-util", "log", - "redis 0.22.0", + "redis", "serde", ] @@ -4860,25 +4859,6 @@ dependencies = [ "rand_core 0.5.1", ] -[[package]] -name = "redis" -version = "0.21.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "571c252c68d09a2ad3e49edd14e9ee48932f3e0f27b06b4ea4c9b2a706d31103" -dependencies = [ - "async-trait", - "bytes 1.2.1", - "combine 4.6.6", - "futures-util", - "itoa 1.0.3", - "percent-encoding", - "pin-project-lite 0.2.9", - "ryu", - "tokio 1.21.0", - "tokio-util 0.7.4", - "url", -] - [[package]] name = "redis" version = "0.22.0" @@ -4929,17 +4909,6 @@ dependencies = [ "time 0.3.14", ] -[[package]] -name = "redis_tang" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcabf56cf2286df31463567c8f8d67ee582e32acf78aaa6f7303f791006aa75d" -dependencies = [ - "redis 0.21.6", - "tang-rs", - "tokio 1.21.0", -] - [[package]] name = "redox_syscall" version = "0.2.16" @@ -6079,12 +6048,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" -[[package]] -name = "tang-rs" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b625cb7d76f7bb1887a11d2e7b97677539924010773844ed17252c6ec7877595" - [[package]] name = "tap" version = "1.0.1" diff --git a/data-factory/Cargo.toml b/data-factory/Cargo.toml index 13c7c4d..d48724a 100644 --- a/data-factory/Cargo.toml +++ b/data-factory/Cargo.toml @@ -1,11 +1,10 @@ [package] -name = "async_data_factory" +name = "async-data" version = "1.0.0" edition = "2021" [dependencies] actix-web = "4" -num_cpus = "1.13" -redis = { version = "0.21", default-features = false, features = ["tokio-comp"] } -# redis_tang is an redis pool for test purpose -redis_tang = "0.3" + +env_logger = "0.9" +log = "0.4" diff --git a/data-factory/README.md b/data-factory/README.md index e956c37..9f19810 100644 --- a/data-factory/README.md +++ b/data-factory/README.md @@ -1,27 +1,14 @@ -# Usage: +# Async Data Factory -This is an example demonstrating the construction of async state with `App::data_factory` +This is an example demonstrating the construction of async state with `App::data_factory`. ## Reason: Use of a `data_factory` would make sense in these situations: - When async state does not necessarily have to be shared between workers/threads. +- When an async state would spawn tasks. If state was centralized there could be a possibility the tasks get an unbalanced distribution on the workers/threads. -- When an async state would spawn tasks on `actix-rt`. If state was centralized there could be a possibility the tasks get an unbalanced distribution on the workers/threads (`actix-rt` would spawn tasks on local thread whenever it's called) +## Context -## Requirement: - -- `rustc 1.58 stable` -- `redis` server listen on `127.0.0.1:6379`(or use `REDIS_URL` env argument when starting the example) - -## Endpoints: - -- use a work load generator(e.g wrk) to benchmark the end points: - - http://127.0.0.1:8080/pool prebuilt shared redis pool - http://127.0.0.1:8080/pool2 data_factory redis pool - -## Context: - -The real world difference can be vary by the work you are doing but in general it's a good idea to spread your _identical_ async tasks evenly between threads and have as little cross threads synchronization as possible. +The real world difference can be vary by the work you are doing but in general it's a good idea to spread your similarly-expensive async tasks evenly between threads and have as little cross threads synchronization as possible. diff --git a/data-factory/src/main.rs b/data-factory/src/main.rs index 6f878bc..2369d75 100644 --- a/data-factory/src/main.rs +++ b/data-factory/src/main.rs @@ -1,105 +1,43 @@ -use actix_web::{get, web::Data, App, HttpServer}; -use redis_tang::{Builder, Pool, RedisManager}; +use actix_web::{web, App, HttpResponse, HttpServer}; #[actix_web::main] async fn main() -> std::io::Result<()> { - let redis_url = - std::env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1")); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); - let num_cpus = num_cpus::get(); - - // a shared redis pool for work load comparison. - let pool = pool_builder(num_cpus, redis_url.as_str()) - .await - .expect("fail to build pool"); - - let pool = Data::new(RedisWrapper(pool)); + log::info!("starting HTTP server at http://localhost:8080"); HttpServer::new(move || { - let redis_url = redis_url.clone(); - App::new() - .app_data(pool.clone()) // a dummy data_factory implementation .data_factory(|| { - /* - App::data_factory would accept a future as return type and poll the future when - App is initialized. - - The Output of the future must be Result and T will be the transformed to - App::Data that can be extracted from handler/request. - (The E will only be used to trigger a log::error.) - - This data is bound to worker thread and you get an instance of it for every - worker of the HttpServer.(hence the name data_factory) - *. It is NOT shared between workers - (unless the underlying data is a smart pointer like Arc). - */ + // App::data_factory would accept a future as return type and poll the future when + // App is initialized. + // + // The Output of the future must be Result and T will be the transformed to + // App::Data that can be extracted from handler/request. (The E will only be used + // to trigger an error log.) + // + // This data is bound to worker thread and you get an instance of it for each worker + // of the HttpServer, hence the name data_factory. + // + // It is NOT shared between workers + // (unless the underlying data is a smart pointer like Arc). async { - // 123usize would be transformed into Data - Ok::(123) + // would be transformed into Data + Ok::<_, ()>(123_usize) } }) - // a data_factory redis pool for work load comparison. - .data_factory(move || pool_builder(1, redis_url.clone())) - .service(pool_shared_prebuilt) - .service(pool_local) + .route( + "/", + web::to(|data: web::Data| async move { + assert_eq!(**data, 123); + HttpResponse::NoContent() + }), + ) }) + .workers(2) .bind(("127.0.0.1", 8080))? .run() .await } - -/* - This pool is shared between workers. We have all redis connections spawned tasks on main thread - therefore it puts too much pressure on one thread. - *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async - redis connection type. -*/ -#[get("/pool")] -async fn pool_shared_prebuilt(pool: Data) -> &'static str { - ping(&pool.as_ref().0).await -} - -/* - This pool is built with App::data_factory and we have 2 connections fixed for every worker. - It's evenly distributed and have no cross workers synchronization. -*/ -#[get("/pool2")] -async fn pool_local(data: Data, pool: Data>) -> &'static str { - assert_eq!(data.get_ref(), &123); - - ping(pool.as_ref()).await -} - -// boiler plate for redis pool -#[derive(Clone)] -struct RedisWrapper(Pool); - -async fn pool_builder( - num_cpus: usize, - redis_url: impl redis::IntoConnectionInfo, -) -> Result, ()> { - let mgr = RedisManager::new(redis_url); - Builder::new() - .always_check(false) - .idle_timeout(None) - .max_lifetime(None) - .min_idle(num_cpus * 2) - .max_size(num_cpus * 2) - .build(mgr) - .await - .map_err(|_| ()) -} - -async fn ping(pool: &Pool) -> &'static str { - let mut client = pool.get().await.unwrap().clone(); - - redis::cmd("PING") - .query_async::<_, ()>(&mut client) - .await - .unwrap(); - - "Done" -}