1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

fix client connector wait queue

This commit is contained in:
Nikolay Kim 2018-07-18 01:23:56 +06:00
parent 373f2e5028
commit 85672d1379
2 changed files with 176 additions and 259 deletions

View File

@ -46,8 +46,6 @@ pub struct ClientConnectorStats {
pub errors: usize, pub errors: usize,
/// Number of connection timeouts /// Number of connection timeouts
pub timeouts: usize, pub timeouts: usize,
/// Number of released connections
pub released: usize,
} }
#[derive(Debug)] #[derive(Debug)]
@ -413,14 +411,14 @@ impl ClientConnector {
} }
if self.limit_per_host > 0 { if self.limit_per_host > 0 {
if let Some(per_host) = self.acquired_per_host.get(key) { if let Some(per_host) = self.acquired_per_host.get(key) {
if self.limit_per_host >= *per_host { if *per_host >= self.limit_per_host {
return Acquire::NotAvailable; return Acquire::NotAvailable;
} }
} }
} }
} else if self.limit_per_host > 0 { } else if self.limit_per_host > 0 {
if let Some(per_host) = self.acquired_per_host.get(key) { if let Some(per_host) = self.acquired_per_host.get(key) {
if self.limit_per_host >= *per_host { if *per_host >= self.limit_per_host {
return Acquire::NotAvailable; return Acquire::NotAvailable;
} }
} }
@ -469,7 +467,9 @@ impl ClientConnector {
} }
fn release_key(&mut self, key: &Key) { fn release_key(&mut self, key: &Key) {
self.acquired -= 1; if self.acquired > 0 {
self.acquired -= 1;
}
let per_host = if let Some(per_host) = self.acquired_per_host.get(key) { let per_host = if let Some(per_host) = self.acquired_per_host.get(key) {
*per_host *per_host
} else { } else {
@ -514,23 +514,23 @@ impl ClientConnector {
let mut next = None; let mut next = None;
for waiters in self.waiters.as_mut().unwrap().values_mut() { for waiters in self.waiters.as_mut().unwrap().values_mut() {
let mut idx = 0; let mut new_waiters = VecDeque::new();
while idx < waiters.len() { while let Some(waiter) = waiters.pop_front() {
if waiters[idx].wait <= now { if waiter.wait <= now {
self.stats.timeouts += 1; self.stats.timeouts += 1;
let waiter = waiters.swap_remove_back(idx).unwrap();
let _ = waiter.tx.send(Err(ClientConnectorError::Timeout)); let _ = waiter.tx.send(Err(ClientConnectorError::Timeout));
} else { } else {
if let Some(n) = next { if let Some(n) = next {
if waiters[idx].wait < n { if waiter.wait < n {
next = Some(waiters[idx].wait); next = Some(waiter.wait);
} }
} else { } else {
next = Some(waiters[idx].wait); next = Some(waiter.wait);
} }
idx += 1; new_waiters.push_back(waiter);
} }
} }
*waiters = new_waiters;
} }
if next.is_some() { if next.is_some() {
@ -573,20 +573,56 @@ impl ClientConnector {
rx rx
} }
fn connect_waiter(&mut self, key: Key, waiter: Waiter, ctx: &mut Context<Self>) { fn check_availibility(&mut self, ctx: &mut Context<ClientConnector>) {
let conn = AcquiredConn(key, Some(self.acq_tx.clone())); // check waiters
let mut act_waiters = self.waiters.take().unwrap();
for (key, ref mut waiters) in &mut act_waiters {
while let Some(waiter) = waiters.pop_front() {
if waiter.tx.is_canceled() {
continue;
}
match self.acquire(key) {
Acquire::Acquired(mut conn) => {
// use existing connection
self.stats.reused += 1;
conn.pool =
Some(AcquiredConn(key.clone(), Some(self.acq_tx.clone())));
let _ = waiter.tx.send(Ok(conn));
}
Acquire::NotAvailable => {
waiters.push_front(waiter);
break;
}
Acquire::Available => {
// create new connection
self.connect_waiter(key.clone(), waiter, ctx);
}
}
}
}
self.waiters = Some(act_waiters);
}
fn connect_waiter(&mut self, key: Key, waiter: Waiter, ctx: &mut Context<Self>) {
let conn = AcquiredConn(key.clone(), Some(self.acq_tx.clone()));
let key2 = key.clone();
fut::WrapFuture::<ClientConnector>::actfuture( fut::WrapFuture::<ClientConnector>::actfuture(
self.resolver.as_ref().unwrap().send( self.resolver.as_ref().unwrap().send(
ResolveConnect::host_and_port(&conn.0.host, conn.0.port) ResolveConnect::host_and_port(&conn.0.host, conn.0.port)
.timeout(waiter.conn_timeout), .timeout(waiter.conn_timeout),
), ),
).map_err(|_, _, _| ()) ).map_err(move |_, act, _| {
act.release_key(&key2);
()
})
.and_then(move |res, act, _| { .and_then(move |res, act, _| {
#[cfg(feature = "alpn")] #[cfg(feature = "alpn")]
match res { match res {
Err(err) => { Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into())); let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(())) fut::Either::B(fut::err(()))
} }
@ -596,7 +632,8 @@ impl ClientConnector {
fut::Either::A( fut::Either::A(
act.connector act.connector
.connect_async(&key.host, stream) .connect_async(&key.host, stream)
.then(move |res| { .into_actor(act)
.then(move |res, act, _| {
match res { match res {
Err(e) => { Err(e) => {
let _ = waiter.tx.send(Err( let _ = waiter.tx.send(Err(
@ -612,9 +649,8 @@ impl ClientConnector {
))); )));
} }
} }
Ok(()) fut::ok(())
}) }),
.into_actor(act),
) )
} else { } else {
let _ = waiter.tx.send(Ok(Connection::new( let _ = waiter.tx.send(Ok(Connection::new(
@ -630,7 +666,6 @@ impl ClientConnector {
#[cfg(all(feature = "tls", not(feature = "alpn")))] #[cfg(all(feature = "tls", not(feature = "alpn")))]
match res { match res {
Err(err) => { Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into())); let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(())) fut::Either::B(fut::err(()))
} }
@ -640,7 +675,8 @@ impl ClientConnector {
fut::Either::A( fut::Either::A(
act.connector act.connector
.connect_async(&conn.0.host, stream) .connect_async(&conn.0.host, stream)
.then(|res| { .into_actor(act)
.then(move |res, _, _| {
match res { match res {
Err(e) => { Err(e) => {
let _ = waiter.tx.send(Err( let _ = waiter.tx.send(Err(
@ -656,9 +692,8 @@ impl ClientConnector {
))); )));
} }
} }
Ok(()) fut::ok(())
}) }),
.into_actor(act),
) )
} else { } else {
let _ = waiter.tx.send(Ok(Connection::new( let _ = waiter.tx.send(Ok(Connection::new(
@ -674,7 +709,6 @@ impl ClientConnector {
#[cfg(not(any(feature = "alpn", feature = "tls")))] #[cfg(not(any(feature = "alpn", feature = "tls")))]
match res { match res {
Err(err) => { Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into())); let _ = waiter.tx.send(Err(err.into()));
fut::err(()) fut::err(())
} }
@ -725,7 +759,7 @@ impl Handler<Resume> for ClientConnector {
impl Handler<Connect> for ClientConnector { impl Handler<Connect> for ClientConnector {
type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>; type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>;
fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: Connect, ctx: &mut Self::Context) -> Self::Result {
let uri = &msg.uri; let uri = &msg.uri;
let wait_timeout = msg.wait_timeout; let wait_timeout = msg.wait_timeout;
let conn_timeout = msg.conn_timeout; let conn_timeout = msg.conn_timeout;
@ -761,239 +795,142 @@ impl Handler<Connect> for ClientConnector {
// check pause state // check pause state
if self.paused.is_paused() { if self.paused.is_paused() {
let rx = self.wait_for(key, wait_timeout, conn_timeout); let rx = self.wait_for(key.clone(), wait_timeout, conn_timeout);
self.stats.waits += 1; self.stats.waits += 1;
return ActorResponse::async( return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected) rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self) .into_actor(self)
.and_then(|res, _, _| match res { .and_then(move |res, act, ctx| match res {
Ok(conn) => fut::ok(conn), Ok(conn) => fut::ok(conn),
Err(err) => fut::err(err), Err(err) => {
match err {
ClientConnectorError::Timeout => (),
_ => {
act.release_key(&key);
}
}
act.stats.errors += 1;
act.check_availibility(ctx);
fut::err(err)
}
}),
);
}
// do not re-use websockets connection
if !proto.is_http() {
let (tx, rx) = oneshot::channel();
let wait = Instant::now() + wait_timeout;
let waiter = Waiter {
tx,
wait,
conn_timeout,
};
self.connect_waiter(key.clone(), waiter, ctx);
return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self)
.and_then(move |res, act, ctx| match res {
Ok(conn) => fut::ok(conn),
Err(err) => {
act.stats.errors += 1;
act.release_key(&key);
act.check_availibility(ctx);
fut::err(err)
}
}), }),
); );
} }
// acquire connection // acquire connection
let pool = if proto.is_http() { match self.acquire(&key) {
match self.acquire(&key) { Acquire::Acquired(mut conn) => {
Acquire::Acquired(mut conn) => { // use existing connection
// use existing connection conn.pool = Some(AcquiredConn(key, Some(self.acq_tx.clone())));
conn.pool = Some(AcquiredConn(key, Some(self.acq_tx.clone()))); self.stats.reused += 1;
self.stats.reused += 1; ActorResponse::async(fut::ok(conn))
return ActorResponse::async(fut::ok(conn));
}
Acquire::NotAvailable => {
// connection is not available, wait
let rx = self.wait_for(key, wait_timeout, conn_timeout);
self.stats.waits += 1;
return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self)
.and_then(|res, _, _| match res {
Ok(conn) => fut::ok(conn),
Err(err) => fut::err(err),
}),
);
}
Acquire::Available => Some(self.acq_tx.clone()),
} }
} else { Acquire::NotAvailable => {
None // connection is not available, wait
}; let rx = self.wait_for(key.clone(), wait_timeout, conn_timeout);
let conn = AcquiredConn(key, pool); self.stats.waits += 1;
{ ActorResponse::async(
ActorResponse::async( rx.map_err(|_| ClientConnectorError::Disconnected)
self.resolver .into_actor(self)
.as_ref() .and_then(move |res, act, ctx| match res {
.unwrap() Ok(conn) => fut::ok(conn),
.send(
ResolveConnect::host_and_port(&conn.0.host, port)
.timeout(conn_timeout),
)
.into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected)
.and_then(move |res, act, _| {
#[cfg(feature = "alpn")]
match res {
Err(err) => { Err(err) => {
act.stats.opened += 1; match err {
fut::Either::B(fut::err(err.into())) ClientConnectorError::Timeout => (),
} _ => {
Ok(stream) => { act.release_key(&key);
act.stats.opened += 1; }
if proto.is_secure() {
fut::Either::A(
act.connector
.connect_async(&conn.0.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| {
Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)
})
.into_actor(act),
)
} else {
fut::Either::B(fut::ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)))
} }
act.stats.errors += 1;
act.check_availibility(ctx);
fut::err(err)
} }
} }),
)
}
Acquire::Available => {
let (tx, rx) = oneshot::channel();
let wait = Instant::now() + wait_timeout;
let waiter = Waiter {
tx,
wait,
conn_timeout,
};
self.connect_waiter(key.clone(), waiter, ctx);
#[cfg(all(feature = "tls", not(feature = "alpn")))] ActorResponse::async(
match res { rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self)
.and_then(move |res, act, ctx| match res {
Ok(conn) => fut::ok(conn),
Err(err) => { Err(err) => {
act.stats.opened += 1; act.stats.errors += 1;
fut::Either::B(fut::err(err.into())) act.release_key(&key);
act.check_availibility(ctx);
fut::err(err)
} }
Ok(stream) => { }),
act.stats.opened += 1; )
if proto.is_secure() { }
fut::Either::A(
act.connector
.connect_async(&conn.0.host, stream)
.map_err(ClientConnectorError::SslError)
.map(|stream| {
Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)
})
.into_actor(act),
)
} else {
fut::Either::B(fut::ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)))
}
}
}
#[cfg(not(any(feature = "alpn", feature = "tls")))]
match res {
Err(err) => {
act.stats.opened += 1;
fut::err(err.into())
}
Ok(stream) => {
act.stats.opened += 1;
if proto.is_secure() {
fut::err(ClientConnectorError::SslIsNotSupported)
} else {
fut::ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
))
}
}
}
}),
)
} }
} }
} }
impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector { impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector {
fn handle(&mut self, msg: AcquiredConnOperation, ctx: &mut Context<Self>) { fn handle(&mut self, msg: AcquiredConnOperation, ctx: &mut Context<Self>) {
let now = Instant::now();
self.stats.released += 1;
// check if we have queued up waiters
let waiter = {
let key = match msg {
AcquiredConnOperation::Close(ref conn) => &conn.key,
AcquiredConnOperation::Release(ref conn) => &conn.key,
AcquiredConnOperation::ReleaseKey(ref key) => key,
};
if let Some(ref mut waiters) = self.waiters.as_mut().unwrap().get_mut(key) {
loop {
if let Some(waiter) = waiters.pop_front() {
if waiter.tx.is_canceled() {
continue;
}
break Some(waiter);
} else {
break None;
}
}
} else {
None
}
};
match msg { match msg {
AcquiredConnOperation::Close(conn) => { AcquiredConnOperation::Close(conn) => {
if let Some(waiter) = waiter { self.release_key(&conn.key);
// create new connection
self.connect_waiter(conn.key.clone(), waiter, ctx);
} else {
self.release_key(&conn.key);
}
self.to_close.push(conn); self.to_close.push(conn);
self.stats.closed += 1; self.stats.closed += 1;
} }
AcquiredConnOperation::Release(mut conn) => { AcquiredConnOperation::Release(conn) => {
let alive = (Instant::now() - conn.ts) < self.conn_lifetime; self.release_key(&conn.key);
if (Instant::now() - conn.ts) < self.conn_lifetime {
if let Some(waiter) = waiter { self.available
// check connection lifetime and the return to available pool .entry(conn.key.clone())
if alive { .or_insert_with(VecDeque::new)
// use existing connection .push_back(Conn(Instant::now(), conn));
self.stats.reused += 1;
conn.pool = Some(AcquiredConn(
conn.key.clone(),
Some(self.acq_tx.clone()),
));
let _ = waiter.tx.send(Ok(conn));
} else {
// create new connection
self.connect_waiter(conn.key.clone(), waiter, ctx);
}
} else { } else {
self.release_key(&conn.key); self.to_close.push(conn);
if alive { self.stats.closed += 1;
self.available
.entry(conn.key.clone())
.or_insert_with(VecDeque::new)
.push_back(Conn(Instant::now(), conn));
}
} }
} }
AcquiredConnOperation::ReleaseKey(key) => { AcquiredConnOperation::ReleaseKey(key) => {
if let Some(waiter) = waiter { // closed
// create new connection self.stats.closed += 1;
self.connect_waiter(key, waiter, ctx); self.release_key(&key);
} else {
self.release_key(&key);
}
} }
} }
// check keep-alive self.check_availibility(ctx);
for conns in self.available.values_mut() {
while !conns.is_empty() {
if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive
|| (now - conns[0].1.ts) > self.conn_lifetime
{
let conn = conns.pop_front().unwrap().1;
self.to_close.push(conn);
self.stats.closed += 1;
} else {
break;
}
}
}
} }
} }
@ -1018,35 +955,8 @@ impl fut::ActorFuture for Maintenance {
act.collect_waiters(); act.collect_waiters();
// check waiters // check waiters
let mut act_waiters = act.waiters.take().unwrap(); act.check_availibility(ctx);
for (key, ref mut waiters) in &mut act_waiters {
while let Some(waiter) = waiters.pop_front() {
if waiter.tx.is_canceled() {
continue;
}
match act.acquire(key) {
Acquire::Acquired(mut conn) => {
// use existing connection
act.stats.reused += 1;
conn.pool =
Some(AcquiredConn(key.clone(), Some(act.acq_tx.clone())));
let _ = waiter.tx.send(Ok(conn));
}
Acquire::NotAvailable => {
waiters.push_front(waiter);
break;
}
Acquire::Available => {
// create new connection
act.connect_waiter(key.clone(), waiter, ctx);
}
}
}
}
act.waiters = Some(act_waiters);
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
@ -1181,14 +1091,14 @@ impl Connection {
Connection::new(Key::empty(), None, Box::new(io)) Connection::new(Key::empty(), None, Box::new(io))
} }
/// Close connection pool /// Close connection
pub fn close(mut self) { pub fn close(mut self) {
if let Some(mut pool) = self.pool.take() { if let Some(mut pool) = self.pool.take() {
pool.close(self) pool.close(self)
} }
} }
/// Release this connection from the connection pool /// Release this connection to the connection pool
pub fn release(mut self) { pub fn release(mut self) {
if let Some(mut pool) = self.pool.take() { if let Some(mut pool) = self.pool.take() {
pool.release(self) pool.release(self)

View File

@ -17,7 +17,7 @@ use context::{ActorHttpContext, Frame};
use error::Error; use error::Error;
use error::PayloadError; use error::PayloadError;
use header::ContentEncoding; use header::ContentEncoding;
use http::Method; use http::{Method, Uri};
use httpmessage::HttpMessage; use httpmessage::HttpMessage;
use server::input::PayloadStream; use server::input::PayloadStream;
use server::WriterState; use server::WriterState;
@ -203,7 +203,8 @@ impl Future for SendRequest {
should_decompress: self.req.response_decompress(), should_decompress: self.req.response_decompress(),
write_state: RunningState::Running, write_state: RunningState::Running,
timeout: Some(Delay::new(Instant::now() + timeout)), timeout: Some(Delay::new(Instant::now() + timeout)),
close: self.req.method() == &Method::HEAD, meth: self.req.method().clone(),
path: self.req.uri().clone(),
}); });
self.state = State::Send(pl); self.state = State::Send(pl);
} }
@ -249,7 +250,8 @@ pub struct Pipeline {
should_decompress: bool, should_decompress: bool,
write_state: RunningState, write_state: RunningState,
timeout: Option<Delay>, timeout: Option<Delay>,
close: bool, meth: Method,
path: Uri,
} }
enum IoBody { enum IoBody {
@ -283,7 +285,7 @@ impl RunningState {
impl Pipeline { impl Pipeline {
fn release_conn(&mut self) { fn release_conn(&mut self) {
if let Some(conn) = self.conn.take() { if let Some(conn) = self.conn.take() {
if self.close { if self.meth == Method::HEAD {
conn.close() conn.close()
} else { } else {
conn.release() conn.release()
@ -529,6 +531,11 @@ impl Pipeline {
impl Drop for Pipeline { impl Drop for Pipeline {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(conn) = self.conn.take() { if let Some(conn) = self.conn.take() {
debug!(
"Client http transaction is not completed, dropping connection: {:?} {:?}",
self.meth,
self.path,
);
conn.close() conn.close()
} }
} }