2017-12-28 21:14:04 -08:00
|
|
|
use std::collections::VecDeque;
|
2021-03-21 22:07:45 -07:00
|
|
|
use std::io;
|
2017-12-28 21:14:04 -08:00
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
use actix::prelude::*;
|
2022-02-03 22:40:02 +00:00
|
|
|
use actix_rt::net::TcpStream;
|
2021-12-11 16:05:21 +00:00
|
|
|
use actix_service::boxed::{self, BoxService};
|
|
|
|
use actix_tls::connect::{ConnectError, ConnectInfo, Connection, ConnectorService};
|
2021-03-21 22:07:45 -07:00
|
|
|
use backoff::backoff::Backoff;
|
|
|
|
use backoff::ExponentialBackoff;
|
|
|
|
use log::{error, info, warn};
|
|
|
|
use redis_async::error::Error as RespError;
|
|
|
|
use redis_async::resp::{RespCodec, RespValue};
|
|
|
|
use tokio::io::{split, WriteHalf};
|
|
|
|
use tokio::sync::oneshot;
|
|
|
|
use tokio_util::codec::FramedRead;
|
2017-12-28 21:14:04 -08:00
|
|
|
|
2019-03-29 11:31:48 -07:00
|
|
|
use crate::Error;
|
2017-12-28 21:14:04 -08:00
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
/// Command for send data to Redis
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct Command(pub RespValue);
|
|
|
|
|
|
|
|
impl Message for Command {
|
|
|
|
type Result = Result<RespValue, Error>;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Redis communication actor
|
|
|
|
pub struct RedisActor {
|
2018-01-22 00:40:50 -08:00
|
|
|
addr: String,
|
2021-12-11 16:05:21 +00:00
|
|
|
connector: BoxService<ConnectInfo<String>, Connection<String, TcpStream>, ConnectError>,
|
2021-03-21 22:07:45 -07:00
|
|
|
backoff: ExponentialBackoff,
|
|
|
|
cell: Option<actix::io::FramedWrite<RespValue, WriteHalf<TcpStream>, RespCodec>>,
|
|
|
|
queue: VecDeque<oneshot::Sender<Result<RespValue, Error>>>,
|
2017-12-28 21:14:04 -08:00
|
|
|
}
|
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
impl RedisActor {
|
|
|
|
/// Start new `Supervisor` with `RedisActor`.
|
|
|
|
pub fn start<S: Into<String>>(addr: S) -> Addr<RedisActor> {
|
|
|
|
let addr = addr.into();
|
|
|
|
|
|
|
|
let backoff = ExponentialBackoff {
|
|
|
|
max_elapsed_time: None,
|
|
|
|
..Default::default()
|
|
|
|
};
|
|
|
|
|
|
|
|
Supervisor::start(|_| RedisActor {
|
|
|
|
addr,
|
2021-12-11 16:05:21 +00:00
|
|
|
connector: boxed::service(ConnectorService::default()),
|
2021-03-21 22:07:45 -07:00
|
|
|
cell: None,
|
|
|
|
backoff,
|
|
|
|
queue: VecDeque::new(),
|
|
|
|
})
|
2017-12-28 21:14:04 -08:00
|
|
|
}
|
2021-03-21 22:07:45 -07:00
|
|
|
}
|
2018-02-15 16:53:05 -08:00
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
impl Actor for RedisActor {
|
|
|
|
type Context = Context<Self>;
|
2018-02-15 16:53:05 -08:00
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
fn started(&mut self, ctx: &mut Context<Self>) {
|
2021-12-11 16:05:21 +00:00
|
|
|
let req = ConnectInfo::new(self.addr.to_owned());
|
2021-03-21 22:07:45 -07:00
|
|
|
self.connector
|
|
|
|
.call(req)
|
|
|
|
.into_actor(self)
|
|
|
|
.map(|res, act, ctx| match res {
|
2021-03-21 23:50:26 +01:00
|
|
|
Ok(conn) => {
|
2021-03-21 22:07:45 -07:00
|
|
|
let stream = conn.into_parts().0;
|
|
|
|
info!("Connected to redis server: {}", act.addr);
|
|
|
|
|
|
|
|
let (r, w) = split(stream);
|
|
|
|
|
|
|
|
// configure write side of the connection
|
|
|
|
let framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
|
|
|
|
act.cell = Some(framed);
|
|
|
|
|
|
|
|
// read side of the connection
|
|
|
|
ctx.add_stream(FramedRead::new(r, RespCodec));
|
|
|
|
|
|
|
|
act.backoff.reset();
|
2018-02-15 16:53:05 -08:00
|
|
|
}
|
2021-03-21 22:07:45 -07:00
|
|
|
Err(err) => {
|
|
|
|
error!("Can not connect to redis server: {}", err);
|
|
|
|
// re-connect with backoff time.
|
|
|
|
// we stop current context, supervisor will restart it.
|
|
|
|
if let Some(timeout) = act.backoff.next_backoff() {
|
|
|
|
ctx.run_later(timeout, |_, ctx| ctx.stop());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.wait(ctx);
|
2018-01-22 00:40:50 -08:00
|
|
|
}
|
2021-03-21 22:07:45 -07:00
|
|
|
}
|
2017-12-28 21:14:04 -08:00
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
impl Supervised for RedisActor {
|
|
|
|
fn restarting(&mut self, _: &mut Self::Context) {
|
|
|
|
self.cell.take();
|
|
|
|
for tx in self.queue.drain(..) {
|
|
|
|
let _ = tx.send(Err(Error::Disconnected));
|
|
|
|
}
|
2018-02-15 16:53:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
impl actix::io::WriteHandler<io::Error> for RedisActor {
|
|
|
|
fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
|
|
|
|
warn!("Redis connection dropped: {} error: {}", self.addr, err);
|
|
|
|
Running::Stop
|
|
|
|
}
|
2017-12-28 21:14:04 -08:00
|
|
|
}
|
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
impl StreamHandler<Result<RespValue, RespError>> for RedisActor {
|
|
|
|
fn handle(&mut self, msg: Result<RespValue, RespError>, ctx: &mut Self::Context) {
|
|
|
|
match msg {
|
|
|
|
Err(e) => {
|
|
|
|
if let Some(tx) = self.queue.pop_front() {
|
|
|
|
let _ = tx.send(Err(e.into()));
|
|
|
|
}
|
|
|
|
ctx.stop();
|
|
|
|
}
|
|
|
|
Ok(val) => {
|
|
|
|
if let Some(tx) = self.queue.pop_front() {
|
|
|
|
let _ = tx.send(Ok(val));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-03-21 23:50:26 +01:00
|
|
|
}
|
2021-03-21 22:07:45 -07:00
|
|
|
}
|
2018-01-05 14:52:07 -08:00
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
impl Handler<Command> for RedisActor {
|
|
|
|
type Result = ResponseFuture<Result<RespValue, Error>>;
|
2017-12-28 21:14:04 -08:00
|
|
|
|
2021-03-21 22:07:45 -07:00
|
|
|
fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
if let Some(ref mut cell) = self.cell {
|
|
|
|
self.queue.push_back(tx);
|
|
|
|
cell.write(msg.0);
|
|
|
|
} else {
|
|
|
|
let _ = tx.send(Err(Error::NotConnected));
|
|
|
|
}
|
|
|
|
|
|
|
|
Box::pin(async move { rx.await.map_err(|_| Error::Disconnected)? })
|
|
|
|
}
|
2017-12-28 21:14:04 -08:00
|
|
|
}
|