diff --git a/Cargo.toml b/Cargo.toml index a42a93dae..e8d15987e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,10 +32,11 @@ serde = "1.0" serde_json = "1.0" tokio-io = "0.1" tokio-core = "0.1" -actix = "^0.3.5" redis-async = "0.0" -cookie = { version="0.10", features=["percent-encode", "secure"] } +# cookie = { version="0.10", features=["percent-encode", "secure"] } +cookie = { git="https://github.com/alexcrichton/cookie-rs.git", features=["percent-encode", "secure"] } +actix = "0.4" actix-web = { git="https://github.com/actix/actix-web.git", optional=true } [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 763936b53..73f28f561 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,11 +17,9 @@ extern crate failure; extern crate actix_web; mod redis; - -#[cfg(feature="web")] -mod session; - pub use redis::RedisActor; +#[cfg(feature="web")] +mod session; #[cfg(feature="web")] pub use session::RedisSessionBackend; diff --git a/src/redis.rs b/src/redis.rs index 6ecbec22a..173c01bef 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -1,11 +1,9 @@ use std::io; use std::collections::VecDeque; -use bytes::BytesMut; use futures::Future; use futures::unsync::oneshot; use tokio_core::net::TcpStream; -use tokio_io::codec::{Decoder, Encoder}; use redis_async::{resp, error}; use actix::prelude::*; @@ -27,34 +25,9 @@ impl From for Error { } } -#[derive(Message)] -pub struct Value(resp::RespValue); - -/// Redis codec wrapper -pub struct RedisCodec; - -impl Encoder for RedisCodec { - type Item = Value; - type Error = Error; - - fn encode(&mut self, msg: Value, buf: &mut BytesMut) -> Result<(), Self::Error> { - match resp::RespCodec.encode(msg.0, buf) { - Ok(()) => Ok(()), - Err(err) => Err(Error::Io(err)) - } - } -} - -impl Decoder for RedisCodec { - type Item = Value; - type Error = Error; - - fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - match resp::RespCodec.decode(buf) { - Ok(Some(item)) => Ok(Some(Value(item))), - Ok(None) => Ok(None), - Err(err) => Err(Error::Redis(err)), - } +impl From for Error { + fn from(err: error::Error) -> Error { + Error::Redis(err) } } @@ -72,7 +45,7 @@ pub struct RedisActor { impl RedisActor { pub fn start(io: TcpStream) -> Address { - RedisActor{queue: VecDeque::new()}.framed(io, RedisCodec) + RedisActor{queue: VecDeque::new()}.framed(io, resp::RespCodec) } } @@ -82,34 +55,24 @@ impl Actor for RedisActor { impl FramedActor for RedisActor { type Io = TcpStream; - type Codec = RedisCodec; -} + type Codec = resp::RespCodec; -impl StreamHandler for RedisActor {} - -impl Handler for RedisActor { - - fn error(&mut self, err: Error, _: &mut Self::Context) { + fn handle(&mut self, msg: Result, _ctx: &mut Self::Context) { if let Some(tx) = self.queue.pop_front() { - let _ = tx.send(Err(err)); + let _ = tx.send(msg.map_err(|e| e.into())); } } - - fn handle(&mut self, msg: Value, _ctx: &mut Self::Context) -> Response { - if let Some(tx) = self.queue.pop_front() { - let _ = tx.send(Ok(msg.0)); - } - Self::empty() - } } impl Handler for RedisActor { - fn handle(&mut self, msg: Command, ctx: &mut Self::Context) -> Response { + type Result = ResponseFuture; + + fn handle(&mut self, msg: Command, ctx: &mut Self::Context) -> Self::Result { let (tx, rx) = oneshot::channel(); self.queue.push_back(tx); - let _ = ctx.send(Value(msg.0)); + let _ = ctx.send(msg.0); - Self::async_reply( + Box::new( rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "").into()) .and_then(|res| res) .actfuture())