diff --git a/Cargo.toml b/Cargo.toml index cb65c0fa..07d29a78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "async_ex1", "async_ex2", "async_pg", + "async_data_factory", "awc_https", "basics", "casbin", diff --git a/async_data_factory/Cargo.toml b/async_data_factory/Cargo.toml new file mode 100644 index 00000000..de4894ed --- /dev/null +++ b/async_data_factory/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "async_data_factory" +version = "0.1.0" +authors = ["fakeshadow <24548779@qq.com>"] +edition = "2018" +workspace = ".." + + +[dependencies] +actix-rt = "1.1.1" +actix-web = { version = "2.0.0" } +num_cpus = "1.13.0" +redis = { version = "0.16.0", default-features = false, features = ["tokio-rt-core"] } +# tang-rs is an object pool for test purpose +redis_tang = { git = "https://github.com/fakeshadow/tang_rs", branch = "atomic" } diff --git a/async_data_factory/README.md b/async_data_factory/README.md new file mode 100644 index 00000000..294d655d --- /dev/null +++ b/async_data_factory/README.md @@ -0,0 +1,23 @@ +## Usage: +This is an example on constructing async state with `App::data_factory` + +## Reason: +`data_factory` would make sense in these situations: +- When async state not necessarily have to be shared between workers/threads. + +- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibilitythe tasks get a very unbalanced distribution on the workers/threads +(`actix-rt` would spawn tasks on local thread whenever it's called) + +## Requirement: +- `rustc 1.43 stable` +- `redis` server listen on `127.0.0.1:6379`(or make change to const var `REDIS_URL` in `main.rs`) + +## Endpoints: +- use a work load generator(e.g wrk) to benchmark the end points: + + http://127.0.0.1:8080/pool prebuilt shared redis pool + http://127.0.0.1:8080/pool2 data_factory redis pool + +## Context: +The real world difference can be vary by the work you are doing but in general it's a good idea to +spread your *identical* async tasks evenly between threads and have as little cross threads synchronization as possible. diff --git a/async_data_factory/src/main.rs b/async_data_factory/src/main.rs new file mode 100644 index 00000000..d41e92d2 --- /dev/null +++ b/async_data_factory/src/main.rs @@ -0,0 +1,92 @@ +use actix_web::web::Data; +use actix_web::{web, App, HttpServer}; + +use redis_tang::{Builder, Pool, RedisManager}; + +// change according to your redis setting. +const REDIS_URL: &str = "redis://127.0.0.1"; + +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + let num_cpus = num_cpus::get(); + + // a shared redis pool for work load comparision. + let pool = pool_builder(num_cpus).await.expect("fail to build pool"); + let pool = Wrapper(pool); + + HttpServer::new(move || { + App::new() + .data(pool.clone()) + // a dummy data_factory implementation + .data_factory(|| { + /* + App::data_factory would accept a future as return type and poll the future when App is initialized. + + The Output of the future must be Result and T will be the transformed to App::Data that can be extracted from handler/request. + (The E will only be used to trigger a log::error.) + + This data is bound to worker thread and you get an instance of it for every worker of the HttpServer.(hence the name data_factory) + *. It is NOT shared between workers(unless the underlying data is a smart pointer like Arc). + */ + + async { + // 123usize would be transformed into Data + Ok::(123) + } + }) + // a data_factory redis pool for work load comparision. + .data_factory(|| pool_builder(1)) + .service(web::resource("/pool").route(web::get().to(pool_shared_prebuilt))) + .service(web::resource("/pool2").route(web::get().to(pool_local))) + }) + .bind("127.0.0.1:8080")? + .workers(num_cpus) + .run() + .await +} + +// this pool is shared between workers. +// we have all redis connections spawned tasks on main thread therefore it puts too much pressure on one thread. +// *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async redis connection type. +async fn pool_shared_prebuilt(pool: Data) -> &'static str { + let mut client = pool.0.get().await.unwrap().clone(); + + redis::cmd("PING") + .query_async::<_, ()>(&mut client) + .await + .unwrap(); + + "Done" +} + +// this pool is built with App::data_factory and we have 2 connections fixed for every worker. +// It's evenly distributed and have no cross workers synchronization. +async fn pool_local(data: Data, pool: Data>) -> &'static str { + assert_eq!(data.get_ref(), &123); + + let mut client = pool.get().await.unwrap().clone(); + + redis::cmd("PING") + .query_async::<_, ()>(&mut client) + .await + .unwrap(); + + "Done" +} + +// some boiler plate for create redis pool +#[derive(Clone)] +struct Wrapper(Pool); + +async fn pool_builder(num_cpus: usize) -> Result, ()> { + let mgr = RedisManager::new(REDIS_URL); + Builder::new() + .always_check(false) + .idle_timeout(None) + .max_lifetime(None) + .min_idle(num_cpus * 2) + .max_size(num_cpus * 2) + .build(mgr) + .await + .map_err(|_| ()) +}