From 6d90615e487746a8574949f217951177bb9fe24f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 29 Mar 2019 11:31:48 -0700 Subject: [PATCH] update actix and actix-web --- Cargo.toml | 19 +++-- examples/basic.rs | 49 ++++------- rustfmt.toml | 3 - src/lib.rs | 40 ++------- src/redis.rs | 4 +- src/session.rs | 198 +++++++++++++++++++++++--------------------- tests/test_redis.rs | 12 +-- 7 files changed, 146 insertions(+), 179 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 66ef46a31..9ee01175d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-redis" -version = "0.5.1" +version = "0.6.0" authors = ["Nikolay Kim "] description = "Redis integration for actix framework" license = "MIT/Apache-2.0" @@ -11,6 +11,7 @@ repository = "https://github.com/actix/actix-redis.git" documentation = "https://docs.rs/actix-redis/" categories = ["network-programming", "asynchronous"] exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] +edition = "2018" [lib] name = "actix_redis" @@ -24,14 +25,14 @@ codecov = { repository = "actix/actix-redis", branch = "master", service = "gith default = ["web"] # actix-web integration -web = ["actix-web", "cookie", "http", "rand", "serde", "serde_json"] +web = ["actix/http", "actix-service", "actix-utils", "actix-web/cookies", "actix-session", "cookie", "rand", "serde", "serde_json"] [dependencies] -actix = "0.7" +actix = "0.8.0-alpha.2" log = "0.4" backoff = "0.1" -failure = "^0.1.1" +derive_more = "0.14" futures = "0.1" tokio-io = "0.1" tokio-codec = "0.1" @@ -40,12 +41,14 @@ redis-async = "0.4" time = "0.1" # actix web session -actix-web = { version = "^0.7.3", optional=true } +actix-web = { version = "1.0.0-alpha.1", optional=true } +actix-utils = { version = "0.3.3", optional=true } +actix-service = { version = "0.3.4", optional=true } +actix-session = { version = "0.1.0-alpha.1", optional=true } cookie = { version="0.11", features=["percent-encode", "secure"], optional=true } -http = { version="0.1", optional=true } -rand = { version="0.5", optional=true } +rand = { version="0.6.5", optional=true } serde = { version="1.0", optional=true } serde_json = { version="1.0", optional=true } [dev-dependencies] -env_logger = "0.5" +env_logger = "0.6" diff --git a/examples/basic.rs b/examples/basic.rs index 4dbed88a7..14e1ce9ba 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,50 +1,35 @@ -#![allow(unused_variables)] -#![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] - -extern crate actix; -extern crate actix_redis; -extern crate actix_web; -extern crate env_logger; -extern crate futures; - -use actix_redis::RedisSessionBackend; -use actix_web::middleware::session::{self, RequestSession}; -use actix_web::{middleware, server, App, HttpRequest, HttpResponse, Result}; +use actix_redis::RedisSession; +use actix_session::Session; +use actix_web::{middleware, web, App, Error, HttpRequest, HttpServer, Responder}; /// simple handler -fn index(req: &HttpRequest) -> Result { +fn index(req: HttpRequest, session: Session) -> Result { println!("{:?}", req); // session - if let Some(count) = req.session().get::("counter")? { + if let Some(count) = session.get::("counter")? { println!("SESSION value: {}", count); - req.session().set("counter", count + 1)?; + session.set("counter", count + 1)?; } else { - req.session().set("counter", 1)?; + session.set("counter", 1)?; } - Ok("Welcome!".into()) + Ok("Welcome!") } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); +fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); env_logger::init(); - let sys = actix::System::new("basic-example"); - server::new(|| { + HttpServer::new(|| { App::new() // enable logger - .middleware(middleware::Logger::default()) + .wrap(middleware::Logger::default()) // cookie session middleware - .middleware(session::SessionStorage::new( - RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) - )) + .wrap(RedisSession::new("127.0.0.1:6379", &[0; 32])) // register simple route, handle all methods - .resource("/", |r| r.f(index)) - }).bind("0.0.0.0:8080") - .unwrap() - .workers(1) - .start(); - - let _ = sys.run(); + .service(web::resource("/").to(index)) + }) + .bind("0.0.0.0:8080")? + .run() } diff --git a/rustfmt.toml b/rustfmt.toml index 6db67ed28..94bd11d51 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,5 +1,2 @@ max_width = 89 reorder_imports = true -wrap_comments = true -fn_args_density = "Compressed" -#use_small_heuristics = false diff --git a/src/lib.rs b/src/lib.rs index 62deca66e..baa446c3b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,62 +7,36 @@ //! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis) //! * Minimum supported Rust version: 1.26 or later //! -extern crate actix; -extern crate backoff; -extern crate futures; -extern crate tokio_codec; -extern crate tokio_io; -extern crate tokio_tcp; #[macro_use] extern crate log; #[macro_use] extern crate redis_async; #[macro_use] -extern crate failure; -extern crate time; +extern crate derive_more; mod redis; pub use redis::{Command, RedisActor}; -#[cfg(feature = "web")] -extern crate actix_web; -#[cfg(feature = "web")] -extern crate cookie; -#[cfg(feature = "web")] -extern crate http; -#[cfg(feature = "web")] -extern crate rand; -#[cfg(feature = "web")] -extern crate serde; -#[cfg(feature = "web")] -extern crate serde_json; - #[cfg(feature = "web")] mod session; #[cfg(feature = "web")] -pub use session::RedisSessionBackend; +pub use cookie::SameSite; #[cfg(feature = "web")] -pub use cookie::{SameSite}; +pub use session::RedisSession; /// General purpose actix redis error -#[derive(Fail, Debug)] +#[derive(Debug, Display, From)] pub enum Error { - #[fail(display = "Redis error {}", _0)] + #[display(fmt = "Redis error {}", _0)] Redis(redis_async::error::Error), /// Receiving message during reconnecting - #[fail(display = "Redis: Not connected")] + #[display(fmt = "Redis: Not connected")] NotConnected, /// Cancel all waters when connection get dropped - #[fail(display = "Redis: Disconnected")] + #[display(fmt = "Redis: Disconnected")] Disconnected, } -impl From for Error { - fn from(err: redis_async::error::Error) -> Error { - Error::Redis(err) - } -} - // re-export pub use redis_async::error::Error as RespError; pub use redis_async::resp::RespValue; diff --git a/src/redis.rs b/src/redis.rs index f2f0aa6da..7d4debadb 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -14,7 +14,7 @@ use tokio_io::io::WriteHalf; use tokio_io::AsyncRead; use tokio_tcp::TcpStream; -use Error; +use crate::Error; /// Command for send data to Redis #[derive(Debug)] @@ -63,7 +63,7 @@ impl Actor for RedisActor { let (r, w) = stream.split(); // configure write side of the connection - let mut framed = actix::io::FramedWrite::new(w, RespCodec, ctx); + let framed = actix::io::FramedWrite::new(w, RespCodec, ctx); act.cell = Some(framed); // read side of the connection diff --git a/src/session.rs b/src/session.rs index 8f067a276..2b1ebbbcf 100644 --- a/src/session.rs +++ b/src/session.rs @@ -3,61 +3,20 @@ use std::iter; use std::rc::Rc; use actix::prelude::*; -use actix_web::middleware::session::{SessionBackend, SessionImpl}; -use actix_web::middleware::Response as MiddlewareResponse; -use actix_web::{error, Error, HttpRequest, HttpResponse, Result}; +use actix_service::{Service, Transform}; +use actix_session::Session; +use actix_utils::cloneable::CloneableService; +use actix_web::dev::{ServiceRequest, ServiceResponse}; +use actix_web::http::header::{self, HeaderValue}; +use actix_web::{error, Error, HttpMessage}; use cookie::{Cookie, CookieJar, Key, SameSite}; -use futures::future::{err as FutErr, ok as FutOk, Either}; -use futures::Future; -use http::header::{self, HeaderValue}; -use rand::distributions::Alphanumeric; -use rand::{self, Rng}; +use futures::future::{err, ok, Either, Future, FutureResult}; +use futures::Poll; +use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use redis_async::resp::RespValue; -use serde_json; use time::Duration; -use redis::{Command, RedisActor}; - -/// Session that stores data in redis -pub struct RedisSession { - changed: bool, - inner: Rc, - state: HashMap, - value: Option, -} - -impl SessionImpl for RedisSession { - fn get(&self, key: &str) -> Option<&str> { - self.state.get(key).map(|s| s.as_str()) - } - - fn set(&mut self, key: &str, value: String) { - self.changed = true; - self.state.insert(key.to_owned(), value); - } - - fn remove(&mut self, key: &str) { - self.changed = true; - self.state.remove(key); - } - - fn clear(&mut self) { - self.changed = true; - self.state.clear() - } - - fn write(&self, resp: HttpResponse) -> Result { - if self.changed { - Ok(MiddlewareResponse::Future(self.inner.update( - &self.state, - resp, - self.value.as_ref(), - ))) - } else { - Ok(MiddlewareResponse::Done(resp)) - } - } -} +use crate::redis::{Command, RedisActor}; /// Use redis as session storage. /// @@ -66,14 +25,14 @@ impl SessionImpl for RedisSession { /// session, When this value is changed, all session data is lost. /// /// Constructor panics if key length is less than 32 bytes. -pub struct RedisSessionBackend(Rc); +pub struct RedisSession(Rc); -impl RedisSessionBackend { +impl RedisSession { /// Create new redis session backend /// /// * `addr` - address of the redis server - pub fn new>(addr: S, key: &[u8]) -> RedisSessionBackend { - RedisSessionBackend(Rc::new(Inner { + pub fn new>(addr: S, key: &[u8]) -> RedisSession { + RedisSession(Rc::new(Inner { key: Key::from_master(key), ttl: "7200".to_owned(), addr: RedisActor::start(addr), @@ -131,29 +90,77 @@ impl RedisSessionBackend { } } -impl SessionBackend for RedisSessionBackend { - type Session = RedisSession; - type ReadFuture = Box>; +impl Transform for RedisSession +where + S: Service< + Request = ServiceRequest

