use std::io; use std::collections::VecDeque; use actix::prelude::*; use actix::actors::{Connect, Connector}; use backoff::ExponentialBackoff; use backoff::backoff::Backoff; use futures::Future; use futures::unsync::oneshot; use tokio_io::AsyncRead; use tokio_io::io::WriteHalf; use tokio_io::codec::FramedRead; use tokio_core::net::TcpStream; use redis_async::error::Error as RespError; use redis_async::resp::{RespCodec, RespValue}; use Error; #[derive(Debug)] pub struct Command(pub RespValue); impl Message for Command { type Result = Result; } /// Redis comminucation actor pub struct RedisActor { addr: String, backoff: ExponentialBackoff, cell: Option, RespCodec>>, queue: VecDeque>>, } impl RedisActor { pub fn start>(addr: S) -> Addr { let addr = addr.into(); Supervisor::start(|_| { RedisActor { addr: addr, cell: None, backoff: ExponentialBackoff::default(), queue: VecDeque::new() } }) } } impl Actor for RedisActor { type Context = Context; fn started(&mut self, ctx: &mut Context) { Connector::from_registry().send(Connect::host(self.addr.as_str())) .into_actor(self) .map(|res, act, ctx| match res { Ok(stream) => { info!("Connected to redis server: {}", act.addr); let (r, w) = stream.split(); // configure write side of the connection let mut framed = actix::io::FramedWrite::new(w, RespCodec, ctx); act.cell = Some(framed); // read side of the connection ctx.add_stream(FramedRead::new(r, RespCodec)); act.backoff.reset(); }, Err(err) => { error!("Can not connect to redis server: {}", err); // re-connect with backoff time. // we stop currect context, supervisor will restart it. if let Some(timeout) = act.backoff.next_backoff() { ctx.run_later(timeout, |_, ctx| ctx.stop()); } else { ctx.stop(); } } }) .map_err(|err, act, ctx| { error!("Can not connect to redis server: {}", err); // re-connect with backoff time. // we stop currect context, supervisor will restart it. if let Some(timeout) = act.backoff.next_backoff() { ctx.run_later(timeout, |_, ctx| ctx.stop()); } else { ctx.stop(); } }) .wait(ctx); } } impl Supervised for RedisActor { fn restarting(&mut self, _: &mut Self::Context) { self.cell.take(); for tx in self.queue.drain(..) { let _ = tx.send(Err(Error::Disconnected)); } } } impl actix::io::WriteHandler for RedisActor { fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running { warn!("Redis connection dropped: {} error: {}", self.addr, err); Running::Stop } } impl StreamHandler for RedisActor { fn error(&mut self, err: RespError, _: &mut Self::Context) -> Running { if let Some(tx) = self.queue.pop_front() { let _ = tx.send(Err(err.into())); } Running::Stop } fn handle(&mut self, msg: RespValue, _: &mut Self::Context) { if let Some(tx) = self.queue.pop_front() { let _ = tx.send(Ok(msg)); } } } impl Handler for RedisActor { type Result = ResponseFuture; fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result { let (tx, rx) = oneshot::channel(); if let Some(ref mut cell) = self.cell { self.queue.push_back(tx); cell.write(msg.0); } else { let _ = tx.send(Err(Error::NotConnected)); } Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res)) } }