1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-07-01 20:25:09 +02:00

Update dependencies (Tokio 1.0) (#144)

This commit is contained in:
Andrey Kutejko
2021-03-21 23:50:26 +01:00
committed by GitHub
parent 86ff1302ad
commit ca85f6b245
34 changed files with 429 additions and 503 deletions

View File

@ -3,7 +3,7 @@
#![deny(rust_2018_idioms)]
mod redis;
pub use redis::{Command, RedisActor};
pub use redis::RedisClient;
use derive_more::{Display, Error, From};
@ -25,6 +25,12 @@ pub enum Error {
/// Cancel all waters when connection get dropped
#[display(fmt = "Redis: Disconnected")]
Disconnected,
/// Invalid address
#[display(fmt = "Redis: Invalid address")]
InvalidAddress,
/// DNS resolve error
#[display(fmt = "Redis: DNS resolve error")]
ResolveError,
}
#[cfg(feature = "web")]

View File

@ -1,150 +1,98 @@
use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use actix::actors::resolver::{Connect, Resolver};
use actix::prelude::*;
use actix_utils::oneshot;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use futures_util::FutureExt;
use log::{error, info, warn};
use redis_async::error::Error as RespError;
use redis_async::resp::{RespCodec, RespValue};
use tokio::io::{split, WriteHalf};
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
use redis_async::client::{paired_connect, PairedConnection};
use redis_async::resp::RespValue;
use tokio::sync::Mutex;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
use crate::Error;
/// Command for send data to Redis
#[derive(Debug)]
pub struct Command(pub RespValue);
impl Message for Command {
type Result = Result<RespValue, Error>;
}
/// Redis comminucation actor
pub struct RedisActor {
pub struct RedisClient {
addr: String,
backoff: ExponentialBackoff,
cell: Option<actix::io::FramedWrite<RespValue, WriteHalf<TcpStream>, RespCodec>>,
queue: VecDeque<oneshot::Sender<Result<RespValue, Error>>>,
connection: Mutex<Option<PairedConnection>>,
}
impl RedisActor {
/// Start new `Supervisor` with `RedisActor`.
pub fn start<S: Into<String>>(addr: S) -> Addr<RedisActor> {
let addr = addr.into();
impl RedisClient {
pub fn new(addr: impl Into<String>) -> Self {
Self {
addr: addr.into(),
connection: Mutex::new(None),
}
}
let backoff = ExponentialBackoff {
max_elapsed_time: None,
..Default::default()
};
async fn get_connection(&self) -> Result<PairedConnection, Error> {
let mut connection = self.connection.lock().await;
if let Some(ref connection) = *connection {
return Ok(connection.clone());
}
Supervisor::start(|_| RedisActor {
addr,
cell: None,
backoff,
queue: VecDeque::new(),
let mut addrs = resolve(&self.addr).await?;
loop {
// try to connect
let socket_addr = addrs.pop_front().ok_or_else(|| {
log::warn!("Cannot connect to {}.", self.addr);
Error::NotConnected
})?;
match paired_connect(socket_addr).await {
Ok(conn) => {
*connection = Some(conn.clone());
return Ok(conn);
}
Err(err) => log::warn!(
"Attempt to connect to {} as {} failed: {}.",
self.addr,
socket_addr,
err
),
}
}
}
pub async fn send(&self, req: RespValue) -> Result<RespValue, Error> {
let res = self.get_connection().await?.send(req).await?;
Ok(res)
}
}
fn parse_addr(addr: &str, default_port: u16) -> Option<(&str, u16)> {
// split the string by ':' and convert the second part to u16
let mut parts_iter = addr.splitn(2, ':');
let host = parts_iter.next()?;
let port_str = parts_iter.next().unwrap_or("");
let port: u16 = port_str.parse().unwrap_or(default_port);
Some((host, port))
}
async fn resolve(addr: &str) -> Result<VecDeque<SocketAddr>, Error> {
// try to parse as a regular SocketAddr first
if let Ok(addr) = addr.parse::<SocketAddr>() {
let mut addrs = VecDeque::new();
addrs.push_back(addr);
return Ok(addrs);
}
let (host, port) = parse_addr(addr, 6379).ok_or(Error::InvalidAddress)?;
// we need to do dns resolution
let resolver = AsyncResolver::tokio_from_system_conf()
.or_else(|err| {
log::warn!("Cannot create system DNS resolver: {}", err);
AsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default())
})
}
}
impl Actor for RedisActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
Resolver::from_registry()
.send(Connect::host(self.addr.as_str()))
.into_actor(self)
.map(|res, act, ctx| match res {
Ok(res) => match res {
Ok(stream) => {
info!("Connected to redis server: {}", act.addr);
let (r, w) = split(stream);
// configure write side of the connection
let framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
act.cell = Some(framed);
// read side of the connection
ctx.add_stream(FramedRead::new(r, RespCodec));
act.backoff.reset();
}
Err(err) => {
error!("Can not connect to redis server: {}", err);
// re-connect with backoff time.
// we stop current context, supervisor will restart it.
if let Some(timeout) = act.backoff.next_backoff() {
ctx.run_later(timeout, |_, ctx| ctx.stop());
}
}
},
Err(err) => {
error!("Can not connect to redis server: {}", err);
// re-connect with backoff time.
// we stop current context, supervisor will restart it.
if let Some(timeout) = act.backoff.next_backoff() {
ctx.run_later(timeout, |_, ctx| ctx.stop());
}
}
})
.wait(ctx);
}
}
impl Supervised for RedisActor {
fn restarting(&mut self, _: &mut Self::Context) {
self.cell.take();
for tx in self.queue.drain(..) {
let _ = tx.send(Err(Error::Disconnected));
}
}
}
impl actix::io::WriteHandler<io::Error> for RedisActor {
fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
warn!("Redis connection dropped: {} error: {}", self.addr, err);
Running::Stop
}
}
impl StreamHandler<Result<RespValue, RespError>> for RedisActor {
fn handle(&mut self, msg: Result<RespValue, RespError>, ctx: &mut Self::Context) {
match msg {
Err(e) => {
if let Some(tx) = self.queue.pop_front() {
let _ = tx.send(Err(e.into()));
}
ctx.stop();
}
Ok(val) => {
if let Some(tx) = self.queue.pop_front() {
let _ = tx.send(Ok(val));
}
}
}
}
}
impl Handler<Command> for RedisActor {
type Result = ResponseFuture<Result<RespValue, Error>>;
fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
let (tx, rx) = oneshot::channel();
if let Some(ref mut cell) = self.cell {
self.queue.push_back(tx);
cell.write(msg.0);
} else {
let _ = tx.send(Err(Error::NotConnected));
}
Box::pin(rx.map(|res| match res {
Ok(res) => res,
Err(_) => Err(Error::Disconnected),
}))
}
.map_err(|err| {
log::error!("Cannot create DNS resolver: {}", err);
Error::ResolveError
})?;
let addrs = resolver
.lookup_ip(host)
.await
.map_err(|_| Error::ResolveError)?
.into_iter()
.map(|ip| SocketAddr::new(ip, port))
.collect();
Ok(addrs)
}

