1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

better connection handling

This commit is contained in:
Nikolay Kim 2018-01-22 00:40:50 -08:00
parent 69c3ab2f92
commit 237030dbfc
7 changed files with 291 additions and 63 deletions

View File

@ -20,23 +20,30 @@ path = "src/lib.rs"
default = ["web"] default = ["web"]
# actix-web integration # actix-web integration
web = ["actix-web"] web = ["actix-web", "cookie", "http", "rand", "serde", "serde_json"]
[dependencies] [dependencies]
rand = "0.3" actix = "^0.4.3"
http = "0.1"
bytes = "0.4" log = "0.4"
backoff = "0.1"
failure = "^0.1.1" failure = "^0.1.1"
futures = "0.1" futures = "0.1"
serde = "1.0"
serde_json = "1.0"
tokio-io = "0.1" tokio-io = "0.1"
tokio-core = "0.1" tokio-core = "0.1"
redis-async = "0.0" redis-async = "0.0"
cookie = { version="0.10", features=["percent-encode", "secure"] } trust-dns-resolver = "0.7"
actix = "^0.4.2" # actix web session
actix-web = { version="0.3", optional=true } actix-web = { version="0.3", optional=true }
cookie = { version="0.10", features=["percent-encode", "secure"], optional=true }
http = { version="0.1", optional=true }
rand = { version="0.3", optional=true }
serde = { version="1.0", optional=true }
serde_json = { version="1.0", optional=true }
[dev-dependencies] [dev-dependencies]
env_logger = "0.4" env_logger = "0.5"
[patch.crates-io]
"actix" = { git = 'https://github.com/actix/actix.git' }

View File

@ -32,7 +32,6 @@ fn main() {
// cookie session middleware // cookie session middleware
.middleware(SessionStorage::new( .middleware(SessionStorage::new(
RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) RedisSessionBackend::new("127.0.0.1:6379", &[0; 32])
.expect("Can not connect to redis server")
)) ))
// register simple route, handle all methods // register simple route, handle all methods
.resource("/", |r| r.f(index))) .resource("/", |r| r.f(index)))

View File

