1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

proper handling for client connection release

This commit is contained in:
Nikolay Kim 2018-07-17 17:23:03 +06:00
parent 1af5aa3a3e
commit d43902ee7c

View File

@ -570,6 +570,131 @@ impl ClientConnector {
.push_back(waiter);
rx
}
fn connect_waiter(&mut self, key: Key, waiter: Waiter, ctx: &mut Context<Self>) {
let conn = AcquiredConn(key, Some(self.acq_tx.clone()));
fut::WrapFuture::<ClientConnector>::actfuture(
self.resolver.as_ref().unwrap().send(
ResolveConnect::host_and_port(&conn.0.host, conn.0.port)
.timeout(waiter.conn_timeout),
),
).map_err(|_, _, _| ())
.and_then(move |res, act, _| {
#[cfg(feature = "alpn")]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
}
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
fut::Either::A(
act.connector
.connect_async(&key.host, stream)
.then(move |res| {
match res {
Err(e) => {
let _ = waiter.tx.send(Err(
ClientConnectorError::SslError(e),
));
}
Ok(stream) => {
let _ =
waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
}
}
Ok(())
})
.into_actor(act),
)
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
fut::Either::B(fut::ok(()))
}
}
}
#[cfg(all(feature = "tls", not(feature = "alpn")))]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
}
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
fut::Either::A(
act.connector
.connect_async(&conn.0.host, stream)
.then(|res| {
match res {
Err(e) => {
let _ = waiter.tx.send(Err(
ClientConnectorError::SslError(e),
));
}
Ok(stream) => {
let _ =
waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
}
}
Ok(())
})
.into_actor(act),
)
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
fut::Either::B(fut::ok(()))
}
}
}
#[cfg(not(any(feature = "alpn", feature = "tls")))]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::err(())
}
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
let _ = waiter
.tx
.send(Err(ClientConnectorError::SslIsNotSupported));
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
};
fut::ok(())
}
}
})
.spawn(ctx);
}
}
impl Handler<Pause> for ClientConnector {
@ -777,28 +902,78 @@ impl Handler<Connect> for ClientConnector {
}
impl StreamHandler<AcquiredConnOperation, ()> for ClientConnector {
fn handle(&mut self, msg: AcquiredConnOperation, _: &mut Context<Self>) {
fn handle(&mut self, msg: AcquiredConnOperation, ctx: &mut Context<Self>) {
let now = Instant::now();
// check if we have queued up waiters
let waiter = {
let key = match msg {
AcquiredConnOperation::Close(ref conn) => &conn.key,
AcquiredConnOperation::Release(ref conn) => &conn.key,
AcquiredConnOperation::ReleaseKey(ref key) => key,
};
if let Some(ref mut waiters) = self.waiters.as_mut().unwrap().get_mut(key) {
loop {
if let Some(waiter) = waiters.pop_front() {
if waiter.tx.is_canceled() {
continue;
}
break Some(waiter);
} else {
break None;
}
}
} else {
None
}
};
match msg {
AcquiredConnOperation::Close(conn) => {
self.release_key(&conn.key);
if let Some(waiter) = waiter {
// create new connection
self.connect_waiter(conn.key.clone(), waiter, ctx);
} else {
self.release_key(&conn.key);
}
self.to_close.push(conn);
self.stats.closed += 1;
}
AcquiredConnOperation::Release(conn) => {
self.release_key(&conn.key);
AcquiredConnOperation::Release(mut conn) => {
let alive = (Instant::now() - conn.ts) < self.conn_lifetime;
// check connection lifetime and the return to available pool
if (Instant::now() - conn.ts) < self.conn_lifetime {
self.available
.entry(conn.key.clone())
.or_insert_with(VecDeque::new)
.push_back(Conn(Instant::now(), conn));
if let Some(waiter) = waiter {
// check connection lifetime and the return to available pool
if alive {
// use existing connection
self.stats.reused += 1;
conn.pool = Some(AcquiredConn(
conn.key.clone(),
Some(self.acq_tx.clone()),
));
let _ = waiter.tx.send(Ok(conn));
} else {
// create new connection
self.connect_waiter(conn.key.clone(), waiter, ctx);
}
} else {
self.release_key(&conn.key);
if alive {
self.available
.entry(conn.key.clone())
.or_insert_with(VecDeque::new)
.push_back(Conn(Instant::now(), conn));
}
}
}
AcquiredConnOperation::ReleaseKey(key) => {
self.release_key(&key);
if let Some(waiter) = waiter {
// create new connection
self.connect_waiter(key, waiter, ctx);
} else {
self.release_key(&key);
}
}
}
@ -861,130 +1036,8 @@ impl fut::ActorFuture for Maintenance {
break;
}
Acquire::Available => {
let key = key.clone();
let conn = AcquiredConn(key.clone(), Some(act.acq_tx.clone()));
fut::WrapFuture::<ClientConnector>::actfuture(
act.resolver.as_ref().unwrap().send(
ResolveConnect::host_and_port(&conn.0.host, conn.0.port)
.timeout(waiter.conn_timeout),
),
).map_err(|_, _, _| ())
.and_then(move |res, act, _| {
#[cfg(feature = "alpn")]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
}
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
fut::Either::A(
act.connector
.connect_async(&key.host, stream)
.then(move |res| {
match res {
Err(e) => {
let _ = waiter.tx.send(
Err(ClientConnectorError::SslError(e)));
}
Ok(stream) => {
let _ = waiter.tx.send(
Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)),
);
}
}
Ok(())
})
.into_actor(act),
)
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
fut::Either::B(fut::ok(()))
}
}
}
#[cfg(all(feature = "tls", not(feature = "alpn")))]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
}
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
fut::Either::A(
act.connector
.connect_async(&conn.0.host, stream)
.then(|res| {
match res {
Err(e) => {
let _ = waiter.tx.send(Err(
ClientConnectorError::SslError(e),
));
}
Ok(stream) => {
let _ = waiter.tx.send(
Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)),
);
}
}
Ok(())
})
.into_actor(act),
)
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
fut::Either::B(fut::ok(()))
}
}
}
#[cfg(not(any(feature = "alpn", feature = "tls")))]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::err(())
}
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
let _ = waiter.tx.send(Err(
ClientConnectorError::SslIsNotSupported,
));
} else {
let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(),
Some(conn),
Box::new(stream),
)));
};
fut::ok(())
}
}
})
.spawn(ctx);
// create new connection
act.connect_waiter(key.clone(), waiter, ctx);
}
}
}