From 8d2a4cfe6867604a4cbc623a76a71a855c5feff9 Mon Sep 17 00:00:00 2001 From: robjtede Date: Fri, 25 Mar 2022 19:10:05 +0000 Subject: [PATCH] =?UTF-8?q?Deploying=20to=20gh-pages=20from=20=20@=2004f49?= =?UTF-8?q?340013204fe324dd04752a30c0dbf9da91d=20=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/struct.RedisSessionStore.html | 8 +- actix_session/storage/trait.SessionStore.html | 2 +- src/actix_session/storage/redis_rs.rs.html | 112 ++++++++++++++---- 3 files changed, 93 insertions(+), 29 deletions(-) diff --git a/actix_session/storage/struct.RedisSessionStore.html b/actix_session/storage/struct.RedisSessionStore.html index 07d7f713d..280a57631 100644 --- a/actix_session/storage/struct.RedisSessionStore.html +++ b/actix_session/storage/struct.RedisSessionStore.html @@ -51,10 +51,10 @@ It takes as input the only required input to create a new instance of Trait Implementations

Returns a copy of the value. Read more

Performs copy-assignment from source. Read more

-

Loads the session state associated to a session key.

-

Persist the session state for a newly created session. Read more

-

Updates the session state associated to a pre-existing session key.

-

Deletes a session from the store.

+

Loads the session state associated to a session key.

+

Persist the session state for a newly created session. Read more

+

Updates the session state associated to a pre-existing session key.

+

Deletes a session from the store.

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

diff --git a/actix_session/storage/trait.SessionStore.html b/actix_session/storage/trait.SessionStore.html index 2e2082ea0..91a75075d 100644 --- a/actix_session/storage/trait.SessionStore.html +++ b/actix_session/storage/trait.SessionStore.html @@ -21,5 +21,5 @@ is required for implementations, too. In particular, we use the send-optional va

Returns the corresponding session key.

Updates the session state associated to a pre-existing session key.

Deletes a session from the store.

-

Implementors

+

Implementors

\ No newline at end of file diff --git a/src/actix_session/storage/redis_rs.rs.html b/src/actix_session/storage/redis_rs.rs.html index fd784f62f..9ba1ac663 100644 --- a/src/actix_session/storage/redis_rs.rs.html +++ b/src/actix_session/storage/redis_rs.rs.html @@ -302,9 +302,41 @@ 295 296 297 +298 +299 +300 +301 +302 +303 +304 +305 +306 +307 +308 +309 +310 +311 +312 +313 +314 +315 +316 +317 +318 +319 +320 +321 +322 +323 +324 +325 +326 +327 +328 +329
use std::sync::Arc;
 
-use redis::{aio::ConnectionManager, AsyncCommands, Value};
+use redis::{aio::ConnectionManager, Cmd, FromRedisValue, RedisResult, Value};
 use time::{self, Duration};
 
 use super::SessionKey;
@@ -440,10 +472,9 @@
 impl SessionStore for RedisSessionStore {
     async fn load(&self, session_key: &SessionKey) -> Result<Option<SessionState>, LoadError> {
         let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
+
         let value: Option<String> = self
-            .client
-            .clone()
-            .get(cache_key)
+            .execute_command(redis::cmd("GET").arg(&[&cache_key]))
             .await
             .map_err(Into::into)
             .map_err(LoadError::Other)?;
@@ -467,18 +498,16 @@
         let session_key = generate_session_key();
         let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
 
-        redis::cmd("SET")
-            .arg(&[
-                &cache_key,
-                &body,
-                "NX", // NX: only set the key if it does not already exist
-                "EX", // EX: set expiry
-                &format!("{}", ttl.whole_seconds()),
-            ])
-            .query_async(&mut self.client.clone())
-            .await
-            .map_err(Into::into)
-            .map_err(SaveError::Other)?;
+        self.execute_command(redis::cmd("SET").arg(&[
+            &cache_key,
+            &body,
+            "NX", // NX: only set the key if it does not already exist
+            "EX", // EX: set expiry
+            &format!("{}", ttl.whole_seconds()),
+        ]))
+        .await
+        .map_err(Into::into)
+        .map_err(SaveError::Other)?;
 
         Ok(session_key)
     }
@@ -495,15 +524,14 @@
 
         let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
 
-        let v: redis::Value = redis::cmd("SET")
-            .arg(&[
+        let v: redis::Value = self
+            .execute_command(redis::cmd("SET").arg(&[
                 &cache_key,
                 &body,
                 "XX", // XX: Only set the key if it already exist.
                 "EX", // EX: set expiry
                 &format!("{}", ttl.whole_seconds()),
-            ])
-            .query_async(&mut self.client.clone())
+            ]))
             .await
             .map_err(Into::into)
             .map_err(UpdateError::Other)?;
@@ -531,10 +559,7 @@
 
     async fn delete(&self, session_key: &SessionKey) -> Result<(), anyhow::Error> {
         let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
-
-        self.client
-            .clone()
-            .del(&cache_key)
+        self.execute_command(redis::cmd("DEL").arg(&[&cache_key]))
             .await
             .map_err(Into::into)
             .map_err(UpdateError::Other)?;
@@ -543,6 +568,45 @@
     }
 }
 
+impl RedisSessionStore {
+    /// Execute Redis command and retry once in certain cases.
+    ///
+    /// `ConnectionManager` automatically reconnects when it encounters an error talking to Redis.
+    /// The request that bumped into the error, though, fails.
+    ///
+    /// This is generally OK, but there is an unpleasant edge case: Redis client timeouts. The
+    /// server is configured to drop connections who have been active longer than a pre-determined
+    /// threshold. `redis-rs` does not proactively detect that the connection has been dropped - you
+    /// only find out when you try to use it.
+    ///
+    /// This helper method catches this case (`.is_connection_dropped`) to execute a retry. The
+    /// retry will be executed on a fresh connection, therefore it is likely to succeed (or fail for
+    /// a different more meaningful reason).
+    async fn execute_command<T: FromRedisValue>(&self, cmd: &mut Cmd) -> RedisResult<T> {
+        let mut can_retry = true;
+
+        loop {
+            match cmd.query_async(&mut self.client.clone()).await {
+                Ok(value) => return Ok(value),
+                Err(err) => {
+                    if can_retry && err.is_connection_dropped() {
+                        tracing::debug!(
+                            "Connection dropped while trying to talk to Redis. Retrying."
+                        );
+
+                        // Retry at most once
+                        can_retry = false;
+
+                        continue;
+                    } else {
+                        return Err(err);
+                    }
+                }
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod test {
     use std::collections::HashMap;