From 237030dbfc27b5bce64f452054ba5dbec53e3c73 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 22 Jan 2018 00:40:50 -0800 Subject: [PATCH] better connection handling --- Cargo.toml | 25 ++++--- README.md | 1 - examples/basic.rs | 5 +- src/connect.rs | 174 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 29 +++++--- src/redis.rs | 90 +++++++++++++++++++----- src/session.rs | 30 ++------ 7 files changed, 291 insertions(+), 63 deletions(-) create mode 100644 src/connect.rs diff --git a/Cargo.toml b/Cargo.toml index 281a1c56c..3fe330e44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,23 +20,30 @@ path = "src/lib.rs" default = ["web"] # actix-web integration -web = ["actix-web"] +web = ["actix-web", "cookie", "http", "rand", "serde", "serde_json"] [dependencies] -rand = "0.3" -http = "0.1" -bytes = "0.4" +actix = "^0.4.3" + +log = "0.4" +backoff = "0.1" failure = "^0.1.1" futures = "0.1" -serde = "1.0" -serde_json = "1.0" tokio-io = "0.1" tokio-core = "0.1" redis-async = "0.0" -cookie = { version="0.10", features=["percent-encode", "secure"] } +trust-dns-resolver = "0.7" -actix = "^0.4.2" +# actix web session actix-web = { version="0.3", optional=true } +cookie = { version="0.10", features=["percent-encode", "secure"], optional=true } +http = { version="0.1", optional=true } +rand = { version="0.3", optional=true } +serde = { version="1.0", optional=true } +serde_json = { version="1.0", optional=true } [dev-dependencies] -env_logger = "0.4" +env_logger = "0.5" + +[patch.crates-io] +"actix" = { git = 'https://github.com/actix/actix.git' } diff --git a/README.md b/README.md index 7fdc1297d..c2d500259 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,6 @@ fn main() { // cookie session middleware .middleware(SessionStorage::new( RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) - .expect("Can not connect to redis server") )) // register simple route, handle all methods .resource("/", |r| r.f(index))) diff --git a/examples/basic.rs b/examples/basic.rs index 53e6bd3b3..e7e5874bd 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -28,8 +28,8 @@ fn index(mut req: HttpRequest) -> Result { } fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); - let _ = env_logger::init(); + ::std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); + env_logger::init(); let sys = actix::System::new("basic-example"); HttpServer::new( @@ -39,7 +39,6 @@ fn main() { // cookie session middleware .middleware(middleware::SessionStorage::new( RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) - .expect("Can not connect to redis server") )) // register simple route, handle all methods .resource("/", |r| r.f(index))) diff --git a/src/connect.rs b/src/connect.rs new file mode 100644 index 000000000..f2c1d052b --- /dev/null +++ b/src/connect.rs @@ -0,0 +1,174 @@ +use std::io; +use std::net::SocketAddr; +use std::collections::VecDeque; +use std::time::Duration; + +use actix::Arbiter; +use trust_dns_resolver::ResolverFuture; +use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; +use trust_dns_resolver::lookup_ip::LookupIpFuture; +use futures::{Async, Future, Poll}; +use tokio_core::reactor::Timeout; +use tokio_core::net::{TcpStream, TcpStreamNew}; + + +#[derive(Fail, Debug)] +pub enum TcpConnectorError { + /// Failed to resolve the hostname + #[fail(display = "Failed resolving hostname: {}", _0)] + Dns(String), + + /// Address is invalid + #[fail(display = "Invalid input: {}", _0)] + InvalidInput(&'static str), + + /// Connecting took too long + #[fail(display = "Timeout out while establishing connection")] + Timeout, + + /// Connection io error + #[fail(display = "{}", _0)] + IoError(io::Error), +} + +pub struct TcpConnector { + lookup: Option, + port: u16, + ips: VecDeque, + error: Option, + timeout: Timeout, + stream: Option, +} + +impl TcpConnector { + + pub fn new>(addr: S) -> TcpConnector { + TcpConnector::with_timeout(addr, Duration::from_secs(1)) + } + + pub fn with_timeout>(addr: S, timeout: Duration) -> TcpConnector { + // try to parse as a regular SocketAddr first + if let Ok(addr) = addr.as_ref().parse() { + let mut ips = VecDeque::new(); + ips.push_back(addr); + + TcpConnector { + lookup: None, + port: 0, + ips: ips, + error: None, + stream: None, + timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() } + } else { + match TcpConnector::parse(addr.as_ref()) { + Ok((host, port)) => { + // we need to do dns resolution + let resolve = match ResolverFuture::from_system_conf(Arbiter::handle()) { + Ok(resolve) => resolve, + Err(err) => { + warn!("Can not create system dns resolver: {}", err); + ResolverFuture::new( + ResolverConfig::default(), + ResolverOpts::default(), + Arbiter::handle()) + } + }; + + TcpConnector { + lookup: Some(resolve.lookup_ip(host)), + port: port, + ips: VecDeque::new(), + error: None, + stream: None, + timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() } + }, + Err(err) => + TcpConnector { + lookup: None, + port: 0, + ips: VecDeque::new(), + error: Some(err), + stream: None, + timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() }, + } + } + } + + fn parse(addr: &str) -> Result<(&str, u16), TcpConnectorError> { + macro_rules! try_opt { + ($e:expr, $msg:expr) => ( + match $e { + Some(r) => r, + None => return Err(TcpConnectorError::InvalidInput($msg)), + } + ) + } + + // split the string by ':' and convert the second part to u16 + let mut parts_iter = addr.rsplitn(2, ':'); + let port_str = try_opt!(parts_iter.next(), "invalid socket address"); + let host = try_opt!(parts_iter.next(), "invalid socket address"); + let port: u16 = try_opt!(port_str.parse().ok(), "invalid port value"); + + Ok((host, port)) + } +} + +impl Future for TcpConnector { + type Item = TcpStream; + type Error = TcpConnectorError; + + fn poll(&mut self) -> Poll { + if let Some(err) = self.error.take() { + Err(err) + } else { + // timeout + if let Ok(Async::Ready(_)) = self.timeout.poll() { + return Err(TcpConnectorError::Timeout) + } + + // lookip ips + if let Some(mut lookup) = self.lookup.take() { + match lookup.poll() { + Ok(Async::NotReady) => { + self.lookup = Some(lookup); + return Ok(Async::NotReady) + }, + Ok(Async::Ready(ips)) => { + let port = self.port; + let ips = ips.iter().map(|ip| SocketAddr::new(ip, port)); + self.ips.extend(ips); + if self.ips.is_empty() { + return Err(TcpConnectorError::Dns( + "Expect at least one A dns record".to_owned())) + } + }, + Err(err) => return Err(TcpConnectorError::Dns(format!("{}", err))), + } + } + + // connect + loop { + if let Some(mut new) = self.stream.take() { + match new.poll() { + Ok(Async::Ready(sock)) => + return Ok(Async::Ready(sock)), + Ok(Async::NotReady) => { + self.stream = Some(new); + return Ok(Async::NotReady) + }, + Err(err) => { + if self.ips.is_empty() { + return Err(TcpConnectorError::IoError(err)) + } + } + } + } + + // try to connect + let addr = self.ips.pop_front().unwrap(); + self.stream = Some(TcpStream::connect(&addr, Arbiter::handle())); + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 73f28f561..f25bf0337 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,23 +1,34 @@ extern crate actix; -extern crate bytes; -extern crate cookie; +extern crate backoff; extern crate futures; -extern crate serde; -extern crate serde_json; -extern crate rand; -extern crate http; extern crate tokio_io; extern crate tokio_core; #[macro_use] +extern crate log; +#[macro_use] extern crate redis_async; #[macro_use] extern crate failure; +extern crate trust_dns_resolver; + +mod redis; +mod connect; + +pub use redis::RedisActor; +pub use connect::TcpConnector; #[cfg(feature="web")] extern crate actix_web; - -mod redis; -pub use redis::RedisActor; +#[cfg(feature="web")] +extern crate cookie; +#[cfg(feature="web")] +extern crate rand; +#[cfg(feature="web")] +extern crate http; +#[cfg(feature="web")] +extern crate serde; +#[cfg(feature="web")] +extern crate serde_json; #[cfg(feature="web")] mod session; diff --git a/src/redis.rs b/src/redis.rs index 173c01bef..149c35778 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -1,30 +1,32 @@ use std::io; use std::collections::VecDeque; +use actix::prelude::*; +use backoff::ExponentialBackoff; +use backoff::backoff::Backoff; use futures::Future; use futures::unsync::oneshot; +use tokio_io::AsyncRead; use tokio_core::net::TcpStream; use redis_async::{resp, error}; -use actix::prelude::*; +use connect::TcpConnector; #[derive(Fail, Debug)] pub enum Error { - #[fail(display="Io error: {}", _0)] - Io(io::Error), - #[fail(display="Redis error")] + #[fail(display="Redis error {}", _0)] Redis(error::Error), + /// Receiving message during reconnecting + #[fail(display="Redis: Not connected")] + NotConnected, + /// Cancel all waters when connection get dropped + #[fail(display="Redis: Disconnected")] + Disconnected, } unsafe impl Send for Error {} unsafe impl Sync for Error {} -impl From for Error { - fn from(err: io::Error) -> Error { - Error::Io(err) - } -} - impl From for Error { fn from(err: error::Error) -> Error { Error::Redis(err) @@ -40,23 +42,73 @@ impl ResponseType for Command { /// Redis comminucation actor pub struct RedisActor { + addr: String, + backoff: ExponentialBackoff, + cell: Option>, queue: VecDeque>>, } impl RedisActor { - pub fn start(io: TcpStream) -> Address { - RedisActor{queue: VecDeque::new()}.framed(io, resp::RespCodec) + pub fn start>(addr: S) -> Address { + let addr = addr.into(); + + Supervisor::start(|_| { + RedisActor { addr: addr, + cell: None, + backoff: ExponentialBackoff::default(), + queue: VecDeque::new() } + }).0 } } impl Actor for RedisActor { - type Context = FramedContext; + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + TcpConnector::new(self.addr.as_str()) + .into_actor(self) + .map(|stream, act, ctx| { + info!("Connected to redis server: {}", act.addr); + act.backoff.reset(); + act.cell = Some(act.add_framed(ctx, stream.framed(resp::RespCodec))); + }) + .map_err(|err, act, ctx| { + error!("Can not connect to redis server: {}", err); + debug!("{:?}", err); + if let Some(timeout) = act.backoff.next_backoff() { + // delay re-connect, drop all messages during this period + ctx.run_later(timeout, |_, ctx| { + ctx.stop() + }); + } else { + ctx.stop(); + } + }) + .wait(ctx); + } +} + +impl Supervised for RedisActor { + fn restarting(&mut self, _: &mut Self::Context) { + self.cell.take(); + for tx in self.queue.drain(..) { + let _ = tx.send(Err(Error::Disconnected)); + } + } } impl FramedActor for RedisActor { type Io = TcpStream; type Codec = resp::RespCodec; + fn closed(&mut self, error: Option, _: &mut Self::Context) { + if let Some(err) = error { + warn!("Redis connection dropped: {} error: {}", self.addr, err); + } else { + warn!("Redis connection dropped: {}", self.addr); + } + } + fn handle(&mut self, msg: Result, _ctx: &mut Self::Context) { if let Some(tx) = self.queue.pop_front() { let _ = tx.send(msg.map_err(|e| e.into())); @@ -67,13 +119,17 @@ impl FramedActor for RedisActor { impl Handler for RedisActor { type Result = ResponseFuture; - fn handle(&mut self, msg: Command, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result { let (tx, rx) = oneshot::channel(); - self.queue.push_back(tx); - let _ = ctx.send(msg.0); + if let Some(ref mut cell) = self.cell { + self.queue.push_back(tx); + cell.send(msg.0); + } else { + let _ = tx.send(Err(Error::NotConnected)); + } Box::new( - rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "").into()) + rx.map_err(|_| Error::Disconnected) .and_then(|res| res) .actfuture()) } diff --git a/src/session.rs b/src/session.rs index c34318e31..bc7a5684d 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,4 +1,3 @@ -use std::{io, net}; use std::rc::Rc; use std::iter::FromIterator; use std::collections::HashMap; @@ -7,7 +6,6 @@ use serde_json; use rand::{self, Rng}; use futures::Future; use futures::future::{Either, ok as FutOk, err as FutErr}; -use tokio_core::net::TcpStream; use redis_async::resp::RespValue; use cookie::{CookieJar, Cookie, Key}; use http::header::{self, HeaderValue}; @@ -76,28 +74,12 @@ impl RedisSessionBackend { /// Create new redis session backend /// /// * `addr` - address of the redis server - pub fn new(addr: S, key: &[u8]) -> io::Result { - let h = Arbiter::handle(); - let mut err = None; - for addr in addr.to_socket_addrs()? { - match net::TcpStream::connect(&addr) { - Err(e) => err = Some(e), - Ok(conn) => { - let addr = RedisActor::start( - TcpStream::from_stream(conn, h).expect("Can not create tcp stream")); - return Ok(RedisSessionBackend( - Rc::new(Inner{key: Key::from_master(key), - ttl: "7200".to_owned(), - addr: addr, - name: "actix-session".to_owned()}))); - }, - } - } - if let Some(e) = err.take() { - Err(e) - } else { - Err(io::Error::new(io::ErrorKind::Other, "Can not connect to redis server.")) - } + pub fn new>(addr: S, key: &[u8]) -> RedisSessionBackend { + RedisSessionBackend( + Rc::new(Inner{key: Key::from_master(key), + ttl: "7200".to_owned(), + addr: RedisActor::start(addr), + name: "actix-session".to_owned()})) } /// Set time to live in seconds for session value