1
0
mirror of https://github.com/actix/examples synced 2024-11-23 14:31:07 +01:00

simplify data_factory example

This commit is contained in:
Rob Ede 2022-10-16 19:24:17 +01:00
parent fa323545e9
commit 6953e927ac
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
4 changed files with 44 additions and 157 deletions

57
Cargo.lock generated
View File

@ -809,6 +809,15 @@ dependencies = [
"futures-core", "futures-core",
] ]
[[package]]
name = "async-data"
version = "1.0.0"
dependencies = [
"actix-web",
"env_logger 0.9.0",
"log",
]
[[package]] [[package]]
name = "async-graphql" name = "async-graphql"
version = "4.0.1" version = "4.0.1"
@ -945,16 +954,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "async_data_factory"
version = "1.0.0"
dependencies = [
"actix-web",
"num_cpus",
"redis 0.21.6",
"redis_tang",
]
[[package]] [[package]]
name = "async_db" name = "async_db"
version = "1.0.0" version = "1.0.0"
@ -2111,7 +2110,7 @@ dependencies = [
"env_logger 0.9.0", "env_logger 0.9.0",
"futures-util", "futures-util",
"log", "log",
"redis 0.22.0", "redis",
"serde", "serde",
] ]
@ -4860,25 +4859,6 @@ dependencies = [
"rand_core 0.5.1", "rand_core 0.5.1",
] ]
[[package]]
name = "redis"
version = "0.21.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "571c252c68d09a2ad3e49edd14e9ee48932f3e0f27b06b4ea4c9b2a706d31103"
dependencies = [
"async-trait",
"bytes 1.2.1",
"combine 4.6.6",
"futures-util",
"itoa 1.0.3",
"percent-encoding",
"pin-project-lite 0.2.9",
"ryu",
"tokio 1.21.0",
"tokio-util 0.7.4",
"url",
]
[[package]] [[package]]
name = "redis" name = "redis"
version = "0.22.0" version = "0.22.0"
@ -4929,17 +4909,6 @@ dependencies = [
"time 0.3.14", "time 0.3.14",
] ]
[[package]]
name = "redis_tang"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcabf56cf2286df31463567c8f8d67ee582e32acf78aaa6f7303f791006aa75d"
dependencies = [
"redis 0.21.6",
"tang-rs",
"tokio 1.21.0",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.16" version = "0.2.16"
@ -6079,12 +6048,6 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tang-rs"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b625cb7d76f7bb1887a11d2e7b97677539924010773844ed17252c6ec7877595"
[[package]] [[package]]
name = "tap" name = "tap"
version = "1.0.1" version = "1.0.1"

View File

@ -1,11 +1,10 @@
[package] [package]
name = "async_data_factory" name = "async-data"
version = "1.0.0" version = "1.0.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
actix-web = "4" actix-web = "4"
num_cpus = "1.13"
redis = { version = "0.21", default-features = false, features = ["tokio-comp"] } env_logger = "0.9"
# redis_tang is an redis pool for test purpose log = "0.4"
redis_tang = "0.3"

View File

@ -1,27 +1,14 @@
# Usage: # Async Data Factory
This is an example demonstrating the construction of async state with `App::data_factory` This is an example demonstrating the construction of async state with `App::data_factory`.
## Reason: ## Reason:
Use of a `data_factory` would make sense in these situations: Use of a `data_factory` would make sense in these situations:
- When async state does not necessarily have to be shared between workers/threads. - When async state does not necessarily have to be shared between workers/threads.
- When an async state would spawn tasks. If state was centralized there could be a possibility the tasks get an unbalanced distribution on the workers/threads.
- When an async state would spawn tasks on `actix-rt`. If state was centralized there could be a possibility the tasks get an unbalanced distribution on the workers/threads (`actix-rt` would spawn tasks on local thread whenever it's called) ## Context
## Requirement: The real world difference can be vary by the work you are doing but in general it's a good idea to spread your similarly-expensive async tasks evenly between threads and have as little cross threads synchronization as possible.
- `rustc 1.58 stable`
- `redis` server listen on `127.0.0.1:6379`(or use `REDIS_URL` env argument when starting the example)
## Endpoints:
- use a work load generator(e.g wrk) to benchmark the end points:
http://127.0.0.1:8080/pool prebuilt shared redis pool
http://127.0.0.1:8080/pool2 data_factory redis pool
## Context:
The real world difference can be vary by the work you are doing but in general it's a good idea to spread your _identical_ async tasks evenly between threads and have as little cross threads synchronization as possible.

View File

@ -1,105 +1,43 @@
use actix_web::{get, web::Data, App, HttpServer}; use actix_web::{web, App, HttpResponse, HttpServer};
use redis_tang::{Builder, Pool, RedisManager};
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let redis_url = env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
std::env::var("REDIS_URL").unwrap_or_else(|_| String::from("redis://127.0.0.1"));
let num_cpus = num_cpus::get(); log::info!("starting HTTP server at http://localhost:8080");
// a shared redis pool for work load comparison.
let pool = pool_builder(num_cpus, redis_url.as_str())
.await
.expect("fail to build pool");
let pool = Data::new(RedisWrapper(pool));
HttpServer::new(move || { HttpServer::new(move || {
let redis_url = redis_url.clone();
App::new() App::new()
.app_data(pool.clone())
// a dummy data_factory implementation // a dummy data_factory implementation
.data_factory(|| { .data_factory(|| {
/* // App::data_factory would accept a future as return type and poll the future when
App::data_factory would accept a future as return type and poll the future when // App is initialized.
App is initialized. //
// The Output of the future must be Result<T, E> and T will be the transformed to
The Output of the future must be Result<T, E> and T will be the transformed to // App::Data<T> that can be extracted from handler/request. (The E will only be used
App::Data<T> that can be extracted from handler/request. // to trigger an error log.)
(The E will only be used to trigger a log::error.) //
// This data is bound to worker thread and you get an instance of it for each worker
This data is bound to worker thread and you get an instance of it for every // of the HttpServer, hence the name data_factory.
worker of the HttpServer.(hence the name data_factory) //
*. It is NOT shared between workers // It is NOT shared between workers
(unless the underlying data is a smart pointer like Arc<T>). // (unless the underlying data is a smart pointer like Arc<T>).
*/
async { async {
// 123usize would be transformed into Data<usize> // would be transformed into Data<usize>
Ok::<usize, ()>(123) Ok::<_, ()>(123_usize)
} }
}) })
// a data_factory redis pool for work load comparison. .route(
.data_factory(move || pool_builder(1, redis_url.clone())) "/",
.service(pool_shared_prebuilt) web::to(|data: web::Data<usize>| async move {
.service(pool_local) assert_eq!(**data, 123);
HttpResponse::NoContent()
}),
)
}) })
.workers(2)
.bind(("127.0.0.1", 8080))? .bind(("127.0.0.1", 8080))?
.run() .run()
.await .await
} }
/*
This pool is shared between workers. We have all redis connections spawned tasks on main thread
therefore it puts too much pressure on one thread.
*. This is the case for redis::aio::MultiplexedConnection and it may not apply to other async
redis connection type.
*/
#[get("/pool")]
async fn pool_shared_prebuilt(pool: Data<RedisWrapper>) -> &'static str {
ping(&pool.as_ref().0).await
}
/*
This pool is built with App::data_factory and we have 2 connections fixed for every worker.
It's evenly distributed and have no cross workers synchronization.
*/
#[get("/pool2")]
async fn pool_local(data: Data<usize>, pool: Data<Pool<RedisManager>>) -> &'static str {
assert_eq!(data.get_ref(), &123);
ping(pool.as_ref()).await
}
// boiler plate for redis pool
#[derive(Clone)]
struct RedisWrapper(Pool<RedisManager>);
async fn pool_builder(
num_cpus: usize,
redis_url: impl redis::IntoConnectionInfo,
) -> Result<Pool<RedisManager>, ()> {
let mgr = RedisManager::new(redis_url);
Builder::new()
.always_check(false)
.idle_timeout(None)
.max_lifetime(None)
.min_idle(num_cpus * 2)
.max_size(num_cpus * 2)
.build(mgr)
.await
.map_err(|_| ())
}
async fn ping(pool: &Pool<RedisManager>) -> &'static str {
let mut client = pool.get().await.unwrap().clone();
redis::cmd("PING")
.query_async::<_, ()>(&mut client)
.await
.unwrap();
"Done"
}