@ -28,8 +28,8 @@ fn index(mut req: HttpRequest) -> Result<HttpResponse> {
} }
fn main() { fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info"); ::std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info");
let _ = env_logger::init(); env_logger::init();
let sys = actix::System::new("basic-example"); let sys = actix::System::new("basic-example");
HttpServer::new( HttpServer::new(
@ -39,7 +39,6 @@ fn main() {
// cookie session middleware // cookie session middleware
.middleware(middleware::SessionStorage::new( .middleware(middleware::SessionStorage::new(
RedisSessionBackend::new("127.0.0.1:6379", &[0; 32]) RedisSessionBackend::new("127.0.0.1:6379", &[0; 32])
.expect("Can not connect to redis server")
)) ))
// register simple route, handle all methods // register simple route, handle all methods
.resource("/", |r| r.f(index))) .resource("/", |r| r.f(index)))

174
src/connect.rs Normal file
View File

@ -0,0 +1,174 @@
use std::io;
use std::net::SocketAddr;
use std::collections::VecDeque;
use std::time::Duration;
use actix::Arbiter;
use trust_dns_resolver::ResolverFuture;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::lookup_ip::LookupIpFuture;
use futures::{Async, Future, Poll};
use tokio_core::reactor::Timeout;
use tokio_core::net::{TcpStream, TcpStreamNew};
#[derive(Fail, Debug)]
pub enum TcpConnectorError {
/// Failed to resolve the hostname
#[fail(display = "Failed resolving hostname: {}", _0)]
Dns(String),
/// Address is invalid
#[fail(display = "Invalid input: {}", _0)]
InvalidInput(&'static str),
/// Connecting took too long
#[fail(display = "Timeout out while establishing connection")]
Timeout,
/// Connection io error
#[fail(display = "{}", _0)]
IoError(io::Error),
}
pub struct TcpConnector {
lookup: Option<LookupIpFuture>,
port: u16,
ips: VecDeque<SocketAddr>,
error: Option<TcpConnectorError>,
timeout: Timeout,
stream: Option<TcpStreamNew>,
}
impl TcpConnector {
pub fn new<S: AsRef<str>>(addr: S) -> TcpConnector {
TcpConnector::with_timeout(addr, Duration::from_secs(1))
}
pub fn with_timeout<S: AsRef<str>>(addr: S, timeout: Duration) -> TcpConnector {
// try to parse as a regular SocketAddr first
if let Ok(addr) = addr.as_ref().parse() {
let mut ips = VecDeque::new();
ips.push_back(addr);
TcpConnector {
lookup: None,
port: 0,
ips: ips,
error: None,
stream: None,
timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() }
} else {
match TcpConnector::parse(addr.as_ref()) {
Ok((host, port)) => {
// we need to do dns resolution
let resolve = match ResolverFuture::from_system_conf(Arbiter::handle()) {
Ok(resolve) => resolve,
Err(err) => {
warn!("Can not create system dns resolver: {}", err);
ResolverFuture::new(
ResolverConfig::default(),
ResolverOpts::default(),
Arbiter::handle())
}
};
TcpConnector {
lookup: Some(resolve.lookup_ip(host)),
port: port,
ips: VecDeque::new(),
error: None,
stream: None,
timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() }
},
Err(err) =>
TcpConnector {
lookup: None,
port: 0,
ips: VecDeque::new(),
error: Some(err),
stream: None,
timeout: Timeout::new(timeout, Arbiter::handle()).unwrap() },
}
}
}
fn parse(addr: &str) -> Result<(&str, u16), TcpConnectorError> {
macro_rules! try_opt {
($e:expr, $msg:expr) => (
match $e {
Some(r) => r,
None => return Err(TcpConnectorError::InvalidInput($msg)),
}
)
}
// split the string by ':' and convert the second part to u16
let mut parts_iter = addr.rsplitn(2, ':');
let port_str = try_opt!(parts_iter.next(), "invalid socket address");
let host = try_opt!(parts_iter.next(), "invalid socket address");
let port: u16 = try_opt!(port_str.parse().ok(), "invalid port value");
Ok((host, port))
}
}
impl Future for TcpConnector {
type Item = TcpStream;
type Error = TcpConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(err) = self.error.take() {
Err(err)
} else {
// timeout
if let Ok(Async::Ready(_)) = self.timeout.poll() {
return Err(TcpConnectorError::Timeout)
}
// lookip ips
if let Some(mut lookup) = self.lookup.take() {
match lookup.poll() {
Ok(Async::NotReady) => {
self.lookup = Some(lookup);
return Ok(Async::NotReady)
},
Ok(Async::Ready(ips)) => {
let port = self.port;
let ips = ips.iter().map(|ip| SocketAddr::new(ip, port));
self.ips.extend(ips);
if self.ips.is_empty() {
return Err(TcpConnectorError::Dns(
"Expect at least one A dns record".to_owned()))
}
},
Err(err) => return Err(TcpConnectorError::Dns(format!("{}", err))),
}
}
// connect
loop {
if let Some(mut new) = self.stream.take() {
match new.poll() {
Ok(Async::Ready(sock)) =>
return Ok(Async::Ready(sock)),
Ok(Async::NotReady) => {
self.stream = Some(new);
return Ok(Async::NotReady)
},
Err(err) => {
if self.ips.is_empty() {
return Err(TcpConnectorError::IoError(err))
}
}
}
}
// try to connect
let addr = self.ips.pop_front().unwrap();
self.stream = Some(TcpStream::connect(&addr, Arbiter::handle()));
}
}
}
}

View File

@ -1,23 +1,34 @@
extern crate actix; extern crate actix;
extern crate bytes; extern crate backoff;
extern crate cookie;
extern crate futures; extern crate futures;
extern crate serde;
extern crate serde_json;
extern crate rand;
extern crate http;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
#[macro_use] #[macro_use]
extern crate log;
#[macro_use]
extern crate redis_async; extern crate redis_async;
#[macro_use] #[macro_use]
extern crate failure; extern crate failure;
extern crate trust_dns_resolver;
mod redis;
mod connect;
pub use redis::RedisActor;
pub use connect::TcpConnector;
#[cfg(feature="web")] #[cfg(feature="web")]
extern crate actix_web; extern crate actix_web;
#[cfg(feature="web")]
mod redis; extern crate cookie;
pub use redis::RedisActor; #[cfg(feature="web")]
extern crate rand;
#[cfg(feature="web")]
extern crate http;
#[cfg(feature="web")]
extern crate serde;
#[cfg(feature="web")]
extern crate serde_json;
#[cfg(feature="web")] #[cfg(feature="web")]
mod session; mod session;

View File

@ -1,30 +1,32 @@
use std::io; use std::io;
use std::collections::VecDeque; use std::collections::VecDeque;
use actix::prelude::*;
use backoff::ExponentialBackoff;
use backoff::backoff::Backoff;
use futures::Future; use futures::Future;
use futures::unsync::oneshot; use futures::unsync::oneshot;
use tokio_io::AsyncRead;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use redis_async::{resp, error}; use redis_async::{resp, error};
use actix::prelude::*; use connect::TcpConnector;
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum Error { pub enum Error {
#[fail(display="Io error: {}", _0)] #[fail(display="Redis error {}", _0)]
Io(io::Error),
#[fail(display="Redis error")]
Redis(error::Error), Redis(error::Error),
/// Receiving message during reconnecting
#[fail(display="Redis: Not connected")]
NotConnected,
/// Cancel all waters when connection get dropped
#[fail(display="Redis: Disconnected")]
Disconnected,
} }
unsafe impl Send for Error {} unsafe impl Send for Error {}
unsafe impl Sync for Error {} unsafe impl Sync for Error {}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Io(err)
}
}
impl From<error::Error> for Error { impl From<error::Error> for Error {
fn from(err: error::Error) -> Error { fn from(err: error::Error) -> Error {
Error::Redis(err) Error::Redis(err)
@ -40,23 +42,73 @@ impl ResponseType for Command {
/// Redis comminucation actor /// Redis comminucation actor
pub struct RedisActor { pub struct RedisActor {
addr: String,
backoff: ExponentialBackoff,
cell: Option<FramedCell<RedisActor>>,
queue: VecDeque<oneshot::Sender<Result<resp::RespValue, Error>>>, queue: VecDeque<oneshot::Sender<Result<resp::RespValue, Error>>>,
} }
impl RedisActor { impl RedisActor {
pub fn start(io: TcpStream) -> Address<RedisActor> { pub fn start<S: Into<String>>(addr: S) -> Address<RedisActor> {
RedisActor{queue: VecDeque::new()}.framed(io, resp::RespCodec) let addr = addr.into();
Supervisor::start(|_| {
RedisActor { addr: addr,
cell: None,
backoff: ExponentialBackoff::default(),
queue: VecDeque::new() }
}).0
} }
} }
impl Actor for RedisActor { impl Actor for RedisActor {
type Context = FramedContext<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
TcpConnector::new(self.addr.as_str())
.into_actor(self)
.map(|stream, act, ctx| {
info!("Connected to redis server: {}", act.addr);
act.backoff.reset();
act.cell = Some(act.add_framed(ctx, stream.framed(resp::RespCodec)));
})
.map_err(|err, act, ctx| {
error!("Can not connect to redis server: {}", err);
debug!("{:?}", err);
if let Some(timeout) = act.backoff.next_backoff() {
// delay re-connect, drop all messages during this period
ctx.run_later(timeout, |_, ctx| {
ctx.stop()
});
} else {
ctx.stop();
}
})
.wait(ctx);
}
}
impl Supervised for RedisActor {
fn restarting(&mut self, _: &mut Self::Context) {
self.cell.take();
for tx in self.queue.drain(..) {
let _ = tx.send(Err(Error::Disconnected));
}
}
} }
impl FramedActor for RedisActor { impl FramedActor for RedisActor {
type Io = TcpStream; type Io = TcpStream;
type Codec = resp::RespCodec; type Codec = resp::RespCodec;
fn closed(&mut self, error: Option<io::Error>, _: &mut Self::Context) {
if let Some(err) = error {
warn!("Redis connection dropped: {} error: {}", self.addr, err);
} else {
warn!("Redis connection dropped: {}", self.addr);
}
}
fn handle(&mut self, msg: Result<resp::RespValue, error::Error>, _ctx: &mut Self::Context) { fn handle(&mut self, msg: Result<resp::RespValue, error::Error>, _ctx: &mut Self::Context) {
if let Some(tx) = self.queue.pop_front() { if let Some(tx) = self.queue.pop_front() {
let _ = tx.send(msg.map_err(|e| e.into())); let _ = tx.send(msg.map_err(|e| e.into()));
@ -67,13 +119,17 @@ impl FramedActor for RedisActor {
impl Handler<Command> for RedisActor { impl Handler<Command> for RedisActor {
type Result = ResponseFuture<Self, Command>; type Result = ResponseFuture<Self, Command>;
fn handle(&mut self, msg: Command, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.queue.push_back(tx); if let Some(ref mut cell) = self.cell {
let _ = ctx.send(msg.0); self.queue.push_back(tx);
cell.send(msg.0);
} else {
let _ = tx.send(Err(Error::NotConnected));
}
Box::new( Box::new(
rx.map_err(|_| io::Error::new(io::ErrorKind::Other, "").into()) rx.map_err(|_| Error::Disconnected)
.and_then(|res| res) .and_then(|res| res)
.actfuture()) .actfuture())
} }

View File

@ -1,4 +1,3 @@
use std::{io, net};
use std::rc::Rc; use std::rc::Rc;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::collections::HashMap; use std::collections::HashMap;
@ -7,7 +6,6 @@ use serde_json;
use rand::{self, Rng}; use rand::{self, Rng};
use futures::Future; use futures::Future;
use futures::future::{Either, ok as FutOk, err as FutErr}; use futures::future::{Either, ok as FutOk, err as FutErr};
use tokio_core::net::TcpStream;
use redis_async::resp::RespValue; use redis_async::resp::RespValue;
use cookie::{CookieJar, Cookie, Key}; use cookie::{CookieJar, Cookie, Key};
use http::header::{self, HeaderValue}; use http::header::{self, HeaderValue};
@ -76,28 +74,12 @@ impl RedisSessionBackend {
/// Create new redis session backend /// Create new redis session backend
/// ///
/// * `addr` - address of the redis server /// * `addr` - address of the redis server
pub fn new<S: net::ToSocketAddrs>(addr: S, key: &[u8]) -> io::Result<RedisSessionBackend> { pub fn new<S: Into<String>>(addr: S, key: &[u8]) -> RedisSessionBackend {
let h = Arbiter::handle(); RedisSessionBackend(
let mut err = None; Rc::new(Inner{key: Key::from_master(key),
for addr in addr.to_socket_addrs()? { ttl: "7200".to_owned(),
match net::TcpStream::connect(&addr) { addr: RedisActor::start(addr),
Err(e) => err = Some(e), name: "actix-session".to_owned()}))
Ok(conn) => {
let addr = RedisActor::start(
TcpStream::from_stream(conn, h).expect("Can not create tcp stream"));
return Ok(RedisSessionBackend(
Rc::new(Inner{key: Key::from_master(key),
ttl: "7200".to_owned(),
addr: addr,
name: "actix-session".to_owned()})));
},
}
}
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(io::ErrorKind::Other, "Can not connect to redis server."))
}
} }
/// Set time to live in seconds for session value /// Set time to live in seconds for session value