1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-25 16:32:43 +01:00
actix-extras/src/client/pool.rs

642 lines
19 KiB
Rust
Raw Normal View History

2018-11-12 08:12:54 +01:00
use std::cell::RefCell;
2019-01-27 20:40:26 +01:00
use std::collections::VecDeque;
2018-11-12 08:12:54 +01:00
use std::io;
use std::rc::Rc;
use std::time::{Duration, Instant};
2018-12-11 03:08:33 +01:00
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::Service;
2019-01-29 05:41:09 +01:00
use bytes::Bytes;
2018-11-12 08:12:54 +01:00
use futures::future::{ok, Either, FutureResult};
use futures::task::AtomicTask;
2019-01-27 20:40:26 +01:00
use futures::unsync::oneshot;
2018-11-12 08:12:54 +01:00
use futures::{Async, Future, Poll};
2019-01-29 05:41:09 +01:00
use h2::client::{handshake, Handshake};
2019-01-27 20:40:26 +01:00
use hashbrown::HashMap;
2018-11-12 08:12:54 +01:00
use http::uri::Authority;
use indexmap::IndexSet;
use slab::Slab;
use tokio_timer::{sleep, Delay};
use super::connect::Connect;
2019-01-29 05:41:09 +01:00
use super::connection::{ConnectionType, IoConnection};
2018-11-12 08:12:54 +01:00
use super::error::ConnectorError;
2019-01-29 05:41:09 +01:00
#[derive(Clone, Copy, PartialEq)]
pub enum Protocol {
Http1,
Http2,
}
2018-11-12 08:12:54 +01:00
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub(crate) struct Key {
authority: Authority,
}
impl From<Authority> for Key {
fn from(authority: Authority) -> Key {
Key { authority }
}
}
/// Connections pool
pub(crate) struct ConnectionPool<T, Io: AsyncRead + AsyncWrite + 'static>(
T,
Rc<RefCell<Inner<Io>>>,
);
impl<T, Io> ConnectionPool<T, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
2019-01-29 05:41:09 +01:00
T: Service<Connect, Response = (Connect, Io, Protocol), Error = ConnectorError>,
2018-11-12 08:12:54 +01:00
{
pub(crate) fn new(
connector: T,
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Option<Duration>,
limit: usize,
) -> Self {
ConnectionPool(
connector,
Rc::new(RefCell::new(Inner {
conn_lifetime,
conn_keep_alive,
disconnect_timeout,
limit,
acquired: 0,
waiters: Slab::new(),
waiters_queue: IndexSet::new(),
available: HashMap::new(),
task: AtomicTask::new(),
})),
)
}
}
impl<T, Io> Clone for ConnectionPool<T, Io>
where
T: Clone,
Io: AsyncRead + AsyncWrite + 'static,
{
fn clone(&self) -> Self {
ConnectionPool(self.0.clone(), self.1.clone())
}
}
2018-11-30 20:57:57 +01:00
impl<T, Io> Service<Connect> for ConnectionPool<T, Io>
2018-11-12 08:12:54 +01:00
where
Io: AsyncRead + AsyncWrite + 'static,
2019-01-29 05:41:09 +01:00
T: Service<Connect, Response = (Connect, Io, Protocol), Error = ConnectorError>,
2018-11-12 08:12:54 +01:00
{
2018-11-15 20:10:23 +01:00
type Response = IoConnection<Io>;
2018-11-12 08:12:54 +01:00
type Error = ConnectorError;
type Future = Either<
2018-11-15 20:10:23 +01:00
FutureResult<IoConnection<Io>, ConnectorError>,
2018-11-12 08:12:54 +01:00
Either<WaitForConnection<Io>, OpenConnection<T::Future, Io>>,
>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
2018-11-30 20:57:57 +01:00
fn call(&mut self, req: Connect) -> Self::Future {
2018-11-12 08:12:54 +01:00
let key = req.key();
// acquire connection
match self.1.as_ref().borrow_mut().acquire(&key) {
Acquire::Acquired(io, created) => {
// use existing connection
2018-11-15 20:10:23 +01:00
Either::A(ok(IoConnection::new(
2018-11-12 08:12:54 +01:00
io,
created,
2019-01-29 05:41:09 +01:00
Some(Acquired(key, Some(self.1.clone()))),
2018-11-12 08:12:54 +01:00
)))
}
Acquire::NotAvailable => {
// connection is not available, wait
let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req);
Either::B(Either::A(WaitForConnection {
rx,
key,
token,
inner: Some(self.1.clone()),
}))
}
Acquire::Available => {
// open new connection
Either::B(Either::B(OpenConnection::new(
key,
self.1.clone(),
self.0.call(req),
)))
}
}
}
}
#[doc(hidden)]
pub struct WaitForConnection<Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
key: Key,
token: usize,
2018-11-15 20:10:23 +01:00
rx: oneshot::Receiver<Result<IoConnection<Io>, ConnectorError>>,
2018-11-12 08:12:54 +01:00
inner: Option<Rc<RefCell<Inner<Io>>>>,
}
impl<Io> Drop for WaitForConnection<Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
fn drop(&mut self) {
if let Some(i) = self.inner.take() {
let mut inner = i.as_ref().borrow_mut();
inner.release_waiter(&self.key, self.token);
inner.check_availibility();
}
}
}
impl<Io> Future for WaitForConnection<Io>
where
Io: AsyncRead + AsyncWrite,
{
2018-11-15 20:10:23 +01:00
type Item = IoConnection<Io>;
2018-11-12 08:12:54 +01:00
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
Ok(Async::Ready(item)) => match item {
Err(err) => Err(err),
Ok(conn) => {
let _ = self.inner.take();
Ok(Async::Ready(conn))
}
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => {
let _ = self.inner.take();
Err(ConnectorError::Disconnected)
}
}
}
}
#[doc(hidden)]
pub struct OpenConnection<F, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
fut: F,
key: Key,
2019-01-29 05:41:09 +01:00
h2: Option<Handshake<Io, Bytes>>,
2018-11-12 08:12:54 +01:00
inner: Option<Rc<RefCell<Inner<Io>>>>,
}
impl<F, Io> OpenConnection<F, Io>
where
2019-01-29 05:41:09 +01:00
F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError>,
2018-11-12 08:12:54 +01:00
Io: AsyncRead + AsyncWrite + 'static,
{
fn new(key: Key, inner: Rc<RefCell<Inner<Io>>>, fut: F) -> Self {
OpenConnection {
key,
fut,
inner: Some(inner),
2019-01-29 05:41:09 +01:00
h2: None,
2018-11-12 08:12:54 +01:00
}
}
}
impl<F, Io> Drop for OpenConnection<F, Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let mut inner = inner.as_ref().borrow_mut();
inner.release();
inner.check_availibility();
}
}
}
impl<F, Io> Future for OpenConnection<F, Io>
where
2019-01-29 05:41:09 +01:00
F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError>,
2018-11-12 08:12:54 +01:00
Io: AsyncRead + AsyncWrite,
{
2018-11-15 20:10:23 +01:00
type Item = IoConnection<Io>;
2018-11-12 08:12:54 +01:00
type Error = ConnectorError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
2019-01-29 05:41:09 +01:00
if let Some(ref mut h2) = self.h2 {
return match h2.poll() {
Ok(Async::Ready((snd, connection))) => {
tokio_current_thread::spawn(connection.map_err(|_| ()));
Ok(Async::Ready(IoConnection::new(
ConnectionType::H2(snd),
Instant::now(),
Some(Acquired(self.key.clone(), self.inner.clone())),
)))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e.into()),
};
2018-11-12 08:12:54 +01:00
}
match self.fut.poll() {
2019-01-29 05:41:09 +01:00
Err(err) => Err(err.into()),
Ok(Async::Ready((_, io, proto))) => {
2018-11-12 08:12:54 +01:00
let _ = self.inner.take();
2019-01-29 05:41:09 +01:00
if proto == Protocol::Http1 {
Ok(Async::Ready(IoConnection::new(
ConnectionType::H1(io),
2018-11-12 08:12:54 +01:00
Instant::now(),
2019-01-29 05:41:09 +01:00
Some(Acquired(self.key.clone(), self.inner.clone())),
)))
} else {
self.h2 = Some(handshake(io));
return self.poll();
2018-11-12 08:12:54 +01:00
}
}
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}
2019-01-29 05:41:09 +01:00
// struct OpenWaitingConnection<F, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// {
// fut: F,
// key: Key,
// h2: Option<Handshake<Io, Bytes>>,
// rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>>,
// inner: Option<Rc<RefCell<Inner<Io>>>>,
// }
// impl<F, Io> OpenWaitingConnection<F, Io>
// where
// F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError> + 'static,
// Io: AsyncRead + AsyncWrite + 'static,
// {
// fn spawn(
// key: Key,
// rx: oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>,
// inner: Rc<RefCell<Inner<Io>>>,
// fut: F,
// ) {
// tokio_current_thread::spawn(OpenWaitingConnection {
// key,
// fut,
// h2: None,
// rx: Some(rx),
// inner: Some(inner),
// })
// }
// }
// impl<F, Io> Drop for OpenWaitingConnection<F, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// {
// fn drop(&mut self) {
// if let Some(inner) = self.inner.take() {
// let mut inner = inner.as_ref().borrow_mut();
// inner.release();
// inner.check_availibility();
// }
// }
// }
// impl<F, Io> Future for OpenWaitingConnection<F, Io>
// where
// F: Future<Item = (Connect, Io, Protocol), Error = ConnectorError>,
// Io: AsyncRead + AsyncWrite,
// {
// type Item = ();
// type Error = ();
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// if let Some(ref mut h2) = self.h2 {
// return match h2.poll() {
// Ok(Async::Ready((snd, connection))) => {
// tokio_current_thread::spawn(connection.map_err(|_| ()));
// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new(
// ConnectionType::H2(snd),
// Instant::now(),
// Some(Acquired(self.key.clone(), self.inner.clone())),
// )));
// Ok(Async::Ready(()))
// }
// Ok(Async::NotReady) => Ok(Async::NotReady),
// Err(e) => {
// let _ = self.inner.take();
// if let Some(rx) = self.rx.take() {
// let _ = rx.send(Err(e.into()));
// }
// Err(())
// }
// };
// }
// match self.fut.poll() {
// Err(err) => {
// let _ = self.inner.take();
// if let Some(rx) = self.rx.take() {
// let _ = rx.send(Err(err));
// }
// Err(())
// }
// Ok(Async::Ready((_, io, proto))) => {
// let _ = self.inner.take();
// if proto == Protocol::Http1 {
// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new(
// ConnectionType::H1(io),
// Instant::now(),
// Some(Acquired(self.key.clone(), self.inner.clone())),
// )));
// } else {
// self.h2 = Some(handshake(io));
// return self.poll();
// }
// Ok(Async::Ready(()))
// }
// Ok(Async::NotReady) => Ok(Async::NotReady),
// }
// }
// }
2018-11-12 08:12:54 +01:00
enum Acquire<T> {
2019-01-29 05:41:09 +01:00
Acquired(ConnectionType<T>, Instant),
2018-11-12 08:12:54 +01:00
Available,
NotAvailable,
}
2019-01-29 05:41:09 +01:00
// #[derive(Debug)]
struct AvailableConnection<Io> {
io: ConnectionType<Io>,
used: Instant,
created: Instant,
}
2018-11-14 07:53:30 +01:00
pub(crate) struct Inner<Io> {
2018-11-12 08:12:54 +01:00
conn_lifetime: Duration,
conn_keep_alive: Duration,
disconnect_timeout: Option<Duration>,
limit: usize,
acquired: usize,
available: HashMap<Key, VecDeque<AvailableConnection<Io>>>,
waiters: Slab<(
Connect,
2018-11-15 20:10:23 +01:00
oneshot::Sender<Result<IoConnection<Io>, ConnectorError>>,
2018-11-12 08:12:54 +01:00
)>,
waiters_queue: IndexSet<(Key, usize)>,
task: AtomicTask,
}
2018-11-14 07:53:30 +01:00
impl<Io> Inner<Io> {
fn reserve(&mut self) {
self.acquired += 1;
}
fn release(&mut self) {
self.acquired -= 1;
}
fn release_waiter(&mut self, key: &Key, token: usize) {
self.waiters.remove(token);
self.waiters_queue.remove(&(key.clone(), token));
}
2019-01-29 05:41:09 +01:00
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
2018-11-14 07:53:30 +01:00
self.acquired -= 1;
self.available
.entry(key.clone())
.or_insert_with(VecDeque::new)
.push_back(AvailableConnection {
io,
created,
used: Instant::now(),
});
}
}
2018-11-12 08:12:54 +01:00
impl<Io> Inner<Io>
where
Io: AsyncRead + AsyncWrite + 'static,
{
/// connection is not available, wait
fn wait_for(
&mut self,
connect: Connect,
) -> (
2018-11-15 20:10:23 +01:00
oneshot::Receiver<Result<IoConnection<Io>, ConnectorError>>,
2018-11-12 08:12:54 +01:00
usize,
) {
let (tx, rx) = oneshot::channel();
let key = connect.key();
let entry = self.waiters.vacant_entry();
let token = entry.key();
entry.insert((connect, tx));
assert!(!self.waiters_queue.insert((key, token)));
(rx, token)
}
fn acquire(&mut self, key: &Key) -> Acquire<Io> {
// check limits
if self.limit > 0 && self.acquired >= self.limit {
return Acquire::NotAvailable;
}
self.reserve();
// check if open connection is available
// cleanup stale connections at the same time
if let Some(ref mut connections) = self.available.get_mut(key) {
let now = Instant::now();
while let Some(conn) = connections.pop_back() {
// check if it still usable
if (now - conn.used) > self.conn_keep_alive
|| (now - conn.created) > self.conn_lifetime
{
if let Some(timeout) = self.disconnect_timeout {
2019-01-29 05:41:09 +01:00
if let ConnectionType::H1(io) = conn.io {
tokio_current_thread::spawn(CloseConnection::new(
io, timeout,
))
}
2018-11-12 08:12:54 +01:00
}
} else {
let mut io = conn.io;
let mut buf = [0; 2];
2019-01-29 05:41:09 +01:00
if let ConnectionType::H1(ref mut s) = io {
match s.read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Ok(n) if n > 0 => {
if let Some(timeout) = self.disconnect_timeout {
if let ConnectionType::H1(io) = io {
tokio_current_thread::spawn(
CloseConnection::new(io, timeout),
)
}
}
continue;
2018-11-12 08:12:54 +01:00
}
2019-01-29 05:41:09 +01:00
Ok(_) | Err(_) => continue,
2018-11-12 08:12:54 +01:00
}
}
return Acquire::Acquired(io, conn.created);
}
}
}
Acquire::Available
}
2019-01-29 05:41:09 +01:00
fn release_close(&mut self, io: ConnectionType<Io>) {
2018-11-12 08:12:54 +01:00
self.acquired -= 1;
if let Some(timeout) = self.disconnect_timeout {
2019-01-29 05:41:09 +01:00
if let ConnectionType::H1(io) = io {
tokio_current_thread::spawn(CloseConnection::new(io, timeout))
}
2018-11-12 08:12:54 +01:00
}
}
fn check_availibility(&self) {
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
self.task.notify()
}
}
}
2019-01-29 05:41:09 +01:00
// struct ConnectorPoolSupport<T, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// {
// connector: T,
// inner: Rc<RefCell<Inner<Io>>>,
// }
// impl<T, Io> Future for ConnectorPoolSupport<T, Io>
// where
// Io: AsyncRead + AsyncWrite + 'static,
// T: Service<Connect, Response = (Connect, Io, Protocol), Error = ConnectorError>,
// T::Future: 'static,
// {
// type Item = ();
// type Error = ();
// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// let mut inner = self.inner.as_ref().borrow_mut();
// inner.task.register();
// // check waiters
// loop {
// let (key, token) = {
// if let Some((key, token)) = inner.waiters_queue.get_index(0) {
// (key.clone(), *token)
// } else {
// break;
// }
// };
// match inner.acquire(&key) {
// Acquire::NotAvailable => break,
// Acquire::Acquired(io, created) => {
// let (_, tx) = inner.waiters.remove(token);
// if let Err(conn) = tx.send(Ok(IoConnection::new(
// io,
// created,
// Some(Acquired(key.clone(), Some(self.inner.clone()))),
// ))) {
// let (io, created) = conn.unwrap().into_inner();
// inner.release_conn(&key, io, created);
// }
// }
// Acquire::Available => {
// let (connect, tx) = inner.waiters.remove(token);
// OpenWaitingConnection::spawn(
// key.clone(),
// tx,
// self.inner.clone(),
// self.connector.call(connect),
// );
// }
// }
// let _ = inner.waiters_queue.swap_remove_index(0);
// }
// Ok(Async::NotReady)
// }
// }
2018-11-12 08:12:54 +01:00
struct CloseConnection<T> {
io: T,
timeout: Delay,
}
impl<T> CloseConnection<T>
where
T: AsyncWrite,
{
fn new(io: T, timeout: Duration) -> Self {
CloseConnection {
io,
timeout: sleep(timeout),
}
}
}
impl<T> Future for CloseConnection<T>
where
T: AsyncWrite,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
match self.timeout.poll() {
Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
Ok(Async::NotReady) => match self.io.shutdown() {
Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady),
},
}
}
}
2018-11-14 07:53:30 +01:00
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
2018-11-12 08:12:54 +01:00
impl<T> Acquired<T>
where
T: AsyncRead + AsyncWrite + 'static,
{
2018-11-15 20:10:23 +01:00
pub(crate) fn close(&mut self, conn: IoConnection<T>) {
2018-11-12 08:12:54 +01:00
if let Some(inner) = self.1.take() {
let (io, _) = conn.into_inner();
inner.as_ref().borrow_mut().release_close(io);
}
}
2018-11-15 20:10:23 +01:00
pub(crate) fn release(&mut self, conn: IoConnection<T>) {
2018-11-12 08:12:54 +01:00
if let Some(inner) = self.1.take() {
let (io, created) = conn.into_inner();
inner
.as_ref()
.borrow_mut()
.release_conn(&self.0, io, created);
}
}
}
2018-11-14 07:53:30 +01:00
impl<T> Drop for Acquired<T> {
2018-11-12 08:12:54 +01:00
fn drop(&mut self) {
if let Some(inner) = self.1.take() {
inner.as_ref().borrow_mut().release();
}
}
}