1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

client connector wait timeout

This commit is contained in:
Nikolay Kim 2018-04-05 18:33:58 -07:00
parent 800f711cc1
commit 6c55501252
2 changed files with 139 additions and 34 deletions

View File

@ -1,11 +1,11 @@
use std::{fmt, mem, io, time}; use std::{fmt, mem, io, time};
use std::cell::RefCell; use std::cell::{Cell, RefCell};
use std::rc::Rc; use std::rc::Rc;
use std::net::Shutdown; use std::net::Shutdown;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use actix::{fut, Actor, ActorFuture, Context, AsyncContext, use actix::{fut, Actor, ActorFuture, Arbiter, Context, AsyncContext,
Handler, Message, ActorResponse, Supervised, ContextFutureSpawner}; Handler, Message, ActorResponse, Supervised, ContextFutureSpawner};
use actix::registry::ArbiterService; use actix::registry::ArbiterService;
use actix::fut::WrapFuture; use actix::fut::WrapFuture;
@ -16,6 +16,7 @@ use futures::{Async, Future, Poll};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use futures::unsync::oneshot; use futures::unsync::oneshot;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Timeout;
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError}; use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError};
@ -35,8 +36,9 @@ use server::IoStream;
/// `Connect` type represents message that can be send to `ClientConnector` /// `Connect` type represents message that can be send to `ClientConnector`
/// with connection request. /// with connection request.
pub struct Connect { pub struct Connect {
pub uri: Uri, pub(crate) uri: Uri,
pub conn_timeout: Duration, pub(crate) wait_time: Duration,
pub(crate) conn_timeout: Duration,
} }
impl Connect { impl Connect {
@ -44,9 +46,25 @@ impl Connect {
pub fn new<U>(uri: U) -> Result<Connect, HttpError> where Uri: HttpTryFrom<U> { pub fn new<U>(uri: U) -> Result<Connect, HttpError> where Uri: HttpTryFrom<U> {
Ok(Connect { Ok(Connect {
uri: Uri::try_from(uri).map_err(|e| e.into())?, uri: Uri::try_from(uri).map_err(|e| e.into())?,
conn_timeout: Duration::from_secs(1) wait_time: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1),
}) })
} }
/// Connection timeout, max time to connect to remote host.
/// By default connect timeout is 1 seccond.
pub fn conn_timeout(mut self, timeout: Duration) -> Self {
self.conn_timeout = timeout;
self
}
/// If connection pool limits are enabled, wait time indicates
/// max time to wait for available connection.
/// By default connect timeout is 5 secconds.
pub fn wait_time(mut self, timeout: Duration) -> Self {
self.wait_time = timeout;
self
}
} }
impl Message for Connect { impl Message for Connect {
@ -102,6 +120,7 @@ impl From<ConnectorError> for ClientConnectorError {
struct Waiter { struct Waiter {
tx: oneshot::Sender<Result<Connection, ClientConnectorError>>, tx: oneshot::Sender<Result<Connection, ClientConnectorError>>,
wait: Instant,
conn_timeout: Duration, conn_timeout: Duration,
} }
@ -114,6 +133,8 @@ pub struct ClientConnector {
connector: TlsConnector, connector: TlsConnector,
pool: Rc<Pool>, pool: Rc<Pool>,
pool_modified: Rc<Cell<bool>>,
conn_lifetime: Duration, conn_lifetime: Duration,
conn_keep_alive: Duration, conn_keep_alive: Duration,
limit: usize, limit: usize,
@ -123,6 +144,7 @@ pub struct ClientConnector {
available: HashMap<Key, VecDeque<Conn>>, available: HashMap<Key, VecDeque<Conn>>,
to_close: Vec<Connection>, to_close: Vec<Connection>,
waiters: HashMap<Key, VecDeque<Waiter>>, waiters: HashMap<Key, VecDeque<Waiter>>,
wait_timeout: Option<(Instant, Timeout)>,
} }
impl Actor for ClientConnector { impl Actor for ClientConnector {
@ -140,6 +162,8 @@ impl ArbiterService for ClientConnector {}
impl Default for ClientConnector { impl Default for ClientConnector {
fn default() -> ClientConnector { fn default() -> ClientConnector {
let modified = Rc::new(Cell::new(false));
#[cfg(all(feature="alpn"))] #[cfg(all(feature="alpn"))]
{ {
let builder = SslConnector::builder(SslMethod::tls()).unwrap(); let builder = SslConnector::builder(SslMethod::tls()).unwrap();
@ -149,7 +173,8 @@ impl Default for ClientConnector {
{ {
let builder = TlsConnector::builder().unwrap(); let builder = TlsConnector::builder().unwrap();
ClientConnector { ClientConnector {
pool: Rc::new(Pool::new()), pool: Rc::new(Pool::new(Rc::clone(&modified))),
pool_modified: modified,
connector: builder.build().unwrap(), connector: builder.build().unwrap(),
conn_lifetime: Duration::from_secs(15), conn_lifetime: Duration::from_secs(15),
conn_keep_alive: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(75),
@ -160,11 +185,13 @@ impl Default for ClientConnector {
available: HashMap::new(), available: HashMap::new(),
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None,
} }
} }
#[cfg(not(any(feature="alpn", feature="tls")))] #[cfg(not(any(feature="alpn", feature="tls")))]
ClientConnector {pool: Rc::new(Pool::new()), ClientConnector {pool: Rc::new(Pool::new(Rc::clone(&modified))),
pool_modified: modified,
conn_lifetime: Duration::from_secs(15), conn_lifetime: Duration::from_secs(15),
conn_keep_alive: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(75),
limit: 100, limit: 100,
@ -174,6 +201,7 @@ impl Default for ClientConnector {
available: HashMap::new(), available: HashMap::new(),
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None,
} }
} }
} }
@ -224,9 +252,11 @@ impl ClientConnector {
/// } /// }
/// ``` /// ```
pub fn with_connector(connector: SslConnector) -> ClientConnector { pub fn with_connector(connector: SslConnector) -> ClientConnector {
let modified = Rc::new(Cell::new(false));
ClientConnector { ClientConnector {
connector, connector,
pool: Rc::new(Pool::new()), pool: Rc::new(Pool::new(Rc::clone(&modified))),
pool_modified: modified,
conn_lifetime: Duration::from_secs(15), conn_lifetime: Duration::from_secs(15),
conn_keep_alive: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(75),
limit: 100, limit: 100,
@ -236,6 +266,7 @@ impl ClientConnector {
available: HashMap::new(), available: HashMap::new(),
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None,
} }
} }
@ -357,31 +388,33 @@ impl ClientConnector {
fn collect(&mut self, periodic: bool) { fn collect(&mut self, periodic: bool) {
let now = Instant::now(); let now = Instant::now();
// collect half acquire keys if self.pool_modified.get() {
if let Some(keys) = self.pool.collect_keys() { // collect half acquire keys
for key in keys { if let Some(keys) = self.pool.collect_keys() {
self.release_key(&key); for key in keys {
self.release_key(&key);
}
} }
}
// collect connections for close // collect connections for close
if let Some(to_close) = self.pool.collect_close() { if let Some(to_close) = self.pool.collect_close() {
for conn in to_close { for conn in to_close {
self.release_key(&conn.key); self.release_key(&conn.key);
self.to_close.push(conn); self.to_close.push(conn);
}
} }
}
// connection connections // connection connections
if let Some(to_release) = self.pool.collect_release() { if let Some(to_release) = self.pool.collect_release() {
for conn in to_release { for conn in to_release {
self.release_key(&conn.key); self.release_key(&conn.key);
// check connection lifetime and the return to available pool // check connection lifetime and the return to available pool
if (now - conn.ts) < self.conn_lifetime { if (now - conn.ts) < self.conn_lifetime {
self.available.entry(conn.key.clone()) self.available.entry(conn.key.clone())
.or_insert_with(VecDeque::new) .or_insert_with(VecDeque::new)
.push_back(Conn(Instant::now(), conn)); .push_back(Conn(Instant::now(), conn));
}
} }
} }
} }
@ -412,6 +445,8 @@ impl ClientConnector {
} }
} }
} }
self.pool_modified.set(false);
} }
fn collect_periodic(&mut self, ctx: &mut Context<Self>) { fn collect_periodic(&mut self, ctx: &mut Context<Self>) {
@ -419,15 +454,58 @@ impl ClientConnector {
// re-schedule next collect period // re-schedule next collect period
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx)); ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx));
} }
fn collect_waiters(&mut self) {
let now = Instant::now();
let mut next = None;
for (_, waiters) in &mut self.waiters {
let mut idx = 0;
while idx < waiters.len() {
if waiters[idx].wait <= now {
let waiter = waiters.swap_remove_back(idx).unwrap();
let _ = waiter.tx.send(Err(ClientConnectorError::Timeout));
} else {
if let Some(n) = next {
if waiters[idx].wait < n {
next = Some(waiters[idx].wait);
}
} else {
next = Some(waiters[idx].wait);
}
idx += 1;
}
}
}
if next.is_some() {
self.install_wait_timeout(next.unwrap());
}
}
fn install_wait_timeout(&mut self, time: Instant) {
if let Some(ref mut wait) = self.wait_timeout {
if wait.0 < time {
return
}
}
let mut timeout = Timeout::new(time-Instant::now(), Arbiter::handle()).unwrap();
let _ = timeout.poll();
self.wait_timeout = Some((time, timeout));
}
} }
impl Handler<Connect> for ClientConnector { impl Handler<Connect> for ClientConnector {
type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>; type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>;
fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
self.collect(false); if self.pool_modified.get() {
self.collect(false);
}
let uri = &msg.uri; let uri = &msg.uri;
let wait_time = msg.wait_time;
let conn_timeout = msg.conn_timeout; let conn_timeout = msg.conn_timeout;
// host name is required // host name is required
@ -469,7 +547,11 @@ impl Handler<Connect> for ClientConnector {
Acquire::NotAvailable => { Acquire::NotAvailable => {
// connection is not available, wait // connection is not available, wait
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let waiter = Waiter{ tx, conn_timeout };
let wait = Instant::now() + wait_time;
self.install_wait_timeout(wait);
let waiter = Waiter{ tx, wait, conn_timeout };
self.waiters.entry(key.clone()).or_insert_with(VecDeque::new) self.waiters.entry(key.clone()).or_insert_with(VecDeque::new)
.push_back(waiter); .push_back(waiter);
return ActorResponse::async( return ActorResponse::async(
@ -563,8 +645,13 @@ impl fut::ActorFuture for Maintenance
fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>) fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>)
-> Poll<Self::Item, Self::Error> -> Poll<Self::Item, Self::Error>
{ {
// collecto connections // collect connections
act.collect(false); if act.pool_modified.get() {
act.collect(false);
}
// collect wait timers
act.collect_waiters();
// check waiters // check waiters
let tmp: &mut ClientConnector = unsafe{mem::transmute(act as &mut _)}; let tmp: &mut ClientConnector = unsafe{mem::transmute(act as &mut _)};
@ -781,11 +868,13 @@ pub struct Pool {
to_close: RefCell<Vec<Connection>>, to_close: RefCell<Vec<Connection>>,
to_release: RefCell<Vec<Connection>>, to_release: RefCell<Vec<Connection>>,
task: RefCell<Option<Task>>, task: RefCell<Option<Task>>,
modified: Rc<Cell<bool>>,
} }
impl Pool { impl Pool {
fn new() -> Pool { fn new(modified: Rc<Cell<bool>>) -> Pool {
Pool { Pool {
modified,
keys: RefCell::new(Vec::new()), keys: RefCell::new(Vec::new()),
to_close: RefCell::new(Vec::new()), to_close: RefCell::new(Vec::new()),
to_release: RefCell::new(Vec::new()), to_release: RefCell::new(Vec::new()),
@ -818,6 +907,7 @@ impl Pool {
} }
fn close(&self, conn: Connection) { fn close(&self, conn: Connection) {
self.modified.set(true);
self.to_close.borrow_mut().push(conn); self.to_close.borrow_mut().push(conn);
if let Some(ref task) = *self.task.borrow() { if let Some(ref task) = *self.task.borrow() {
task.notify() task.notify()
@ -825,6 +915,7 @@ impl Pool {
} }
fn release(&self, conn: Connection) { fn release(&self, conn: Connection) {
self.modified.set(true);
self.to_release.borrow_mut().push(conn); self.to_release.borrow_mut().push(conn);
if let Some(ref task) = *self.task.borrow() { if let Some(ref task) = *self.task.borrow() {
task.notify() task.notify()
@ -832,6 +923,7 @@ impl Pool {
} }
fn release_key(&self, key: Key) { fn release_key(&self, key: Key) {
self.modified.set(true);
self.keys.borrow_mut().push(key); self.keys.borrow_mut().push(key);
if let Some(ref task) = *self.task.borrow() { if let Some(ref task) = *self.task.borrow() {
task.notify() task.notify()

View File

@ -69,6 +69,7 @@ pub struct SendRequest {
state: State, state: State,
conn: Addr<Unsync, ClientConnector>, conn: Addr<Unsync, ClientConnector>,
conn_timeout: Duration, conn_timeout: Duration,
wait_time: Duration,
timeout: Option<Timeout>, timeout: Option<Timeout>,
} }
@ -83,7 +84,8 @@ impl SendRequest {
SendRequest{req, conn, SendRequest{req, conn,
state: State::New, state: State::New,
timeout: None, timeout: None,
conn_timeout: Duration::from_secs(1) wait_time: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1),
} }
} }
@ -93,6 +95,7 @@ impl SendRequest {
state: State::Connection(conn), state: State::Connection(conn),
conn: ClientConnector::from_registry(), conn: ClientConnector::from_registry(),
timeout: None, timeout: None,
wait_time: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1), conn_timeout: Duration::from_secs(1),
} }
} }
@ -115,6 +118,15 @@ impl SendRequest {
self.conn_timeout = timeout; self.conn_timeout = timeout;
self self
} }
/// Set wait time
///
/// If connections pool limits are enabled, wait time indicates max time
/// to wait for available connection. Default value is 5 seconds.
pub fn wait_time(mut self, timeout: Duration) -> Self {
self.wait_time = timeout;
self
}
} }
impl Future for SendRequest { impl Future for SendRequest {
@ -129,6 +141,7 @@ impl Future for SendRequest {
State::New => State::New =>
self.state = State::Connect(self.conn.send(Connect { self.state = State::Connect(self.conn.send(Connect {
uri: self.req.uri().clone(), uri: self.req.uri().clone(),
wait_time: self.wait_time,
conn_timeout: self.conn_timeout, conn_timeout: self.conn_timeout,
})), })),
State::Connect(mut conn) => match conn.poll() { State::Connect(mut conn) => match conn.poll() {