, + Response = ServiceResponse, + Error = Error, + > + 'static, + S::Future: 'static, + P: 'static, + B: 'static, +{ + type Request = ServiceRequest

; + type Response = ServiceResponse; + type Error = S::Error; + type InitError = (); + type Transform = RedisSessionMiddleware; + type Future = FutureResult; - fn from_request(&self, req: &mut HttpRequest) -> Self::ReadFuture { - let inner = Rc::clone(&self.0); + fn new_transform(&self, service: S) -> Self::Future { + ok(RedisSessionMiddleware { + service: CloneableService::new(service), + inner: self.0.clone(), + }) + } +} - Box::new(self.0.load(req).map(move |state| { - if let Some((state, value)) = state { - RedisSession { - inner, - state, - changed: false, - value: Some(value), - } +/// Cookie session middleware +pub struct RedisSessionMiddleware { + service: CloneableService, + inner: Rc, +} + +impl Service for RedisSessionMiddleware +where + S: Service< + Request = ServiceRequest

, + Response = ServiceResponse, + Error = Error, + > + 'static, + S::Future: 'static, + P: 'static, + B: 'static, +{ + type Request = ServiceRequest

; + type Response = ServiceResponse; + type Error = Error; + type Future = Box>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready() + } + + fn call(&mut self, mut req: ServiceRequest

) -> Self::Future { + let mut srv = self.service.clone(); + let inner = self.inner.clone(); + + Box::new(self.inner.load(&req).and_then(move |state| { + let value = if let Some((state, value)) = state { + Session::set_session(state.into_iter(), &mut req); + Some(value) } else { - RedisSession { - inner, - changed: false, - state: HashMap::new(), - value: None, + None + }; + + srv.call(req).and_then(move |mut res| { + if let Some(state) = Session::get_changes(&mut res) { + Either::A(inner.update(res, state, value)) + } else { + Either::B(ok(res)) } - } + }) })) } } @@ -171,10 +178,10 @@ struct Inner { } impl Inner { - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - fn load( - &self, req: &mut HttpRequest, - ) -> Box, String)>, Error = Error>> + fn load

( + &self, + req: &ServiceRequest

, + ) -> impl Future, String)>, Error = Error> { if let Ok(cookies) = req.cookies() { for cookie in cookies.iter() { @@ -183,7 +190,7 @@ impl Inner { jar.add_original(cookie.clone()); if let Some(cookie) = jar.signed(&self.key).get(&self.name) { let value = cookie.value().to_owned(); - return Box::new( + return Either::A( self.addr .send(Command(resp_array!["GET", cookie.value()])) .map_err(Error::from) @@ -193,7 +200,7 @@ impl Inner { RespValue::Error(err) => { return Err( error::ErrorInternalServerError(err), - ) + ); } RespValue::SimpleString(s) => { if let Ok(val) = serde_json::from_str(&s) @@ -218,27 +225,30 @@ impl Inner { }), ); } else { - return Box::new(FutOk(None)); + return Either::B(ok(None)); } } } } - Box::new(FutOk(None)) + Either::B(ok(None)) } - fn update( - &self, state: &HashMap, mut resp: HttpResponse, - value: Option<&String>, - ) -> Box> { + fn update( + &self, + mut res: ServiceResponse, + state: impl Iterator, + value: Option, + ) -> impl Future, Error = Error> { let (value, jar) = if let Some(value) = value { (value.clone(), None) } else { - let mut rng = rand::OsRng::new().unwrap(); + let mut rng = OsRng::new().unwrap(); let value: String = iter::repeat(()) .map(|()| rng.sample(Alphanumeric)) .take(32) .collect(); + // prepare session id cookie let mut cookie = Cookie::new(self.name.clone(), value.clone()); cookie.set_path(self.path.clone()); cookie.set_secure(self.secure); @@ -263,26 +273,28 @@ impl Inner { (value, Some(jar)) }; - Box::new(match serde_json::to_string(state) { - Err(e) => Either::A(FutErr(e.into())), + let state: HashMap<_, _> = state.collect(); + + match serde_json::to_string(&state) { + Err(e) => Either::A(err(e.into())), Ok(body) => Either::B( self.addr .send(Command(resp_array!["SET", value, body, "EX", &self.ttl])) .map_err(Error::from) - .and_then(move |res| match res { + .and_then(move |redis_result| match redis_result { Ok(_) => { if let Some(jar) = jar { for cookie in jar.delta() { let val = HeaderValue::from_str(&cookie.to_string())?; - resp.headers_mut().append(header::SET_COOKIE, val); + res.headers_mut().append(header::SET_COOKIE, val); } } - Ok(resp) + Ok(res) } Err(err) => Err(error::ErrorInternalServerError(err)), }), ), - }) + } } } diff --git a/tests/test_redis.rs b/tests/test_redis.rs index a9fef4a74..979c0230a 100644 --- a/tests/test_redis.rs +++ b/tests/test_redis.rs @@ -1,16 +1,12 @@ -extern crate actix; -extern crate actix_redis; #[macro_use] extern crate redis_async; -extern crate env_logger; -extern crate futures; use actix::prelude::*; use actix_redis::{Command, Error, RedisActor, RespValue}; use futures::Future; #[test] -fn test_error_connect() { +fn test_error_connect() -> std::io::Result<()> { let sys = System::new("test"); let addr = RedisActor::start("localhost:54000"); @@ -27,11 +23,11 @@ fn test_error_connect() { }) }); - sys.run(); + sys.run() } #[test] -fn test_redis() { +fn test_redis() -> std::io::Result<()> { env_logger::init(); let sys = System::new("test"); @@ -63,5 +59,5 @@ fn test_redis() { }) }); - sys.run(); + sys.run() }