1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-27 10:39:03 +02:00

complete redis session backend

This commit is contained in:
Nikolay Kim
2017-12-29 01:10:27 -08:00
parent fab3c35ba9
commit 8924335338
6 changed files with 185 additions and 43 deletions

View File

@ -1,9 +1,12 @@
extern crate actix;
extern crate actix_web;
extern crate bytes;
extern crate cookie;
extern crate futures;
extern crate serde;
extern crate serde_json;
extern crate rand;
extern crate http;
extern crate tokio_io;
extern crate tokio_core;
#[macro_use]
@ -14,4 +17,5 @@ extern crate failure;
mod redis;
mod session;
pub use redis::RedisActor;
pub use session::RedisSessionBackend;

View File

@ -3,7 +3,6 @@ use std::collections::VecDeque;
use bytes::BytesMut;
use futures::Future;
use futures::future::Either;
use futures::unsync::oneshot;
use tokio_core::net::TcpStream;
use tokio_io::codec::{Decoder, Encoder};
@ -108,7 +107,7 @@ impl Handler<Command> for RedisActor {
fn handle(&mut self, msg: Command, ctx: &mut Self::Context) -> Response<Self, Command> {
let (tx, rx) = oneshot::channel();
self.queue.push_back(tx);
ctx.send(Value(msg.0));
let _ = ctx.send(Value(msg.0));
Self::async_reply(
rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "").into())

View File

@ -1,12 +1,16 @@
use std::{io, net};
use std::rc::Rc;
use std::time::Duration;
use std::iter::FromIterator;
use std::collections::HashMap;
use serde_json;
use rand::{self, Rng};
use futures::Future;
use futures::future::{Either, ok as FutOk, err as FutErr};
use tokio_core::net::TcpStream;
use redis_async::resp::RespValue;
use cookie::{CookieJar, Cookie, Key};
use http::header::{self, HeaderValue};
use actix::prelude::*;
use actix_web::{error, Error, HttpRequest, HttpResponse};
use actix_web::middleware::{SessionImpl, SessionBackend, Response as MiddlewareResponse};
@ -19,6 +23,7 @@ pub struct RedisSession {
changed: bool,
inner: Rc<Inner>,
state: HashMap<String, String>,
value: Option<String>,
}
impl SessionImpl for RedisSession {
@ -46,27 +51,44 @@ impl SessionImpl for RedisSession {
self.state.clear()
}
fn write(&self, mut resp: HttpResponse) -> MiddlewareResponse {
fn write(&self, resp: HttpResponse) -> MiddlewareResponse {
if self.changed {
let _ = self.inner.update(&self.state);
MiddlewareResponse::Future(self.inner.update(&self.state, resp, self.value.as_ref()))
} else {
MiddlewareResponse::Done(resp)
}
MiddlewareResponse::Done(resp)
}
}
/// Use redis as session storage.
///
/// You need to pass an address of the redis server and random value to the
/// constructor of `RedisSessionBackend`. This is private key for cookie session,
/// When this value is changed, all session data is lost.
///
/// Note that whatever you write into your session is visible by the user (but not modifiable).
///
/// Constructor panics if key length is less than 32 bytes.
pub struct RedisSessionBackend(Rc<Inner>);
impl RedisSessionBackend {
/// Create new redis session backend
pub fn new<S: net::ToSocketAddrs>(addr: S, ttl: Duration) -> io::Result<RedisSessionBackend> {
///
/// * `addr` - address of the redis server
pub fn new<S: net::ToSocketAddrs>(addr: S, key: &[u8]) -> io::Result<RedisSessionBackend> {
let h = Arbiter::handle();
let mut err = None;
for addr in addr.to_socket_addrs()? {
match TcpStream::connect(&addr, &h).wait() {
match net::TcpStream::connect(&addr) {
Err(e) => err = Some(e),
Ok(conn) => {
let addr = RedisActor::start(conn);
return Ok(RedisSessionBackend(Rc::new(Inner{ttl: ttl, addr: addr})));
let addr = RedisActor::start(
TcpStream::from_stream(conn, h).expect("Can not create tcp stream"));
return Ok(RedisSessionBackend(
Rc::new(Inner{key: Key::from_master(key),
ttl: "7200".to_owned(),
addr: addr,
name: "actix-session".to_owned()})));
},
}
}
@ -76,6 +98,17 @@ impl RedisSessionBackend {
Err(io::Error::new(io::ErrorKind::Other, "Can not connect to redis server."))
}
}
/// Set time to live in seconds for session value
pub fn ttl(mut self, ttl: u16) -> Self {
Rc::get_mut(&mut self.0).unwrap().ttl = format!("{}", ttl);
self
}
pub fn cookie_name(mut self, name: &str) -> Self {
Rc::get_mut(&mut self.0).unwrap().name = name.to_owned();
self
}
}
impl<S> SessionBackend<S> for RedisSessionBackend {
@ -87,17 +120,19 @@ impl<S> SessionBackend<S> for RedisSessionBackend {
let inner = Rc::clone(&self.0);
Box::new(self.0.load(req).map(move |state| {
if let Some(state) = state {
if let Some((state, value)) = state {
RedisSession {
changed: false,
inner: inner,
state: state,
value: Some(value),
}
} else {
RedisSession {
changed: false,
inner: inner,
state: HashMap::new(),
value: None,
}
}
}))
@ -105,49 +140,99 @@ impl<S> SessionBackend<S> for RedisSessionBackend {
}
struct Inner {
ttl: Duration,
key: Key,
ttl: String,
name: String,
addr: Address<RedisActor>,
}
impl Inner {
fn load<S>(&self, req: &mut HttpRequest<S>)
-> Box<Future<Item=Option<HashMap<String, String>>, Error=Error>>
{
-> Box<Future<Item=Option<(HashMap<String, String>, String)>, Error=Error>> {
if let Ok(cookies) = req.cookies() {
for cookie in cookies {
if cookie.name() == "actix-session" {
return Box::new(
self.addr.call_fut(Command(resp_array!["GET", cookie.value()]))
.map_err(Error::from)
.and_then(|res| {
match res {
Ok(val) => {
println!("VAL {:?}", val);
Ok(Some(HashMap::new()))
},
Err(err) => Err(
io::Error::new(io::ErrorKind::Other, "Error").into())
}
}))
if cookie.name() == self.name {
let mut jar = CookieJar::new();
jar.add_original(cookie.clone());
if let Some(cookie) = jar.signed(&self.key).get(&self.name) {
let value = cookie.value().to_owned();
return Box::new(
self.addr.call_fut(Command(resp_array!["GET", cookie.value()]))
.map_err(Error::from)
.and_then(move |res| {
match res {
Ok(val) => {
match val {
RespValue::Error(err) =>
return Err(
error::ErrorInternalServerError(err).into()),
RespValue::SimpleString(s) =>
if let Ok(val) = serde_json::from_str(&s) {
return Ok(Some((val, value)))
},
RespValue::BulkString(s) => {
if let Ok(val) = serde_json::from_slice(&s) {
return Ok(Some((val, value)))
}
},
_ => (),
}
Ok(None)
},
Err(err) => Err(error::ErrorInternalServerError(err).into())
}
}))
} else {
return Box::new(FutOk(None))
}
}
}
}
Box::new(FutOk(None))
}
fn update(&self, state: &HashMap<String, String>) -> Box<Future<Item=(), Error=Error>> {
fn update(&self, state: &HashMap<String, String>,
mut resp: HttpResponse,
value: Option<&String>) -> Box<Future<Item=HttpResponse, Error=Error>>
{
let (value, jar) = if let Some(value) = value {
(value.clone(), None)
} else {
let mut rng = rand::OsRng::new().unwrap();
let value = String::from_iter(rng.gen_ascii_chars().take(32));
let mut cookie = Cookie::new(self.name.clone(), value.clone());
cookie.set_path("/");
cookie.set_http_only(true);
// set cookie
let mut jar = CookieJar::new();
jar.signed(&self.key).add(cookie);
(value, Some(jar))
};
Box::new(
match serde_json::to_string(state) {
Err(e) => Either::A(FutErr(e.into())),
Ok(body) => {
Either::B(
self.addr.call_fut(Command(resp_array!["GET", "test"]))
self.addr.call_fut(
Command(resp_array!["SET", value, body,"EX", &self.ttl]))
.map_err(Error::from)
.and_then(|res| {
.and_then(move |res| {
match res {
Ok(val) => Ok(()),
Err(err) => Err(
error::ErrorInternalServerError(err).into())
Ok(_) => {
if let Some(jar) = jar {
for cookie in jar.delta() {
let val = HeaderValue::from_str(
&cookie.to_string())?;
resp.headers_mut().append(header::SET_COOKIE, val);
}
}
Ok(resp)
},
Err(err) => Err(error::ErrorInternalServerError(err).into())
}
}))
}