diff --git a/Cargo.toml b/Cargo.toml index 6487165b..0be6323b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "async_data_factory", "async_db", "async_ex1", "async_ex2", diff --git a/async_data_factory/Cargo.toml b/async_data_factory/Cargo.toml new file mode 100644 index 00000000..47f6d73c --- /dev/null +++ b/async_data_factory/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "async_data_factory" +version = "0.1.0" +authors = ["fakeshadow <24548779@qq.com>"] +edition = "2018" +workspace = ".." + + +[dependencies] +actix-rt = "1.1.1" +actix-web = { version = "2.0.0" } +num_cpus = "1.13.0" +redis = { version = "0.16.0", default-features = false, features = ["tokio-rt-core"] } +# redis_tang is an redis pool for test purpose +redis_tang = "0.1.0" diff --git a/async_data_factory/README.md b/async_data_factory/README.md new file mode 100644 index 00000000..98c31e07 --- /dev/null +++ b/async_data_factory/README.md @@ -0,0 +1,23 @@ +## Usage: +This is an example on constructing async state with `App::data_factory` + +## Reason: +`data_factory` would make sense in these situations: +- When async state not necessarily have to be shared between workers/threads. + +- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibility the tasks get a very unbalanced distribution on the workers/threads +(`actix-rt` would spawn tasks on local thread whenever it's called) + +## Requirement: +- `rustc 1.43 stable` +- `redis` server listen on `127.0.0.1:6379`(or use `REDIS_URL` env argument when starting the example) + +## Endpoints: +- use a work load generator(e.g wrk) to benchmark the end points: + + http://127.0.0.1:8080/pool prebuilt shared redis pool + http://127.0.0.1:8080/pool2 data_factory redis pool + +## Context: +The real world difference can be vary by the work you are doing but in general it's a good idea to +spread your *identical* async tasks evenly between threads and have as little cross threads synchronization as possible. diff --git a/async_data_factory/src/main.rs b/async_data_factory/src/main.rs new file mode 100644 index 00000000..007af704 --- /dev/null +++ b/async_data_factory/src/main.rs @@ -0,0 +1,106 @@ +use actix_web::web::Data; +use actix_web::{get, App, HttpServer}; + +use redis_tang::{Builder, Pool, RedisManager}; + +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + let redis_url = + std::env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1")); + + let num_cpus = num_cpus::get(); + + // a shared redis pool for work load comparison. + let pool = pool_builder(num_cpus, redis_url.as_str()) + .await + .expect("fail to build pool"); + let pool = RedisWrapper(pool); + + HttpServer::new(move || { + let redis_url = redis_url.clone(); + + App::new() + .data(pool.clone()) + // a dummy data_factory implementation + .data_factory(|| { + /* + App::data_factory would accept a future as return type and poll the future when + App is initialized. + + The Output of the future must be Result and T will be the transformed to + App::Data that can be extracted from handler/request. + (The E will only be used to trigger a log::error.) + + This data is bound to worker thread and you get an instance of it for every + worker of the HttpServer.(hence the name data_factory) + *. It is NOT shared between workers + (unless the underlying data is a smart pointer like Arc). + */ + + async { + // 123usize would be transformed into Data + Ok::(123) + } + }) + // a data_factory redis pool for work load comparison. + .data_factory(move || pool_builder(1, redis_url.clone())) + .service(pool_shared_prebuilt) + .service(pool_local) + }) + .bind("127.0.0.1:8080")? + .run() + .await +} + +/* + This pool is shared between workers. We have all redis connections spawned tasks on main thread + therefore it puts too much pressure on one thread. + *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async + redis connection type. +*/ +#[get("/pool")] +async fn pool_shared_prebuilt(pool: Data) -> &'static str { + ping(&pool.as_ref().0).await +} + +/* + This pool is built with App::data_factory and we have 2 connections fixed for every worker. + It's evenly distributed and have no cross workers synchronization. +*/ +#[get("/pool2")] +async fn pool_local(data: Data, pool: Data>) -> &'static str { + assert_eq!(data.get_ref(), &123); + + ping(pool.as_ref()).await +} + +// boiler plate for redis pool +#[derive(Clone)] +struct RedisWrapper(Pool); + +async fn pool_builder( + num_cpus: usize, + redis_url: impl redis::IntoConnectionInfo, +) -> Result, ()> { + let mgr = RedisManager::new(redis_url); + Builder::new() + .always_check(false) + .idle_timeout(None) + .max_lifetime(None) + .min_idle(num_cpus * 2) + .max_size(num_cpus * 2) + .build(mgr) + .await + .map_err(|_| ()) +} + +async fn ping(pool: &Pool) -> &'static str { + let mut client = pool.get().await.unwrap().clone(); + + redis::cmd("PING") + .query_async::<_, ()>(&mut client) + .await + .unwrap(); + + "Done" +}