1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-02-25 11:32:50 +01:00

update actix and actix-web

This commit is contained in:
Nikolay Kim 2019-03-29 11:31:48 -07:00
parent 55fbdf1d00
commit 6d90615e48
7 changed files with 146 additions and 179 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-redis" name = "actix-redis"
version = "0.5.1" version = "0.6.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Redis integration for actix framework" description = "Redis integration for actix framework"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
@ -11,6 +11,7 @@ repository = "https://github.com/actix/actix-redis.git"
documentation = "https://docs.rs/actix-redis/" documentation = "https://docs.rs/actix-redis/"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
[lib] [lib]
name = "actix_redis" name = "actix_redis"
@ -24,14 +25,14 @@ codecov = { repository = "actix/actix-redis", branch = "master", service = "gith
default = ["web"] default = ["web"]
# actix-web integration # actix-web integration
web = ["actix-web", "cookie", "http", "rand", "serde", "serde_json"] web = ["actix/http", "actix-service", "actix-utils", "actix-web/cookies", "actix-session", "cookie", "rand", "serde", "serde_json"]
[dependencies] [dependencies]
actix = "0.7" actix = "0.8.0-alpha.2"
log = "0.4" log = "0.4"
backoff = "0.1" backoff = "0.1"
failure = "^0.1.1" derive_more = "0.14"
futures = "0.1" futures = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-codec = "0.1" tokio-codec = "0.1"
@ -40,12 +41,14 @@ redis-async = "0.4"
time = "0.1" time = "0.1"
# actix web session # actix web session
actix-web = { version = "^0.7.3", optional=true } actix-web = { version = "1.0.0-alpha.1", optional=true }
actix-utils = { version = "0.3.3", optional=true }
actix-service = { version = "0.3.4", optional=true }
actix-session = { version = "0.1.0-alpha.1", optional=true }
cookie = { version="0.11", features=["percent-encode", "secure"], optional=true } cookie = { version="0.11", features=["percent-encode", "secure"], optional=true }
http = { version="0.1", optional=true } rand = { version="0.6.5", optional=true }
rand = { version="0.5", optional=true }
serde = { version="1.0", optional=true } serde = { version="1.0", optional=true }
serde_json = { version="1.0", optional=true } serde_json = { version="1.0", optional=true }
[dev-dependencies] [dev-dependencies]
env_logger = "0.5" env_logger = "0.6"

View File

@ -1,50 +1,35 @@
#![allow(unused_variables)] use actix_redis::RedisSession;
#![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] use actix_session::Session;
use actix_web::{middleware, web, App, Error, HttpRequest, HttpServer, Responder};
extern crate actix;
extern crate actix_redis;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
use actix_redis::RedisSessionBackend;
use actix_web::middleware::session::{self, RequestSession};
use actix_web::{middleware, server, App, HttpRequest, HttpResponse, Result};
/// simple handler /// simple handler
fn index(req: &HttpRequest) -> Result<HttpResponse> { fn index(req: HttpRequest, session: Session) -> Result<impl Responder, Error> {
println!("{:?}", req); println!("{:?}", req);
// session // session
if let Some(count) = req.session().get::<i32>("counter")? { if let Some(count) = session.get::<i32>("counter")? {
println!("SESSION value: {}", count); println!("SESSION value: {}", count);
req.session().set("counter", count + 1)?; session.set("counter", count + 1)?;
} else { } else {
req.session().set("counter", 1)?; session.set("counter", 1)?;
} }
Ok("Welcome!".into()) Ok("Welcome!")
} }
fn main() { fn main() -> std::io::Result<()> {
::std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info");
env_logger::init(); env_logger::init();
let sys = actix::System::new("basic-example");
server::new(|| { HttpServer::new(|| {
App::new() App::new()
// enable logger // enable logger
.middleware(middleware::Logger::default()) .wrap(middleware::Logger::default())
// cookie session middleware // cookie session middleware
.middleware(session::SessionStorage::new( .wrap(RedisSession::new("127.0.0.1:6379", &[0; 32]))
RedisSessionBackend::new("127.0.0.1:6379", &[0; 32])
))
// register simple route, handle all methods // register simple route, handle all methods
.resource("/", |r| r.f(index)) .service(web::resource("/").to(index))
}).bind("0.0.0.0:8080") })
.unwrap() .bind("0.0.0.0:8080")?
.workers(1) .run()
.start();
let _ = sys.run();
} }

View File

@ -1,5 +1,2 @@
max_width = 89 max_width = 89
reorder_imports = true reorder_imports = true
wrap_comments = true
fn_args_density = "Compressed"
#use_small_heuristics = false

View File

@ -7,62 +7,36 @@
//! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis) //! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis)
//! * Minimum supported Rust version: 1.26 or later //! * Minimum supported Rust version: 1.26 or later
//! //!
extern crate actix;
extern crate backoff;
extern crate futures;
extern crate tokio_codec;
extern crate tokio_io;
extern crate tokio_tcp;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate redis_async; extern crate redis_async;
#[macro_use] #[macro_use]
extern crate failure; extern crate derive_more;
extern crate time;
mod redis; mod redis;
pub use redis::{Command, RedisActor}; pub use redis::{Command, RedisActor};
#[cfg(feature = "web")]
extern crate actix_web;
#[cfg(feature = "web")]
extern crate cookie;
#[cfg(feature = "web")]
extern crate http;
#[cfg(feature = "web")]
extern crate rand;
#[cfg(feature = "web")]
extern crate serde;
#[cfg(feature = "web")]
extern crate serde_json;
#[cfg(feature = "web")] #[cfg(feature = "web")]
mod session; mod session;
#[cfg(feature = "web")] #[cfg(feature = "web")]
pub use session::RedisSessionBackend; pub use cookie::SameSite;
#[cfg(feature = "web")] #[cfg(feature = "web")]
pub use cookie::{SameSite}; pub use session::RedisSession;
/// General purpose actix redis error /// General purpose actix redis error
#[derive(Fail, Debug)] #[derive(Debug, Display, From)]
pub enum Error { pub enum Error {
#[fail(display = "Redis error {}", _0)] #[display(fmt = "Redis error {}", _0)]
Redis(redis_async::error::Error), Redis(redis_async::error::Error),
/// Receiving message during reconnecting /// Receiving message during reconnecting
#[fail(display = "Redis: Not connected")] #[display(fmt = "Redis: Not connected")]
NotConnected, NotConnected,
/// Cancel all waters when connection get dropped /// Cancel all waters when connection get dropped
#[fail(display = "Redis: Disconnected")] #[display(fmt = "Redis: Disconnected")]
Disconnected, Disconnected,
} }
impl From<redis_async::error::Error> for Error {
fn from(err: redis_async::error::Error) -> Error {
Error::Redis(err)
}
}
// re-export // re-export
pub use redis_async::error::Error as RespError; pub use redis_async::error::Error as RespError;
pub use redis_async::resp::RespValue; pub use redis_async::resp::RespValue;

View File

@ -14,7 +14,7 @@ use tokio_io::io::WriteHalf;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
use Error; use crate::Error;
/// Command for send data to Redis /// Command for send data to Redis
#[derive(Debug)] #[derive(Debug)]
@ -63,7 +63,7 @@ impl Actor for RedisActor {
let (r, w) = stream.split(); let (r, w) = stream.split();
// configure write side of the connection // configure write side of the connection
let mut framed = actix::io::FramedWrite::new(w, RespCodec, ctx); let framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
act.cell = Some(framed); act.cell = Some(framed);
// read side of the connection // read side of the connection

View File

@ -3,61 +3,20 @@ use std::iter;
use std::rc::Rc; use std::rc::Rc;
use actix::prelude::*; use actix::prelude::*;
use actix_web::middleware::session::{SessionBackend, SessionImpl}; use actix_service::{Service, Transform};
use actix_web::middleware::Response as MiddlewareResponse; use actix_session::Session;
use actix_web::{error, Error, HttpRequest, HttpResponse, Result}; use actix_utils::cloneable::CloneableService;
use actix_web::dev::{ServiceRequest, ServiceResponse};
use actix_web::http::header::{self, HeaderValue};
use actix_web::{error, Error, HttpMessage};
use cookie::{Cookie, CookieJar, Key, SameSite}; use cookie::{Cookie, CookieJar, Key, SameSite};
use futures::future::{err as FutErr, ok as FutOk, Either}; use futures::future::{err, ok, Either, Future, FutureResult};
use futures::Future; use futures::Poll;
use http::header::{self, HeaderValue}; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng};
use rand::distributions::Alphanumeric;
use rand::{self, Rng};
use redis_async::resp::RespValue; use redis_async::resp::RespValue;
use serde_json;
use time::Duration; use time::Duration;
use redis::{Command, RedisActor}; use crate::redis::{Command, RedisActor};
/// Session that stores data in redis
pub struct RedisSession {
changed: bool,
inner: Rc<Inner>,
state: HashMap<String, String>,
value: Option<String>,
}
impl SessionImpl for RedisSession {
fn get(&self, key: &str) -> Option<&str> {
self.state.get(key).map(|s| s.as_str())
}
fn set(&mut self, key: &str, value: String) {
self.changed = true;
self.state.insert(key.to_owned(), value);
}
fn remove(&mut self, key: &str) {
self.changed = true;
self.state.remove(key);
}
fn clear(&mut self) {
self.changed = true;
self.state.clear()
}
fn write(&self, resp: HttpResponse) -> Result<MiddlewareResponse> {
if self.changed {
Ok(MiddlewareResponse::Future(self.inner.update(
&self.state,
resp,
self.value.as_ref(),
)))
} else {
Ok(MiddlewareResponse::Done(resp))
}
}
}
/// Use redis as session storage. /// Use redis as session storage.
/// ///
@ -66,14 +25,14 @@ impl SessionImpl for RedisSession {
/// session, When this value is changed, all session data is lost. /// session, When this value is changed, all session data is lost.
/// ///
/// Constructor panics if key length is less than 32 bytes. /// Constructor panics if key length is less than 32 bytes.
pub struct RedisSessionBackend(Rc<Inner>); pub struct RedisSession(Rc<Inner>);
impl RedisSessionBackend { impl RedisSession {
/// Create new redis session backend /// Create new redis session backend
/// ///
/// * `addr` - address of the redis server /// * `addr` - address of the redis server
pub fn new<S: Into<String>>(addr: S, key: &[u8]) -> RedisSessionBackend { pub fn new<S: Into<String>>(addr: S, key: &[u8]) -> RedisSession {
RedisSessionBackend(Rc::new(Inner { RedisSession(Rc::new(Inner {
key: Key::from_master(key), key: Key::from_master(key),
ttl: "7200".to_owned(), ttl: "7200".to_owned(),
addr: RedisActor::start(addr), addr: RedisActor::start(addr),
@ -131,29 +90,77 @@ impl RedisSessionBackend {
} }
} }
impl<S> SessionBackend<S> for RedisSessionBackend { impl<S, P, B> Transform<S> for RedisSession
type Session = RedisSession; where
type ReadFuture = Box<Future<Item = RedisSession, Error = Error>>; S: Service<
Request = ServiceRequest<P>,
Response = ServiceResponse<B>,
Error = Error,
> + 'static,
S::Future: 'static,
P: 'static,
B: 'static,
{
type Request = ServiceRequest<P>;
type Response = ServiceResponse<B>;
type Error = S::Error;
type InitError = ();
type Transform = RedisSessionMiddleware<S>;
type Future = FutureResult<Self::Transform, Self::InitError>;
fn from_request(&self, req: &mut HttpRequest<S>) -> Self::ReadFuture { fn new_transform(&self, service: S) -> Self::Future {
let inner = Rc::clone(&self.0); ok(RedisSessionMiddleware {
service: CloneableService::new(service),
inner: self.0.clone(),
})
}
}
Box::new(self.0.load(req).map(move |state| { /// Cookie session middleware
if let Some((state, value)) = state { pub struct RedisSessionMiddleware<S: 'static> {
RedisSession { service: CloneableService<S>,
inner, inner: Rc<Inner>,
state, }
changed: false,
value: Some(value), impl<S, P, B> Service for RedisSessionMiddleware<S>
} where
S: Service<
Request = ServiceRequest<P>,
Response = ServiceResponse<B>,
Error = Error,
> + 'static,
S::Future: 'static,
P: 'static,
B: 'static,
{
type Request = ServiceRequest<P>;
type Response = ServiceResponse<B>;
type Error = Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
}
fn call(&mut self, mut req: ServiceRequest<P>) -> Self::Future {
let mut srv = self.service.clone();
let inner = self.inner.clone();
Box::new(self.inner.load(&req).and_then(move |state| {
let value = if let Some((state, value)) = state {
Session::set_session(state.into_iter(), &mut req);
Some(value)
} else { } else {
RedisSession { None
inner, };
changed: false,
state: HashMap::new(), srv.call(req).and_then(move |mut res| {
value: None, if let Some(state) = Session::get_changes(&mut res) {
Either::A(inner.update(res, state, value))
} else {
Either::B(ok(res))
} }
} })
})) }))
} }
} }
@ -171,10 +178,10 @@ struct Inner {
} }
impl Inner { impl Inner {
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] fn load<P>(
fn load<S>( &self,
&self, req: &mut HttpRequest<S>, req: &ServiceRequest<P>,
) -> Box<Future<Item = Option<(HashMap<String, String>, String)>, Error = Error>> ) -> impl Future<Item = Option<(HashMap<String, String>, String)>, Error = Error>
{ {
if let Ok(cookies) = req.cookies() { if let Ok(cookies) = req.cookies() {
for cookie in cookies.iter() { for cookie in cookies.iter() {
@ -183,7 +190,7 @@ impl Inner {
jar.add_original(cookie.clone()); jar.add_original(cookie.clone());
if let Some(cookie) = jar.signed(&self.key).get(&self.name) { if let Some(cookie) = jar.signed(&self.key).get(&self.name) {
let value = cookie.value().to_owned(); let value = cookie.value().to_owned();
return Box::new( return Either::A(
self.addr self.addr
.send(Command(resp_array!["GET", cookie.value()])) .send(Command(resp_array!["GET", cookie.value()]))
.map_err(Error::from) .map_err(Error::from)
@ -193,7 +200,7 @@ impl Inner {
RespValue::Error(err) => { RespValue::Error(err) => {
return Err( return Err(
error::ErrorInternalServerError(err), error::ErrorInternalServerError(err),
) );
} }
RespValue::SimpleString(s) => { RespValue::SimpleString(s) => {
if let Ok(val) = serde_json::from_str(&s) if let Ok(val) = serde_json::from_str(&s)
@ -218,27 +225,30 @@ impl Inner {
}), }),
); );
} else { } else {
return Box::new(FutOk(None)); return Either::B(ok(None));
} }
} }
} }
} }
Box::new(FutOk(None)) Either::B(ok(None))
} }
fn update( fn update<B>(
&self, state: &HashMap<String, String>, mut resp: HttpResponse, &self,
value: Option<&String>, mut res: ServiceResponse<B>,
) -> Box<Future<Item = HttpResponse, Error = Error>> { state: impl Iterator<Item = (String, String)>,
value: Option<String>,
) -> impl Future<Item = ServiceResponse<B>, Error = Error> {
let (value, jar) = if let Some(value) = value { let (value, jar) = if let Some(value) = value {
(value.clone(), None) (value.clone(), None)
} else { } else {
let mut rng = rand::OsRng::new().unwrap(); let mut rng = OsRng::new().unwrap();
let value: String = iter::repeat(()) let value: String = iter::repeat(())
.map(|()| rng.sample(Alphanumeric)) .map(|()| rng.sample(Alphanumeric))
.take(32) .take(32)
.collect(); .collect();
// prepare session id cookie
let mut cookie = Cookie::new(self.name.clone(), value.clone()); let mut cookie = Cookie::new(self.name.clone(), value.clone());
cookie.set_path(self.path.clone()); cookie.set_path(self.path.clone());
cookie.set_secure(self.secure); cookie.set_secure(self.secure);
@ -263,26 +273,28 @@ impl Inner {
(value, Some(jar)) (value, Some(jar))
}; };
Box::new(match serde_json::to_string(state) { let state: HashMap<_, _> = state.collect();
Err(e) => Either::A(FutErr(e.into())),
match serde_json::to_string(&state) {
Err(e) => Either::A(err(e.into())),
Ok(body) => Either::B( Ok(body) => Either::B(
self.addr self.addr
.send(Command(resp_array!["SET", value, body, "EX", &self.ttl])) .send(Command(resp_array!["SET", value, body, "EX", &self.ttl]))
.map_err(Error::from) .map_err(Error::from)
.and_then(move |res| match res { .and_then(move |redis_result| match redis_result {
Ok(_) => { Ok(_) => {
if let Some(jar) = jar { if let Some(jar) = jar {
for cookie in jar.delta() { for cookie in jar.delta() {
let val = let val =
HeaderValue::from_str(&cookie.to_string())?; HeaderValue::from_str(&cookie.to_string())?;
resp.headers_mut().append(header::SET_COOKIE, val); res.headers_mut().append(header::SET_COOKIE, val);
} }
} }
Ok(resp) Ok(res)
} }
Err(err) => Err(error::ErrorInternalServerError(err)), Err(err) => Err(error::ErrorInternalServerError(err)),
}), }),
), ),
}) }
} }
} }

View File

@ -1,16 +1,12 @@
extern crate actix;
extern crate actix_redis;
#[macro_use] #[macro_use]
extern crate redis_async; extern crate redis_async;
extern crate env_logger;
extern crate futures;
use actix::prelude::*; use actix::prelude::*;
use actix_redis::{Command, Error, RedisActor, RespValue}; use actix_redis::{Command, Error, RedisActor, RespValue};
use futures::Future; use futures::Future;
#[test] #[test]
fn test_error_connect() { fn test_error_connect() -> std::io::Result<()> {
let sys = System::new("test"); let sys = System::new("test");
let addr = RedisActor::start("localhost:54000"); let addr = RedisActor::start("localhost:54000");
@ -27,11 +23,11 @@ fn test_error_connect() {
}) })
}); });
sys.run(); sys.run()
} }
#[test] #[test]
fn test_redis() { fn test_redis() -> std::io::Result<()> {
env_logger::init(); env_logger::init();
let sys = System::new("test"); let sys = System::new("test");
@ -63,5 +59,5 @@ fn test_redis() {
}) })
}); });
sys.run(); sys.run()
} }