1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-02-17 16:43:30 +01:00

actix-web 0.6 compatibility

This commit is contained in:
Nikolay Kim 2018-05-08 10:12:57 -07:00
parent 2a7ffe6e0a
commit 7af09390ee
9 changed files with 161 additions and 126 deletions

View File

@ -1,5 +1,9 @@
# Changes # Changes
## 0.4.0 (2018-05-08)
* Actix web 0.6 compatibility
## 0.3.0 (2018-04-10) ## 0.3.0 (2018-04-10)
* Actix web 0.5 compatibility * Actix web 0.5 compatibility

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-redis" name = "actix-redis"
version = "0.3.0" version = "0.4.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Redis integration for actix framework" description = "Redis integration for actix framework"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
@ -38,7 +38,7 @@ tokio-core = "0.1"
redis-async = "0.0" redis-async = "0.0"
# actix web session # actix web session
actix-web = { version="0.5", optional=true } actix-web = { version="0.6", optional=true }
cookie = { version="0.10", features=["percent-encode", "secure"], optional=true } cookie = { version="0.10", features=["percent-encode", "secure"], optional=true }
http = { version="0.1", optional=true } http = { version="0.1", optional=true }
rand = { version="0.3", optional=true } rand = { version="0.3", optional=true }

View File

@ -8,7 +8,7 @@ Redis integration for actix framework.
* [API Documentation (Releases)](https://docs.rs/actix-redis/) * [API Documentation (Releases)](https://docs.rs/actix-redis/)
* [Chat on gitter](https://gitter.im/actix/actix) * [Chat on gitter](https://gitter.im/actix/actix)
* Cargo package: [actix-redis](https://crates.io/crates/actix-redis) * Cargo package: [actix-redis](https://crates.io/crates/actix-redis)
* Minimum supported Rust version: 1.21 or later * Minimum supported Rust version: 1.24 or later
## Redis session backend ## Redis session backend
@ -23,12 +23,12 @@ Note that whatever you write into your session is visible by the user (but not m
Constructor panics if key length is less than 32 bytes. Constructor panics if key length is less than 32 bytes.
```rust,ignore ```rust
extern crate actix_web; extern crate actix_web;
extern crate actix_redis; extern crate actix_redis;
use actix_web::{App, server}; use actix_web::{App, server, middleware};
use actix_web::middleware::{Logger, SessionStorage}; use actix_web::middleware::session::SessionStorage;
use actix_redis::RedisSessionBackend; use actix_redis::RedisSessionBackend;
fn main() { fn main() {
@ -39,7 +39,7 @@ fn main() {
server::new( server::new(
|| App::new() || App::new()
// enable logger // enable logger
.middleware(Logger::default()) .middleware(middleware::Logger::default())
// cookie session middleware // cookie session middleware
.middleware(SessionStorage::new( .middleware(SessionStorage::new(
RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) RedisSessionBackend::new("127.0.0.1:6379", &[0; 32])

View File

@ -1,16 +1,15 @@
#![allow(unused_variables)] #![allow(unused_variables)]
#![cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))] #![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
extern crate actix; extern crate actix;
extern crate actix_web;
extern crate actix_redis; extern crate actix_redis;
extern crate actix_web;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
use actix_web::{server, middleware, App, HttpRequest, HttpResponse, Result};
use actix_web::middleware::RequestSession;
use actix_redis::RedisSessionBackend; use actix_redis::RedisSessionBackend;
use actix_web::middleware::RequestSession;
use actix_web::{middleware, server, App, HttpRequest, HttpResponse, Result};
/// simple handler /// simple handler
fn index(mut req: HttpRequest) -> Result<HttpResponse> { fn index(mut req: HttpRequest) -> Result<HttpResponse> {
@ -19,7 +18,7 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
// session // session
if let Some(count) = req.session().get::<i32>("counter")? { if let Some(count) = req.session().get::<i32>("counter")? {
println!("SESSION value: {}", count); println!("SESSION value: {}", count);
req.session().set("counter", count+1)?; req.session().set("counter", count + 1)?;
} else { } else {
req.session().set("counter", 1)?; req.session().set("counter", 1)?;
} }
@ -32,8 +31,8 @@ fn main() {
env_logger::init(); env_logger::init();
let sys = actix::System::new("basic-example"); let sys = actix::System::new("basic-example");
server::new( server::new(|| {
|| App::new() App::new()
// enable logger // enable logger
.middleware(middleware::Logger::default()) .middleware(middleware::Logger::default())
// cookie session middleware // cookie session middleware
@ -41,8 +40,9 @@ fn main() {
RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) RedisSessionBackend::new("127.0.0.1:6379", &[0; 32])
)) ))
// register simple route, handle all methods // register simple route, handle all methods
.resource("/", |r| r.f(index))) .resource("/", |r| r.f(index))
.bind("0.0.0.0:8080").unwrap() }).bind("0.0.0.0:8080")
.unwrap()
.threads(1) .threads(1)
.start(); .start();

5
rustfmt.toml Normal file
View File

@ -0,0 +1,5 @@
max_width = 89
reorder_imports = true
wrap_comments = true
fn_args_density = "Compressed"
#use_small_heuristics = false

View File

@ -6,12 +6,12 @@
//! * [Chat on gitter](https://gitter.im/actix/actix) //! * [Chat on gitter](https://gitter.im/actix/actix)
//! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis) //! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis)
//! * Minimum supported Rust version: 1.21 or later //! * Minimum supported Rust version: 1.21 or later
//! //!
extern crate actix; extern crate actix;
extern crate backoff; extern crate backoff;
extern crate futures; extern crate futures;
extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
@ -22,34 +22,34 @@ extern crate failure;
mod redis; mod redis;
pub use redis::{Command, RedisActor}; pub use redis::{Command, RedisActor};
#[cfg(feature="web")] #[cfg(feature = "web")]
extern crate actix_web; extern crate actix_web;
#[cfg(feature="web")] #[cfg(feature = "web")]
extern crate cookie; extern crate cookie;
#[cfg(feature="web")] #[cfg(feature = "web")]
extern crate rand;
#[cfg(feature="web")]
extern crate http; extern crate http;
#[cfg(feature="web")] #[cfg(feature = "web")]
extern crate rand;
#[cfg(feature = "web")]
extern crate serde; extern crate serde;
#[cfg(feature="web")] #[cfg(feature = "web")]
extern crate serde_json; extern crate serde_json;
#[cfg(feature="web")] #[cfg(feature = "web")]
mod session; mod session;
#[cfg(feature="web")] #[cfg(feature = "web")]
pub use session::RedisSessionBackend; pub use session::RedisSessionBackend;
/// General purpose actix redis error /// General purpose actix redis error
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum Error { pub enum Error {
#[fail(display="Redis error {}", _0)] #[fail(display = "Redis error {}", _0)]
Redis(redis_async::error::Error), Redis(redis_async::error::Error),
/// Receiving message during reconnecting /// Receiving message during reconnecting
#[fail(display="Redis: Not connected")] #[fail(display = "Redis: Not connected")]
NotConnected, NotConnected,
/// Cancel all waters when connection get dropped /// Cancel all waters when connection get dropped
#[fail(display="Redis: Disconnected")] #[fail(display = "Redis: Disconnected")]
Disconnected, Disconnected,
} }
@ -63,5 +63,5 @@ impl From<redis_async::error::Error> for Error {
} }
// re-export // re-export
pub use redis_async::resp::RespValue;
pub use redis_async::error::Error as RespError; pub use redis_async::error::Error as RespError;
pub use redis_async::resp::RespValue;

View File

@ -1,22 +1,21 @@
use std::io;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io;
use actix::prelude::*;
use actix::actors::{Connect, Connector}; use actix::actors::{Connect, Connector};
use backoff::ExponentialBackoff; use actix::prelude::*;
use backoff::backoff::Backoff; use backoff::backoff::Backoff;
use futures::Future; use backoff::ExponentialBackoff;
use futures::unsync::oneshot; use futures::unsync::oneshot;
use tokio_io::AsyncRead; use futures::Future;
use tokio_io::io::WriteHalf;
use tokio_io::codec::FramedRead;
use tokio_core::net::TcpStream;
use redis_async::error::Error as RespError; use redis_async::error::Error as RespError;
use redis_async::resp::{RespCodec, RespValue}; use redis_async::resp::{RespCodec, RespValue};
use tokio_core::net::TcpStream;
use tokio_io::codec::FramedRead;
use tokio_io::io::WriteHalf;
use tokio_io::AsyncRead;
use Error; use Error;
/// Command for send data to Redis /// Command for send data to Redis
#[derive(Debug)] #[derive(Debug)]
pub struct Command(pub RespValue); pub struct Command(pub RespValue);
@ -38,11 +37,11 @@ impl RedisActor {
pub fn start<S: Into<String>>(addr: S) -> Addr<Unsync, RedisActor> { pub fn start<S: Into<String>>(addr: S) -> Addr<Unsync, RedisActor> {
let addr = addr.into(); let addr = addr.into();
Supervisor::start(|_| { Supervisor::start(|_| RedisActor {
RedisActor { addr: addr, addr: addr,
cell: None, cell: None,
backoff: ExponentialBackoff::default(), backoff: ExponentialBackoff::default(),
queue: VecDeque::new() } queue: VecDeque::new(),
}) })
} }
} }
@ -51,7 +50,8 @@ impl Actor for RedisActor {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) { fn started(&mut self, ctx: &mut Context<Self>) {
Connector::from_registry().send(Connect::host(self.addr.as_str())) Connector::from_registry()
.send(Connect::host(self.addr.as_str()))
.into_actor(self) .into_actor(self)
.map(|res, act, ctx| match res { .map(|res, act, ctx| match res {
Ok(stream) => { Ok(stream) => {
@ -67,7 +67,7 @@ impl Actor for RedisActor {
ctx.add_stream(FramedRead::new(r, RespCodec)); ctx.add_stream(FramedRead::new(r, RespCodec));
act.backoff.reset(); act.backoff.reset();
}, }
Err(err) => { Err(err) => {
error!("Can not connect to redis server: {}", err); error!("Can not connect to redis server: {}", err);
// re-connect with backoff time. // re-connect with backoff time.
@ -103,15 +103,16 @@ impl Supervised for RedisActor {
} }
impl actix::io::WriteHandler<io::Error> for RedisActor { impl actix::io::WriteHandler<io::Error> for RedisActor {
fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running { fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
warn!("Redis connection dropped: {} error: {}", self.addr, err); warn!(
"Redis connection dropped: {} error: {}",
self.addr, err
);
Running::Stop Running::Stop
} }
} }
impl StreamHandler<RespValue, RespError> for RedisActor { impl StreamHandler<RespValue, RespError> for RedisActor {
fn error(&mut self, err: RespError, _: &mut Self::Context) -> Running { fn error(&mut self, err: RespError, _: &mut Self::Context) -> Running {
if let Some(tx) = self.queue.pop_front() { if let Some(tx) = self.queue.pop_front() {
let _ = tx.send(Err(err.into())); let _ = tx.send(Err(err.into()));
@ -138,6 +139,9 @@ impl Handler<Command> for RedisActor {
let _ = tx.send(Err(Error::NotConnected)); let _ = tx.send(Err(Error::NotConnected));
} }
Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res)) Box::new(
rx.map_err(|_| Error::Disconnected)
.and_then(|res| res),
)
} }
} }

View File

@ -1,21 +1,21 @@
use std::rc::Rc;
use std::iter::FromIterator;
use std::collections::HashMap; use std::collections::HashMap;
use std::iter::FromIterator;
use std::rc::Rc;
use serde_json;
use rand::{self, Rng};
use futures::Future;
use futures::future::{Either, ok as FutOk, err as FutErr};
use redis_async::resp::RespValue;
use cookie::{CookieJar, Cookie, Key};
use http::header::{self, HeaderValue};
use actix::prelude::*; use actix::prelude::*;
use actix_web::{error, Error, Result, HttpRequest, HttpResponse}; use actix_web::middleware::session::{SessionBackend, SessionImpl};
use actix_web::middleware::{SessionImpl, SessionBackend, Response as MiddlewareResponse}; use actix_web::middleware::Response as MiddlewareResponse;
use actix_web::{error, Error, HttpRequest, HttpResponse, Result};
use cookie::{Cookie, CookieJar, Key};
use futures::future::{err as FutErr, ok as FutOk, Either};
use futures::Future;
use http::header::{self, HeaderValue};
use rand::{self, Rng};
use redis_async::resp::RespValue;
use serde_json;
use redis::{Command, RedisActor}; use redis::{Command, RedisActor};
/// Session that stores data in redis /// Session that stores data in redis
pub struct RedisSession { pub struct RedisSession {
changed: bool, changed: bool,
@ -25,7 +25,6 @@ pub struct RedisSession {
} }
impl SessionImpl for RedisSession { impl SessionImpl for RedisSession {
fn get(&self, key: &str) -> Option<&str> { fn get(&self, key: &str) -> Option<&str> {
self.state.get(key).map(|s| s.as_str()) self.state.get(key).map(|s| s.as_str())
} }
@ -47,8 +46,11 @@ impl SessionImpl for RedisSession {
fn write(&self, resp: HttpResponse) -> Result<MiddlewareResponse> { fn write(&self, resp: HttpResponse) -> Result<MiddlewareResponse> {
if self.changed { if self.changed {
Ok(MiddlewareResponse::Future( Ok(MiddlewareResponse::Future(self.inner.update(
self.inner.update(&self.state, resp, self.value.as_ref()))) &self.state,
resp,
self.value.as_ref(),
)))
} else { } else {
Ok(MiddlewareResponse::Done(resp)) Ok(MiddlewareResponse::Done(resp))
} }
@ -58,10 +60,11 @@ impl SessionImpl for RedisSession {
/// Use redis as session storage. /// Use redis as session storage.
/// ///
/// You need to pass an address of the redis server and random value to the /// You need to pass an address of the redis server and random value to the
/// constructor of `RedisSessionBackend`. This is private key for cookie session, /// constructor of `RedisSessionBackend`. This is private key for cookie
/// When this value is changed, all session data is lost. /// session, When this value is changed, all session data is lost.
/// ///
/// Note that whatever you write into your session is visible by the user (but not modifiable). /// Note that whatever you write into your session is visible by the user (but
/// not modifiable).
/// ///
/// Constructor panics if key length is less than 32 bytes. /// Constructor panics if key length is less than 32 bytes.
pub struct RedisSessionBackend(Rc<Inner>); pub struct RedisSessionBackend(Rc<Inner>);
@ -71,11 +74,12 @@ impl RedisSessionBackend {
/// ///
/// * `addr` - address of the redis server /// * `addr` - address of the redis server
pub fn new<S: Into<String>>(addr: S, key: &[u8]) -> RedisSessionBackend { pub fn new<S: Into<String>>(addr: S, key: &[u8]) -> RedisSessionBackend {
RedisSessionBackend( RedisSessionBackend(Rc::new(Inner {
Rc::new(Inner{key: Key::from_master(key), key: Key::from_master(key),
ttl: "7200".to_owned(), ttl: "7200".to_owned(),
addr: RedisActor::start(addr), addr: RedisActor::start(addr),
name: "actix-session".to_owned()})) name: "actix-session".to_owned(),
}))
} }
/// Set time to live in seconds for session value /// Set time to live in seconds for session value
@ -92,9 +96,8 @@ impl RedisSessionBackend {
} }
impl<S> SessionBackend<S> for RedisSessionBackend { impl<S> SessionBackend<S> for RedisSessionBackend {
type Session = RedisSession; type Session = RedisSession;
type ReadFuture = Box<Future<Item=RedisSession, Error=Error>>; type ReadFuture = Box<Future<Item = RedisSession, Error = Error>>;
fn from_request(&self, req: &mut HttpRequest<S>) -> Self::ReadFuture { fn from_request(&self, req: &mut HttpRequest<S>) -> Self::ReadFuture {
let inner = Rc::clone(&self.0); let inner = Rc::clone(&self.0);
@ -105,13 +108,15 @@ impl<S> SessionBackend<S> for RedisSessionBackend {
changed: false, changed: false,
inner: inner, inner: inner,
state: state, state: state,
value: Some(value) } value: Some(value),
}
} else { } else {
RedisSession { RedisSession {
changed: false, changed: false,
inner: inner, inner: inner,
state: HashMap::new(), state: HashMap::new(),
value: None } value: None,
}
} }
})) }))
} }
@ -126,8 +131,9 @@ struct Inner {
impl Inner { impl Inner {
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
fn load<S>(&self, req: &mut HttpRequest<S>) fn load<S>(
-> Box<Future<Item=Option<(HashMap<String, String>, String)>, Error=Error>> &self, req: &mut HttpRequest<S>,
) -> Box<Future<Item = Option<(HashMap<String, String>, String)>, Error = Error>>
{ {
if let Ok(cookies) = req.cookies() { if let Ok(cookies) = req.cookies() {
for cookie in cookies { for cookie in cookies {
@ -137,31 +143,42 @@ impl Inner {
if let Some(cookie) = jar.signed(&self.key).get(&self.name) { if let Some(cookie) = jar.signed(&self.key).get(&self.name) {
let value = cookie.value().to_owned(); let value = cookie.value().to_owned();
return Box::new( return Box::new(
self.addr.send(Command(resp_array!["GET", cookie.value()])) self.addr
.send(Command(resp_array!["GET", cookie.value()]))
.map_err(Error::from) .map_err(Error::from)
.and_then(move |res| match res { .and_then(move |res| match res {
Ok(val) => { Ok(val) => {
match val { match val {
RespValue::Error(err) => RespValue::Error(err) => {
return Err( return Err(
error::ErrorInternalServerError(err).into()), error::ErrorInternalServerError(err)
RespValue::SimpleString(s) => .into(),
if let Ok(val) = serde_json::from_str(&s) { )
return Ok(Some((val, value))) }
}, RespValue::SimpleString(s) => {
RespValue::BulkString(s) => { if let Ok(val) = serde_json::from_str(&s)
if let Ok(val) = serde_json::from_slice(&s) { {
return Ok(Some((val, value))) return Ok(Some((val, value)));
} }
}, }
RespValue::BulkString(s) => {
if let Ok(val) =
serde_json::from_slice(&s)
{
return Ok(Some((val, value)));
}
}
_ => (), _ => (),
} }
Ok(None) Ok(None)
}, }
Err(err) => Err(error::ErrorInternalServerError(err).into()) Err(err) => {
})) Err(error::ErrorInternalServerError(err).into())
}
}),
);
} else { } else {
return Box::new(FutOk(None)) return Box::new(FutOk(None));
} }
} }
} }
@ -169,10 +186,10 @@ impl Inner {
Box::new(FutOk(None)) Box::new(FutOk(None))
} }
fn update(&self, state: &HashMap<String, String>, fn update(
mut resp: HttpResponse, &self, state: &HashMap<String, String>, mut resp: HttpResponse,
value: Option<&String>) -> Box<Future<Item=HttpResponse, Error=Error>> value: Option<&String>,
{ ) -> Box<Future<Item = HttpResponse, Error = Error>> {
let (value, jar) = if let Some(value) = value { let (value, jar) = if let Some(value) = value {
(value.clone(), None) (value.clone(), None)
} else { } else {
@ -190,25 +207,28 @@ impl Inner {
(value, Some(jar)) (value, Some(jar))
}; };
Box::new( Box::new(match serde_json::to_string(state) {
match serde_json::to_string(state) { Err(e) => Either::A(FutErr(e.into())),
Err(e) => Either::A(FutErr(e.into())), Ok(body) => Either::B(
Ok(body) => Either::B( self.addr
self.addr.send(Command(resp_array!["SET", value, body,"EX", &self.ttl])) .send(Command(resp_array![
.map_err(Error::from) "SET", value, body, "EX", &self.ttl
.and_then(move |res| match res { ]))
Ok(_) => { .map_err(Error::from)
if let Some(jar) = jar { .and_then(move |res| match res {
for cookie in jar.delta() { Ok(_) => {
let val = HeaderValue::from_str( if let Some(jar) = jar {
&cookie.to_string())?; for cookie in jar.delta() {
resp.headers_mut().append(header::SET_COOKIE, val); let val =
} HeaderValue::from_str(&cookie.to_string())?;
resp.headers_mut().append(header::SET_COOKIE, val);
} }
Ok(resp) }
}, Ok(resp)
Err(err) => Err(error::ErrorInternalServerError(err).into()) }
})) Err(err) => Err(error::ErrorInternalServerError(err).into()),
}) }),
),
})
} }
} }

View File

@ -2,11 +2,11 @@ extern crate actix;
extern crate actix_redis; extern crate actix_redis;
#[macro_use] #[macro_use]
extern crate redis_async; extern crate redis_async;
extern crate futures;
extern crate env_logger; extern crate env_logger;
extern crate futures;
use actix::prelude::*; use actix::prelude::*;
use actix_redis::{RedisActor, Command, Error, RespValue}; use actix_redis::{Command, Error, RedisActor, RespValue};
use futures::Future; use futures::Future;
#[test] #[test]
@ -31,7 +31,6 @@ fn test_error_connect() {
sys.run(); sys.run();
} }
#[test] #[test]
fn test_redis() { fn test_redis() {
env_logger::init(); env_logger::init();
@ -46,20 +45,23 @@ fn test_redis() {
.then(move |res| match res { .then(move |res| match res {
Ok(Ok(resp)) => { Ok(Ok(resp)) => {
assert_eq!(resp, RespValue::SimpleString("OK".to_owned())); assert_eq!(resp, RespValue::SimpleString("OK".to_owned()));
addr2.send(Command(resp_array!["GET", "test"])) addr2
.send(Command(resp_array!["GET", "test"]))
.then(|res| { .then(|res| {
match res { match res {
Ok(Ok(resp)) => { Ok(Ok(resp)) => {
println!("RESP: {:?}", resp); println!("RESP: {:?}", resp);
assert_eq!( assert_eq!(
resp, RespValue::BulkString((&b"value"[..]).into())); resp,
}, RespValue::BulkString((&b"value"[..]).into())
);
}
_ => panic!("Should not happen {:?}", res), _ => panic!("Should not happen {:?}", res),
} }
Arbiter::system().do_send(actix::msgs::SystemExit(0)); Arbiter::system().do_send(actix::msgs::SystemExit(0));
Ok(()) Ok(())
}) })
}, }
_ => panic!("Should not happen {:?}", res), _ => panic!("Should not happen {:?}", res),
}) })
}); });