1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

add basic client connection pooling

This commit is contained in:
Nikolay Kim 2018-03-16 12:04:01 -07:00
parent b15b5e5246
commit 84bf282c17
5 changed files with 232 additions and 35 deletions

View File

@ -12,7 +12,9 @@
* Move brotli encoding to a feature * Move brotli encoding to a feature
* Add option of default handler for `StaticFiles` middleware * Add option of default handler for `StaticFiles` handler #57
* Add basic client connection pooling
## 0.4.8 (2018-03-12) ## 0.4.8 (2018-03-12)

View File

@ -38,7 +38,7 @@ alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"]
# sessions # sessions
session = ["cookie/secure"] session = ["cookie/secure"]
# brotli # brotli encoding
brotli = ["brotli2"] brotli = ["brotli2"]
[dependencies] [dependencies]

View File

@ -1,15 +1,18 @@
use std::{io, time}; use std::{fmt, io, time};
use std::cell::RefCell;
use std::rc::Rc;
use std::net::Shutdown; use std::net::Shutdown;
use std::time::Duration; use std::time::{Duration, Instant};
use std::collections::{HashMap, VecDeque};
use actix::{fut, Actor, ActorFuture, Context, use actix::{fut, Actor, ActorFuture, Context, AsyncContext,
Handler, Message, ActorResponse, Supervised}; Handler, Message, ActorResponse, Supervised};
use actix::registry::ArbiterService; use actix::registry::ArbiterService;
use actix::fut::WrapFuture; use actix::fut::WrapFuture;
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
use http::{Uri, HttpTryFrom, Error as HttpError}; use http::{Uri, HttpTryFrom, Error as HttpError};
use futures::Poll; use futures::{Async, Poll};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
@ -104,10 +107,15 @@ pub struct ClientConnector {
connector: SslConnector, connector: SslConnector,
#[cfg(all(feature="tls", not(feature="alpn")))] #[cfg(all(feature="tls", not(feature="alpn")))]
connector: TlsConnector, connector: TlsConnector,
pool: Rc<Pool>,
} }
impl Actor for ClientConnector { impl Actor for ClientConnector {
type Context = Context<ClientConnector>; type Context = Context<ClientConnector>;
fn started(&mut self, ctx: &mut Self::Context) {
self.collect(ctx);
}
} }
impl Supervised for ClientConnector {} impl Supervised for ClientConnector {}
@ -120,19 +128,21 @@ impl Default for ClientConnector {
{ {
let builder = SslConnector::builder(SslMethod::tls()).unwrap(); let builder = SslConnector::builder(SslMethod::tls()).unwrap();
ClientConnector { ClientConnector {
connector: builder.build() connector: builder.build(),
pool: Rc::new(Pool::new()),
} }
} }
#[cfg(all(feature="tls", not(feature="alpn")))] #[cfg(all(feature="tls", not(feature="alpn")))]
{ {
let builder = TlsConnector::builder().unwrap(); let builder = TlsConnector::builder().unwrap();
ClientConnector { ClientConnector {
connector: builder.build().unwrap() connector: builder.build().unwrap(),
pool: Rc::new(Pool::new()),
} }
} }
#[cfg(not(any(feature="alpn", feature="tls")))] #[cfg(not(any(feature="alpn", feature="tls")))]
ClientConnector {} ClientConnector {pool: Rc::new(Pool::new())}
} }
} }
@ -184,6 +194,11 @@ impl ClientConnector {
pub fn with_connector(connector: SslConnector) -> ClientConnector { pub fn with_connector(connector: SslConnector) -> ClientConnector {
ClientConnector { connector } ClientConnector { connector }
} }
fn collect(&mut self, ctx: &mut Context<Self>) {
self.pool.collect();
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx));
}
} }
impl Handler<Connect> for ClientConnector { impl Handler<Connect> for ClientConnector {
@ -214,10 +229,21 @@ impl Handler<Connect> for ClientConnector {
let host = uri.host().unwrap().to_owned(); let host = uri.host().unwrap().to_owned();
let port = uri.port().unwrap_or_else(|| proto.port()); let port = uri.port().unwrap_or_else(|| proto.port());
let key = Key {host, port, ssl: proto.is_secure()};
let pool = if proto.is_http() {
if let Some(conn) = self.pool.query(&key) {
return ActorResponse::async(fut::ok(conn))
} else {
Some(Rc::clone(&self.pool))
}
} else {
None
};
ActorResponse::async( ActorResponse::async(
Connector::from_registry() Connector::from_registry()
.send(ResolveConnect::host_and_port(&host, port) .send(ResolveConnect::host_and_port(&key.host, port)
.timeout(conn_timeout)) .timeout(conn_timeout))
.into_actor(self) .into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected) .map_err(|_, _, _| ClientConnectorError::Disconnected)
@ -230,10 +256,12 @@ impl Handler<Connect> for ClientConnector {
fut::Either::A( fut::Either::A(
_act.connector.connect_async(&host, stream) _act.connector.connect_async(&host, stream)
.map_err(ClientConnectorError::SslError) .map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)}) .map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act)) .into_actor(_act))
} else { } else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)})) fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
} }
} }
} }
@ -246,10 +274,12 @@ impl Handler<Connect> for ClientConnector {
fut::Either::A( fut::Either::A(
_act.connector.connect_async(&host, stream) _act.connector.connect_async(&host, stream)
.map_err(ClientConnectorError::SslError) .map_err(ClientConnectorError::SslError)
.map(|stream| Connection{stream: Box::new(stream)}) .map(|stream| Connection::new(
key, pool, Box::new(stream)))
.into_actor(_act)) .into_actor(_act))
} else { } else {
fut::Either::B(fut::ok(Connection{stream: Box::new(stream)})) fut::Either::B(fut::ok(
Connection::new(key, pool, Box::new(stream))))
} }
} }
} }
@ -261,7 +291,7 @@ impl Handler<Connect> for ClientConnector {
if proto.is_secure() { if proto.is_secure() {
fut::err(ClientConnectorError::SslIsNotSupported) fut::err(ClientConnectorError::SslIsNotSupported)
} else { } else {
fut::ok(Connection{stream: Box::new(stream)}) fut::ok(Connection::new(key, pool, Box::new(stream)))
} }
} }
} }
@ -288,6 +318,13 @@ impl Protocol {
} }
} }
fn is_http(&self) -> bool {
match *self {
Protocol::Https | Protocol::Http => true,
_ => false,
}
}
fn is_secure(&self) -> bool { fn is_secure(&self) -> bool {
match *self { match *self {
Protocol::Https | Protocol::Wss => true, Protocol::Https | Protocol::Wss => true,
@ -303,18 +340,156 @@ impl Protocol {
} }
} }
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
struct Key {
host: String,
port: u16,
ssl: bool,
}
impl Key {
fn empty() -> Key {
Key{host: String::new(), port: 0, ssl: false}
}
}
#[derive(Debug)]
struct Conn(Instant, Connection);
pub struct Pool {
max_size: usize,
keep_alive: Duration,
max_lifetime: Duration,
pool: RefCell<HashMap<Key, VecDeque<Conn>>>,
to_close: RefCell<Vec<Connection>>,
}
impl Pool {
fn new() -> Pool {
Pool {
max_size: 128,
keep_alive: Duration::from_secs(15),
max_lifetime: Duration::from_secs(75),
pool: RefCell::new(HashMap::new()),
to_close: RefCell::new(Vec::new()),
}
}
fn collect(&self) {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
// check keep-alive
let now = Instant::now();
for conns in pool.values_mut() {
while !conns.is_empty() {
if (now - conns[0].0) > self.keep_alive
|| (now - conns[0].1.ts) > self.max_lifetime
{
let conn = conns.pop_front().unwrap().1;
to_close.push(conn);
} else {
break
}
}
}
// check connections for shutdown
let mut idx = 0;
while idx < to_close.len() {
match AsyncWrite::shutdown(&mut to_close[idx]) {
Ok(Async::NotReady) => idx += 1,
_ => {
to_close.swap_remove(idx);
},
}
}
}
fn query(&self, key: &Key) -> Option<Connection> {
let mut pool = self.pool.borrow_mut();
let mut to_close = self.to_close.borrow_mut();
if let Some(ref mut connections) = pool.get_mut(key) {
let now = Instant::now();
while let Some(conn) = connections.pop_back() {
// check if it still usable
if (now - conn.0) > self.keep_alive
|| (now - conn.1.ts) > self.max_lifetime
{
to_close.push(conn.1);
} else {
let mut conn = conn.1;
let mut buf = [0; 2];
match conn.stream().read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Ok(n) if n > 0 => {
to_close.push(conn);
continue
},
Ok(_) | Err(_) => continue,
}
return Some(conn)
}
}
}
None
}
fn release(&self, conn: Connection) {
if (Instant::now() - conn.ts) < self.max_lifetime {
let mut pool = self.pool.borrow_mut();
if !pool.contains_key(&conn.key) {
let key = conn.key.clone();
let mut vec = VecDeque::new();
vec.push_back(Conn(Instant::now(), conn));
pool.insert(key, vec);
} else {
let vec = pool.get_mut(&conn.key).unwrap();
vec.push_back(Conn(Instant::now(), conn));
if vec.len() > self.max_size {
let conn = vec.pop_front().unwrap();
self.to_close.borrow_mut().push(conn.1);
}
}
}
}
}
pub struct Connection { pub struct Connection {
key: Key,
stream: Box<IoStream>, stream: Box<IoStream>,
pool: Option<Rc<Pool>>,
ts: Instant,
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Connection {}:{}", self.key.host, self.key.port)
}
} }
impl Connection { impl Connection {
fn new(key: Key, pool: Option<Rc<Pool>>, stream: Box<IoStream>) -> Self {
Connection {
key, pool, stream,
ts: Instant::now(),
}
}
pub fn stream(&mut self) -> &mut IoStream { pub fn stream(&mut self) -> &mut IoStream {
&mut *self.stream &mut *self.stream
} }
pub fn from_stream<T: IoStream>(io: T) -> Connection { pub fn from_stream<T: IoStream>(io: T) -> Connection {
Connection{stream: Box::new(io)} Connection::new(Key::empty(), None, Box::new(io))
}
pub fn release(mut self) {
if let Some(pool) = self.pool.take() {
pool.release(self)
}
} }
} }

