diff --git a/src/lib.rs b/src/lib.rs index f25bf0337..29146b9d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,3 +34,25 @@ extern crate serde_json; mod session; #[cfg(feature="web")] pub use session::RedisSessionBackend; + + +#[derive(Fail, Debug)] +pub enum Error { + #[fail(display="Redis error {}", _0)] + Redis(redis_async::error::Error), + /// Receiving message during reconnecting + #[fail(display="Redis: Not connected")] + NotConnected, + /// Cancel all waters when connection get dropped + #[fail(display="Redis: Disconnected")] + Disconnected, +} + +unsafe impl Send for Error {} +unsafe impl Sync for Error {} + +impl From for Error { + fn from(err: redis_async::error::Error) -> Error { + Error::Redis(err) + } +} diff --git a/src/redis.rs b/src/redis.rs index 6d8665759..184eb8621 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -8,44 +8,22 @@ use futures::Future; use futures::unsync::oneshot; use tokio_io::AsyncRead; use tokio_core::net::TcpStream; -use redis_async::{resp, error}; +use redis_async::error::Error as RespError; +use redis_async::resp::{RespCodec, RespValue}; +use Error; use connect::TcpConnector; -#[derive(Fail, Debug)] -pub enum Error { - #[fail(display="Redis error {}", _0)] - Redis(error::Error), - /// Receiving message during reconnecting - #[fail(display="Redis: Not connected")] - NotConnected, - /// Cancel all waters when connection get dropped - #[fail(display="Redis: Disconnected")] - Disconnected, -} - -unsafe impl Send for Error {} -unsafe impl Sync for Error {} - -impl From for Error { - fn from(err: error::Error) -> Error { - Error::Redis(err) - } -} - -pub struct Command(pub resp::RespValue); - -impl ResponseType for Command { - type Item = resp::RespValue; - type Error = Error; -} +#[derive(Message)] +#[rtype(RespValue, Error)] +pub struct Command(pub RespValue); /// Redis comminucation actor pub struct RedisActor { addr: String, backoff: ExponentialBackoff, cell: Option>, - queue: VecDeque>>, + queue: VecDeque>>, } impl RedisActor { @@ -70,16 +48,14 @@ impl Actor for RedisActor { .map(|stream, act, ctx| { info!("Connected to redis server: {}", act.addr); act.backoff.reset(); - act.cell = Some(act.add_framed(stream.framed(resp::RespCodec), ctx)); + act.cell = Some(act.add_framed(stream.framed(RespCodec), ctx)); }) .map_err(|err, act, ctx| { error!("Can not connect to redis server: {}", err); debug!("{:?}", err); + // re-connect with backoff time if let Some(timeout) = act.backoff.next_backoff() { - // delay re-connect, drop all messages during this period - ctx.run_later(timeout, |_, ctx| { - ctx.stop() - }); + ctx.run_later(timeout, |_, ctx| ctx.stop()); } else { ctx.stop(); } @@ -99,7 +75,7 @@ impl Supervised for RedisActor { impl FramedActor for RedisActor { type Io = TcpStream; - type Codec = resp::RespCodec; + type Codec = RespCodec; fn closed(&mut self, error: Option, _: &mut Self::Context) { if let Some(err) = error { @@ -109,7 +85,7 @@ impl FramedActor for RedisActor { } } - fn handle(&mut self, msg: Result, _ctx: &mut Self::Context) { + fn handle(&mut self, msg: Result, _ctx: &mut Self::Context) { if let Some(tx) = self.queue.pop_front() { let _ = tx.send(msg.map_err(|e| e.into())); }