mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-27 17:22:57 +01:00
[actix-session/redis-rs] Catch server-driven disconnections and prevent a user-visible issue via a retry (#235)
Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
parent
4d77e26e1e
commit
04f4934001
@ -2,10 +2,12 @@
|
|||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
- Implement `SessionExt` for `GuardContext`. [#234]
|
- Implement `SessionExt` for `GuardContext`. [#234]
|
||||||
|
- `RedisSessionStore` will prevent connection timeouts from causing user-visible errors. [#235]
|
||||||
- Do not leak internal implementation details to callers when errors occur. [#236]
|
- Do not leak internal implementation details to callers when errors occur. [#236]
|
||||||
|
|
||||||
[#234]: https://github.com/actix/actix-extras/pull/234
|
[#234]: https://github.com/actix/actix-extras/pull/234
|
||||||
[#236]: https://github.com/actix/actix-extras/pull/236
|
[#236]: https://github.com/actix/actix-extras/pull/236
|
||||||
|
[#235]: https://github.com/actix/actix-extras/pull/235
|
||||||
|
|
||||||
|
|
||||||
## 0.6.1 - 2022-03-21
|
## 0.6.1 - 2022-03-21
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use redis::{aio::ConnectionManager, AsyncCommands, Value};
|
use redis::{aio::ConnectionManager, Cmd, FromRedisValue, RedisResult, Value};
|
||||||
use time::{self, Duration};
|
use time::{self, Duration};
|
||||||
|
|
||||||
use super::SessionKey;
|
use super::SessionKey;
|
||||||
@ -136,10 +136,9 @@ impl RedisSessionStoreBuilder {
|
|||||||
impl SessionStore for RedisSessionStore {
|
impl SessionStore for RedisSessionStore {
|
||||||
async fn load(&self, session_key: &SessionKey) -> Result<Option<SessionState>, LoadError> {
|
async fn load(&self, session_key: &SessionKey) -> Result<Option<SessionState>, LoadError> {
|
||||||
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
||||||
|
|
||||||
let value: Option<String> = self
|
let value: Option<String> = self
|
||||||
.client
|
.execute_command(redis::cmd("GET").arg(&[&cache_key]))
|
||||||
.clone()
|
|
||||||
.get(cache_key)
|
|
||||||
.await
|
.await
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
.map_err(LoadError::Other)?;
|
.map_err(LoadError::Other)?;
|
||||||
@ -163,15 +162,13 @@ impl SessionStore for RedisSessionStore {
|
|||||||
let session_key = generate_session_key();
|
let session_key = generate_session_key();
|
||||||
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
||||||
|
|
||||||
redis::cmd("SET")
|
self.execute_command(redis::cmd("SET").arg(&[
|
||||||
.arg(&[
|
|
||||||
&cache_key,
|
&cache_key,
|
||||||
&body,
|
&body,
|
||||||
"NX", // NX: only set the key if it does not already exist
|
"NX", // NX: only set the key if it does not already exist
|
||||||
"EX", // EX: set expiry
|
"EX", // EX: set expiry
|
||||||
&format!("{}", ttl.whole_seconds()),
|
&format!("{}", ttl.whole_seconds()),
|
||||||
])
|
]))
|
||||||
.query_async(&mut self.client.clone())
|
|
||||||
.await
|
.await
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
.map_err(SaveError::Other)?;
|
.map_err(SaveError::Other)?;
|
||||||
@ -191,15 +188,14 @@ impl SessionStore for RedisSessionStore {
|
|||||||
|
|
||||||
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
||||||
|
|
||||||
let v: redis::Value = redis::cmd("SET")
|
let v: redis::Value = self
|
||||||
.arg(&[
|
.execute_command(redis::cmd("SET").arg(&[
|
||||||
&cache_key,
|
&cache_key,
|
||||||
&body,
|
&body,
|
||||||
"XX", // XX: Only set the key if it already exist.
|
"XX", // XX: Only set the key if it already exist.
|
||||||
"EX", // EX: set expiry
|
"EX", // EX: set expiry
|
||||||
&format!("{}", ttl.whole_seconds()),
|
&format!("{}", ttl.whole_seconds()),
|
||||||
])
|
]))
|
||||||
.query_async(&mut self.client.clone())
|
|
||||||
.await
|
.await
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
.map_err(UpdateError::Other)?;
|
.map_err(UpdateError::Other)?;
|
||||||
@ -227,10 +223,7 @@ impl SessionStore for RedisSessionStore {
|
|||||||
|
|
||||||
async fn delete(&self, session_key: &SessionKey) -> Result<(), anyhow::Error> {
|
async fn delete(&self, session_key: &SessionKey) -> Result<(), anyhow::Error> {
|
||||||
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
|
||||||
|
self.execute_command(redis::cmd("DEL").arg(&[&cache_key]))
|
||||||
self.client
|
|
||||||
.clone()
|
|
||||||
.del(&cache_key)
|
|
||||||
.await
|
.await
|
||||||
.map_err(Into::into)
|
.map_err(Into::into)
|
||||||
.map_err(UpdateError::Other)?;
|
.map_err(UpdateError::Other)?;
|
||||||
@ -239,6 +232,45 @@ impl SessionStore for RedisSessionStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RedisSessionStore {
|
||||||
|
/// Execute Redis command and retry once in certain cases.
|
||||||
|
///
|
||||||
|
/// `ConnectionManager` automatically reconnects when it encounters an error talking to Redis.
|
||||||
|
/// The request that bumped into the error, though, fails.
|
||||||
|
///
|
||||||
|
/// This is generally OK, but there is an unpleasant edge case: Redis client timeouts. The
|
||||||
|
/// server is configured to drop connections who have been active longer than a pre-determined
|
||||||
|
/// threshold. `redis-rs` does not proactively detect that the connection has been dropped - you
|
||||||
|
/// only find out when you try to use it.
|
||||||
|
///
|
||||||
|
/// This helper method catches this case (`.is_connection_dropped`) to execute a retry. The
|
||||||
|
/// retry will be executed on a fresh connection, therefore it is likely to succeed (or fail for
|
||||||
|
/// a different more meaningful reason).
|
||||||
|
async fn execute_command<T: FromRedisValue>(&self, cmd: &mut Cmd) -> RedisResult<T> {
|
||||||
|
let mut can_retry = true;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match cmd.query_async(&mut self.client.clone()).await {
|
||||||
|
Ok(value) => return Ok(value),
|
||||||
|
Err(err) => {
|
||||||
|
if can_retry && err.is_connection_dropped() {
|
||||||
|
tracing::debug!(
|
||||||
|
"Connection dropped while trying to talk to Redis. Retrying."
|
||||||
|
);
|
||||||
|
|
||||||
|
// Retry at most once
|
||||||
|
can_retry = false;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
Loading…
Reference in New Issue
Block a user