2018-11-12 08:12:54 +01:00
|
|
|
use std::cell::RefCell;
|
2019-01-27 20:40:26 +01:00
|
|
|
use std::collections::VecDeque;
|
2019-11-15 10:54:11 +01:00
|
|
|
use std::future::Future;
|
2021-02-16 09:27:14 +01:00
|
|
|
use std::ops::Deref;
|
2019-11-15 10:54:11 +01:00
|
|
|
use std::pin::Pin;
|
2020-08-09 22:49:43 +02:00
|
|
|
use std::rc::Rc;
|
2021-02-16 09:27:14 +01:00
|
|
|
use std::sync::Arc;
|
2019-11-15 10:54:11 +01:00
|
|
|
use std::task::{Context, Poll};
|
2018-11-12 08:12:54 +01:00
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
2021-01-04 00:47:04 +01:00
|
|
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
|
|
|
use actix_rt::time::{sleep, Sleep};
|
2018-12-11 03:08:33 +01:00
|
|
|
use actix_service::Service;
|
2021-01-15 03:11:10 +01:00
|
|
|
use ahash::AHashMap;
|
2021-02-07 02:00:40 +01:00
|
|
|
use futures_core::future::LocalBoxFuture;
|
2019-04-20 03:03:44 +02:00
|
|
|
use http::uri::Authority;
|
2020-02-20 03:03:53 +01:00
|
|
|
use pin_project::pin_project;
|
2021-02-16 09:27:14 +01:00
|
|
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2020-03-07 03:09:31 +01:00
|
|
|
use super::config::ConnectorConfig;
|
2021-02-16 09:27:14 +01:00
|
|
|
use super::connection::{ConnectionType, H2Connection, IoConnection};
|
2019-03-13 22:41:40 +01:00
|
|
|
use super::error::ConnectError;
|
2020-03-07 03:09:31 +01:00
|
|
|
use super::h2proto::handshake;
|
2019-04-20 03:03:44 +02:00
|
|
|
use super::Connect;
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2019-01-29 05:41:09 +01:00
|
|
|
#[derive(Clone, Copy, PartialEq)]
|
2019-04-08 20:09:57 +02:00
|
|
|
/// Protocol version
|
2019-01-29 05:41:09 +01:00
|
|
|
pub enum Protocol {
|
|
|
|
Http1,
|
|
|
|
Http2,
|
|
|
|
}
|
|
|
|
|
2018-11-12 08:12:54 +01:00
|
|
|
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
|
|
|
|
pub(crate) struct Key {
|
|
|
|
authority: Authority,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<Authority> for Key {
|
|
|
|
fn from(authority: Authority) -> Key {
|
|
|
|
Key { authority }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
/// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key
|
|
|
|
pub(crate) struct ConnectionPool<S, Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2021-02-16 09:27:14 +01:00
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
2021-02-16 09:27:14 +01:00
|
|
|
connector: Rc<S>,
|
|
|
|
inner: ConnectionPoolInner<Io>,
|
|
|
|
}
|
2020-07-10 23:35:22 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
/// wrapper type for check the ref count of Rc.
|
|
|
|
struct ConnectionPoolInner<Io>(Rc<ConnectionPoolInnerPriv<Io>>)
|
|
|
|
where
|
|
|
|
Io: AsyncWrite + Unpin + 'static;
|
2020-07-10 23:35:22 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl<Io> ConnectionPoolInner<Io>
|
|
|
|
where
|
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
|
|
|
{
|
|
|
|
/// spawn a async for graceful shutdown h1 Io type with a timeout.
|
|
|
|
fn close(&self, conn: ConnectionType<Io>) {
|
|
|
|
if let Some(timeout) = self.config.disconnect_timeout {
|
|
|
|
if let ConnectionType::H1(io) = conn {
|
|
|
|
actix_rt::spawn(CloseConnection::new(io, timeout));
|
|
|
|
}
|
|
|
|
}
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl<Io> Clone for ConnectionPoolInner<Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2021-02-16 09:27:14 +01:00
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
|
|
|
fn clone(&self) -> Self {
|
2021-02-16 09:27:14 +01:00
|
|
|
Self(Rc::clone(&self.0))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<Io> Deref for ConnectionPoolInner<Io>
|
|
|
|
where
|
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
|
|
|
{
|
|
|
|
type Target = ConnectionPoolInnerPriv<Io>;
|
|
|
|
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
|
|
&*self.0
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl<Io> Drop for ConnectionPoolInner<Io>
|
|
|
|
where
|
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
|
|
|
{
|
2020-08-09 22:49:43 +02:00
|
|
|
fn drop(&mut self) {
|
2021-02-16 09:27:14 +01:00
|
|
|
// When strong count is one it means the pool is dropped
|
|
|
|
// remove and drop all Io types.
|
|
|
|
if Rc::strong_count(&self.0) == 1 {
|
|
|
|
self.permits.close();
|
|
|
|
std::mem::take(&mut *self.available.borrow_mut())
|
|
|
|
.into_iter()
|
|
|
|
.for_each(|(_, conns)| {
|
|
|
|
conns.into_iter().for_each(|pooled| self.close(pooled.conn))
|
|
|
|
});
|
|
|
|
}
|
2020-08-09 22:49:43 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
struct ConnectionPoolInnerPriv<Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2021-02-16 09:27:14 +01:00
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
|
|
|
{
|
|
|
|
config: ConnectorConfig,
|
|
|
|
available: RefCell<AHashMap<Key, VecDeque<PooledConnection<Io>>>>,
|
|
|
|
permits: Arc<Semaphore>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S, Io> ConnectionPool<S, Io>
|
|
|
|
where
|
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
|
|
|
{
|
|
|
|
/// construct a new connection pool.
|
|
|
|
///
|
|
|
|
/// [`super::config::ConnectorConfig`]'s `limit` is used as the max permits allowed
|
|
|
|
/// for on flight connections.
|
|
|
|
///
|
|
|
|
/// The pool can only have equal to `limit` amount of requests spawning/using Io type
|
|
|
|
/// concurrently.
|
|
|
|
///
|
|
|
|
/// Any requests beyond limit would be wait in fifo order and get notified in async
|
|
|
|
/// manner by [`tokio::sync::Semaphore`]
|
|
|
|
pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self {
|
|
|
|
let permits = Arc::new(Semaphore::new(config.limit));
|
|
|
|
let available = RefCell::new(AHashMap::default());
|
|
|
|
let connector = Rc::new(connector);
|
|
|
|
|
|
|
|
let inner = ConnectionPoolInner(Rc::new(ConnectionPoolInnerPriv {
|
|
|
|
config,
|
|
|
|
available,
|
|
|
|
permits,
|
|
|
|
}));
|
|
|
|
|
|
|
|
Self { connector, inner }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S, Io> Clone for ConnectionPool<S, Io>
|
|
|
|
where
|
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
|
|
|
{
|
|
|
|
fn clone(&self) -> Self {
|
|
|
|
Self {
|
|
|
|
connector: self.connector.clone(),
|
|
|
|
inner: self.inner.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S, Io> Service<Connect> for ConnectionPool<S, Io>
|
|
|
|
where
|
|
|
|
S: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
|
2019-11-18 13:42:27 +01:00
|
|
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
2018-11-15 20:10:23 +01:00
|
|
|
type Response = IoConnection<Io>;
|
2019-03-13 22:41:40 +01:00
|
|
|
type Error = ConnectError;
|
2019-11-18 13:42:27 +01:00
|
|
|
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2021-02-07 02:00:40 +01:00
|
|
|
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
2021-02-16 09:27:14 +01:00
|
|
|
self.connector.poll_ready(cx)
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-07 02:00:40 +01:00
|
|
|
fn call(&self, req: Connect) -> Self::Future {
|
2021-02-16 09:27:14 +01:00
|
|
|
let connector = self.connector.clone();
|
|
|
|
let inner = self.inner.clone();
|
2019-11-18 13:42:27 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
Box::pin(async move {
|
2019-12-05 18:35:43 +01:00
|
|
|
let key = if let Some(authority) = req.uri.authority() {
|
2019-11-18 13:42:27 +01:00
|
|
|
authority.clone().into()
|
|
|
|
} else {
|
2020-05-07 19:26:48 +02:00
|
|
|
return Err(ConnectError::Unresolved);
|
2019-11-18 13:42:27 +01:00
|
|
|
};
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
// acquire an owned permit and carry it with connection
|
|
|
|
let permit = inner
|
|
|
|
.permits
|
|
|
|
.clone()
|
|
|
|
.acquire_owned()
|
|
|
|
.await
|
|
|
|
// TODO: use specific error for semaphore acquire error
|
|
|
|
.map_err(|_| ConnectError::NoRecords)?;
|
|
|
|
|
|
|
|
// check if there is idle connection for given key.
|
|
|
|
let mut map = inner.available.borrow_mut();
|
|
|
|
|
|
|
|
let mut conn = None;
|
|
|
|
if let Some(conns) = map.get_mut(&key) {
|
|
|
|
let now = Instant::now();
|
|
|
|
while let Some(mut c) = conns.pop_front() {
|
|
|
|
// check the lifetime and drop connection that live for too long.
|
|
|
|
if (now - c.used) > inner.config.conn_keep_alive
|
|
|
|
|| (now - c.created) > inner.config.conn_lifetime
|
|
|
|
{
|
|
|
|
inner.close(c.conn);
|
|
|
|
// check if the connection is still usable.
|
|
|
|
} else {
|
|
|
|
if let ConnectionType::H1(ref mut io) = c.conn {
|
|
|
|
let check = ConnectionCheckFuture { io };
|
|
|
|
match check.await {
|
|
|
|
ConnectionState::Break => {
|
|
|
|
inner.close(c.conn);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
ConnectionState::Skip => continue,
|
|
|
|
ConnectionState::Live => conn = Some(c),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
conn = Some(c);
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
2019-11-18 13:42:27 +01:00
|
|
|
}
|
2021-02-16 09:27:14 +01:00
|
|
|
};
|
2019-11-18 13:42:27 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
// drop map early to end the borrow_mut of RefCell.
|
|
|
|
drop(map);
|
2020-03-07 03:09:31 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
// construct acquired. It's used to put Io type back to pool/ close the Io type.
|
|
|
|
// permit is carried with the whole lifecycle of Acquired.
|
|
|
|
let acquired = Some(Acquired { key, inner, permit });
|
|
|
|
|
|
|
|
// match the connection and spawn new one if did not get anything.
|
|
|
|
match conn {
|
|
|
|
Some(conn) => Ok(IoConnection::new(conn.conn, conn.created, acquired)),
|
|
|
|
None => {
|
|
|
|
let (io, proto) = connector.call(req).await?;
|
2019-11-18 13:42:27 +01:00
|
|
|
|
|
|
|
if proto == Protocol::Http1 {
|
|
|
|
Ok(IoConnection::new(
|
|
|
|
ConnectionType::H1(io),
|
|
|
|
Instant::now(),
|
2021-02-16 09:27:14 +01:00
|
|
|
acquired,
|
2019-11-18 13:42:27 +01:00
|
|
|
))
|
|
|
|
} else {
|
2021-02-16 09:27:14 +01:00
|
|
|
let config = &acquired.as_ref().unwrap().inner.config;
|
|
|
|
let (sender, connection) = handshake(io, config).await?;
|
2019-11-18 13:42:27 +01:00
|
|
|
Ok(IoConnection::new(
|
2021-02-07 04:51:36 +01:00
|
|
|
ConnectionType::H2(H2Connection::new(sender, connection)),
|
2019-11-18 13:42:27 +01:00
|
|
|
Instant::now(),
|
2021-02-16 09:27:14 +01:00
|
|
|
acquired,
|
2019-11-18 13:42:27 +01:00
|
|
|
))
|
|
|
|
}
|
|
|
|
}
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
2021-02-16 09:27:14 +01:00
|
|
|
})
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
/// Type for check the connection and determine if it's usable.
|
|
|
|
struct ConnectionCheckFuture<'a, Io> {
|
|
|
|
io: &'a mut Io,
|
|
|
|
}
|
|
|
|
|
|
|
|
enum ConnectionState {
|
|
|
|
Live,
|
|
|
|
Break,
|
|
|
|
Skip,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<Io> Future for ConnectionCheckFuture<'_, Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2021-02-16 09:27:14 +01:00
|
|
|
Io: AsyncRead + Unpin,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
2021-02-16 09:27:14 +01:00
|
|
|
type Output = ConnectionState;
|
|
|
|
|
|
|
|
// this future is only used to get access to Context.
|
|
|
|
// It should never return Poll::Pending.
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
|
|
let this = self.get_mut();
|
|
|
|
let mut buf = [0; 2];
|
|
|
|
let mut read_buf = ReadBuf::new(&mut buf);
|
|
|
|
|
|
|
|
let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) {
|
|
|
|
// io is pending and new data would wake up it.
|
|
|
|
Poll::Pending => ConnectionState::Live,
|
|
|
|
// io have data inside. drop it.
|
|
|
|
Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => {
|
|
|
|
ConnectionState::Break
|
|
|
|
}
|
|
|
|
// otherwise skip to next.
|
|
|
|
_ => ConnectionState::Skip,
|
|
|
|
};
|
|
|
|
|
|
|
|
Poll::Ready(state)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct PooledConnection<Io> {
|
|
|
|
conn: ConnectionType<Io>,
|
|
|
|
used: Instant,
|
|
|
|
created: Instant,
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
#[pin_project]
|
|
|
|
struct CloseConnection<Io> {
|
|
|
|
io: Io,
|
|
|
|
#[pin]
|
|
|
|
timeout: Sleep,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<Io> CloseConnection<Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2021-02-16 09:27:14 +01:00
|
|
|
Io: AsyncWrite + Unpin,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
2021-02-16 09:27:14 +01:00
|
|
|
fn new(io: Io, timeout: Duration) -> Self {
|
|
|
|
CloseConnection {
|
|
|
|
io,
|
|
|
|
timeout: sleep(timeout),
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl<Io> Future for CloseConnection<Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2021-02-16 09:27:14 +01:00
|
|
|
Io: AsyncWrite + Unpin,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
2021-02-16 09:27:14 +01:00
|
|
|
type Output = ();
|
|
|
|
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
|
|
|
let this = self.project();
|
|
|
|
|
|
|
|
match this.timeout.poll(cx) {
|
|
|
|
Poll::Ready(_) => Poll::Ready(()),
|
|
|
|
Poll::Pending => Pin::new(this.io).poll_shutdown(cx).map(|_| ()),
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
pub(crate) struct Acquired<Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2021-02-16 09:27:14 +01:00
|
|
|
Io: AsyncWrite + Unpin + 'static,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
|
|
|
key: Key,
|
2021-02-16 09:27:14 +01:00
|
|
|
inner: ConnectionPoolInner<Io>,
|
|
|
|
permit: OwnedSemaphorePermit,
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl<Io> Acquired<Io>
|
2018-11-12 08:12:54 +01:00
|
|
|
where
|
2019-11-18 13:42:27 +01:00
|
|
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
2018-11-12 08:12:54 +01:00
|
|
|
{
|
2021-02-16 09:27:14 +01:00
|
|
|
// close the Io type.
|
|
|
|
pub(crate) fn close(&mut self, conn: IoConnection<Io>) {
|
|
|
|
let (conn, _) = conn.into_inner();
|
|
|
|
self.inner.close(conn);
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
2019-11-18 13:42:27 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
// put the Io type back to pool.
|
|
|
|
pub(crate) fn release(&mut self, conn: IoConnection<Io>) {
|
|
|
|
let (io, created) = conn.into_inner();
|
|
|
|
let Acquired { key, inner, .. } = self;
|
|
|
|
inner
|
|
|
|
.available
|
|
|
|
.borrow_mut()
|
|
|
|
.entry(key.clone())
|
|
|
|
.or_insert_with(VecDeque::new)
|
|
|
|
.push_back(PooledConnection {
|
|
|
|
conn: io,
|
|
|
|
created,
|
|
|
|
used: Instant::now(),
|
|
|
|
});
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
// a no op bind. used to stop clippy warning without adding allow attribute.
|
|
|
|
let _permit = &mut self.permit;
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
use std::cell::Cell;
|
|
|
|
use std::io;
|
2019-01-29 05:41:09 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
use http::Uri;
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
use crate::client::connection::IoConnection;
|
2018-11-14 07:53:30 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
// A stream type always return pending on async read.
|
|
|
|
// mock a usable tcp stream that ready to be used as client
|
|
|
|
struct TestStream(Rc<Cell<usize>>);
|
2018-11-14 07:53:30 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl Drop for TestStream {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.0.set(self.0.get() - 1);
|
|
|
|
}
|
2018-11-14 07:53:30 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl AsyncRead for TestStream {
|
|
|
|
fn poll_read(
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
_: &mut Context<'_>,
|
|
|
|
_: &mut ReadBuf<'_>,
|
|
|
|
) -> Poll<io::Result<()>> {
|
|
|
|
Poll::Pending
|
|
|
|
}
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl AsyncWrite for TestStream {
|
|
|
|
fn poll_write(
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
_: &mut Context<'_>,
|
|
|
|
_: &[u8],
|
|
|
|
) -> Poll<io::Result<usize>> {
|
|
|
|
unimplemented!()
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
fn poll_flush(
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
_: &mut Context<'_>,
|
|
|
|
) -> Poll<io::Result<()>> {
|
|
|
|
unimplemented!()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn poll_shutdown(
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
_: &mut Context<'_>,
|
|
|
|
) -> Poll<io::Result<()>> {
|
|
|
|
Poll::Ready(Ok(()))
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
struct TestPoolConnector {
|
|
|
|
generated: Rc<Cell<usize>>,
|
2019-04-23 23:57:03 +02:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
impl Service<Connect> for TestPoolConnector {
|
|
|
|
type Response = (TestStream, Protocol);
|
|
|
|
type Error = ConnectError;
|
|
|
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
|
|
|
|
|
|
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
unimplemented!("poll_ready is not used in test")
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
fn call(&self, _: Connect) -> Self::Future {
|
|
|
|
self.generated.set(self.generated.get() + 1);
|
|
|
|
let generated = self.generated.clone();
|
|
|
|
Box::pin(async { Ok((TestStream(generated), Protocol::Http1)) })
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
fn release<T>(conn: IoConnection<T>)
|
|
|
|
where
|
|
|
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
|
|
{
|
|
|
|
let (conn, created, mut acquired) = conn.into_parts();
|
|
|
|
acquired.release(IoConnection::new(conn, created, None));
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_pool_limit() {
|
|
|
|
let connector = TestPoolConnector {
|
|
|
|
generated: Rc::new(Cell::new(0)),
|
|
|
|
};
|
2019-11-18 13:42:27 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let config = ConnectorConfig {
|
|
|
|
limit: 1,
|
|
|
|
..Default::default()
|
|
|
|
};
|
2019-11-19 13:54:19 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let pool = super::ConnectionPool::new(connector, config);
|
|
|
|
|
|
|
|
let req = Connect {
|
|
|
|
uri: Uri::from_static("http://localhost"),
|
|
|
|
addr: None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
|
|
|
|
let waiting = Rc::new(Cell::new(true));
|
|
|
|
|
|
|
|
let waiting_clone = waiting.clone();
|
|
|
|
actix_rt::spawn(async move {
|
|
|
|
actix_rt::time::sleep(Duration::from_millis(100)).await;
|
|
|
|
waiting_clone.set(false);
|
|
|
|
drop(conn);
|
|
|
|
});
|
|
|
|
|
|
|
|
assert!(waiting.get());
|
|
|
|
|
|
|
|
let now = Instant::now();
|
|
|
|
let conn = pool.call(req).await.unwrap();
|
|
|
|
|
|
|
|
release(conn);
|
|
|
|
assert!(!waiting.get());
|
|
|
|
assert!(now.elapsed() >= Duration::from_millis(100));
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_pool_keep_alive() {
|
|
|
|
let generated = Rc::new(Cell::new(0));
|
|
|
|
let generated_clone = generated.clone();
|
2019-04-23 23:57:03 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let connector = TestPoolConnector { generated };
|
2019-11-18 13:42:27 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let config = ConnectorConfig {
|
|
|
|
conn_keep_alive: Duration::from_secs(1),
|
|
|
|
..Default::default()
|
|
|
|
};
|
2019-04-23 23:57:03 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let pool = super::ConnectionPool::new(connector, config);
|
2020-08-09 22:49:43 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let req = Connect {
|
|
|
|
uri: Uri::from_static("http://localhost"),
|
|
|
|
addr: None,
|
|
|
|
};
|
2019-04-23 23:57:03 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
release(conn);
|
2019-07-30 17:00:46 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
release(conn);
|
2020-08-09 22:49:43 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
actix_rt::time::sleep(Duration::from_millis(1500)).await;
|
|
|
|
actix_rt::task::yield_now().await;
|
2019-04-23 23:57:03 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let conn = pool.call(req).await.unwrap();
|
|
|
|
// Note: spawned recycle connection is not ran yet.
|
|
|
|
// This is tokio current thread runtime specific behavior.
|
|
|
|
assert_eq!(2, generated_clone.get());
|
2019-04-23 23:57:03 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
// yield task so the old connection is properly dropped.
|
|
|
|
actix_rt::task::yield_now().await;
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
|
|
|
|
release(conn);
|
2019-04-23 23:57:03 +02:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_pool_lifetime() {
|
|
|
|
let generated = Rc::new(Cell::new(0));
|
|
|
|
let generated_clone = generated.clone();
|
|
|
|
|
|
|
|
let connector = TestPoolConnector { generated };
|
|
|
|
|
|
|
|
let config = ConnectorConfig {
|
|
|
|
conn_lifetime: Duration::from_secs(1),
|
|
|
|
..Default::default()
|
|
|
|
};
|
|
|
|
|
|
|
|
let pool = super::ConnectionPool::new(connector, config);
|
|
|
|
|
|
|
|
let req = Connect {
|
|
|
|
uri: Uri::from_static("http://localhost"),
|
|
|
|
addr: None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
release(conn);
|
|
|
|
|
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
release(conn);
|
|
|
|
|
|
|
|
actix_rt::time::sleep(Duration::from_millis(1500)).await;
|
|
|
|
actix_rt::task::yield_now().await;
|
|
|
|
|
|
|
|
let conn = pool.call(req).await.unwrap();
|
|
|
|
// Note: spawned recycle connection is not ran yet.
|
|
|
|
// This is tokio current thread runtime specific behavior.
|
|
|
|
assert_eq!(2, generated_clone.get());
|
|
|
|
|
|
|
|
// yield task so the old connection is properly dropped.
|
|
|
|
actix_rt::task::yield_now().await;
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
|
|
|
|
release(conn);
|
2019-04-23 23:57:03 +02:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_pool_authority_key() {
|
|
|
|
let generated = Rc::new(Cell::new(0));
|
|
|
|
let generated_clone = generated.clone();
|
2019-11-18 13:42:27 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let connector = TestPoolConnector { generated };
|
2019-08-13 14:55:04 +02:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let config = ConnectorConfig::default();
|
|
|
|
|
|
|
|
let pool = super::ConnectionPool::new(connector, config);
|
|
|
|
|
|
|
|
let req = Connect {
|
|
|
|
uri: Uri::from_static("https://crates.io"),
|
|
|
|
addr: None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
release(conn);
|
|
|
|
|
|
|
|
let conn = pool.call(req).await.unwrap();
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
release(conn);
|
|
|
|
|
|
|
|
let req = Connect {
|
|
|
|
uri: Uri::from_static("https://google.com"),
|
|
|
|
addr: None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(2, generated_clone.get());
|
|
|
|
release(conn);
|
|
|
|
let conn = pool.call(req).await.unwrap();
|
|
|
|
assert_eq!(2, generated_clone.get());
|
|
|
|
release(conn);
|
2019-04-23 23:57:03 +02:00
|
|
|
}
|
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_pool_drop() {
|
|
|
|
let generated = Rc::new(Cell::new(0));
|
|
|
|
let generated_clone = generated.clone();
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
let connector = TestPoolConnector { generated };
|
|
|
|
|
|
|
|
let config = ConnectorConfig::default();
|
|
|
|
|
|
|
|
let pool = Rc::new(super::ConnectionPool::new(connector, config));
|
|
|
|
|
|
|
|
let req = Connect {
|
|
|
|
uri: Uri::from_static("https://crates.io"),
|
|
|
|
addr: None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(1, generated_clone.get());
|
|
|
|
release(conn);
|
|
|
|
|
|
|
|
let req = Connect {
|
|
|
|
uri: Uri::from_static("https://google.com"),
|
|
|
|
addr: None,
|
|
|
|
};
|
|
|
|
let conn = pool.call(req.clone()).await.unwrap();
|
|
|
|
assert_eq!(2, generated_clone.get());
|
|
|
|
release(conn);
|
|
|
|
|
|
|
|
let clone1 = pool.clone();
|
|
|
|
let clone2 = clone1.clone();
|
|
|
|
|
|
|
|
drop(clone2);
|
|
|
|
for _ in 0..2 {
|
|
|
|
actix_rt::task::yield_now().await;
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
2021-02-16 09:27:14 +01:00
|
|
|
assert_eq!(2, generated_clone.get());
|
|
|
|
|
|
|
|
drop(clone1);
|
|
|
|
for _ in 0..2 {
|
|
|
|
actix_rt::task::yield_now().await;
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
2021-02-16 09:27:14 +01:00
|
|
|
assert_eq!(2, generated_clone.get());
|
2018-11-12 08:12:54 +01:00
|
|
|
|
2021-02-16 09:27:14 +01:00
|
|
|
drop(pool);
|
|
|
|
for _ in 0..2 {
|
|
|
|
actix_rt::task::yield_now().await;
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
2021-02-16 09:27:14 +01:00
|
|
|
assert_eq!(0, generated_clone.get());
|
2018-11-12 08:12:54 +01:00
|
|
|
}
|
|
|
|
}
|