From f11752437c5b57c3b6d68504b49913bc068a4bd3 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 15 Feb 2018 16:53:05 -0800 Subject: [PATCH] use latest actix api --- Cargo.toml | 7 +- src/connect.rs | 174 -------------------------------------------- src/lib.rs | 4 - src/redis.rs | 82 ++++++++++++++------- src/session.rs | 7 +- tests/test_redis.rs | 10 +-- 6 files changed, 67 insertions(+), 217 deletions(-) delete mode 100644 src/connect.rs diff --git a/Cargo.toml b/Cargo.toml index 3527e61a9..c92c93d27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,8 @@ default = ["web"] web = ["actix-web", "cookie", "http", "rand", "serde", "serde_json"] [dependencies] -actix = "^0.4.5" +#actix = "^0.4.5" +actix = { git = "https://github.com/actix/actix.git" } log = "0.4" backoff = "0.1" @@ -36,10 +37,10 @@ futures = "0.1" tokio-io = "0.1" tokio-core = "0.1" redis-async = "0.0" -trust-dns-resolver = "0.7" # actix web session -actix-web = { version="0.3", optional=true } +# actix-web = { version="0.3", optional=true } +actix-web = { git = "https://github.com/actix/actix-web.git", optional=true } cookie = { version="0.10", features=["percent-encode", "secure"], optional=true } http = { version="0.1", optional=true } rand = { version="0.3", optional=true } diff --git a/src/connect.rs b/src/connect.rs deleted file mode 100644 index f2c1d052b..000000000 --- a/src/connect.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::io; -use std::net::SocketAddr; -use std::collections::VecDeque; -use std::time::Duration; - -use actix::Arbiter; -use trust_dns_resolver::ResolverFuture; -use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -use trust_dns_resolver::lookup_ip::LookupIpFuture; -use futures::{Async, Future, Poll}; -use tokio_core::reactor::Timeout; -use tokio_core::net::{TcpStream, TcpStreamNew}; - - -#[derive(Fail, Debug)] -pub enum TcpConnectorError { - /// Failed to resolve the hostname - #[fail(display = "Failed resolving hostname: {}", _0)] - Dns(String), - - /// Address is invalid - #[fail(display = "Invalid input: {}", _0)] - InvalidInput(&'static str), - - /// Connecting took too long - #[fail(display = "Timeout out while establishing connection")] - Timeout, - - /// Connection io error - #[fail(display = "{}", _0)] - IoError(io::Error), -} - -pub struct TcpConnector { - lookup: Option, - port: u16, - ips: VecDeque, - error: Option, - timeout: Timeout, - stream: Option, -} - -impl TcpConnector { - - pub fn new>(addr: S) -> TcpConnector { - TcpConnector::with_timeout(addr, Duration::from_secs(1)) - } - - pub fn with_timeout>(addr: S, timeout: Duration) -> TcpConnector { - // try to parse as a regular SocketAddr first - if let Ok(addr) = addr.as_ref().parse() { - let mut ips = VecDeque::new(); - ips.push_back(addr); - - TcpConnector { - lookup: None, - port: 0, - ips: ips, - error: None, - stream: None, - timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() } - } else { - match TcpConnector::parse(addr.as_ref()) { - Ok((host, port)) => { - // we need to do dns resolution - let resolve = match ResolverFuture::from_system_conf(Arbiter::handle()) { - Ok(resolve) => resolve, - Err(err) => { - warn!("Can not create system dns resolver: {}", err); - ResolverFuture::new( - ResolverConfig::default(), - ResolverOpts::default(), - Arbiter::handle()) - } - }; - - TcpConnector { - lookup: Some(resolve.lookup_ip(host)), - port: port, - ips: VecDeque::new(), - error: None, - stream: None, - timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() } - }, - Err(err) => - TcpConnector { - lookup: None, - port: 0, - ips: VecDeque::new(), - error: Some(err), - stream: None, - timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() }, - } - } - } - - fn parse(addr: &str) -> Result<(&str, u16), TcpConnectorError> { - macro_rules! try_opt { - ($e:expr, $msg:expr) => ( - match $e { - Some(r) => r, - None => return Err(TcpConnectorError::InvalidInput($msg)), - } - ) - } - - // split the string by ':' and convert the second part to u16 - let mut parts_iter = addr.rsplitn(2, ':'); - let port_str = try_opt!(parts_iter.next(), "invalid socket address"); - let host = try_opt!(parts_iter.next(), "invalid socket address"); - let port: u16 = try_opt!(port_str.parse().ok(), "invalid port value"); - - Ok((host, port)) - } -} - -impl Future for TcpConnector { - type Item = TcpStream; - type Error = TcpConnectorError; - - fn poll(&mut self) -> Poll { - if let Some(err) = self.error.take() { - Err(err) - } else { - // timeout - if let Ok(Async::Ready(_)) = self.timeout.poll() { - return Err(TcpConnectorError::Timeout) - } - - // lookip ips - if let Some(mut lookup) = self.lookup.take() { - match lookup.poll() { - Ok(Async::NotReady) => { - self.lookup = Some(lookup); - return Ok(Async::NotReady) - }, - Ok(Async::Ready(ips)) => { - let port = self.port; - let ips = ips.iter().map(|ip| SocketAddr::new(ip, port)); - self.ips.extend(ips); - if self.ips.is_empty() { - return Err(TcpConnectorError::Dns( - "Expect at least one A dns record".to_owned())) - } - }, - Err(err) => return Err(TcpConnectorError::Dns(format!("{}", err))), - } - } - - // connect - loop { - if let Some(mut new) = self.stream.take() { - match new.poll() { - Ok(Async::Ready(sock)) => - return Ok(Async::Ready(sock)), - Ok(Async::NotReady) => { - self.stream = Some(new); - return Ok(Async::NotReady) - }, - Err(err) => { - if self.ips.is_empty() { - return Err(TcpConnectorError::IoError(err)) - } - } - } - } - - // try to connect - let addr = self.ips.pop_front().unwrap(); - self.stream = Some(TcpStream::connect(&addr, Arbiter::handle())); - } - } - } -} diff --git a/src/lib.rs b/src/lib.rs index a5b40f4fa..3df10fbda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,9 @@ extern crate log; extern crate redis_async; #[macro_use] extern crate failure; -extern crate trust_dns_resolver; mod redis; -mod connect; - pub use redis::{Command, RedisActor}; -pub use connect::TcpConnector; #[cfg(feature="web")] extern crate actix_web; diff --git a/src/redis.rs b/src/redis.rs index ddaebdd95..ac6efab81 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -2,32 +2,38 @@ use std::io; use std::collections::VecDeque; use actix::prelude::*; +use actix::actors::{Connect, Connector}; use backoff::ExponentialBackoff; use backoff::backoff::Backoff; use futures::Future; use futures::unsync::oneshot; use tokio_io::AsyncRead; +use tokio_io::io::WriteHalf; +use tokio_io::codec::FramedRead; use tokio_core::net::TcpStream; use redis_async::error::Error as RespError; use redis_async::resp::{RespCodec, RespValue}; use Error; -use connect::TcpConnector; -#[derive(Message, Debug)] -#[rtype(RespValue, Error)] + +#[derive(Debug)] pub struct Command(pub RespValue); +impl Message for Command { + type Result = Result; +} + /// Redis comminucation actor pub struct RedisActor { addr: String, backoff: ExponentialBackoff, - cell: Option>, + cell: Option, RespCodec>>, queue: VecDeque>>, } impl RedisActor { - pub fn start>(addr: S) -> Address { + pub fn start>(addr: S) -> Addr { let addr = addr.into(); Supervisor::start(|_| { @@ -43,16 +49,36 @@ impl Actor for RedisActor { type Context = Context; fn started(&mut self, ctx: &mut Context) { - TcpConnector::new(self.addr.as_str()) + Connector::from_registry().send(Connect::host(self.addr.as_str())) .into_actor(self) - .map(|stream, act, ctx| { - info!("Connected to redis server: {}", act.addr); - act.backoff.reset(); - act.cell = Some(act.add_framed(stream.framed(RespCodec), ctx)); + .map(|res, act, ctx| match res { + Ok(stream) => { + info!("Connected to redis server: {}", act.addr); + + let (r, w) = stream.split(); + + // configure write side of the connection + let mut framed = actix::io::FramedWrite::new(w, RespCodec, ctx); + act.cell = Some(framed); + + // read side of the connection + ctx.add_stream(FramedRead::new(r, RespCodec)); + + act.backoff.reset(); + }, + Err(err) => { + error!("Can not connect to redis server: {}", err); + // re-connect with backoff time. + // we stop currect context, supervisor will restart it. + if let Some(timeout) = act.backoff.next_backoff() { + ctx.run_later(timeout, |_, ctx| ctx.stop()); + } else { + ctx.stop(); + } + } }) .map_err(|err, act, ctx| { error!("Can not connect to redis server: {}", err); - debug!("{:?}", err); // re-connect with backoff time. // we stop currect context, supervisor will restart it. if let Some(timeout) = act.backoff.next_backoff() { @@ -74,40 +100,42 @@ impl Supervised for RedisActor { } } -impl FramedActor for RedisActor { - type Io = TcpStream; - type Codec = RespCodec; +impl actix::io::WriteHandler for RedisActor { - fn closed(&mut self, error: Option, _: &mut Self::Context) { - if let Some(err) = error { - warn!("Redis connection dropped: {} error: {}", self.addr, err); - } else { - warn!("Redis connection dropped: {}", self.addr); + fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running { + warn!("Redis connection dropped: {} error: {}", self.addr, err); + Running::Stop + } +} + +impl StreamHandler for RedisActor { + + fn error(&mut self, err: RespError, _: &mut Self::Context) -> Running { + if let Some(tx) = self.queue.pop_front() { + let _ = tx.send(Err(err.into())); } + Running::Stop } - fn handle(&mut self, msg: Result, _ctx: &mut Self::Context) { + fn handle(&mut self, msg: RespValue, _: &mut Self::Context) { if let Some(tx) = self.queue.pop_front() { - let _ = tx.send(msg.map_err(|e| e.into())); + let _ = tx.send(Ok(msg)); } } } impl Handler for RedisActor { - type Result = ResponseFuture; + type Result = ResponseFuture; fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result { let (tx, rx) = oneshot::channel(); if let Some(ref mut cell) = self.cell { self.queue.push_back(tx); - cell.send(msg.0); + cell.write(msg.0); } else { let _ = tx.send(Err(Error::NotConnected)); } - Box::new( - rx.map_err(|_| Error::Disconnected) - .and_then(|res| res) - .actfuture()) + Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res)) } } diff --git a/src/session.rs b/src/session.rs index 38466c169..efc1733c9 100644 --- a/src/session.rs +++ b/src/session.rs @@ -121,7 +121,7 @@ struct Inner { key: Key, ttl: String, name: String, - addr: Address, + addr: Addr, } impl Inner { @@ -137,7 +137,7 @@ impl Inner { if let Some(cookie) = jar.signed(&self.key).get(&self.name) { let value = cookie.value().to_owned(); return Box::new( - self.addr.call_fut(Command(resp_array!["GET", cookie.value()])) + self.addr.send(Command(resp_array!["GET", cookie.value()])) .map_err(Error::from) .and_then(move |res| match res { Ok(val) => { @@ -194,8 +194,7 @@ impl Inner { match serde_json::to_string(state) { Err(e) => Either::A(FutErr(e.into())), Ok(body) => Either::B( - self.addr.call_fut( - Command(resp_array!["SET", value, body,"EX", &self.ttl])) + self.addr.send(Command(resp_array!["SET", value, body,"EX", &self.ttl])) .map_err(Error::from) .and_then(move |res| match res { Ok(_) => { diff --git a/tests/test_redis.rs b/tests/test_redis.rs index 2123c7e1b..f19cfcec5 100644 --- a/tests/test_redis.rs +++ b/tests/test_redis.rs @@ -17,13 +17,13 @@ fn test_error_connect() { let _addr2 = addr.clone(); Arbiter::handle().spawn_fn(move || { - addr.call_fut(Command(resp_array!["GET", "test"])) + addr.send(Command(resp_array!["GET", "test"])) .then(|res| { match res { Ok(Err(Error::NotConnected)) => (), _ => panic!("Should not happen {:?}", res), } - Arbiter::system().send(actix::msgs::SystemExit(0)); + Arbiter::system().do_send(actix::msgs::SystemExit(0)); Ok(()) }) }); @@ -42,11 +42,11 @@ fn test_redis() { Arbiter::handle().spawn_fn(move || { let addr2 = addr.clone(); - addr.call_fut(Command(resp_array!["SET", "test", "value"])) + addr.send(Command(resp_array!["SET", "test", "value"])) .then(move |res| match res { Ok(Ok(resp)) => { assert_eq!(resp, RespValue::SimpleString("OK".to_owned())); - addr2.call_fut(Command(resp_array!["GET", "test"])) + addr2.send(Command(resp_array!["GET", "test"])) .then(|res| { match res { Ok(Ok(resp)) => { @@ -56,7 +56,7 @@ fn test_redis() { }, _ => panic!("Should not happen {:?}", res), } - Arbiter::system().send(actix::msgs::SystemExit(0)); + Arbiter::system().do_send(actix::msgs::SystemExit(0)); Ok(()) }) },