View File

@ -171,7 +171,8 @@ impl Future for SendRequest {
}; };
let pl = Box::new(Pipeline { let pl = Box::new(Pipeline {
body, conn, writer, body, writer,
conn: Some(conn),
parser: Some(HttpResponseParser::default()), parser: Some(HttpResponseParser::default()),
parser_buf: BytesMut::new(), parser_buf: BytesMut::new(),
disconnected: false, disconnected: false,
@ -208,7 +209,7 @@ impl Future for SendRequest {
pub(crate) struct Pipeline { pub(crate) struct Pipeline {
body: IoBody, body: IoBody,
conn: Connection, conn: Option<Connection>,
writer: HttpClientWriter, writer: HttpClientWriter,
parser: Option<HttpResponseParser>, parser: Option<HttpResponseParser>,
parser_buf: BytesMut, parser_buf: BytesMut,
@ -249,30 +250,45 @@ impl RunningState {
impl Pipeline { impl Pipeline {
fn release_conn(&mut self) {
if let Some(conn) = self.conn.take() {
conn.release()
}
}
#[inline] #[inline]
pub fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> { fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) { if let Some(ref mut conn) = self.conn {
Ok(Async::Ready(resp)) => { match self.parser.as_mut().unwrap().parse(conn, &mut self.parser_buf) {
// check content-encoding Ok(Async::Ready(resp)) => {
if self.should_decompress { // check content-encoding
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) { if self.should_decompress {
if let Ok(enc) = enc.to_str() { if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
match ContentEncoding::from(enc) { if let Ok(enc) = enc.to_str() {
ContentEncoding::Auto | ContentEncoding::Identity => (), match ContentEncoding::from(enc) {
enc => self.decompress = Some(PayloadStream::new(enc)), ContentEncoding::Auto | ContentEncoding::Identity => (),
enc => self.decompress = Some(PayloadStream::new(enc)),
}
} }
} }
} }
}
Ok(Async::Ready(resp)) Ok(Async::Ready(resp))
}
val => val,
} }
val => val, } else {
Ok(Async::NotReady)
} }
} }
#[inline] #[inline]
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> { pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if self.conn.is_none() {
return Ok(Async::Ready(None))
}
let conn: &mut Connection = unsafe{ mem::transmute(self.conn.as_mut().unwrap())};
let mut need_run = false; let mut need_run = false;
// need write? // need write?
@ -286,7 +302,7 @@ impl Pipeline {
if self.parser.is_some() { if self.parser.is_some() {
loop { loop {
match self.parser.as_mut().unwrap() match self.parser.as_mut().unwrap()
.parse_payload(&mut self.conn, &mut self.parser_buf)? .parse_payload(conn, &mut self.parser_buf)?
{ {
Async::Ready(Some(b)) => { Async::Ready(Some(b)) => {
if let Some(ref mut decompress) = self.decompress { if let Some(ref mut decompress) = self.decompress {
@ -314,6 +330,7 @@ impl Pipeline {
if let Some(mut decompress) = self.decompress.take() { if let Some(mut decompress) = self.decompress.take() {
let res = decompress.feed_eof(); let res = decompress.feed_eof();
if let Some(b) = res? { if let Some(b) = res? {
self.release_conn();
return Ok(Async::Ready(Some(b))) return Ok(Async::Ready(Some(b)))
} }
} }
@ -321,13 +338,14 @@ impl Pipeline {
if need_run { if need_run {
Ok(Async::NotReady) Ok(Async::NotReady)
} else { } else {
self.release_conn();
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} }
} }
#[inline] #[inline]
pub fn poll_write(&mut self) -> Poll<(), Error> { fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done { if self.write_state == RunningState::Done || self.conn.is_none() {
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} }
@ -416,7 +434,7 @@ impl Pipeline {
} }
// flush io but only if we need to // flush io but only if we need to
match self.writer.poll_completed(&mut self.conn, false) { match self.writer.poll_completed(self.conn.as_mut().unwrap(), false) {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
if self.disconnected { if self.disconnected {
self.write_state = RunningState::Done; self.write_state = RunningState::Done;

View File

@ -745,6 +745,8 @@ fn start_accept_thread(
msg = err.into_inner(); msg = err.into_inner();
workers.swap_remove(next); workers.swap_remove(next);
if workers.is_empty() { if workers.is_empty() {
error!("No workers");
thread::sleep(sleep);
break break
} else if workers.len() <= next { } else if workers.len() <= next {
next = 0; next = 0;