From b0854ed1441fcc9e5eb38da42fda0380f8ad34e8 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 21 Mar 2021 22:07:45 -0700 Subject: [PATCH] fix actix-redis by revert most recent changes (#164) --- actix-redis/Cargo.toml | 37 +++--- actix-redis/src/lib.rs | 8 +- actix-redis/src/redis.rs | 203 +++++++++++++++++++------------- actix-redis/src/session.rs | 72 +++++------ actix-redis/tests/test_redis.rs | 37 ++++-- 5 files changed, 207 insertions(+), 150 deletions(-) diff --git a/actix-redis/Cargo.toml b/actix-redis/Cargo.toml index ac6e12f7d..1516d7d47 100644 --- a/actix-redis/Cargo.toml +++ b/actix-redis/Cargo.toml @@ -22,8 +22,8 @@ default = ["web"] # actix-web integration web = [ - "actix-service", - "actix-web", + "actix-web/cookies", + "actix-web/secure-cookies", "actix-session/cookie-session", "rand", "serde", @@ -31,27 +31,30 @@ web = [ ] [dependencies] -log = "0.4.6" -derive_more = "0.99.2" -futures-util = { version = "0.3.7", default-features = false } -futures-channel = { version = "0.3.5", default-features = false } -redis-async = "0.9" -time = "0.2.23" -actix-rt = "2" -tokio = "1" -tokio-util = "0.6" +actix = { version = "0.11.0", default-features = false } +actix-rt = { version = "2.1", default-features = false } +actix-service = "2.0.0-beta.5" +actix-tls = { version = "3.0.0-beta.4", default-features = false, features = ["connect"] } -trust-dns-resolver = { version = "0.20.0", default-features = false, features = ["tokio-runtime", "system-config"] } +log = "0.4.6" +backoff = "0.2.1" +derive_more = "0.99.2" +futures-core = { version = "0.3.7", default-features = false } +redis2 = { package = "redis", version = "0.19.0", features = ["tokio-comp", "tokio-native-tls-comp"] } +redis-async = { version = "0.8", default-features = false, features = ["tokio10"] } +time = "0.2.23" +tokio = { version = "1", features = ["sync"] } +tokio-util = "0.6.1" # actix-session -actix-web = { version = "4.0.0-beta.4", default_features = false, optional = true } -actix-http = { version = "3.0.0-beta.4", optional = true } -actix-service = { version = "2.0.0-beta.5", optional = true } +actix-web = { version = "4.0.0-beta.3", default_features = false, optional = true } actix-session = { version = "0.4.0", optional = true } -rand = { version = "0.8", optional = true } +rand = { version = "0.8.0", optional = true } serde = { version = "1.0.101", optional = true } serde_json = { version = "1.0.40", optional = true } [dev-dependencies] -env_logger = "0.8" +actix-http = "3.0.0-beta.4" +actix-rt = "2.1" +env_logger = "0.7" serde_derive = "1.0" diff --git a/actix-redis/src/lib.rs b/actix-redis/src/lib.rs index 107beb762..a582583f5 100644 --- a/actix-redis/src/lib.rs +++ b/actix-redis/src/lib.rs @@ -3,7 +3,7 @@ #![deny(rust_2018_idioms)] mod redis; -pub use redis::RedisClient; +pub use redis::{Command, RedisActor}; use derive_more::{Display, Error, From}; @@ -25,12 +25,6 @@ pub enum Error { /// Cancel all waters when connection get dropped #[display(fmt = "Redis: Disconnected")] Disconnected, - /// Invalid address - #[display(fmt = "Redis: Invalid address")] - InvalidAddress, - /// DNS resolve error - #[display(fmt = "Redis: DNS resolve error")] - ResolveError, } #[cfg(feature = "web")] diff --git a/actix-redis/src/redis.rs b/actix-redis/src/redis.rs index b1b92b83f..2586b1866 100644 --- a/actix-redis/src/redis.rs +++ b/actix-redis/src/redis.rs @@ -1,98 +1,141 @@ use std::collections::VecDeque; -use std::net::SocketAddr; +use std::io; -use redis_async::client::{paired_connect, PairedConnection}; -use redis_async::resp::RespValue; -use tokio::sync::Mutex; -use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; +use actix::prelude::*; +use actix_rt::net::TcpStream; +use actix_service::boxed::{service, BoxService}; +use actix_tls::connect::{default_connector, Connect, ConnectError, Connection}; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use log::{error, info, warn}; +use redis_async::error::Error as RespError; +use redis_async::resp::{RespCodec, RespValue}; +use tokio::io::{split, WriteHalf}; +use tokio::sync::oneshot; +use tokio_util::codec::FramedRead; use crate::Error; -pub struct RedisClient { - addr: String, - connection: Mutex>, +/// Command for send data to Redis +#[derive(Debug)] +pub struct Command(pub RespValue); + +impl Message for Command { + type Result = Result; } -impl RedisClient { - pub fn new(addr: impl Into) -> Self { - Self { - addr: addr.into(), - connection: Mutex::new(None), +/// Redis communication actor +pub struct RedisActor { + addr: String, + connector: BoxService, Connection, ConnectError>, + backoff: ExponentialBackoff, + cell: Option, RespCodec>>, + queue: VecDeque>>, +} + +impl RedisActor { + /// Start new `Supervisor` with `RedisActor`. + pub fn start>(addr: S) -> Addr { + let addr = addr.into(); + + let backoff = ExponentialBackoff { + max_elapsed_time: None, + ..Default::default() + }; + + Supervisor::start(|_| RedisActor { + addr, + connector: service(default_connector()), + cell: None, + backoff, + queue: VecDeque::new(), + }) + } +} + +impl Actor for RedisActor { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + let req = Connect::new(self.addr.to_owned()); + self.connector + .call(req) + .into_actor(self) + .map(|res, act, ctx| match res { + Ok(conn) => { + let stream = conn.into_parts().0; + info!("Connected to redis server: {}", act.addr); + + let (r, w) = split(stream); + + // configure write side of the connection + let framed = actix::io::FramedWrite::new(w, RespCodec, ctx); + act.cell = Some(framed); + + // read side of the connection + ctx.add_stream(FramedRead::new(r, RespCodec)); + + act.backoff.reset(); + } + Err(err) => { + error!("Can not connect to redis server: {}", err); + // re-connect with backoff time. + // we stop current context, supervisor will restart it. + if let Some(timeout) = act.backoff.next_backoff() { + ctx.run_later(timeout, |_, ctx| ctx.stop()); + } + } + }) + .wait(ctx); + } +} + +impl Supervised for RedisActor { + fn restarting(&mut self, _: &mut Self::Context) { + self.cell.take(); + for tx in self.queue.drain(..) { + let _ = tx.send(Err(Error::Disconnected)); } } +} - async fn get_connection(&self) -> Result { - let mut connection = self.connection.lock().await; - if let Some(ref connection) = *connection { - return Ok(connection.clone()); - } +impl actix::io::WriteHandler for RedisActor { + fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running { + warn!("Redis connection dropped: {} error: {}", self.addr, err); + Running::Stop + } +} - let mut addrs = resolve(&self.addr).await?; - loop { - // try to connect - let socket_addr = addrs.pop_front().ok_or_else(|| { - log::warn!("Cannot connect to {}.", self.addr); - Error::NotConnected - })?; - match paired_connect(socket_addr).await { - Ok(conn) => { - *connection = Some(conn.clone()); - return Ok(conn); +impl StreamHandler> for RedisActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Err(e) => { + if let Some(tx) = self.queue.pop_front() { + let _ = tx.send(Err(e.into())); + } + ctx.stop(); + } + Ok(val) => { + if let Some(tx) = self.queue.pop_front() { + let _ = tx.send(Ok(val)); } - Err(err) => log::warn!( - "Attempt to connect to {} as {} failed: {}.", - self.addr, - socket_addr, - err - ), } } } +} - pub async fn send(&self, req: RespValue) -> Result { - let res = self.get_connection().await?.send(req).await?; - Ok(res) +impl Handler for RedisActor { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result { + let (tx, rx) = oneshot::channel(); + if let Some(ref mut cell) = self.cell { + self.queue.push_back(tx); + cell.write(msg.0); + } else { + let _ = tx.send(Err(Error::NotConnected)); + } + + Box::pin(async move { rx.await.map_err(|_| Error::Disconnected)? }) } } - -fn parse_addr(addr: &str, default_port: u16) -> Option<(&str, u16)> { - // split the string by ':' and convert the second part to u16 - let mut parts_iter = addr.splitn(2, ':'); - let host = parts_iter.next()?; - let port_str = parts_iter.next().unwrap_or(""); - let port: u16 = port_str.parse().unwrap_or(default_port); - Some((host, port)) -} - -async fn resolve(addr: &str) -> Result, Error> { - // try to parse as a regular SocketAddr first - if let Ok(addr) = addr.parse::() { - let mut addrs = VecDeque::new(); - addrs.push_back(addr); - return Ok(addrs); - } - - let (host, port) = parse_addr(addr, 6379).ok_or(Error::InvalidAddress)?; - - // we need to do dns resolution - let resolver = AsyncResolver::tokio_from_system_conf() - .or_else(|err| { - log::warn!("Cannot create system DNS resolver: {}", err); - AsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()) - }) - .map_err(|err| { - log::error!("Cannot create DNS resolver: {}", err); - Error::ResolveError - })?; - - let addrs = resolver - .lookup_ip(host) - .await - .map_err(|_| Error::ResolveError)? - .into_iter() - .map(|ip| SocketAddr::new(ip, port)) - .collect(); - - Ok(addrs) -} diff --git a/actix-redis/src/session.rs b/actix-redis/src/session.rs index d185aefa4..be80eccbb 100644 --- a/actix-redis/src/session.rs +++ b/actix-redis/src/session.rs @@ -1,21 +1,19 @@ -use std::cell::RefCell; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::{collections::HashMap, iter, rc::Rc}; +use actix::prelude::*; use actix_service::{Service, Transform}; use actix_session::{Session, SessionStatus}; use actix_web::cookie::{Cookie, CookieJar, Key, SameSite}; use actix_web::dev::{ServiceRequest, ServiceResponse}; use actix_web::http::header::{self, HeaderValue}; use actix_web::{error, Error, HttpMessage}; -use futures_util::future::{ok, Future, Ready}; +use futures_core::future::LocalBoxFuture; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use redis_async::resp::RespValue; use redis_async::resp_array; use time::{self, Duration, OffsetDateTime}; -use crate::redis::RedisClient; +use crate::redis::{Command, RedisActor}; /// Use redis as session storage. /// @@ -35,7 +33,7 @@ impl RedisSession { key: Key::derive_from(key), cache_keygen: Box::new(|key: &str| format!("session:{}", &key)), ttl: "7200".to_owned(), - redis_client: RedisClient::new(addr), + addr: RedisActor::start(addr), name: "actix-session".to_owned(), path: "/".to_owned(), domain: None, @@ -120,21 +118,24 @@ where { type Response = ServiceResponse; type Error = S::Error; - type InitError = (); type Transform = RedisSessionMiddleware; - type Future = Ready>; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; fn new_transform(&self, service: S) -> Self::Future { - ok(RedisSessionMiddleware { - service: Rc::new(RefCell::new(service)), - inner: self.0.clone(), + let inner = self.0.clone(); + Box::pin(async { + Ok(RedisSessionMiddleware { + service: Rc::new(service), + inner, + }) }) } } /// Cookie session middleware pub struct RedisSessionMiddleware { - service: Rc>, + service: Rc, inner: Rc, } @@ -146,12 +147,9 @@ where { type Response = ServiceResponse; type Error = Error; - #[allow(clippy::type_complexity)] - type Future = Pin>>>; + type Future = LocalBoxFuture<'static, Result>; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.service.borrow_mut().poll_ready(cx) - } + actix_service::forward_ready!(service); fn call(&self, mut req: ServiceRequest) -> Self::Future { let srv = self.service.clone(); @@ -210,7 +208,7 @@ struct Inner { key: Key, cache_keygen: Box String>, ttl: String, - redis_client: RedisClient, + addr: Addr, name: String, path: String, domain: Option, @@ -252,9 +250,11 @@ impl Inner { }; let val = self - .redis_client - .send(resp_array!["GET", cache_key]) - .await?; + .addr + .send(Command(resp_array!["GET", cache_key])) + .await + .map_err(error::ErrorInternalServerError)? + .map_err(error::ErrorInternalServerError)?; match val { RespValue::Error(err) => { @@ -285,11 +285,11 @@ impl Inner { let (value, jar) = if let Some(value) = value { (value, None) } else { - let value: String = iter::repeat(()) + let value = iter::repeat(()) .map(|()| OsRng.sample(Alphanumeric)) - .map(char::from) .take(32) - .collect(); + .collect::>(); + let value = String::from_utf8(value).unwrap_or_default(); // prepare session id cookie let mut cookie = Cookie::new(self.name.clone(), value.clone()); @@ -325,9 +325,13 @@ impl Inner { Ok(body) => body, }; - self.redis_client - .send(resp_array!["SET", cache_key, body, "EX", &self.ttl]) - .await?; + let cmd = Command(resp_array!["SET", cache_key, body, "EX", &self.ttl]); + + self.addr + .send(cmd) + .await + .map_err(error::ErrorInternalServerError)? + .map_err(error::ErrorInternalServerError)?; if let Some(jar) = jar { for cookie in jar.delta() { @@ -343,13 +347,15 @@ impl Inner { async fn clear_cache(&self, key: String) -> Result<(), Error> { let cache_key = (self.cache_keygen)(&key); - match self - .redis_client - .send(resp_array!["DEL", cache_key]) - .await? - { + let res = self + .addr + .send(Command(resp_array!["DEL", cache_key])) + .await + .map_err(error::ErrorInternalServerError)?; + + match res { // redis responds with number of deleted records - RespValue::Integer(x) if x > 0 => Ok(()), + Ok(RespValue::Integer(x)) if x > 0 => Ok(()), _ => Err(error::ErrorInternalServerError( "failed to remove session from cache", )), diff --git a/actix-redis/tests/test_redis.rs b/actix-redis/tests/test_redis.rs index 6d53aec79..b9bb9c390 100644 --- a/actix-redis/tests/test_redis.rs +++ b/actix-redis/tests/test_redis.rs @@ -1,31 +1,42 @@ #[macro_use] extern crate redis_async; -use actix_redis::{Error, RedisClient, RespValue}; +use actix_redis::{Command, Error, RedisActor, RespValue}; #[actix_rt::test] async fn test_error_connect() { - let addr = RedisClient::new("localhost:54000"); + let addr = RedisActor::start("localhost:54000"); + let _addr2 = addr.clone(); - let res = addr.send(resp_array!["GET", "test"]).await; + let res = addr.send(Command(resp_array!["GET", "test"])).await; match res { - Err(Error::NotConnected) => (), + Ok(Err(Error::NotConnected)) => (), _ => panic!("Should not happen {:?}", res), } } #[actix_rt::test] -async fn test_redis() -> Result<(), Error> { +async fn test_redis() { env_logger::init(); - let addr = RedisClient::new("127.0.0.1:6379"); + let addr = RedisActor::start("127.0.0.1:6379"); + let res = addr + .send(Command(resp_array!["SET", "test", "value"])) + .await; - let resp = addr.send(resp_array!["SET", "test", "value"]).await?; + match res { + Ok(Ok(resp)) => { + assert_eq!(resp, RespValue::SimpleString("OK".to_owned())); - assert_eq!(resp, RespValue::SimpleString("OK".to_owned())); - - let resp = addr.send(resp_array!["GET", "test"]).await?; - println!("RESP: {:?}", resp); - assert_eq!(resp, RespValue::BulkString((&b"value"[..]).into())); - Ok(()) + let res = addr.send(Command(resp_array!["GET", "test"])).await; + match res { + Ok(Ok(resp)) => { + println!("RESP: {:?}", resp); + assert_eq!(resp, RespValue::BulkString((&b"value"[..]).into())); + } + _ => panic!("Should not happen {:?}", res), + } + } + _ => panic!("Should not happen {:?}", res), + } }