diff --git a/actix-redis/.gitignore b/actix-redis/.gitignore new file mode 100644 index 000000000..c786504ea --- /dev/null +++ b/actix-redis/.gitignore @@ -0,0 +1,15 @@ +Cargo.lock +target/ +guide/build/ +/gh-pages +/.history + +*.so +*.out +*.pyc +*.pid +*.sock +*~ + +# These are backup files generated by rustfmt +**/*.rs.bk diff --git a/actix-redis/.travis.yml b/actix-redis/.travis.yml new file mode 100644 index 000000000..cb3e21855 --- /dev/null +++ b/actix-redis/.travis.yml @@ -0,0 +1,65 @@ +language: rust +rust: + - stable + - beta + - nightly + +sudo: required +dist: trusty + +services: + - redis-server + +env: + global: + - RUSTFLAGS="-C link-dead-code" + +addons: + apt: + packages: + - libcurl4-openssl-dev + - libelf-dev + - libdw-dev + - cmake + - gcc + - binutils-dev + - libiberty-dev + +# Add clippy +before_script: + - | + if [[ "$TRAVIS_RUST_VERSION" == "nightly" ]]; then + ( ( cargo install clippy && export CLIPPY=true ) || export CLIPPY=false ); + fi + - export PATH=$PATH:~/.cargo/bin + +script: + - | + if [[ "$TRAVIS_RUST_VERSION" == "nightly" ]]; then + USE_SKEPTIC=1 cargo test + else + cargo test + fi + - | + if [[ "$TRAVIS_RUST_VERSION" == "nightly" && $CLIPPY ]]; then + cargo clippy + fi + +# Upload docs +after_success: + - | + if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "stable" ]]; then + cargo doc --no-deps && + echo "" > target/doc/index.html && + git clone https://github.com/davisp/ghp-import.git && + ./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc && + echo "Uploaded documentation" + fi + + - | + if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_RUST_VERSION" == "nightly" ]]; then + bash <(curl https://raw.githubusercontent.com/xd009642/tarpaulin/master/travis-install.sh) + USE_SKEPTIC=1 cargo tarpaulin --out Xml + bash <(curl -s https://codecov.io/bash) + echo "Uploaded code coverage" + fi diff --git a/actix-redis/CHANGES.md b/actix-redis/CHANGES.md new file mode 100644 index 000000000..8e0f972bc --- /dev/null +++ b/actix-redis/CHANGES.md @@ -0,0 +1,65 @@ +# Changes + +## [0.8.0] 2019-12-20 + +* Release + +## [0.8.0-alpha.1] 2019-12-16 + +* Migrate to actix 0.9 + +## 0.7 (2019-09-25) + +* added cache_keygen functionality to RedisSession builder, enabling support for + customizable cache key creation + + +## 0.6.1 (2019-07-19) + +* remove ClonableService usage + +* added comprehensive tests for session workflow + + +## 0.6.0 (2019-07-08) + +* actix-web 1.0.0 compatibility + +* Upgraded logic that evaluates session state, including new SessionStatus field, + and introduced ``session.renew()`` and ``session.purge()`` functionality. + Use ``renew()`` to cycle the session key at successful login. ``renew()`` keeps a + session's state while replacing the old cookie and session key with new ones. + Use ``purge()`` at logout to invalidate the session cookie and remove the + session's redis cache entry. + + + +## 0.5.1 (2018-08-02) + +* Use cookie 0.11 + + +## 0.5.0 (2018-07-21) + +* Session cookie configuration + +* Actix/Actix-web 0.7 compatibility + + +## 0.4.0 (2018-05-08) + +* Actix web 0.6 compatibility + +## 0.3.0 (2018-04-10) + +* Actix web 0.5 compatibility + +## 0.2.0 (2018-02-28) + +* Use resolver actor from actix + +* Use actix web 0.5 + +## 0.1.0 (2018-01-23) + +* First release diff --git a/actix-redis/Cargo.toml b/actix-redis/Cargo.toml new file mode 100644 index 000000000..add06caa7 --- /dev/null +++ b/actix-redis/Cargo.toml @@ -0,0 +1,54 @@ +[package] +name = "actix-redis" +version = "0.8.0" +authors = ["Nikolay Kim "] +description = "Redis integration for actix framework" +license = "MIT/Apache-2.0" +readme = "README.md" +keywords = ["web", "redis", "async", "actix", "tokio"] +homepage = "https://github.com/actix/actix-redis" +repository = "https://github.com/actix/actix-redis.git" +documentation = "https://docs.rs/actix-redis/" +categories = ["network-programming", "asynchronous"] +exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] +edition = "2018" + +[lib] +name = "actix_redis" +path = "src/lib.rs" + +[badges] +travis-ci = { repository = "actix/actix-redis", branch = "master" } +codecov = { repository = "actix/actix-redis", branch = "master", service = "github" } + +[features] +default = ["web"] + +# actix-web integration +web = ["actix/http", "actix-service", "actix-web", "actix-session/cookie-session", "rand", "serde", "serde_json"] + +[dependencies] +actix = "0.9.0" +actix-utils = "1.0.3" + +log = "0.4.6" +backoff = "0.1.5" +derive_more = "0.99.2" +futures = "0.3.1" +redis-async = "0.6.1" +actix-rt = "1.0.0" +time = "0.1.42" +tokio = "0.2.6" +tokio-util = "0.2.0" + +# actix web session +actix-web = { version = "2.0.0", optional = true } +actix-service = { version = "1.0.0", optional = true } +actix-session = { version = "0.3.0", optional = true } +rand = { version = "0.7.0", optional = true } +serde = { version = "1.0.101", optional = true, features = ["derive"] } +serde_json = { version = "1.0.40", optional = true } +env_logger = "0.6.2" + +[dev-dependencies] +env_logger = "0.6" diff --git a/actix-redis/LICENSE-APACHE b/actix-redis/LICENSE-APACHE new file mode 100644 index 000000000..6cdf2d16c --- /dev/null +++ b/actix-redis/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017-NOW Nikolay Kim + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/actix-redis/LICENSE-MIT b/actix-redis/LICENSE-MIT new file mode 100644 index 000000000..410ce45a4 --- /dev/null +++ b/actix-redis/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2017 Nikilay Kim + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/actix-redis/README.md b/actix-redis/README.md new file mode 100644 index 000000000..5b973fb83 --- /dev/null +++ b/actix-redis/README.md @@ -0,0 +1,61 @@ +# Actix redis [![Build Status](https://travis-ci.org/actix/actix-redis.svg?branch=master)](https://travis-ci.org/actix/actix-redis) [![codecov](https://codecov.io/gh/actix/actix-redis/branch/master/graph/badge.svg)](https://codecov.io/gh/actix/actix-redis) [![crates.io](http://meritbadge.herokuapp.com/actix-redis)](https://crates.io/crates/actix-redis) + +Redis integration for actix framework. + +## Documentation + +* [API Documentation](http://actix.github.io/actix-redis/actix_redis/) +* [Chat on gitter](https://gitter.im/actix/actix) +* Cargo package: [actix-redis](https://crates.io/crates/actix-redis) +* Minimum supported Rust version: 1.39 or later + + +## Redis session backend + +Use redis as session storage. + +You need to pass an address of the redis server and random value to the +constructor of `RedisSessionBackend`. This is private key for cookie session, +When this value is changed, all session data is lost. + +Note that whatever you write into your session is visible by the user (but not modifiable). + +Constructor panics if key length is less than 32 bytes. + +```rust +use actix_web::{App, HttpServer, web, middleware}; +use actix_web::middleware::session::SessionStorage; +use actix_redis::RedisSessionBackend; + +#[actix_rt::main] +async fn main() -> std::io::Result { + HttpServer::new(|| App::new() + // enable logger + .middleware(middleware::Logger::default()) + // cookie session middleware + .middleware(SessionStorage::new( + RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) + )) + // register simple route, handle all methods + .service(web::resource("/").to(index)) + ) + .bind("0.0.0.0:8080")? + .start() + .await +} +``` + +## License + +This project is licensed under either of + +* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or [http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)) +* MIT license ([LICENSE-MIT](LICENSE-MIT) or [http://opensource.org/licenses/MIT](http://opensource.org/licenses/MIT)) + +at your option. + +## Code of Conduct + +Contribution to the actix-redis crate is organized under the terms of the +Contributor Covenant, the maintainer of actix-redis, @fafhrd91, promises to +intervene to uphold that code of conduct. diff --git a/actix-redis/examples/basic.rs b/actix-redis/examples/basic.rs new file mode 100644 index 000000000..0d95cad80 --- /dev/null +++ b/actix-redis/examples/basic.rs @@ -0,0 +1,37 @@ +use actix_redis::RedisSession; +use actix_session::Session; +use actix_web::{middleware, web, App, Error, HttpRequest, HttpServer, Responder}; + +/// simple handler +async fn index(req: HttpRequest, session: Session) -> Result { + println!("{:?}", req); + + // session + if let Some(count) = session.get::("counter")? { + println!("SESSION value: {}", count); + session.set("counter", count + 1)?; + } else { + session.set("counter", 1)?; + } + + Ok("Welcome!") +} + +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); + env_logger::init(); + + HttpServer::new(|| { + App::new() + // enable logger + .wrap(middleware::Logger::default()) + // cookie session middleware + .wrap(RedisSession::new("127.0.0.1:6379", &[0; 32])) + // register simple route, handle all methods + .service(web::resource("/").to(index)) + }) + .bind("0.0.0.0:8080")? + .run() + .await +} diff --git a/actix-redis/rustfmt.toml b/actix-redis/rustfmt.toml new file mode 100644 index 000000000..94bd11d51 --- /dev/null +++ b/actix-redis/rustfmt.toml @@ -0,0 +1,2 @@ +max_width = 89 +reorder_imports = true diff --git a/actix-redis/src/lib.rs b/actix-redis/src/lib.rs new file mode 100644 index 000000000..5ee762010 --- /dev/null +++ b/actix-redis/src/lib.rs @@ -0,0 +1,45 @@ +//! Redis integration for Actix framework. +//! +//! ## Documentation +//! * [API Documentation (Development)](http://actix.github.io/actix-redis/actix_redis/) +//! * [API Documentation (Releases)](https://docs.rs/actix-redis/) +//! * [Chat on gitter](https://gitter.im/actix/actix) +//! * Cargo package: [actix-redis](https://crates.io/crates/actix-redis) +//! * Minimum supported Rust version: 1.26 or later +//! +#[macro_use] +extern crate log; +#[macro_use] +extern crate redis_async; +#[macro_use] +extern crate derive_more; + +mod redis; +pub use redis::{Command, RedisActor}; + +#[cfg(feature = "web")] +mod session; +#[cfg(feature = "web")] +pub use actix_web::cookie::SameSite; +#[cfg(feature = "web")] +pub use session::RedisSession; + +/// General purpose actix redis error +#[derive(Debug, Display, From)] +pub enum Error { + #[display(fmt = "Redis error {}", _0)] + Redis(redis_async::error::Error), + /// Receiving message during reconnecting + #[display(fmt = "Redis: Not connected")] + NotConnected, + /// Cancel all waters when connection get dropped + #[display(fmt = "Redis: Disconnected")] + Disconnected, +} + +#[cfg(feature = "web")] +impl actix_web::ResponseError for Error {} + +// re-export +pub use redis_async::error::Error as RespError; +pub use redis_async::resp::RespValue; diff --git a/actix-redis/src/redis.rs b/actix-redis/src/redis.rs new file mode 100644 index 000000000..6a4c81a3f --- /dev/null +++ b/actix-redis/src/redis.rs @@ -0,0 +1,147 @@ +use std::collections::VecDeque; +use std::io; + +use actix::actors::resolver::{Connect, Resolver}; +use actix::prelude::*; +use actix_utils::oneshot; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use futures::FutureExt; +use redis_async::error::Error as RespError; +use redis_async::resp::{RespCodec, RespValue}; +use tokio::io::{split, WriteHalf}; +use tokio::net::TcpStream; +use tokio_util::codec::FramedRead; + +use crate::Error; + +/// Command for send data to Redis +#[derive(Debug)] +pub struct Command(pub RespValue); + +impl Message for Command { + type Result = Result; +} + +/// Redis comminucation actor +pub struct RedisActor { + addr: String, + backoff: ExponentialBackoff, + cell: Option, RespCodec>>, + queue: VecDeque>>, +} + +impl RedisActor { + /// Start new `Supervisor` with `RedisActor`. + pub fn start>(addr: S) -> Addr { + let addr = addr.into(); + + let mut backoff = ExponentialBackoff::default(); + backoff.max_elapsed_time = None; + + Supervisor::start(|_| RedisActor { + addr, + cell: None, + backoff, + queue: VecDeque::new(), + }) + } +} + +impl Actor for RedisActor { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + Resolver::from_registry() + .send(Connect::host(self.addr.as_str())) + .into_actor(self) + .map(|res, act, ctx| match res { + Ok(res) => match res { + Ok(stream) => { + info!("Connected to redis server: {}", act.addr); + + let (r, w) = split(stream); + + // configure write side of the connection + let framed = actix::io::FramedWrite::new(w, RespCodec, ctx); + act.cell = Some(framed); + + // read side of the connection + ctx.add_stream(FramedRead::new(r, RespCodec)); + + act.backoff.reset(); + } + Err(err) => { + error!("Can not connect to redis server: {}", err); + // re-connect with backoff time. + // we stop current context, supervisor will restart it. + if let Some(timeout) = act.backoff.next_backoff() { + ctx.run_later(timeout, |_, ctx| ctx.stop()); + } + } + }, + Err(err) => { + error!("Can not connect to redis server: {}", err); + // re-connect with backoff time. + // we stop current context, supervisor will restart it. + if let Some(timeout) = act.backoff.next_backoff() { + ctx.run_later(timeout, |_, ctx| ctx.stop()); + } + } + }) + .wait(ctx); + } +} + +impl Supervised for RedisActor { + fn restarting(&mut self, _: &mut Self::Context) { + self.cell.take(); + for tx in self.queue.drain(..) { + let _ = tx.send(Err(Error::Disconnected)); + } + } +} + +impl actix::io::WriteHandler for RedisActor { + fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running { + warn!("Redis connection dropped: {} error: {}", self.addr, err); + Running::Stop + } +} + +impl StreamHandler> for RedisActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Err(e) => { + if let Some(tx) = self.queue.pop_front() { + let _ = tx.send(Err(e.into())); + } + ctx.stop(); + } + Ok(val) => { + if let Some(tx) = self.queue.pop_front() { + let _ = tx.send(Ok(val)); + } + } + } + } +} + +impl Handler for RedisActor { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result { + let (tx, rx) = oneshot::channel(); + if let Some(ref mut cell) = self.cell { + self.queue.push_back(tx); + cell.write(msg.0); + } else { + let _ = tx.send(Err(Error::NotConnected)); + } + + Box::pin(rx.map(|res| match res { + Ok(res) => res, + Err(_) => Err(Error::Disconnected), + })) + } +} diff --git a/actix-redis/src/session.rs b/actix-redis/src/session.rs new file mode 100644 index 000000000..ced1822ad --- /dev/null +++ b/actix-redis/src/session.rs @@ -0,0 +1,660 @@ +use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{collections::HashMap, iter, rc::Rc}; + +use actix::prelude::*; +use actix_service::{Service, Transform}; +use actix_session::{Session, SessionStatus}; +use actix_web::cookie::{Cookie, CookieJar, Key, SameSite}; +use actix_web::dev::{ServiceRequest, ServiceResponse}; +use actix_web::http::header::{self, HeaderValue}; +use actix_web::{error, Error, HttpMessage}; +use futures::future::{ok, Future, Ready}; +use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; +use redis_async::resp::RespValue; +use time::{self, Duration}; + +use crate::redis::{Command, RedisActor}; + +/// Use redis as session storage. +/// +/// You need to pass an address of the redis server and random value to the +/// constructor of `RedisSessionBackend`. This is private key for cookie +/// session, When this value is changed, all session data is lost. +/// +/// Constructor panics if key length is less than 32 bytes. +pub struct RedisSession(Rc); + +impl RedisSession { + /// Create new redis session backend + /// + /// * `addr` - address of the redis server + pub fn new>(addr: S, key: &[u8]) -> RedisSession { + RedisSession(Rc::new(Inner { + key: Key::from_master(key), + cache_keygen: Box::new(|key: &str| format!("session:{}", &key)), + ttl: "7200".to_owned(), + addr: RedisActor::start(addr), + name: "actix-session".to_owned(), + path: "/".to_owned(), + domain: None, + secure: false, + max_age: Some(Duration::days(7)), + same_site: None, + })) + } + + /// Set time to live in seconds for session value + pub fn ttl(mut self, ttl: u16) -> Self { + Rc::get_mut(&mut self.0).unwrap().ttl = format!("{}", ttl); + self + } + + /// Set custom cookie name for session id + pub fn cookie_name(mut self, name: &str) -> Self { + Rc::get_mut(&mut self.0).unwrap().name = name.to_owned(); + self + } + + /// Set custom cookie path + pub fn cookie_path(mut self, path: &str) -> Self { + Rc::get_mut(&mut self.0).unwrap().path = path.to_owned(); + self + } + + /// Set custom cookie domain + pub fn cookie_domain(mut self, domain: &str) -> Self { + Rc::get_mut(&mut self.0).unwrap().domain = Some(domain.to_owned()); + self + } + + /// Set custom cookie secure + /// If the `secure` field is set, a cookie will only be transmitted when the + /// connection is secure - i.e. `https` + pub fn cookie_secure(mut self, secure: bool) -> Self { + Rc::get_mut(&mut self.0).unwrap().secure = secure; + self + } + + /// Set custom cookie max-age + pub fn cookie_max_age(mut self, max_age: Duration) -> Self { + Rc::get_mut(&mut self.0).unwrap().max_age = Some(max_age); + self + } + + /// Set custom cookie SameSite + pub fn cookie_same_site(mut self, same_site: SameSite) -> Self { + Rc::get_mut(&mut self.0).unwrap().same_site = Some(same_site); + self + } + + /// Set a custom cache key generation strategy, expecting session key as input + pub fn cache_keygen(mut self, keygen: Box String>) -> Self { + Rc::get_mut(&mut self.0).unwrap().cache_keygen = keygen; + self + } +} + +impl Transform for RedisSession +where + S: Service, Error = Error> + + 'static, + S::Future: 'static, + B: 'static, +{ + type Request = ServiceRequest; + type Response = ServiceResponse; + type Error = S::Error; + type InitError = (); + type Transform = RedisSessionMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ok(RedisSessionMiddleware { + service: Rc::new(RefCell::new(service)), + inner: self.0.clone(), + }) + } +} + +/// Cookie session middleware +pub struct RedisSessionMiddleware { + service: Rc>, + inner: Rc, +} + +impl Service for RedisSessionMiddleware +where + S: Service, Error = Error> + + 'static, + S::Future: 'static, + B: 'static, +{ + type Request = ServiceRequest; + type Response = ServiceResponse; + type Error = Error; + type Future = Pin>>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.borrow_mut().poll_ready(cx) + } + + fn call(&mut self, mut req: ServiceRequest) -> Self::Future { + let mut srv = self.service.clone(); + let inner = self.inner.clone(); + + Box::pin(async move { + let state = inner.load(&req).await?; + let value = if let Some((state, value)) = state { + Session::set_session(state.into_iter(), &mut req); + Some(value) + } else { + None + }; + + let mut res = srv.call(req).await?; + + match Session::get_changes(&mut res) { + (SessionStatus::Unchanged, None) => Ok(res), + (SessionStatus::Unchanged, Some(state)) => { + if value.is_none() { + // implies the session is new + inner.update(res, state, value).await + } else { + Ok(res) + } + } + (SessionStatus::Changed, Some(state)) => { + inner.update(res, state, value).await + } + (SessionStatus::Purged, Some(_)) => { + if let Some(val) = value { + inner.clear_cache(val).await?; + match inner.remove_cookie(&mut res) { + Ok(_) => Ok(res), + Err(_err) => Err(error::ErrorInternalServerError(_err)), + } + } else { + Err(error::ErrorInternalServerError("unexpected")) + } + } + (SessionStatus::Renewed, Some(state)) => { + if let Some(val) = value { + inner.clear_cache(val).await?; + inner.update(res, state, None).await + } else { + inner.update(res, state, None).await + } + } + (_, None) => unreachable!(), + } + }) + } +} + +struct Inner { + key: Key, + cache_keygen: Box String>, + ttl: String, + addr: Addr, + name: String, + path: String, + domain: Option, + secure: bool, + max_age: Option, + same_site: Option, +} + +impl Inner { + async fn load( + &self, + req: &ServiceRequest, + ) -> Result, String)>, Error> { + if let Ok(cookies) = req.cookies() { + for cookie in cookies.iter() { + if cookie.name() == self.name { + let mut jar = CookieJar::new(); + jar.add_original(cookie.clone()); + if let Some(cookie) = jar.signed(&self.key).get(&self.name) { + let value = cookie.value().to_owned(); + let cachekey = (self.cache_keygen)(&cookie.value()); + return match self + .addr + .send(Command(resp_array!["GET", cachekey])) + .await + { + Err(e) => Err(Error::from(e)), + Ok(res) => match res { + Ok(val) => { + match val { + RespValue::Error(err) => { + return Err( + error::ErrorInternalServerError(err), + ); + } + RespValue::SimpleString(s) => { + if let Ok(val) = serde_json::from_str(&s) { + return Ok(Some((val, value))); + } + } + RespValue::BulkString(s) => { + if let Ok(val) = serde_json::from_slice(&s) { + return Ok(Some((val, value))); + } + } + _ => (), + } + Ok(None) + } + Err(err) => Err(error::ErrorInternalServerError(err)), + }, + }; + } else { + return Ok(None); + } + } + } + } + Ok(None) + } + + async fn update( + &self, + mut res: ServiceResponse, + state: impl Iterator, + value: Option, + ) -> Result, Error> { + let (value, jar) = if let Some(value) = value { + (value.clone(), None) + } else { + let value: String = iter::repeat(()) + .map(|()| OsRng.sample(Alphanumeric)) + .take(32) + .collect(); + + // prepare session id cookie + let mut cookie = Cookie::new(self.name.clone(), value.clone()); + cookie.set_path(self.path.clone()); + cookie.set_secure(self.secure); + cookie.set_http_only(true); + + if let Some(ref domain) = self.domain { + cookie.set_domain(domain.clone()); + } + + if let Some(max_age) = self.max_age { + cookie.set_max_age(max_age); + } + + if let Some(same_site) = self.same_site { + cookie.set_same_site(same_site); + } + + // set cookie + let mut jar = CookieJar::new(); + jar.signed(&self.key).add(cookie); + + (value, Some(jar)) + }; + + let cachekey = (self.cache_keygen)(&value); + + let state: HashMap<_, _> = state.collect(); + match serde_json::to_string(&state) { + Err(e) => Err(e.into()), + Ok(body) => { + match self + .addr + .send(Command(resp_array!["SET", cachekey, body, "EX", &self.ttl])) + .await + { + Err(e) => Err(Error::from(e)), + Ok(redis_result) => match redis_result { + Ok(_) => { + if let Some(jar) = jar { + for cookie in jar.delta() { + let val = + HeaderValue::from_str(&cookie.to_string())?; + res.headers_mut().append(header::SET_COOKIE, val); + } + } + Ok(res) + } + Err(err) => Err(error::ErrorInternalServerError(err)), + }, + } + } + } + } + + /// removes cache entry + async fn clear_cache(&self, key: String) -> Result<(), Error> { + let cachekey = (self.cache_keygen)(&key); + + match self.addr.send(Command(resp_array!["DEL", cachekey])).await { + Err(e) => Err(Error::from(e)), + Ok(res) => { + match res { + // redis responds with number of deleted records + Ok(RespValue::Integer(x)) if x > 0 => Ok(()), + _ => Err(error::ErrorInternalServerError( + "failed to remove session from cache", + )), + } + } + } + } + + /// invalidates session cookie + fn remove_cookie(&self, res: &mut ServiceResponse) -> Result<(), Error> { + let mut cookie = Cookie::named(self.name.clone()); + cookie.set_value(""); + cookie.set_max_age(Duration::seconds(0)); + cookie.set_expires(time::now() - Duration::days(365)); + + let val = HeaderValue::from_str(&cookie.to_string()) + .map_err(error::ErrorInternalServerError)?; + res.headers_mut().append(header::SET_COOKIE, val); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use actix_session::Session; + use actix_web::{ + middleware, test, web, + web::{get, post, resource}, + App, HttpResponse, Result, + }; + use serde::{Deserialize, Serialize}; + use serde_json::json; + use time; + + #[derive(Serialize, Deserialize, Debug, PartialEq)] + pub struct IndexResponse { + user_id: Option, + counter: i32, + } + + async fn index(session: Session) -> Result { + let user_id: Option = session.get::("user_id").unwrap(); + let counter: i32 = session + .get::("counter") + .unwrap_or(Some(0)) + .unwrap_or(0); + + Ok(HttpResponse::Ok().json(IndexResponse { user_id, counter })) + } + + async fn do_something(session: Session) -> Result { + let user_id: Option = session.get::("user_id").unwrap(); + let counter: i32 = session + .get::("counter") + .unwrap_or(Some(0)) + .map_or(1, |inner| inner + 1); + session.set("counter", counter)?; + + Ok(HttpResponse::Ok().json(IndexResponse { user_id, counter })) + } + + #[derive(Deserialize)] + struct Identity { + user_id: String, + } + + async fn login( + user_id: web::Json, + session: Session, + ) -> Result { + let id = user_id.into_inner().user_id; + session.set("user_id", &id)?; + session.renew(); + + let counter: i32 = session + .get::("counter") + .unwrap_or(Some(0)) + .unwrap_or(0); + + Ok(HttpResponse::Ok().json(IndexResponse { + user_id: Some(id), + counter, + })) + } + + async fn logout(session: Session) -> Result { + let id: Option = session.get("user_id")?; + if let Some(x) = id { + session.purge(); + Ok(format!("Logged out: {}", x).into()) + } else { + Ok("Could not log out anonymous user".into()) + } + } + + #[actix_rt::test] + async fn test_workflow() { + // Step 1: GET index + // - set-cookie actix-session will be in response (session cookie #1) + // - response should be: {"counter": 0, "user_id": None} + // Step 2: GET index, including session cookie #1 in request + // - set-cookie will *not* be in response + // - response should be: {"counter": 0, "user_id": None} + // Step 3: POST to do_something, including session cookie #1 in request + // - adds new session state in redis: {"counter": 1} + // - response should be: {"counter": 1, "user_id": None} + // Step 4: POST again to do_something, including session cookie #1 in request + // - updates session state in redis: {"counter": 2} + // - response should be: {"counter": 2, "user_id": None} + // Step 5: POST to login, including session cookie #1 in request + // - set-cookie actix-session will be in response (session cookie #2) + // - updates session state in redis: {"counter": 2, "user_id": "ferris"} + // Step 6: GET index, including session cookie #2 in request + // - response should be: {"counter": 2, "user_id": "ferris"} + // Step 7: POST again to do_something, including session cookie #2 in request + // - updates session state in redis: {"counter": 3, "user_id": "ferris"} + // - response should be: {"counter": 2, "user_id": None} + // Step 8: GET index, including session cookie #1 in request + // - set-cookie actix-session will be in response (session cookie #3) + // - response should be: {"counter": 0, "user_id": None} + // Step 9: POST to logout, including session cookie #2 + // - set-cookie actix-session will be in response with session cookie #2 + // invalidation logic + // Step 10: GET index, including session cookie #2 in request + // - set-cookie actix-session will be in response (session cookie #3) + // - response should be: {"counter": 0, "user_id": None} + + let srv = test::start(|| { + App::new() + .wrap( + RedisSession::new("127.0.0.1:6379", &[0; 32]) + .cookie_name("test-session"), + ) + .wrap(middleware::Logger::default()) + .service(resource("/").route(get().to(index))) + .service(resource("/do_something").route(post().to(do_something))) + .service(resource("/login").route(post().to(login))) + .service(resource("/logout").route(post().to(logout))) + }); + + // Step 1: GET index + // - set-cookie actix-session will be in response (session cookie #1) + // - response should be: {"counter": 0, "user_id": None} + let req_1a = srv.get("/").send(); + let mut resp_1 = req_1a.await.unwrap(); + let cookie_1 = resp_1 + .cookies() + .unwrap() + .clone() + .into_iter() + .find(|c| c.name() == "test-session") + .unwrap(); + let result_1 = resp_1.json::().await.unwrap(); + assert_eq!( + result_1, + IndexResponse { + user_id: None, + counter: 0 + } + ); + + // Step 2: GET index, including session cookie #1 in request + // - set-cookie will *not* be in response + // - response should be: {"counter": 0, "user_id": None} + let req_2 = srv.get("/").cookie(cookie_1.clone()).send(); + let resp_2 = req_2.await.unwrap(); + let cookie_2 = resp_2 + .cookies() + .unwrap() + .clone() + .into_iter() + .find(|c| c.name() == "test-session"); + assert_eq!(cookie_2, None); + + // Step 3: POST to do_something, including session cookie #1 in request + // - adds new session state in redis: {"counter": 1} + // - response should be: {"counter": 1, "user_id": None} + let req_3 = srv.post("/do_something").cookie(cookie_1.clone()).send(); + let mut resp_3 = req_3.await.unwrap(); + let result_3 = resp_3.json::().await.unwrap(); + assert_eq!( + result_3, + IndexResponse { + user_id: None, + counter: 1 + } + ); + + // Step 4: POST again to do_something, including session cookie #1 in request + // - updates session state in redis: {"counter": 2} + // - response should be: {"counter": 2, "user_id": None} + let req_4 = srv.post("/do_something").cookie(cookie_1.clone()).send(); + let mut resp_4 = req_4.await.unwrap(); + let result_4 = resp_4.json::().await.unwrap(); + assert_eq!( + result_4, + IndexResponse { + user_id: None, + counter: 2 + } + ); + + // Step 5: POST to login, including session cookie #1 in request + // - set-cookie actix-session will be in response (session cookie #2) + // - updates session state in redis: {"counter": 2, "user_id": "ferris"} + let req_5 = srv + .post("/login") + .cookie(cookie_1.clone()) + .send_json(&json!({"user_id": "ferris"})); + let mut resp_5 = req_5.await.unwrap(); + let cookie_2 = resp_5 + .cookies() + .unwrap() + .clone() + .into_iter() + .find(|c| c.name() == "test-session") + .unwrap(); + assert_eq!( + true, + cookie_1.value().to_string() != cookie_2.value().to_string() + ); + + let result_5 = resp_5.json::().await.unwrap(); + assert_eq!( + result_5, + IndexResponse { + user_id: Some("ferris".into()), + counter: 2 + } + ); + + // Step 6: GET index, including session cookie #2 in request + // - response should be: {"counter": 2, "user_id": "ferris"} + let req_6 = srv.get("/").cookie(cookie_2.clone()).send(); + let mut resp_6 = req_6.await.unwrap(); + let result_6 = resp_6.json::().await.unwrap(); + assert_eq!( + result_6, + IndexResponse { + user_id: Some("ferris".into()), + counter: 2 + } + ); + + // Step 7: POST again to do_something, including session cookie #2 in request + // - updates session state in redis: {"counter": 3, "user_id": "ferris"} + // - response should be: {"counter": 2, "user_id": None} + let req_7 = srv.post("/do_something").cookie(cookie_2.clone()).send(); + let mut resp_7 = req_7.await.unwrap(); + let result_7 = resp_7.json::().await.unwrap(); + assert_eq!( + result_7, + IndexResponse { + user_id: Some("ferris".into()), + counter: 3 + } + ); + + // Step 8: GET index, including session cookie #1 in request + // - set-cookie actix-session will be in response (session cookie #3) + // - response should be: {"counter": 0, "user_id": None} + let req_8 = srv.get("/").cookie(cookie_1.clone()).send(); + let mut resp_8 = req_8.await.unwrap(); + let cookie_3 = resp_8 + .cookies() + .unwrap() + .clone() + .into_iter() + .find(|c| c.name() == "test-session") + .unwrap(); + let result_8 = resp_8.json::().await.unwrap(); + assert_eq!( + result_8, + IndexResponse { + user_id: None, + counter: 0 + } + ); + assert!(cookie_3.value().to_string() != cookie_2.value().to_string()); + + // Step 9: POST to logout, including session cookie #2 + // - set-cookie actix-session will be in response with session cookie #2 + // invalidation logic + let req_9 = srv.post("/logout").cookie(cookie_2.clone()).send(); + let resp_9 = req_9.await.unwrap(); + let cookie_4 = resp_9 + .cookies() + .unwrap() + .clone() + .into_iter() + .find(|c| c.name() == "test-session") + .unwrap(); + assert!(&time::now().tm_year != &cookie_4.expires().map(|t| t.tm_year).unwrap()); + + // Step 10: GET index, including session cookie #2 in request + // - set-cookie actix-session will be in response (session cookie #3) + // - response should be: {"counter": 0, "user_id": None} + let req_10 = srv.get("/").cookie(cookie_2.clone()).send(); + let mut resp_10 = req_10.await.unwrap(); + let result_10 = resp_10.json::().await.unwrap(); + assert_eq!( + result_10, + IndexResponse { + user_id: None, + counter: 0 + } + ); + + let cookie_5 = resp_10 + .cookies() + .unwrap() + .clone() + .into_iter() + .find(|c| c.name() == "test-session") + .unwrap(); + assert!(cookie_5.value().to_string() != cookie_2.value().to_string()); + } +} diff --git a/actix-redis/tests/test_redis.rs b/actix-redis/tests/test_redis.rs new file mode 100644 index 000000000..b9bb9c390 --- /dev/null +++ b/actix-redis/tests/test_redis.rs @@ -0,0 +1,42 @@ +#[macro_use] +extern crate redis_async; + +use actix_redis::{Command, Error, RedisActor, RespValue}; + +#[actix_rt::test] +async fn test_error_connect() { + let addr = RedisActor::start("localhost:54000"); + let _addr2 = addr.clone(); + + let res = addr.send(Command(resp_array!["GET", "test"])).await; + match res { + Ok(Err(Error::NotConnected)) => (), + _ => panic!("Should not happen {:?}", res), + } +} + +#[actix_rt::test] +async fn test_redis() { + env_logger::init(); + + let addr = RedisActor::start("127.0.0.1:6379"); + let res = addr + .send(Command(resp_array!["SET", "test", "value"])) + .await; + + match res { + Ok(Ok(resp)) => { + assert_eq!(resp, RespValue::SimpleString("OK".to_owned())); + + let res = addr.send(Command(resp_array!["GET", "test"])).await; + match res { + Ok(Ok(resp)) => { + println!("RESP: {:?}", resp); + assert_eq!(resp, RespValue::BulkString((&b"value"[..]).into())); + } + _ => panic!("Should not happen {:?}", res), + } + } + _ => panic!("Should not happen {:?}", res), + } +}