From 1e61a03d5fb2afe08b066d6a5ec278d27331f193 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 22 Jun 2020 21:16:25 +0800 Subject: [PATCH 1/2] add example for App::data_factory --- Cargo.toml | 1 + async_data_factory/Cargo.toml | 15 ++++++ async_data_factory/README.md | 23 +++++++++ async_data_factory/src/main.rs | 92 ++++++++++++++++++++++++++++++++++ 4 files changed, 131 insertions(+) create mode 100644 async_data_factory/Cargo.toml create mode 100644 async_data_factory/README.md create mode 100644 async_data_factory/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index cb65c0f..07d29a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "async_ex1", "async_ex2", "async_pg", + "async_data_factory", "awc_https", "basics", "casbin", diff --git a/async_data_factory/Cargo.toml b/async_data_factory/Cargo.toml new file mode 100644 index 0000000..de4894e --- /dev/null +++ b/async_data_factory/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "async_data_factory" +version = "0.1.0" +authors = ["fakeshadow <24548779@qq.com>"] +edition = "2018" +workspace = ".." + + +[dependencies] +actix-rt = "1.1.1" +actix-web = { version = "2.0.0" } +num_cpus = "1.13.0" +redis = { version = "0.16.0", default-features = false, features = ["tokio-rt-core"] } +# tang-rs is an object pool for test purpose +redis_tang = { git = "https://github.com/fakeshadow/tang_rs", branch = "atomic" } diff --git a/async_data_factory/README.md b/async_data_factory/README.md new file mode 100644 index 0000000..294d655 --- /dev/null +++ b/async_data_factory/README.md @@ -0,0 +1,23 @@ +## Usage: +This is an example on constructing async state with `App::data_factory` + +## Reason: +`data_factory` would make sense in these situations: +- When async state not necessarily have to be shared between workers/threads. + +- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibilitythe tasks get a very unbalanced distribution on the workers/threads +(`actix-rt` would spawn tasks on local thread whenever it's called) + +## Requirement: +- `rustc 1.43 stable` +- `redis` server listen on `127.0.0.1:6379`(or make change to const var `REDIS_URL` in `main.rs`) + +## Endpoints: +- use a work load generator(e.g wrk) to benchmark the end points: + + http://127.0.0.1:8080/pool prebuilt shared redis pool + http://127.0.0.1:8080/pool2 data_factory redis pool + +## Context: +The real world difference can be vary by the work you are doing but in general it's a good idea to +spread your *identical* async tasks evenly between threads and have as little cross threads synchronization as possible. diff --git a/async_data_factory/src/main.rs b/async_data_factory/src/main.rs new file mode 100644 index 0000000..d41e92d --- /dev/null +++ b/async_data_factory/src/main.rs @@ -0,0 +1,92 @@ +use actix_web::web::Data; +use actix_web::{web, App, HttpServer}; + +use redis_tang::{Builder, Pool, RedisManager}; + +// change according to your redis setting. +const REDIS_URL: &str = "redis://127.0.0.1"; + +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + let num_cpus = num_cpus::get(); + + // a shared redis pool for work load comparision. + let pool = pool_builder(num_cpus).await.expect("fail to build pool"); + let pool = Wrapper(pool); + + HttpServer::new(move || { + App::new() + .data(pool.clone()) + // a dummy data_factory implementation + .data_factory(|| { + /* + App::data_factory would accept a future as return type and poll the future when App is initialized. + + The Output of the future must be Result and T will be the transformed to App::Data that can be extracted from handler/request. + (The E will only be used to trigger a log::error.) + + This data is bound to worker thread and you get an instance of it for every worker of the HttpServer.(hence the name data_factory) + *. It is NOT shared between workers(unless the underlying data is a smart pointer like Arc). + */ + + async { + // 123usize would be transformed into Data + Ok::(123) + } + }) + // a data_factory redis pool for work load comparision. + .data_factory(|| pool_builder(1)) + .service(web::resource("/pool").route(web::get().to(pool_shared_prebuilt))) + .service(web::resource("/pool2").route(web::get().to(pool_local))) + }) + .bind("127.0.0.1:8080")? + .workers(num_cpus) + .run() + .await +} + +// this pool is shared between workers. +// we have all redis connections spawned tasks on main thread therefore it puts too much pressure on one thread. +// *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async redis connection type. +async fn pool_shared_prebuilt(pool: Data) -> &'static str { + let mut client = pool.0.get().await.unwrap().clone(); + + redis::cmd("PING") + .query_async::<_, ()>(&mut client) + .await + .unwrap(); + + "Done" +} + +// this pool is built with App::data_factory and we have 2 connections fixed for every worker. +// It's evenly distributed and have no cross workers synchronization. +async fn pool_local(data: Data, pool: Data>) -> &'static str { + assert_eq!(data.get_ref(), &123); + + let mut client = pool.get().await.unwrap().clone(); + + redis::cmd("PING") + .query_async::<_, ()>(&mut client) + .await + .unwrap(); + + "Done" +} + +// some boiler plate for create redis pool +#[derive(Clone)] +struct Wrapper(Pool); + +async fn pool_builder(num_cpus: usize) -> Result, ()> { + let mgr = RedisManager::new(REDIS_URL); + Builder::new() + .always_check(false) + .idle_timeout(None) + .max_lifetime(None) + .min_idle(num_cpus * 2) + .max_size(num_cpus * 2) + .build(mgr) + .await + .map_err(|_| ()) +} From c7660c8f8648e4367810d354841ac3709fb4e52f Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 28 Jun 2020 06:15:34 +0800 Subject: [PATCH 2/2] fixes according to reviews Fix typos. Format the comments. Use macro for routes. etc --- Cargo.toml | 2 +- async_data_factory/Cargo.toml | 4 +- async_data_factory/README.md | 4 +- async_data_factory/src/main.rs | 98 +++++++++++++++++++--------------- 4 files changed, 61 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 07d29a7..53f6240 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ + "async_data_factory", "async_db", "async_ex1", "async_ex2", "async_pg", - "async_data_factory", "awc_https", "basics", "casbin", diff --git a/async_data_factory/Cargo.toml b/async_data_factory/Cargo.toml index de4894e..47f6d73 100644 --- a/async_data_factory/Cargo.toml +++ b/async_data_factory/Cargo.toml @@ -11,5 +11,5 @@ actix-rt = "1.1.1" actix-web = { version = "2.0.0" } num_cpus = "1.13.0" redis = { version = "0.16.0", default-features = false, features = ["tokio-rt-core"] } -# tang-rs is an object pool for test purpose -redis_tang = { git = "https://github.com/fakeshadow/tang_rs", branch = "atomic" } +# redis_tang is an redis pool for test purpose +redis_tang = "0.1.0" diff --git a/async_data_factory/README.md b/async_data_factory/README.md index 294d655..98c31e0 100644 --- a/async_data_factory/README.md +++ b/async_data_factory/README.md @@ -5,12 +5,12 @@ This is an example on constructing async state with `App::data_factory` `data_factory` would make sense in these situations: - When async state not necessarily have to be shared between workers/threads. -- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibilitythe tasks get a very unbalanced distribution on the workers/threads +- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibility the tasks get a very unbalanced distribution on the workers/threads (`actix-rt` would spawn tasks on local thread whenever it's called) ## Requirement: - `rustc 1.43 stable` -- `redis` server listen on `127.0.0.1:6379`(or make change to const var `REDIS_URL` in `main.rs`) +- `redis` server listen on `127.0.0.1:6379`(or use `REDIS_URL` env argument when starting the example) ## Endpoints: - use a work load generator(e.g wrk) to benchmark the end points: diff --git a/async_data_factory/src/main.rs b/async_data_factory/src/main.rs index d41e92d..007af70 100644 --- a/async_data_factory/src/main.rs +++ b/async_data_factory/src/main.rs @@ -1,32 +1,40 @@ use actix_web::web::Data; -use actix_web::{web, App, HttpServer}; +use actix_web::{get, App, HttpServer}; use redis_tang::{Builder, Pool, RedisManager}; -// change according to your redis setting. -const REDIS_URL: &str = "redis://127.0.0.1"; - #[actix_rt::main] async fn main() -> std::io::Result<()> { + let redis_url = + std::env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1")); + let num_cpus = num_cpus::get(); - // a shared redis pool for work load comparision. - let pool = pool_builder(num_cpus).await.expect("fail to build pool"); - let pool = Wrapper(pool); + // a shared redis pool for work load comparison. + let pool = pool_builder(num_cpus, redis_url.as_str()) + .await + .expect("fail to build pool"); + let pool = RedisWrapper(pool); HttpServer::new(move || { + let redis_url = redis_url.clone(); + App::new() .data(pool.clone()) // a dummy data_factory implementation .data_factory(|| { /* - App::data_factory would accept a future as return type and poll the future when App is initialized. + App::data_factory would accept a future as return type and poll the future when + App is initialized. - The Output of the future must be Result and T will be the transformed to App::Data that can be extracted from handler/request. + The Output of the future must be Result and T will be the transformed to + App::Data that can be extracted from handler/request. (The E will only be used to trigger a log::error.) - This data is bound to worker thread and you get an instance of it for every worker of the HttpServer.(hence the name data_factory) - *. It is NOT shared between workers(unless the underlying data is a smart pointer like Arc). + This data is bound to worker thread and you get an instance of it for every + worker of the HttpServer.(hence the name data_factory) + *. It is NOT shared between workers + (unless the underlying data is a smart pointer like Arc). */ async { @@ -34,52 +42,47 @@ async fn main() -> std::io::Result<()> { Ok::(123) } }) - // a data_factory redis pool for work load comparision. - .data_factory(|| pool_builder(1)) - .service(web::resource("/pool").route(web::get().to(pool_shared_prebuilt))) - .service(web::resource("/pool2").route(web::get().to(pool_local))) + // a data_factory redis pool for work load comparison. + .data_factory(move || pool_builder(1, redis_url.clone())) + .service(pool_shared_prebuilt) + .service(pool_local) }) .bind("127.0.0.1:8080")? - .workers(num_cpus) .run() .await } -// this pool is shared between workers. -// we have all redis connections spawned tasks on main thread therefore it puts too much pressure on one thread. -// *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async redis connection type. -async fn pool_shared_prebuilt(pool: Data) -> &'static str { - let mut client = pool.0.get().await.unwrap().clone(); - - redis::cmd("PING") - .query_async::<_, ()>(&mut client) - .await - .unwrap(); - - "Done" +/* + This pool is shared between workers. We have all redis connections spawned tasks on main thread + therefore it puts too much pressure on one thread. + *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async + redis connection type. +*/ +#[get("/pool")] +async fn pool_shared_prebuilt(pool: Data) -> &'static str { + ping(&pool.as_ref().0).await } -// this pool is built with App::data_factory and we have 2 connections fixed for every worker. -// It's evenly distributed and have no cross workers synchronization. +/* + This pool is built with App::data_factory and we have 2 connections fixed for every worker. + It's evenly distributed and have no cross workers synchronization. +*/ +#[get("/pool2")] async fn pool_local(data: Data, pool: Data>) -> &'static str { assert_eq!(data.get_ref(), &123); - let mut client = pool.get().await.unwrap().clone(); - - redis::cmd("PING") - .query_async::<_, ()>(&mut client) - .await - .unwrap(); - - "Done" + ping(pool.as_ref()).await } -// some boiler plate for create redis pool +// boiler plate for redis pool #[derive(Clone)] -struct Wrapper(Pool); +struct RedisWrapper(Pool); -async fn pool_builder(num_cpus: usize) -> Result, ()> { - let mgr = RedisManager::new(REDIS_URL); +async fn pool_builder( + num_cpus: usize, + redis_url: impl redis::IntoConnectionInfo, +) -> Result, ()> { + let mgr = RedisManager::new(redis_url); Builder::new() .always_check(false) .idle_timeout(None) @@ -90,3 +93,14 @@ async fn pool_builder(num_cpus: usize) -> Result, ()> { .await .map_err(|_| ()) } + +async fn ping(pool: &Pool) -> &'static str { + let mut client = pool.get().await.unwrap().clone(); + + redis::cmd("PING") + .query_async::<_, ()>(&mut client) + .await + .unwrap(); + + "Done" +}