From 84bf282c1721d89689fdf4df7a79f5ffef0c1da5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 16 Mar 2018 12:04:01 -0700 Subject: [PATCH] add basic client connection pooling --- CHANGES.md | 4 +- Cargo.toml | 2 +- src/client/connector.rs | 203 +++++++++++++++++++++++++++++++++++++--- src/client/pipeline.rs | 56 +++++++---- src/server/srv.rs | 2 + 5 files changed, 232 insertions(+), 35 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1d73093b4..e3a6e17d0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,7 +12,9 @@ * Move brotli encoding to a feature -* Add option of default handler for `StaticFiles` middleware +* Add option of default handler for `StaticFiles` handler #57 + +* Add basic client connection pooling ## 0.4.8 (2018-03-12) diff --git a/Cargo.toml b/Cargo.toml index a2d4fca79..eb8fe5cae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"] # sessions session = ["cookie/secure"] -# brotli +# brotli encoding brotli = ["brotli2"] [dependencies] diff --git a/src/client/connector.rs b/src/client/connector.rs index 6d649fa6b..bce8aa985 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -1,15 +1,18 @@ -use std::{io, time}; +use std::{fmt, io, time}; +use std::cell::RefCell; +use std::rc::Rc; use std::net::Shutdown; -use std::time::Duration; +use std::time::{Duration, Instant}; +use std::collections::{HashMap, VecDeque}; -use actix::{fut, Actor, ActorFuture, Context, +use actix::{fut, Actor, ActorFuture, Context, AsyncContext, Handler, Message, ActorResponse, Supervised}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; use http::{Uri, HttpTryFrom, Error as HttpError}; -use futures::Poll; +use futures::{Async, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature="alpn")] @@ -104,10 +107,15 @@ pub struct ClientConnector { connector: SslConnector, #[cfg(all(feature="tls", not(feature="alpn")))] connector: TlsConnector, + pool: Rc, } impl Actor for ClientConnector { type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.collect(ctx); + } } impl Supervised for ClientConnector {} @@ -120,19 +128,21 @@ impl Default for ClientConnector { { let builder = SslConnector::builder(SslMethod::tls()).unwrap(); ClientConnector { - connector: builder.build() + connector: builder.build(), + pool: Rc::new(Pool::new()), } } #[cfg(all(feature="tls", not(feature="alpn")))] { let builder = TlsConnector::builder().unwrap(); ClientConnector { - connector: builder.build().unwrap() + connector: builder.build().unwrap(), + pool: Rc::new(Pool::new()), } } #[cfg(not(any(feature="alpn", feature="tls")))] - ClientConnector {} + ClientConnector {pool: Rc::new(Pool::new())} } } @@ -184,6 +194,11 @@ impl ClientConnector { pub fn with_connector(connector: SslConnector) -> ClientConnector { ClientConnector { connector } } + + fn collect(&mut self, ctx: &mut Context) { + self.pool.collect(); + ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx)); + } } impl Handler for ClientConnector { @@ -214,10 +229,21 @@ impl Handler for ClientConnector { let host = uri.host().unwrap().to_owned(); let port = uri.port().unwrap_or_else(|| proto.port()); + let key = Key {host, port, ssl: proto.is_secure()}; + + let pool = if proto.is_http() { + if let Some(conn) = self.pool.query(&key) { + return ActorResponse::async(fut::ok(conn)) + } else { + Some(Rc::clone(&self.pool)) + } + } else { + None + }; ActorResponse::async( Connector::from_registry() - .send(ResolveConnect::host_and_port(&host, port) + .send(ResolveConnect::host_and_port(&key.host, port) .timeout(conn_timeout)) .into_actor(self) .map_err(|_, _, _| ClientConnectorError::Disconnected) @@ -230,10 +256,12 @@ impl Handler for ClientConnector { fut::Either::A( _act.connector.connect_async(&host, stream) .map_err(ClientConnectorError::SslError) - .map(|stream| Connection{stream: Box::new(stream)}) + .map(|stream| Connection::new( + key, pool, Box::new(stream))) .into_actor(_act)) } else { - fut::Either::B(fut::ok(Connection{stream: Box::new(stream)})) + fut::Either::B(fut::ok( + Connection::new(key, pool, Box::new(stream)))) } } } @@ -246,10 +274,12 @@ impl Handler for ClientConnector { fut::Either::A( _act.connector.connect_async(&host, stream) .map_err(ClientConnectorError::SslError) - .map(|stream| Connection{stream: Box::new(stream)}) + .map(|stream| Connection::new( + key, pool, Box::new(stream))) .into_actor(_act)) } else { - fut::Either::B(fut::ok(Connection{stream: Box::new(stream)})) + fut::Either::B(fut::ok( + Connection::new(key, pool, Box::new(stream)))) } } } @@ -261,7 +291,7 @@ impl Handler for ClientConnector { if proto.is_secure() { fut::err(ClientConnectorError::SslIsNotSupported) } else { - fut::ok(Connection{stream: Box::new(stream)}) + fut::ok(Connection::new(key, pool, Box::new(stream))) } } } @@ -288,6 +318,13 @@ impl Protocol { } } + fn is_http(&self) -> bool { + match *self { + Protocol::Https | Protocol::Http => true, + _ => false, + } + } + fn is_secure(&self) -> bool { match *self { Protocol::Https | Protocol::Wss => true, @@ -303,18 +340,156 @@ impl Protocol { } } +#[derive(Hash, Eq, PartialEq, Clone, Debug)] +struct Key { + host: String, + port: u16, + ssl: bool, +} + +impl Key { + fn empty() -> Key { + Key{host: String::new(), port: 0, ssl: false} + } +} + +#[derive(Debug)] +struct Conn(Instant, Connection); + +pub struct Pool { + max_size: usize, + keep_alive: Duration, + max_lifetime: Duration, + pool: RefCell>>, + to_close: RefCell>, +} + +impl Pool { + fn new() -> Pool { + Pool { + max_size: 128, + keep_alive: Duration::from_secs(15), + max_lifetime: Duration::from_secs(75), + pool: RefCell::new(HashMap::new()), + to_close: RefCell::new(Vec::new()), + } + } + + fn collect(&self) { + let mut pool = self.pool.borrow_mut(); + let mut to_close = self.to_close.borrow_mut(); + + // check keep-alive + let now = Instant::now(); + for conns in pool.values_mut() { + while !conns.is_empty() { + if (now - conns[0].0) > self.keep_alive + || (now - conns[0].1.ts) > self.max_lifetime + { + let conn = conns.pop_front().unwrap().1; + to_close.push(conn); + } else { + break + } + } + } + + // check connections for shutdown + let mut idx = 0; + while idx < to_close.len() { + match AsyncWrite::shutdown(&mut to_close[idx]) { + Ok(Async::NotReady) => idx += 1, + _ => { + to_close.swap_remove(idx); + }, + } + } + } + + fn query(&self, key: &Key) -> Option { + let mut pool = self.pool.borrow_mut(); + let mut to_close = self.to_close.borrow_mut(); + + if let Some(ref mut connections) = pool.get_mut(key) { + let now = Instant::now(); + while let Some(conn) = connections.pop_back() { + // check if it still usable + if (now - conn.0) > self.keep_alive + || (now - conn.1.ts) > self.max_lifetime + { + to_close.push(conn.1); + } else { + let mut conn = conn.1; + let mut buf = [0; 2]; + match conn.stream().read(&mut buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), + Ok(n) if n > 0 => { + to_close.push(conn); + continue + }, + Ok(_) | Err(_) => continue, + } + return Some(conn) + } + } + } + None + } + + fn release(&self, conn: Connection) { + if (Instant::now() - conn.ts) < self.max_lifetime { + let mut pool = self.pool.borrow_mut(); + if !pool.contains_key(&conn.key) { + let key = conn.key.clone(); + let mut vec = VecDeque::new(); + vec.push_back(Conn(Instant::now(), conn)); + pool.insert(key, vec); + } else { + let vec = pool.get_mut(&conn.key).unwrap(); + vec.push_back(Conn(Instant::now(), conn)); + if vec.len() > self.max_size { + let conn = vec.pop_front().unwrap(); + self.to_close.borrow_mut().push(conn.1); + } + } + } + } +} + pub struct Connection { + key: Key, stream: Box, + pool: Option>, + ts: Instant, +} + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Connection {}:{}", self.key.host, self.key.port) + } } impl Connection { + fn new(key: Key, pool: Option>, stream: Box) -> Self { + Connection { + key, pool, stream, + ts: Instant::now(), + } + } + pub fn stream(&mut self) -> &mut IoStream { &mut *self.stream } pub fn from_stream(io: T) -> Connection { - Connection{stream: Box::new(io)} + Connection::new(Key::empty(), None, Box::new(io)) + } + + pub fn release(mut self) { + if let Some(pool) = self.pool.take() { + pool.release(self) + } } } diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index f17420818..f4b556bc1 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -171,7 +171,8 @@ impl Future for SendRequest { }; let pl = Box::new(Pipeline { - body, conn, writer, + body, writer, + conn: Some(conn), parser: Some(HttpResponseParser::default()), parser_buf: BytesMut::new(), disconnected: false, @@ -208,7 +209,7 @@ impl Future for SendRequest { pub(crate) struct Pipeline { body: IoBody, - conn: Connection, + conn: Option, writer: HttpClientWriter, parser: Option, parser_buf: BytesMut, @@ -249,30 +250,45 @@ impl RunningState { impl Pipeline { + fn release_conn(&mut self) { + if let Some(conn) = self.conn.take() { + conn.release() + } + } + #[inline] - pub fn parse(&mut self) -> Poll { - match self.parser.as_mut().unwrap().parse(&mut self.conn, &mut self.parser_buf) { - Ok(Async::Ready(resp)) => { - // check content-encoding - if self.should_decompress { - if let Some(enc) = resp.headers().get(CONTENT_ENCODING) { - if let Ok(enc) = enc.to_str() { - match ContentEncoding::from(enc) { - ContentEncoding::Auto | ContentEncoding::Identity => (), - enc => self.decompress = Some(PayloadStream::new(enc)), + fn parse(&mut self) -> Poll { + if let Some(ref mut conn) = self.conn { + match self.parser.as_mut().unwrap().parse(conn, &mut self.parser_buf) { + Ok(Async::Ready(resp)) => { + // check content-encoding + if self.should_decompress { + if let Some(enc) = resp.headers().get(CONTENT_ENCODING) { + if let Ok(enc) = enc.to_str() { + match ContentEncoding::from(enc) { + ContentEncoding::Auto | ContentEncoding::Identity => (), + enc => self.decompress = Some(PayloadStream::new(enc)), + } } } } - } - Ok(Async::Ready(resp)) + Ok(Async::Ready(resp)) + } + val => val, } - val => val, + } else { + Ok(Async::NotReady) } } #[inline] pub fn poll(&mut self) -> Poll, PayloadError> { + if self.conn.is_none() { + return Ok(Async::Ready(None)) + } + let conn: &mut Connection = unsafe{ mem::transmute(self.conn.as_mut().unwrap())}; + let mut need_run = false; // need write? @@ -286,7 +302,7 @@ impl Pipeline { if self.parser.is_some() { loop { match self.parser.as_mut().unwrap() - .parse_payload(&mut self.conn, &mut self.parser_buf)? + .parse_payload(conn, &mut self.parser_buf)? { Async::Ready(Some(b)) => { if let Some(ref mut decompress) = self.decompress { @@ -314,6 +330,7 @@ impl Pipeline { if let Some(mut decompress) = self.decompress.take() { let res = decompress.feed_eof(); if let Some(b) = res? { + self.release_conn(); return Ok(Async::Ready(Some(b))) } } @@ -321,13 +338,14 @@ impl Pipeline { if need_run { Ok(Async::NotReady) } else { + self.release_conn(); Ok(Async::Ready(None)) } } #[inline] - pub fn poll_write(&mut self) -> Poll<(), Error> { - if self.write_state == RunningState::Done { + fn poll_write(&mut self) -> Poll<(), Error> { + if self.write_state == RunningState::Done || self.conn.is_none() { return Ok(Async::Ready(())) } @@ -416,7 +434,7 @@ impl Pipeline { } // flush io but only if we need to - match self.writer.poll_completed(&mut self.conn, false) { + match self.writer.poll_completed(self.conn.as_mut().unwrap(), false) { Ok(Async::Ready(_)) => { if self.disconnected { self.write_state = RunningState::Done; diff --git a/src/server/srv.rs b/src/server/srv.rs index f24e3b977..69ff9012f 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -745,6 +745,8 @@ fn start_accept_thread( msg = err.into_inner(); workers.swap_remove(next); if workers.is_empty() { + error!("No workers"); + thread::sleep(sleep); break } else if workers.len() <= next { next = 0;