mirror of
https://github.com/actix/actix-extras.git
synced 2024-12-02 19:12:24 +01:00
upgrade to actix/actix-web 0.7
This commit is contained in:
parent
93707884d2
commit
62c382d280
13
Cargo.toml
13
Cargo.toml
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-redis"
|
name = "actix-redis"
|
||||||
version = "0.4.0"
|
version = "0.5.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Redis integration for actix framework"
|
description = "Redis integration for actix framework"
|
||||||
license = "MIT/Apache-2.0"
|
license = "MIT/Apache-2.0"
|
||||||
@ -27,21 +27,22 @@ default = ["web"]
|
|||||||
web = ["actix-web", "cookie", "http", "rand", "serde", "serde_json"]
|
web = ["actix-web", "cookie", "http", "rand", "serde", "serde_json"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix = "0.5"
|
actix = "0.7"
|
||||||
|
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
backoff = "0.1"
|
backoff = "0.1"
|
||||||
failure = "^0.1.1"
|
failure = "^0.1.1"
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
tokio-io = "0.1"
|
tokio-io = "0.1"
|
||||||
tokio-core = "0.1"
|
tokio-codec = "0.1"
|
||||||
redis-async = "0.0"
|
tokio-tcp = "0.1"
|
||||||
|
redis-async = "0.3.2"
|
||||||
|
|
||||||
# actix web session
|
# actix web session
|
||||||
actix-web = { version="0.6", optional=true }
|
actix-web = { git = "https://github.com/actix/actix-web.git", optional=true }
|
||||||
cookie = { version="0.10", features=["percent-encode", "secure"], optional=true }
|
cookie = { version="0.10", features=["percent-encode", "secure"], optional=true }
|
||||||
http = { version="0.1", optional=true }
|
http = { version="0.1", optional=true }
|
||||||
rand = { version="0.3", optional=true }
|
rand = { version="0.5", optional=true }
|
||||||
serde = { version="1.0", optional=true }
|
serde = { version="1.0", optional=true }
|
||||||
serde_json = { version="1.0", optional=true }
|
serde_json = { version="1.0", optional=true }
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ use actix_web::middleware::session::{self, RequestSession};
|
|||||||
use actix_web::{middleware, server, App, HttpRequest, HttpResponse, Result};
|
use actix_web::{middleware, server, App, HttpRequest, HttpResponse, Result};
|
||||||
|
|
||||||
/// simple handler
|
/// simple handler
|
||||||
fn index(req: HttpRequest) -> Result<HttpResponse> {
|
fn index(req: &HttpRequest) -> Result<HttpResponse> {
|
||||||
println!("{:?}", req);
|
println!("{:?}", req);
|
||||||
|
|
||||||
// session
|
// session
|
||||||
|
@ -5,13 +5,14 @@
|
|||||||
//! * [API Documentation (Releases)](https://docs.rs/actix-redis/)
|
//! * [API Documentation (Releases)](https://docs.rs/actix-redis/)
|
||||||
//! * [Chat on gitter](https://gitter.im/actix/actix)
|
//! * [Chat on gitter](https://gitter.im/actix/actix)
|
||||||
//! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis)
|
//! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis)
|
||||||
//! * Minimum supported Rust version: 1.21 or later
|
//! * Minimum supported Rust version: 1.26 or later
|
||||||
//!
|
//!
|
||||||
extern crate actix;
|
extern crate actix;
|
||||||
extern crate backoff;
|
extern crate backoff;
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate tokio_core;
|
extern crate tokio_codec;
|
||||||
extern crate tokio_io;
|
extern crate tokio_io;
|
||||||
|
extern crate tokio_tcp;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
@ -53,9 +54,6 @@ pub enum Error {
|
|||||||
Disconnected,
|
Disconnected,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for Error {}
|
|
||||||
unsafe impl Sync for Error {}
|
|
||||||
|
|
||||||
impl From<redis_async::error::Error> for Error {
|
impl From<redis_async::error::Error> for Error {
|
||||||
fn from(err: redis_async::error::Error) -> Error {
|
fn from(err: redis_async::error::Error) -> Error {
|
||||||
Error::Redis(err)
|
Error::Redis(err)
|
||||||
|
22
src/redis.rs
22
src/redis.rs
@ -1,7 +1,7 @@
|
|||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use actix::actors::{Connect, Connector};
|
use actix::actors::resolver::{Connect, Resolver};
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use backoff::backoff::Backoff;
|
use backoff::backoff::Backoff;
|
||||||
use backoff::ExponentialBackoff;
|
use backoff::ExponentialBackoff;
|
||||||
@ -9,10 +9,10 @@ use futures::unsync::oneshot;
|
|||||||
use futures::Future;
|
use futures::Future;
|
||||||
use redis_async::error::Error as RespError;
|
use redis_async::error::Error as RespError;
|
||||||
use redis_async::resp::{RespCodec, RespValue};
|
use redis_async::resp::{RespCodec, RespValue};
|
||||||
use tokio_core::net::TcpStream;
|
use tokio_codec::FramedRead;
|
||||||
use tokio_io::codec::FramedRead;
|
|
||||||
use tokio_io::io::WriteHalf;
|
use tokio_io::io::WriteHalf;
|
||||||
use tokio_io::AsyncRead;
|
use tokio_io::AsyncRead;
|
||||||
|
use tokio_tcp::TcpStream;
|
||||||
|
|
||||||
use Error;
|
use Error;
|
||||||
|
|
||||||
@ -34,11 +34,11 @@ pub struct RedisActor {
|
|||||||
|
|
||||||
impl RedisActor {
|
impl RedisActor {
|
||||||
/// Start new `Supervisor` with `RedisActor`.
|
/// Start new `Supervisor` with `RedisActor`.
|
||||||
pub fn start<S: Into<String>>(addr: S) -> Addr<Unsync, RedisActor> {
|
pub fn start<S: Into<String>>(addr: S) -> Addr<RedisActor> {
|
||||||
let addr = addr.into();
|
let addr = addr.into();
|
||||||
|
|
||||||
Supervisor::start(|_| RedisActor {
|
Supervisor::start(|_| RedisActor {
|
||||||
addr: addr,
|
addr,
|
||||||
cell: None,
|
cell: None,
|
||||||
backoff: ExponentialBackoff::default(),
|
backoff: ExponentialBackoff::default(),
|
||||||
queue: VecDeque::new(),
|
queue: VecDeque::new(),
|
||||||
@ -50,7 +50,7 @@ impl Actor for RedisActor {
|
|||||||
type Context = Context<Self>;
|
type Context = Context<Self>;
|
||||||
|
|
||||||
fn started(&mut self, ctx: &mut Context<Self>) {
|
fn started(&mut self, ctx: &mut Context<Self>) {
|
||||||
Connector::from_registry()
|
Resolver::from_registry()
|
||||||
.send(Connect::host(self.addr.as_str()))
|
.send(Connect::host(self.addr.as_str()))
|
||||||
.into_actor(self)
|
.into_actor(self)
|
||||||
.map(|res, act, ctx| match res {
|
.map(|res, act, ctx| match res {
|
||||||
@ -104,10 +104,7 @@ impl Supervised for RedisActor {
|
|||||||
|
|
||||||
impl actix::io::WriteHandler<io::Error> for RedisActor {
|
impl actix::io::WriteHandler<io::Error> for RedisActor {
|
||||||
fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
|
fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
|
||||||
warn!(
|
warn!("Redis connection dropped: {} error: {}", self.addr, err);
|
||||||
"Redis connection dropped: {} error: {}",
|
|
||||||
self.addr, err
|
|
||||||
);
|
|
||||||
Running::Stop
|
Running::Stop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,9 +136,6 @@ impl Handler<Command> for RedisActor {
|
|||||||
let _ = tx.send(Err(Error::NotConnected));
|
let _ = tx.send(Err(Error::NotConnected));
|
||||||
}
|
}
|
||||||
|
|
||||||
Box::new(
|
Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res))
|
||||||
rx.map_err(|_| Error::Disconnected)
|
|
||||||
.and_then(|res| res),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::iter::FromIterator;
|
use std::iter;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
@ -10,6 +10,7 @@ use cookie::{Cookie, CookieJar, Key};
|
|||||||
use futures::future::{err as FutErr, ok as FutOk, Either};
|
use futures::future::{err as FutErr, ok as FutOk, Either};
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use http::header::{self, HeaderValue};
|
use http::header::{self, HeaderValue};
|
||||||
|
use rand::distributions::Alphanumeric;
|
||||||
use rand::{self, Rng};
|
use rand::{self, Rng};
|
||||||
use redis_async::resp::RespValue;
|
use redis_async::resp::RespValue;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
@ -105,15 +106,15 @@ impl<S> SessionBackend<S> for RedisSessionBackend {
|
|||||||
Box::new(self.0.load(req).map(move |state| {
|
Box::new(self.0.load(req).map(move |state| {
|
||||||
if let Some((state, value)) = state {
|
if let Some((state, value)) = state {
|
||||||
RedisSession {
|
RedisSession {
|
||||||
|
inner,
|
||||||
|
state,
|
||||||
changed: false,
|
changed: false,
|
||||||
inner: inner,
|
|
||||||
state: state,
|
|
||||||
value: Some(value),
|
value: Some(value),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
RedisSession {
|
RedisSession {
|
||||||
|
inner,
|
||||||
changed: false,
|
changed: false,
|
||||||
inner: inner,
|
|
||||||
state: HashMap::new(),
|
state: HashMap::new(),
|
||||||
value: None,
|
value: None,
|
||||||
}
|
}
|
||||||
@ -126,7 +127,7 @@ struct Inner {
|
|||||||
key: Key,
|
key: Key,
|
||||||
ttl: String,
|
ttl: String,
|
||||||
name: String,
|
name: String,
|
||||||
addr: Addr<Unsync, RedisActor>,
|
addr: Addr<RedisActor>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Inner {
|
impl Inner {
|
||||||
@ -136,7 +137,7 @@ impl Inner {
|
|||||||
) -> Box<Future<Item = Option<(HashMap<String, String>, String)>, Error = Error>>
|
) -> Box<Future<Item = Option<(HashMap<String, String>, String)>, Error = Error>>
|
||||||
{
|
{
|
||||||
if let Ok(cookies) = req.cookies() {
|
if let Ok(cookies) = req.cookies() {
|
||||||
for cookie in cookies {
|
for cookie in cookies.iter() {
|
||||||
if cookie.name() == self.name {
|
if cookie.name() == self.name {
|
||||||
let mut jar = CookieJar::new();
|
let mut jar = CookieJar::new();
|
||||||
jar.add_original(cookie.clone());
|
jar.add_original(cookie.clone());
|
||||||
@ -151,8 +152,7 @@ impl Inner {
|
|||||||
match val {
|
match val {
|
||||||
RespValue::Error(err) => {
|
RespValue::Error(err) => {
|
||||||
return Err(
|
return Err(
|
||||||
error::ErrorInternalServerError(err)
|
error::ErrorInternalServerError(err),
|
||||||
.into(),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
RespValue::SimpleString(s) => {
|
RespValue::SimpleString(s) => {
|
||||||
@ -173,7 +173,7 @@ impl Inner {
|
|||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
Err(error::ErrorInternalServerError(err).into())
|
Err(error::ErrorInternalServerError(err))
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
@ -194,7 +194,10 @@ impl Inner {
|
|||||||
(value.clone(), None)
|
(value.clone(), None)
|
||||||
} else {
|
} else {
|
||||||
let mut rng = rand::OsRng::new().unwrap();
|
let mut rng = rand::OsRng::new().unwrap();
|
||||||
let value = String::from_iter(rng.gen_ascii_chars().take(32));
|
let value: String = iter::repeat(())
|
||||||
|
.map(|()| rng.sample(Alphanumeric))
|
||||||
|
.take(32)
|
||||||
|
.collect();
|
||||||
|
|
||||||
let mut cookie = Cookie::new(self.name.clone(), value.clone());
|
let mut cookie = Cookie::new(self.name.clone(), value.clone());
|
||||||
cookie.set_path("/");
|
cookie.set_path("/");
|
||||||
@ -211,9 +214,7 @@ impl Inner {
|
|||||||
Err(e) => Either::A(FutErr(e.into())),
|
Err(e) => Either::A(FutErr(e.into())),
|
||||||
Ok(body) => Either::B(
|
Ok(body) => Either::B(
|
||||||
self.addr
|
self.addr
|
||||||
.send(Command(resp_array![
|
.send(Command(resp_array!["SET", value, body, "EX", &self.ttl]))
|
||||||
"SET", value, body, "EX", &self.ttl
|
|
||||||
]))
|
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.and_then(move |res| match res {
|
.and_then(move |res| match res {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
@ -226,7 +227,7 @@ impl Inner {
|
|||||||
}
|
}
|
||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
Err(err) => Err(error::ErrorInternalServerError(err).into()),
|
Err(err) => Err(error::ErrorInternalServerError(err)),
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
})
|
})
|
||||||
|
@ -16,14 +16,13 @@ fn test_error_connect() {
|
|||||||
let addr = RedisActor::start("localhost:54000");
|
let addr = RedisActor::start("localhost:54000");
|
||||||
let _addr2 = addr.clone();
|
let _addr2 = addr.clone();
|
||||||
|
|
||||||
Arbiter::handle().spawn_fn(move || {
|
Arbiter::spawn_fn(move || {
|
||||||
addr.send(Command(resp_array!["GET", "test"]))
|
addr.send(Command(resp_array!["GET", "test"])).then(|res| {
|
||||||
.then(|res| {
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Err(Error::NotConnected)) => (),
|
Ok(Err(Error::NotConnected)) => (),
|
||||||
_ => panic!("Should not happen {:?}", res),
|
_ => panic!("Should not happen {:?}", res),
|
||||||
}
|
}
|
||||||
Arbiter::system().do_send(actix::msgs::SystemExit(0));
|
System::current().stop();
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
@ -39,15 +38,13 @@ fn test_redis() {
|
|||||||
let addr = RedisActor::start("127.0.0.1:6379");
|
let addr = RedisActor::start("127.0.0.1:6379");
|
||||||
let _addr2 = addr.clone();
|
let _addr2 = addr.clone();
|
||||||
|
|
||||||
Arbiter::handle().spawn_fn(move || {
|
Arbiter::spawn_fn(move || {
|
||||||
let addr2 = addr.clone();
|
let addr2 = addr.clone();
|
||||||
addr.send(Command(resp_array!["SET", "test", "value"]))
|
addr.send(Command(resp_array!["SET", "test", "value"]))
|
||||||
.then(move |res| match res {
|
.then(move |res| match res {
|
||||||
Ok(Ok(resp)) => {
|
Ok(Ok(resp)) => {
|
||||||
assert_eq!(resp, RespValue::SimpleString("OK".to_owned()));
|
assert_eq!(resp, RespValue::SimpleString("OK".to_owned()));
|
||||||
addr2
|
addr2.send(Command(resp_array!["GET", "test"])).then(|res| {
|
||||||
.send(Command(resp_array!["GET", "test"]))
|
|
||||||
.then(|res| {
|
|
||||||
match res {
|
match res {
|
||||||
Ok(Ok(resp)) => {
|
Ok(Ok(resp)) => {
|
||||||
println!("RESP: {:?}", resp);
|
println!("RESP: {:?}", resp);
|
||||||
@ -58,7 +55,7 @@ fn test_redis() {
|
|||||||
}
|
}
|
||||||
_ => panic!("Should not happen {:?}", res),
|
_ => panic!("Should not happen {:?}", res),
|
||||||
}
|
}
|
||||||
Arbiter::system().do_send(actix::msgs::SystemExit(0));
|
System::current().stop();
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user