1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00
actix-extras/src/redis.rs
2018-02-17 21:37:30 +03:00

144 lines
4.3 KiB
Rust

use std::io;
use std::collections::VecDeque;
use actix::prelude::*;
use actix::actors::{Connect, Connector};
use backoff::ExponentialBackoff;
use backoff::backoff::Backoff;
use futures::Future;
use futures::unsync::oneshot;
use tokio_io::AsyncRead;
use tokio_io::io::WriteHalf;
use tokio_io::codec::FramedRead;
use tokio_core::net::TcpStream;
use redis_async::error::Error as RespError;
use redis_async::resp::{RespCodec, RespValue};
use Error;
/// Command for send data to Redis
#[derive(Debug)]
pub struct Command(pub RespValue);
impl Message for Command {
type Result = Result<RespValue, Error>;
}
/// Redis comminucation actor
pub struct RedisActor {
addr: String,
backoff: ExponentialBackoff,
cell: Option<actix::io::FramedWrite<WriteHalf<TcpStream>, RespCodec>>,
queue: VecDeque<oneshot::Sender<Result<RespValue, Error>>>,
}
impl RedisActor {
/// Start new `Supervisor` with `RedisActor`.
pub fn start<S: Into<String>>(addr: S) -> Addr<Unsync, RedisActor> {
let addr = addr.into();
Supervisor::start(|_| {
RedisActor { addr: addr,
cell: None,
backoff: ExponentialBackoff::default(),
queue: VecDeque::new() }
})
}
}
impl Actor for RedisActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
Connector::from_registry().send(Connect::host(self.addr.as_str()))
.into_actor(self)
.map(|res, act, ctx| match res {
Ok(stream) => {
info!("Connected to redis server: {}", act.addr);
let (r, w) = stream.split();
// configure write side of the connection
let mut framed = actix::io::FramedWrite::new(w, RespCodec, ctx);
act.cell = Some(framed);
// read side of the connection
ctx.add_stream(FramedRead::new(r, RespCodec));
act.backoff.reset();
},
Err(err) => {
error!("Can not connect to redis server: {}", err);
// re-connect with backoff time.
// we stop current context, supervisor will restart it.
if let Some(timeout) = act.backoff.next_backoff() {
ctx.run_later(timeout, |_, ctx| ctx.stop());
} else {
ctx.stop();
}
}
})
.map_err(|err, act, ctx| {
error!("Can not connect to redis server: {}", err);
// re-connect with backoff time.
// we stop current context, supervisor will restart it.
if let Some(timeout) = act.backoff.next_backoff() {
ctx.run_later(timeout, |_, ctx| ctx.stop());
} else {
ctx.stop();
}
})
.wait(ctx);
}
}
impl Supervised for RedisActor {
fn restarting(&mut self, _: &mut Self::Context) {
self.cell.take();
for tx in self.queue.drain(..) {
let _ = tx.send(Err(Error::Disconnected));
}
}
}
impl actix::io::WriteHandler<io::Error> for RedisActor {
fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
warn!("Redis connection dropped: {} error: {}", self.addr, err);
Running::Stop
}
}
impl StreamHandler<RespValue, RespError> for RedisActor {
fn error(&mut self, err: RespError, _: &mut Self::Context) -> Running {
if let Some(tx) = self.queue.pop_front() {
let _ = tx.send(Err(err.into()));
}
Running::Stop
}
fn handle(&mut self, msg: RespValue, _: &mut Self::Context) {
if let Some(tx) = self.queue.pop_front() {
let _ = tx.send(Ok(msg));
}
}
}
impl Handler<Command> for RedisActor {
type Result = ResponseFuture<RespValue, Error>;
fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
let (tx, rx) = oneshot::channel();
if let Some(ref mut cell) = self.cell {
self.queue.push_back(tx);
cell.write(msg.0);
} else {
let _ = tx.send(Err(Error::NotConnected));
}
Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res))
}
}