use std::cell::RefCell; use std::collections::VecDeque; use std::io; use std::rc::Rc; use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::Service; use bytes::Bytes; use futures::future::{ok, Either, FutureResult}; use futures::task::AtomicTask; use futures::unsync::oneshot; use futures::{Async, Future, Poll}; use h2::client::{handshake, Handshake}; use hashbrown::HashMap; use http::uri::Authority; use indexmap::IndexSet; use slab::Slab; use tokio_timer::{sleep, Delay}; use super::connect::Connect; use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; #[derive(Clone, Copy, PartialEq)] pub enum Protocol { Http1, Http2, } #[derive(Hash, Eq, PartialEq, Clone, Debug)] pub(crate) struct Key { authority: Authority, } impl From for Key { fn from(authority: Authority) -> Key { Key { authority } } } /// Connections pool pub(crate) struct ConnectionPool( T, Rc>>, ); impl ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, T: Service, { pub(crate) fn new( connector: T, conn_lifetime: Duration, conn_keep_alive: Duration, disconnect_timeout: Option, limit: usize, ) -> Self { ConnectionPool( connector, Rc::new(RefCell::new(Inner { conn_lifetime, conn_keep_alive, disconnect_timeout, limit, acquired: 0, waiters: Slab::new(), waiters_queue: IndexSet::new(), available: HashMap::new(), task: AtomicTask::new(), })), ) } } impl Clone for ConnectionPool where T: Clone, Io: AsyncRead + AsyncWrite + 'static, { fn clone(&self) -> Self { ConnectionPool(self.0.clone(), self.1.clone()) } } impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, T: Service, { type Request = Connect; type Response = IoConnection; type Error = ConnectError; type Future = Either< FutureResult, Either, OpenConnection>, >; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.0.poll_ready() } fn call(&mut self, req: Connect) -> Self::Future { let key = req.key(); // acquire connection match self.1.as_ref().borrow_mut().acquire(&key) { Acquire::Acquired(io, created) => { // use existing connection Either::A(ok(IoConnection::new( io, created, Some(Acquired(key, Some(self.1.clone()))), ))) } Acquire::NotAvailable => { // connection is not available, wait let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req); Either::B(Either::A(WaitForConnection { rx, key, token, inner: Some(self.1.clone()), })) } Acquire::Available => { // open new connection Either::B(Either::B(OpenConnection::new( key, self.1.clone(), self.0.call(req), ))) } } } } #[doc(hidden)] pub struct WaitForConnection where Io: AsyncRead + AsyncWrite + 'static, { key: Key, token: usize, rx: oneshot::Receiver, ConnectError>>, inner: Option>>>, } impl Drop for WaitForConnection where Io: AsyncRead + AsyncWrite + 'static, { fn drop(&mut self) { if let Some(i) = self.inner.take() { let mut inner = i.as_ref().borrow_mut(); inner.release_waiter(&self.key, self.token); inner.check_availibility(); } } } impl Future for WaitForConnection where Io: AsyncRead + AsyncWrite, { type Item = IoConnection; type Error = ConnectError; fn poll(&mut self) -> Poll { match self.rx.poll() { Ok(Async::Ready(item)) => match item { Err(err) => Err(err), Ok(conn) => { let _ = self.inner.take(); Ok(Async::Ready(conn)) } }, Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => { let _ = self.inner.take(); Err(ConnectError::Disconnected) } } } } #[doc(hidden)] pub struct OpenConnection where Io: AsyncRead + AsyncWrite + 'static, { fut: F, key: Key, h2: Option>, inner: Option>>>, } impl OpenConnection where F: Future, Io: AsyncRead + AsyncWrite + 'static, { fn new(key: Key, inner: Rc>>, fut: F) -> Self { OpenConnection { key, fut, inner: Some(inner), h2: None, } } } impl Drop for OpenConnection where Io: AsyncRead + AsyncWrite + 'static, { fn drop(&mut self) { if let Some(inner) = self.inner.take() { let mut inner = inner.as_ref().borrow_mut(); inner.release(); inner.check_availibility(); } } } impl Future for OpenConnection where F: Future, Io: AsyncRead + AsyncWrite, { type Item = IoConnection; type Error = ConnectError; fn poll(&mut self) -> Poll { if let Some(ref mut h2) = self.h2 { return match h2.poll() { Ok(Async::Ready((snd, connection))) => { tokio_current_thread::spawn(connection.map_err(|_| ())); Ok(Async::Ready(IoConnection::new( ConnectionType::H2(snd), Instant::now(), Some(Acquired(self.key.clone(), self.inner.clone())), ))) } Ok(Async::NotReady) => Ok(Async::NotReady), Err(e) => Err(e.into()), }; } match self.fut.poll() { Err(err) => Err(err), Ok(Async::Ready((io, proto))) => { let _ = self.inner.take(); if proto == Protocol::Http1 { Ok(Async::Ready(IoConnection::new( ConnectionType::H1(io), Instant::now(), Some(Acquired(self.key.clone(), self.inner.clone())), ))) } else { self.h2 = Some(handshake(io)); self.poll() } } Ok(Async::NotReady) => Ok(Async::NotReady), } } } // struct OpenWaitingConnection // where // Io: AsyncRead + AsyncWrite + 'static, // { // fut: F, // key: Key, // h2: Option>, // rx: Option, ConnectorError>>>, // inner: Option>>>, // } // impl OpenWaitingConnection // where // F: Future + 'static, // Io: AsyncRead + AsyncWrite + 'static, // { // fn spawn( // key: Key, // rx: oneshot::Sender, ConnectorError>>, // inner: Rc>>, // fut: F, // ) { // tokio_current_thread::spawn(OpenWaitingConnection { // key, // fut, // h2: None, // rx: Some(rx), // inner: Some(inner), // }) // } // } // impl Drop for OpenWaitingConnection // where // Io: AsyncRead + AsyncWrite + 'static, // { // fn drop(&mut self) { // if let Some(inner) = self.inner.take() { // let mut inner = inner.as_ref().borrow_mut(); // inner.release(); // inner.check_availibility(); // } // } // } // impl Future for OpenWaitingConnection // where // F: Future, // Io: AsyncRead + AsyncWrite, // { // type Item = (); // type Error = (); // fn poll(&mut self) -> Poll { // if let Some(ref mut h2) = self.h2 { // return match h2.poll() { // Ok(Async::Ready((snd, connection))) => { // tokio_current_thread::spawn(connection.map_err(|_| ())); // let _ = self.rx.take().unwrap().send(Ok(IoConnection::new( // ConnectionType::H2(snd), // Instant::now(), // Some(Acquired(self.key.clone(), self.inner.clone())), // ))); // Ok(Async::Ready(())) // } // Ok(Async::NotReady) => Ok(Async::NotReady), // Err(e) => { // let _ = self.inner.take(); // if let Some(rx) = self.rx.take() { // let _ = rx.send(Err(e.into())); // } // Err(()) // } // }; // } // match self.fut.poll() { // Err(err) => { // let _ = self.inner.take(); // if let Some(rx) = self.rx.take() { // let _ = rx.send(Err(err)); // } // Err(()) // } // Ok(Async::Ready((_, io, proto))) => { // let _ = self.inner.take(); // if proto == Protocol::Http1 { // let _ = self.rx.take().unwrap().send(Ok(IoConnection::new( // ConnectionType::H1(io), // Instant::now(), // Some(Acquired(self.key.clone(), self.inner.clone())), // ))); // } else { // self.h2 = Some(handshake(io)); // return self.poll(); // } // Ok(Async::Ready(())) // } // Ok(Async::NotReady) => Ok(Async::NotReady), // } // } // } enum Acquire { Acquired(ConnectionType, Instant), Available, NotAvailable, } // #[derive(Debug)] struct AvailableConnection { io: ConnectionType, used: Instant, created: Instant, } pub(crate) struct Inner { conn_lifetime: Duration, conn_keep_alive: Duration, disconnect_timeout: Option, limit: usize, acquired: usize, available: HashMap>>, waiters: Slab<( Connect, oneshot::Sender, ConnectError>>, )>, waiters_queue: IndexSet<(Key, usize)>, task: AtomicTask, } impl Inner { fn reserve(&mut self) { self.acquired += 1; } fn release(&mut self) { self.acquired -= 1; } fn release_waiter(&mut self, key: &Key, token: usize) { self.waiters.remove(token); self.waiters_queue.remove(&(key.clone(), token)); } fn release_conn(&mut self, key: &Key, io: ConnectionType, created: Instant) { self.acquired -= 1; self.available .entry(key.clone()) .or_insert_with(VecDeque::new) .push_back(AvailableConnection { io, created, used: Instant::now(), }); } } impl Inner where Io: AsyncRead + AsyncWrite + 'static, { /// connection is not available, wait fn wait_for( &mut self, connect: Connect, ) -> ( oneshot::Receiver, ConnectError>>, usize, ) { let (tx, rx) = oneshot::channel(); let key = connect.key(); let entry = self.waiters.vacant_entry(); let token = entry.key(); entry.insert((connect, tx)); assert!(!self.waiters_queue.insert((key, token))); (rx, token) } fn acquire(&mut self, key: &Key) -> Acquire { // check limits if self.limit > 0 && self.acquired >= self.limit { return Acquire::NotAvailable; } self.reserve(); // check if open connection is available // cleanup stale connections at the same time if let Some(ref mut connections) = self.available.get_mut(key) { let now = Instant::now(); while let Some(conn) = connections.pop_back() { // check if it still usable if (now - conn.used) > self.conn_keep_alive || (now - conn.created) > self.conn_lifetime { if let Some(timeout) = self.disconnect_timeout { if let ConnectionType::H1(io) = conn.io { tokio_current_thread::spawn(CloseConnection::new( io, timeout, )) } } } else { let mut io = conn.io; let mut buf = [0; 2]; if let ConnectionType::H1(ref mut s) = io { match s.read(&mut buf) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), Ok(n) if n > 0 => { if let Some(timeout) = self.disconnect_timeout { if let ConnectionType::H1(io) = io { tokio_current_thread::spawn( CloseConnection::new(io, timeout), ) } } continue; } Ok(_) | Err(_) => continue, } } return Acquire::Acquired(io, conn.created); } } } Acquire::Available } fn release_close(&mut self, io: ConnectionType) { self.acquired -= 1; if let Some(timeout) = self.disconnect_timeout { if let ConnectionType::H1(io) = io { tokio_current_thread::spawn(CloseConnection::new(io, timeout)) } } } fn check_availibility(&self) { if !self.waiters_queue.is_empty() && self.acquired < self.limit { self.task.notify() } } } // struct ConnectorPoolSupport // where // Io: AsyncRead + AsyncWrite + 'static, // { // connector: T, // inner: Rc>>, // } // impl Future for ConnectorPoolSupport // where // Io: AsyncRead + AsyncWrite + 'static, // T: Service, // T::Future: 'static, // { // type Item = (); // type Error = (); // fn poll(&mut self) -> Poll { // let mut inner = self.inner.as_ref().borrow_mut(); // inner.task.register(); // // check waiters // loop { // let (key, token) = { // if let Some((key, token)) = inner.waiters_queue.get_index(0) { // (key.clone(), *token) // } else { // break; // } // }; // match inner.acquire(&key) { // Acquire::NotAvailable => break, // Acquire::Acquired(io, created) => { // let (_, tx) = inner.waiters.remove(token); // if let Err(conn) = tx.send(Ok(IoConnection::new( // io, // created, // Some(Acquired(key.clone(), Some(self.inner.clone()))), // ))) { // let (io, created) = conn.unwrap().into_inner(); // inner.release_conn(&key, io, created); // } // } // Acquire::Available => { // let (connect, tx) = inner.waiters.remove(token); // OpenWaitingConnection::spawn( // key.clone(), // tx, // self.inner.clone(), // self.connector.call(connect), // ); // } // } // let _ = inner.waiters_queue.swap_remove_index(0); // } // Ok(Async::NotReady) // } // } struct CloseConnection { io: T, timeout: Delay, } impl CloseConnection where T: AsyncWrite, { fn new(io: T, timeout: Duration) -> Self { CloseConnection { io, timeout: sleep(timeout), } } } impl Future for CloseConnection where T: AsyncWrite, { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { match self.timeout.poll() { Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())), Ok(Async::NotReady) => match self.io.shutdown() { Ok(Async::Ready(_)) | Err(_) => Ok(Async::Ready(())), Ok(Async::NotReady) => Ok(Async::NotReady), }, } } } pub(crate) struct Acquired(Key, Option>>>); impl Acquired where T: AsyncRead + AsyncWrite + 'static, { pub(crate) fn close(&mut self, conn: IoConnection) { if let Some(inner) = self.1.take() { let (io, _) = conn.into_inner(); inner.as_ref().borrow_mut().release_close(io); } } pub(crate) fn release(&mut self, conn: IoConnection) { if let Some(inner) = self.1.take() { let (io, created) = conn.into_inner(); inner .as_ref() .borrow_mut() .release_conn(&self.0, io, created); } } } impl Drop for Acquired { fn drop(&mut self) { if let Some(inner) = self.1.take() { inner.as_ref().borrow_mut().release(); } } }