From c7660c8f8648e4367810d354841ac3709fb4e52f Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 28 Jun 2020 06:15:34 +0800 Subject: [PATCH] fixes according to reviews Fix typos. Format the comments. Use macro for routes. etc --- Cargo.toml | 2 +- async_data_factory/Cargo.toml | 4 +- async_data_factory/README.md | 4 +- async_data_factory/src/main.rs | 98 +++++++++++++++++++--------------- 4 files changed, 61 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 07d29a78..53f6240c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ + "async_data_factory", "async_db", "async_ex1", "async_ex2", "async_pg", - "async_data_factory", "awc_https", "basics", "casbin", diff --git a/async_data_factory/Cargo.toml b/async_data_factory/Cargo.toml index de4894ed..47f6d73c 100644 --- a/async_data_factory/Cargo.toml +++ b/async_data_factory/Cargo.toml @@ -11,5 +11,5 @@ actix-rt = "1.1.1" actix-web = { version = "2.0.0" } num_cpus = "1.13.0" redis = { version = "0.16.0", default-features = false, features = ["tokio-rt-core"] } -# tang-rs is an object pool for test purpose -redis_tang = { git = "https://github.com/fakeshadow/tang_rs", branch = "atomic" } +# redis_tang is an redis pool for test purpose +redis_tang = "0.1.0" diff --git a/async_data_factory/README.md b/async_data_factory/README.md index 294d655d..98c31e07 100644 --- a/async_data_factory/README.md +++ b/async_data_factory/README.md @@ -5,12 +5,12 @@ This is an example on constructing async state with `App::data_factory` `data_factory` would make sense in these situations: - When async state not necessarily have to be shared between workers/threads. -- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibilitythe tasks get a very unbalanced distribution on the workers/threads +- When async state would spawn tasks on `actix-rt`. If we centralized the state there could be a possibility the tasks get a very unbalanced distribution on the workers/threads (`actix-rt` would spawn tasks on local thread whenever it's called) ## Requirement: - `rustc 1.43 stable` -- `redis` server listen on `127.0.0.1:6379`(or make change to const var `REDIS_URL` in `main.rs`) +- `redis` server listen on `127.0.0.1:6379`(or use `REDIS_URL` env argument when starting the example) ## Endpoints: - use a work load generator(e.g wrk) to benchmark the end points: diff --git a/async_data_factory/src/main.rs b/async_data_factory/src/main.rs index d41e92d2..007af704 100644 --- a/async_data_factory/src/main.rs +++ b/async_data_factory/src/main.rs @@ -1,32 +1,40 @@ use actix_web::web::Data; -use actix_web::{web, App, HttpServer}; +use actix_web::{get, App, HttpServer}; use redis_tang::{Builder, Pool, RedisManager}; -// change according to your redis setting. -const REDIS_URL: &str = "redis://127.0.0.1"; - #[actix_rt::main] async fn main() -> std::io::Result<()> { + let redis_url = + std::env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1")); + let num_cpus = num_cpus::get(); - // a shared redis pool for work load comparision. - let pool = pool_builder(num_cpus).await.expect("fail to build pool"); - let pool = Wrapper(pool); + // a shared redis pool for work load comparison. + let pool = pool_builder(num_cpus, redis_url.as_str()) + .await + .expect("fail to build pool"); + let pool = RedisWrapper(pool); HttpServer::new(move || { + let redis_url = redis_url.clone(); + App::new() .data(pool.clone()) // a dummy data_factory implementation .data_factory(|| { /* - App::data_factory would accept a future as return type and poll the future when App is initialized. + App::data_factory would accept a future as return type and poll the future when + App is initialized. - The Output of the future must be Result and T will be the transformed to App::Data that can be extracted from handler/request. + The Output of the future must be Result and T will be the transformed to + App::Data that can be extracted from handler/request. (The E will only be used to trigger a log::error.) - This data is bound to worker thread and you get an instance of it for every worker of the HttpServer.(hence the name data_factory) - *. It is NOT shared between workers(unless the underlying data is a smart pointer like Arc). + This data is bound to worker thread and you get an instance of it for every + worker of the HttpServer.(hence the name data_factory) + *. It is NOT shared between workers + (unless the underlying data is a smart pointer like Arc). */ async { @@ -34,52 +42,47 @@ async fn main() -> std::io::Result<()> { Ok::(123) } }) - // a data_factory redis pool for work load comparision. - .data_factory(|| pool_builder(1)) - .service(web::resource("/pool").route(web::get().to(pool_shared_prebuilt))) - .service(web::resource("/pool2").route(web::get().to(pool_local))) + // a data_factory redis pool for work load comparison. + .data_factory(move || pool_builder(1, redis_url.clone())) + .service(pool_shared_prebuilt) + .service(pool_local) }) .bind("127.0.0.1:8080")? - .workers(num_cpus) .run() .await } -// this pool is shared between workers. -// we have all redis connections spawned tasks on main thread therefore it puts too much pressure on one thread. -// *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async redis connection type. -async fn pool_shared_prebuilt(pool: Data) -> &'static str { - let mut client = pool.0.get().await.unwrap().clone(); - - redis::cmd("PING") - .query_async::<_, ()>(&mut client) - .await - .unwrap(); - - "Done" +/* + This pool is shared between workers. We have all redis connections spawned tasks on main thread + therefore it puts too much pressure on one thread. + *. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async + redis connection type. +*/ +#[get("/pool")] +async fn pool_shared_prebuilt(pool: Data) -> &'static str { + ping(&pool.as_ref().0).await } -// this pool is built with App::data_factory and we have 2 connections fixed for every worker. -// It's evenly distributed and have no cross workers synchronization. +/* + This pool is built with App::data_factory and we have 2 connections fixed for every worker. + It's evenly distributed and have no cross workers synchronization. +*/ +#[get("/pool2")] async fn pool_local(data: Data, pool: Data>) -> &'static str { assert_eq!(data.get_ref(), &123); - let mut client = pool.get().await.unwrap().clone(); - - redis::cmd("PING") - .query_async::<_, ()>(&mut client) - .await - .unwrap(); - - "Done" + ping(pool.as_ref()).await } -// some boiler plate for create redis pool +// boiler plate for redis pool #[derive(Clone)] -struct Wrapper(Pool); +struct RedisWrapper(Pool); -async fn pool_builder(num_cpus: usize) -> Result, ()> { - let mgr = RedisManager::new(REDIS_URL); +async fn pool_builder( + num_cpus: usize, + redis_url: impl redis::IntoConnectionInfo, +) -> Result, ()> { + let mgr = RedisManager::new(redis_url); Builder::new() .always_check(false) .idle_timeout(None) @@ -90,3 +93,14 @@ async fn pool_builder(num_cpus: usize) -> Result, ()> { .await .map_err(|_| ()) } + +async fn ping(pool: &Pool) -> &'static str { + let mut client = pool.get().await.unwrap().clone(); + + redis::cmd("PING") + .query_async::<_, ()>(&mut client) + .await + .unwrap(); + + "Done" +}