use actix::Addr;
use actix_redis::{resp_array, Command, RedisActor, RespValue};
use actix_web::cookie::time::Duration;
use anyhow::Error;
use super::SessionKey;
use crate::storage::{
interface::{LoadError, SaveError, SessionState, UpdateError},
utils::generate_session_key,
SessionStore,
};
pub struct RedisActorSessionStore {
configuration: CacheConfiguration,
addr: Addr<RedisActor>,
}
impl RedisActorSessionStore {
pub fn builder<S: Into<String>>(connection_string: S) -> RedisActorSessionStoreBuilder {
RedisActorSessionStoreBuilder {
configuration: CacheConfiguration::default(),
connection_string: connection_string.into(),
}
}
pub fn new<S: Into<String>>(connection_string: S) -> RedisActorSessionStore {
Self::builder(connection_string).build()
}
}
struct CacheConfiguration {
cache_keygen: Box<dyn Fn(&str) -> String>,
}
impl Default for CacheConfiguration {
fn default() -> Self {
Self {
cache_keygen: Box::new(str::to_owned),
}
}
}
#[must_use]
pub struct RedisActorSessionStoreBuilder {
connection_string: String,
configuration: CacheConfiguration,
}
impl RedisActorSessionStoreBuilder {
pub fn cache_keygen<F>(mut self, keygen: F) -> Self
where
F: Fn(&str) -> String + 'static,
{
self.configuration.cache_keygen = Box::new(keygen);
self
}
#[must_use]
pub fn build(self) -> RedisActorSessionStore {
RedisActorSessionStore {
configuration: self.configuration,
addr: RedisActor::start(self.connection_string),
}
}
}
#[async_trait::async_trait(?Send)]
impl SessionStore for RedisActorSessionStore {
async fn load(&self, session_key: &SessionKey) -> Result<Option<SessionState>, LoadError> {
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
let val = self
.addr
.send(Command(resp_array!["GET", cache_key]))
.await
.map_err(Into::into)
.map_err(LoadError::Other)?
.map_err(Into::into)
.map_err(LoadError::Other)?;
match val {
RespValue::Error(err) => Err(LoadError::Other(anyhow::anyhow!(err))),
RespValue::SimpleString(s) => Ok(serde_json::from_str(&s)
.map_err(Into::into)
.map_err(LoadError::Deserialization)?),
RespValue::BulkString(s) => Ok(serde_json::from_slice(&s)
.map_err(Into::into)
.map_err(LoadError::Deserialization)?),
_ => Ok(None),
}
}
async fn save(
&self,
session_state: SessionState,
ttl: &Duration,
) -> Result<SessionKey, SaveError> {
let body = serde_json::to_string(&session_state)
.map_err(Into::into)
.map_err(SaveError::Serialization)?;
let session_key = generate_session_key();
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
let cmd = Command(resp_array![
"SET",
cache_key,
body,
"NX", "EX", format!("{}", ttl.whole_seconds())
]);
let result = self
.addr
.send(cmd)
.await
.map_err(Into::into)
.map_err(SaveError::Other)?
.map_err(Into::into)
.map_err(SaveError::Other)?;
match result {
RespValue::SimpleString(_) => Ok(session_key),
RespValue::Nil => Err(SaveError::Other(anyhow::anyhow!(
"Failed to save session state. A record with the same key already existed in Redis"
))),
err => Err(SaveError::Other(anyhow::anyhow!(
"Failed to save session state. {:?}",
err
))),
}
}
async fn update(
&self,
session_key: SessionKey,
session_state: SessionState,
ttl: &Duration,
) -> Result<SessionKey, UpdateError> {
let body = serde_json::to_string(&session_state)
.map_err(Into::into)
.map_err(UpdateError::Serialization)?;
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
let cmd = Command(resp_array![
"SET",
cache_key,
body,
"XX", "EX", format!("{}", ttl.whole_seconds())
]);
let result = self
.addr
.send(cmd)
.await
.map_err(Into::into)
.map_err(UpdateError::Other)?
.map_err(Into::into)
.map_err(UpdateError::Other)?;
match result {
RespValue::Nil => {
self.save(session_state, ttl)
.await
.map_err(|err| match err {
SaveError::Serialization(err) => UpdateError::Serialization(err),
SaveError::Other(err) => UpdateError::Other(err),
})
}
RespValue::SimpleString(_) => Ok(session_key),
val => Err(UpdateError::Other(anyhow::anyhow!(
"Failed to update session state. {:?}",
val
))),
}
}
async fn update_ttl(&self, session_key: &SessionKey, ttl: &Duration) -> Result<(), Error> {
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
let cmd = Command(resp_array![
"EXPIRE",
cache_key,
ttl.whole_seconds().to_string()
]);
match self.addr.send(cmd).await? {
Ok(RespValue::Integer(_)) => Ok(()),
val => Err(anyhow::anyhow!(
"Failed to update the session state TTL: {:?}",
val
)),
}
}
async fn delete(&self, session_key: &SessionKey) -> Result<(), anyhow::Error> {
let cache_key = (self.configuration.cache_keygen)(session_key.as_ref());
let res = self
.addr
.send(Command(resp_array!["DEL", cache_key]))
.await?;
match res {
Ok(RespValue::Integer(_)) => Ok(()),
val => Err(anyhow::anyhow!(
"Failed to remove session from cache. {:?}",
val
)),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use actix_web::cookie::time::Duration;
use super::*;
use crate::test_helpers::acceptance_test_suite;
fn redis_actor_store() -> RedisActorSessionStore {
RedisActorSessionStore::new("127.0.0.1:6379")
}
#[actix_web::test]
async fn test_session_workflow() {
acceptance_test_suite(redis_actor_store, true).await;
}
#[actix_web::test]
async fn loading_a_missing_session_returns_none() {
let store = redis_actor_store();
let session_key = generate_session_key();
assert!(store.load(&session_key).await.unwrap().is_none());
}
#[actix_web::test]
async fn updating_of_an_expired_state_is_handled_gracefully() {
let store = redis_actor_store();
let session_key = generate_session_key();
let initial_session_key = session_key.as_ref().to_owned();
let updated_session_key = store
.update(session_key, HashMap::new(), &Duration::seconds(1))
.await
.unwrap();
assert_ne!(initial_session_key, updated_session_key.as_ref());
}
}