From 504e89403b523cf0b03309f05cb1fdfeb62e5ee6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Fernandes?= <38193239+0rangeFox@users.noreply.github.com> Date: Tue, 6 Aug 2024 14:47:49 +0100 Subject: [PATCH] feature(session): add deadpool-redis compatibility (#381) * Add compatibility of `deadpool-redis` for the storage `redis_rs`. * Keep up-to-date the `actix-redis` version. * Format the project issued by command `cargo +nightly fmt`. * Add `deadpool-redis` into the documentation and tests. * Update CHANGES.md. * Update the documentation of `Deadpool Redis` section on `redis_rs`. * Replace `no_run` with `ignore` attribute on "Deadpool Redis" example to skip the doc tests failure. * Rollback the renaming `redis::cmd` to `cmd` for better reading and avoid shadowing, fix the wrong return type on builder function comment. * Format the project issued by command `cargo +nightly fmt`. * Format. * Fix feature naming from the last merge. * Fix feature missing from the last merge. * Format the project issued by command `cargo +nightly fmt`. * Re-import `cookie-session` feature. (Maybe was removed accidentally from the last merge?) * tmp * chore: bump deadpool-redis to 0.16 * chore: fixup rest of redis code for pool * fix: add missing cfg guard * docs: fix pool docs --------- Co-authored-by: Rob Ede --- actix-session/CHANGES.md | 1 + actix-session/Cargo.toml | 2 + actix-session/src/storage/mod.rs | 9 +- actix-session/src/storage/redis_rs.rs | 240 ++++++++++++++++++++------ 4 files changed, 196 insertions(+), 56 deletions(-) diff --git a/actix-session/CHANGES.md b/actix-session/CHANGES.md index 0e987ce6d..0f1f28cf1 100644 --- a/actix-session/CHANGES.md +++ b/actix-session/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Add `redis-session-rustls` crate feature that enables `rustls`-secured Redis sessions. +- Add `redis-pool` crate feature (off-by-default) which enables `RedisSessionStore::{new, builder}_pooled()` constructors. - Rename `redis-rs-session` crate feature to `redis-session`. - Rename `redis-rs-tls-session` crate feature to `redis-session-native-tls`. - Remove `redis-actor-session` crate feature (and, therefore, the `actix-redis` based storage backend). diff --git a/actix-session/Cargo.toml b/actix-session/Cargo.toml index 3de6f0e36..bffae7e19 100644 --- a/actix-session/Cargo.toml +++ b/actix-session/Cargo.toml @@ -23,6 +23,7 @@ cookie-session = [] redis-session = ["dep:redis", "dep:rand"] redis-session-native-tls = ["redis-session", "redis/tokio-native-tls-comp"] redis-session-rustls = ["redis-session", "redis/tokio-rustls-comp"] +redis-pool = ["dep:deadpool-redis"] [dependencies] actix-service = "2" @@ -38,6 +39,7 @@ tracing = { version = "0.1.30", default-features = false, features = ["log"] } # redis-session redis = { version = "0.26", default-features = false, features = ["tokio-comp", "connection-manager"], optional = true } +deadpool-redis = { version = "0.16", optional = true } [dev-dependencies] actix-session = { path = ".", features = ["cookie-session", "redis-session"] } diff --git a/actix-session/src/storage/mod.rs b/actix-session/src/storage/mod.rs index b3fba4a00..39395f0cb 100644 --- a/actix-session/src/storage/mod.rs +++ b/actix-session/src/storage/mod.rs @@ -11,11 +11,12 @@ mod utils; #[cfg(feature = "cookie-session")] pub use self::cookie::CookieSessionStore; -#[cfg(feature = "redis-session")] -pub use self::redis_rs::{RedisSessionStore, RedisSessionStoreBuilder}; -#[cfg(feature = "redis-session")] -pub use self::utils::generate_session_key; pub use self::{ interface::{LoadError, SaveError, SessionStore, UpdateError}, session_key::SessionKey, }; +#[cfg(feature = "redis-session")] +pub use self::{ + redis_rs::{RedisSessionStore, RedisSessionStoreBuilder}, + utils::generate_session_key, +}; diff --git a/actix-session/src/storage/redis_rs.rs b/actix-session/src/storage/redis_rs.rs index ca9aeb5e6..c88e7028c 100644 --- a/actix-session/src/storage/redis_rs.rs +++ b/actix-session/src/storage/redis_rs.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use actix_web::cookie::time::Duration; use anyhow::Error; -use redis::{aio::ConnectionManager, AsyncCommands, Cmd, FromRedisValue, RedisResult, Value}; +use redis::{aio::ConnectionManager, AsyncCommands, Client, Cmd, FromRedisValue, Value}; use super::SessionKey; use crate::storage::{ @@ -56,14 +56,38 @@ use crate::storage::{ /// # }) /// ``` /// -/// # Implementation notes -/// `RedisSessionStore` leverages [`redis-rs`] as Redis client. +/// # Pooled Redis Connections /// -/// [`redis-rs`]: https://github.com/mitsuhiko/redis-rs +/// When the `redis-pool` crate feature is enabled, a pre-existing pool from [`deadpool_redis`] can +/// be provided. +/// +/// ```no_run +/// use actix_session::storage::RedisSessionStore; +/// use deadpool_redis::{Config, Runtime}; +/// +/// let redis_cfg = Config::from_url("redis://127.0.0.1:6379"); +/// let redis_pool = redis_cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); +/// +/// let store = RedisSessionStore::new_pooled(redis_pool); +/// ``` +/// +/// # Implementation notes +/// +/// `RedisSessionStore` leverages the [`redis`] crate as the underlying Redis client. #[derive(Clone)] pub struct RedisSessionStore { configuration: CacheConfiguration, - client: ConnectionManager, + client: RedisSessionConn, +} + +#[derive(Clone)] +enum RedisSessionConn { + /// Single connection. + Single(ConnectionManager), + + /// Connection pool. + #[cfg(feature = "redis-pool")] + Pool(deadpool_redis::Pool), } #[derive(Clone)] @@ -80,34 +104,77 @@ impl Default for CacheConfiguration { } impl RedisSessionStore { - /// A fluent API to configure [`RedisSessionStore`]. - /// It takes as input the only required input to create a new instance of [`RedisSessionStore`] - a - /// connection string for Redis. - pub fn builder>(connection_string: S) -> RedisSessionStoreBuilder { + /// Returns a fluent API builder to configure [`RedisSessionStore`]. + /// + /// It takes as input the only required input to create a new instance of [`RedisSessionStore`] + /// - a connection string for Redis. + pub fn builder(connection_string: impl Into) -> RedisSessionStoreBuilder { RedisSessionStoreBuilder { configuration: CacheConfiguration::default(), - connection_string: connection_string.into(), + conn_builder: RedisSessionConnBuilder::Single(connection_string.into()), } } - /// Create a new instance of [`RedisSessionStore`] using the default configuration. - /// It takes as input the only required input to create a new instance of [`RedisSessionStore`] - a - /// connection string for Redis. - pub async fn new>( - connection_string: S, - ) -> Result { + /// Returns a fluent API builder to configure [`RedisSessionStore`]. + /// + /// It takes as input the only required input to create a new instance of [`RedisSessionStore`] + /// - a pool object for Redis. + #[cfg(feature = "redis-pool")] + pub fn builder_pooled(pool: impl Into) -> RedisSessionStoreBuilder { + RedisSessionStoreBuilder { + configuration: CacheConfiguration::default(), + conn_builder: RedisSessionConnBuilder::Pool(pool.into()), + } + } + + /// Creates a new instance of [`RedisSessionStore`] using the default configuration. + /// + /// It takes as input the only required input to create a new instance of [`RedisSessionStore`] + /// - a connection string for Redis. + pub async fn new(connection_string: impl Into) -> Result { Self::builder(connection_string).build().await } + + /// Creates a new instance of [`RedisSessionStore`] using the default configuration. + /// + /// It takes as input the only required input to create a new instance of [`RedisSessionStore`] + /// - a pool object for Redis. + #[cfg(feature = "redis-pool")] + pub async fn new_pooled( + pool: impl Into, + ) -> anyhow::Result { + Self::builder_pooled(pool).build().await + } } /// A fluent builder to construct a [`RedisSessionStore`] instance with custom configuration /// parameters. -/// -/// [`RedisSessionStore`]: crate::storage::RedisSessionStore #[must_use] pub struct RedisSessionStoreBuilder { - connection_string: String, configuration: CacheConfiguration, + conn_builder: RedisSessionConnBuilder, +} + +enum RedisSessionConnBuilder { + /// Single connection string. + Single(String), + + /// Pre-built connection pool. + #[cfg(feature = "redis-pool")] + Pool(deadpool_redis::Pool), +} + +impl RedisSessionConnBuilder { + async fn into_client(self) -> anyhow::Result { + Ok(match self { + RedisSessionConnBuilder::Single(conn_string) => { + RedisSessionConn::Single(ConnectionManager::new(Client::open(conn_string)?).await?) + } + + #[cfg(feature = "redis-pool")] + RedisSessionConnBuilder::Pool(pool) => RedisSessionConn::Pool(pool), + }) + } } impl RedisSessionStoreBuilder { @@ -120,9 +187,10 @@ impl RedisSessionStoreBuilder { self } - /// Finalise the builder and return a [`RedisSessionStore`] instance. - pub async fn build(self) -> Result { - let client = ConnectionManager::new(redis::Client::open(self.connection_string)?).await?; + /// Finalises builder and returns a [`RedisSessionStore`] instance. + pub async fn build(self) -> anyhow::Result { + let client = self.conn_builder.into_client().await?; + Ok(RedisSessionStore { configuration: self.configuration, client, @@ -190,7 +258,7 @@ impl SessionStore for RedisSessionStore { let cache_key = (self.configuration.cache_keygen)(session_key.as_ref()); - let v: redis::Value = self + let v: Value = self .execute_command(redis::cmd("SET").arg(&[ &cache_key, &body, @@ -223,18 +291,29 @@ impl SessionStore for RedisSessionStore { } } - async fn update_ttl(&self, session_key: &SessionKey, ttl: &Duration) -> Result<(), Error> { + async fn update_ttl(&self, session_key: &SessionKey, ttl: &Duration) -> anyhow::Result<()> { let cache_key = (self.configuration.cache_keygen)(session_key.as_ref()); - self.client - .clone() - .expire::<_, ()>(&cache_key, ttl.whole_seconds()) - .await?; + match self.client { + RedisSessionConn::Single(ref conn) => { + conn.clone() + .expire::<_, ()>(&cache_key, ttl.whole_seconds()) + .await?; + } + + #[cfg(feature = "redis-pool")] + RedisSessionConn::Pool(ref pool) => { + pool.get() + .await? + .expire::<_, ()>(&cache_key, ttl.whole_seconds()) + .await?; + } + } Ok(()) } - async fn delete(&self, session_key: &SessionKey) -> Result<(), anyhow::Error> { + async fn delete(&self, session_key: &SessionKey) -> Result<(), Error> { let cache_key = (self.configuration.cache_keygen)(session_key.as_ref()); self.execute_command::<()>(redis::cmd("DEL").arg(&[&cache_key])) @@ -261,24 +340,55 @@ impl RedisSessionStore { /// retry will be executed on a fresh connection, therefore it is likely to succeed (or fail for /// a different more meaningful reason). #[allow(clippy::needless_pass_by_ref_mut)] - async fn execute_command(&self, cmd: &mut Cmd) -> RedisResult { + async fn execute_command(&self, cmd: &mut Cmd) -> anyhow::Result { let mut can_retry = true; - loop { - match cmd.query_async(&mut self.client.clone()).await { - Ok(value) => return Ok(value), - Err(err) => { - if can_retry && err.is_connection_dropped() { - tracing::debug!( - "Connection dropped while trying to talk to Redis. Retrying." - ); + match self.client { + RedisSessionConn::Single(ref conn) => { + let mut conn = conn.clone(); - // Retry at most once - can_retry = false; + loop { + match cmd.query_async(&mut conn).await { + Ok(value) => return Ok(value), + Err(err) => { + if can_retry && err.is_connection_dropped() { + tracing::debug!( + "Connection dropped while trying to talk to Redis. Retrying." + ); - continue; - } else { - return Err(err); + // Retry at most once + can_retry = false; + + continue; + } else { + return Err(err.into()); + } + } + } + } + } + + #[cfg(feature = "redis-pool")] + RedisSessionConn::Pool(ref pool) => { + let mut conn = pool.get().await?; + + loop { + match cmd.query_async(&mut conn).await { + Ok(value) => return Ok(value), + Err(err) => { + if can_retry && err.is_connection_dropped() { + tracing::debug!( + "Connection dropped while trying to talk to Redis. Retrying." + ); + + // Retry at most once + can_retry = false; + + continue; + } else { + return Err(err.into()); + } + } } } } @@ -291,14 +401,27 @@ mod tests { use std::collections::HashMap; use actix_web::cookie::time; + #[cfg(not(feature = "redis-session"))] + use deadpool_redis::{Config, Runtime}; use super::*; use crate::test_helpers::acceptance_test_suite; async fn redis_store() -> RedisSessionStore { - RedisSessionStore::new("redis://127.0.0.1:6379") - .await - .unwrap() + #[cfg(feature = "redis-session")] + { + RedisSessionStore::new("redis://127.0.0.1:6379") + .await + .unwrap() + } + + #[cfg(not(feature = "redis-session"))] + { + let redis_pool = Config::from_url("redis://127.0.0.1:6379") + .create_pool(Some(Runtime::Tokio1)) + .unwrap(); + RedisSessionStore::new(redis_pool.clone()) + } } #[actix_web::test] @@ -318,12 +441,25 @@ mod tests { async fn loading_an_invalid_session_state_returns_deserialization_error() { let store = redis_store().await; let session_key = generate_session_key(); - store - .client - .clone() - .set::<_, _, ()>(session_key.as_ref(), "random-thing-which-is-not-json") - .await - .unwrap(); + + match store.client { + RedisSessionConn::Single(ref conn) => conn + .clone() + .set::<_, _, ()>(session_key.as_ref(), "random-thing-which-is-not-json") + .await + .unwrap(), + + #[cfg(feature = "redis-pool")] + RedisSessionConn::Pool(ref pool) => { + pool.get() + .await + .unwrap() + .set::<_, _, ()>(session_key.as_ref(), "random-thing-which-is-not-json") + .await + .unwrap(); + } + } + assert!(matches!( store.load(&session_key).await.unwrap_err(), LoadError::Deserialization(_),