View File

@ -3,7 +3,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::{collections::HashMap, iter, rc::Rc};
use actix::prelude::*;
use actix_service::{Service, Transform};
use actix_session::{Session, SessionStatus};
use actix_web::cookie::{Cookie, CookieJar, Key, SameSite};
@ -16,7 +15,7 @@ use redis_async::resp::RespValue;
use redis_async::resp_array;
use time::{self, Duration, OffsetDateTime};
use crate::redis::{Command, RedisActor};
use crate::redis::RedisClient;
/// Use redis as session storage.
///
@ -36,7 +35,7 @@ impl RedisSession {
key: Key::derive_from(key),
cache_keygen: Box::new(|key: &str| format!("session:{}", &key)),
ttl: "7200".to_owned(),
addr: RedisActor::start(addr),
redis_client: RedisClient::new(addr),
name: "actix-session".to_owned(),
path: "/".to_owned(),
domain: None,
@ -113,14 +112,12 @@ impl RedisSession {
}
}
impl<S, B> Transform<S> for RedisSession
impl<S, B> Transform<S, ServiceRequest> for RedisSession
where
S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>
+ 'static,
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Request = ServiceRequest;
type Response = ServiceResponse<B>;
type Error = S::Error;
type InitError = ();
@ -141,25 +138,23 @@ pub struct RedisSessionMiddleware<S: 'static> {
inner: Rc<Inner>,
}
impl<S, B> Service for RedisSessionMiddleware<S>
impl<S, B> Service<ServiceRequest> for RedisSessionMiddleware<S>
where
S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>
+ 'static,
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Request = ServiceRequest;
type Response = ServiceResponse<B>;
type Error = Error;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.borrow_mut().poll_ready(cx)
}
fn call(&mut self, mut req: ServiceRequest) -> Self::Future {
let mut srv = self.service.clone();
fn call(&self, mut req: ServiceRequest) -> Self::Future {
let srv = self.service.clone();
let inner = self.inner.clone();
Box::pin(async move {
@ -215,7 +210,7 @@ struct Inner {
key: Key,
cache_keygen: Box<dyn Fn(&str) -> String>,
ttl: String,
addr: Addr<RedisActor>,
redis_client: RedisClient,
name: String,
path: String,
domain: Option<String>,
@ -256,13 +251,11 @@ impl Inner {
}
};
let res = self
.addr
.send(Command(resp_array!["GET", cache_key]))
let val = self
.redis_client
.send(resp_array!["GET", cache_key])
.await?;
let val = res.map_err(error::ErrorInternalServerError)?;
match val {
RespValue::Error(err) => {
return Err(error::ErrorInternalServerError(err));
@ -294,6 +287,7 @@ impl Inner {
} else {
let value: String = iter::repeat(())
.map(|()| OsRng.sample(Alphanumeric))
.map(char::from)
.take(32)
.collect();
@ -331,12 +325,9 @@ impl Inner {
Ok(body) => body,
};
let cmd = Command(resp_array!["SET", cache_key, body, "EX", &self.ttl]);
self.addr
.send(cmd)
.await?
.map_err(error::ErrorInternalServerError)?;
self.redis_client
.send(resp_array!["SET", cache_key, body, "EX", &self.ttl])
.await?;
if let Some(jar) = jar {
for cookie in jar.delta() {
@ -352,17 +343,16 @@ impl Inner {
async fn clear_cache(&self, key: String) -> Result<(), Error> {
let cache_key = (self.cache_keygen)(&key);
match self.addr.send(Command(resp_array!["DEL", cache_key])).await {
Err(e) => Err(Error::from(e)),
Ok(res) => {
match res {
// redis responds with number of deleted records
Ok(RespValue::Integer(x)) if x > 0 => Ok(()),
_ => Err(error::ErrorInternalServerError(
"failed to remove session from cache",
)),
}
}
match self
.redis_client
.send(resp_array!["DEL", cache_key])
.await?
{
// redis responds with number of deleted records
RespValue::Integer(x) if x > 0 => Ok(()),
_ => Err(error::ErrorInternalServerError(
"failed to remove session from cache",
)),
}
}
@ -406,7 +396,7 @@ mod test {
.unwrap_or(Some(0))
.unwrap_or(0);
Ok(HttpResponse::Ok().json(IndexResponse { user_id, counter }))
Ok(HttpResponse::Ok().json(&IndexResponse { user_id, counter }))
}
async fn do_something(session: Session) -> Result<HttpResponse> {
@ -417,7 +407,7 @@ mod test {
.map_or(1, |inner| inner + 1);
session.set("counter", counter)?;
Ok(HttpResponse::Ok().json(IndexResponse { user_id, counter }))
Ok(HttpResponse::Ok().json(&IndexResponse { user_id, counter }))
}
#[derive(Deserialize)]
@ -438,7 +428,7 @@ mod test {
.unwrap_or(Some(0))
.unwrap_or(0);
Ok(HttpResponse::Ok().json(IndexResponse {
Ok(HttpResponse::Ok().json(&IndexResponse {
user_id: Some(id),
counter,
}))