From d18f9c590531b201ead5c1f77c36a0fcb6c5f475 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 11 Apr 2018 16:11:11 -0700 Subject: [PATCH] add clinet connector stats --- Cargo.toml | 2 +- src/client/connector.rs | 72 ++++++++++++++++++++++++++++++++++++----- src/client/mod.rs | 2 +- 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c4c04e36d..8c663f58a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web" -version = "0.5.0" +version = "0.5.1" authors = ["Nikolay Kim "] description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust." readme = "README.md" diff --git a/src/client/connector.rs b/src/client/connector.rs index 30eccd2f5..af433be23 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -6,7 +6,8 @@ use std::time::{Duration, Instant}; use std::collections::{HashMap, VecDeque}; use actix::{fut, Actor, ActorFuture, Arbiter, Context, AsyncContext, - Handler, Message, ActorResponse, Supervised, ContextFutureSpawner}; + Recipient, Syn, Handler, Message, ActorResponse, + Supervised, ContextFutureSpawner}; use actix::registry::ArbiterService; use actix::fut::WrapFuture; use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect}; @@ -31,6 +32,15 @@ use tokio_tls::TlsConnectorExt; use {HAS_OPENSSL, HAS_TLS}; use server::IoStream; +/// Client connector usage stats +#[derive(Default, Message)] +pub struct ClientConnectorStats { + pub waits: usize, + pub reused: usize, + pub opened: usize, + pub closed: usize, + pub errors: usize, +} #[derive(Debug)] /// `Connect` type represents a message that can be sent to @@ -160,6 +170,9 @@ pub struct ClientConnector { #[cfg(all(feature="tls", not(feature="alpn")))] connector: TlsConnector, + stats: ClientConnectorStats, + subscriber: Option>, + pool: Rc, pool_modified: Rc>, @@ -202,6 +215,8 @@ impl Default for ClientConnector { { let builder = TlsConnector::builder().unwrap(); ClientConnector { + stats: ClientConnectorStats::default(), + subscriber: None, pool: Rc::new(Pool::new(Rc::clone(&_modified))), pool_modified: _modified, connector: builder.build().unwrap(), @@ -220,7 +235,9 @@ impl Default for ClientConnector { } #[cfg(not(any(feature="alpn", feature="tls")))] - ClientConnector {pool: Rc::new(Pool::new(Rc::clone(&_modified))), + ClientConnector {stats: ClientConnectorStats::default(), + subscriber: None, + pool: Rc::new(Pool::new(Rc::clone(&_modified))), pool_modified: _modified, conn_lifetime: Duration::from_secs(15), conn_keep_alive: Duration::from_secs(75), @@ -286,6 +303,8 @@ impl ClientConnector { let modified = Rc::new(Cell::new(false)); ClientConnector { connector, + stats: ClientConnectorStats::default(), + subscriber: None, pool: Rc::new(Pool::new(Rc::clone(&modified))), pool_modified: modified, conn_lifetime: Duration::from_secs(15), @@ -339,6 +358,12 @@ impl ClientConnector { self } + /// Subscribe for connector stats. Only one subscriber is supported. + pub fn stats(mut self, subs: Recipient) -> Self { + self.subscriber = Some(subs); + self + } + fn acquire(&mut self, key: &Key) -> Acquire { // check limits if self.limit > 0 { @@ -372,6 +397,7 @@ impl ClientConnector { if (now - conn.0) > self.conn_keep_alive || (now - conn.1.ts) > self.conn_lifetime { + self.stats.closed += 1; self.to_close.push(conn.1); } else { let mut conn = conn.1; @@ -379,6 +405,7 @@ impl ClientConnector { match conn.stream().read(&mut buf) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), Ok(n) if n > 0 => { + self.stats.closed += 1; self.to_close.push(conn); continue }, @@ -433,6 +460,7 @@ impl ClientConnector { for conn in to_close { self.release_key(&conn.key); self.to_close.push(conn); + self.stats.closed += 1; } } @@ -459,6 +487,7 @@ impl ClientConnector { { let conn = conns.pop_front().unwrap().1; self.to_close.push(conn); + self.stats.closed += 1; } else { break } @@ -485,6 +514,12 @@ impl ClientConnector { self.collect(true); // re-schedule next collect period ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx)); + + // send stats + let stats = mem::replace(&mut self.stats, ClientConnectorStats::default()); + if let Some(ref mut subscr) = self.subscriber { + let _ = subscr.do_send(stats); + } } fn collect_waiters(&mut self) { @@ -609,6 +644,7 @@ impl Handler for ClientConnector { // check pause state if self.paused.is_some() { let rx = self.wait_for(key, wait_timeout, conn_timeout); + self.stats.waits += 1; return ActorResponse::async( rx.map_err(|_| ClientConnectorError::Disconnected) .into_actor(self) @@ -616,7 +652,6 @@ impl Handler for ClientConnector { Ok(conn) => fut::ok(conn), Err(err) => fut::err(err), })); - } // acquire connection @@ -625,11 +660,13 @@ impl Handler for ClientConnector { Acquire::Acquired(mut conn) => { // use existing connection conn.pool = Some(AcquiredConn(key, Some(Rc::clone(&self.pool)))); + self.stats.reused += 1; return ActorResponse::async(fut::ok(conn)) }, Acquire::NotAvailable => { // connection is not available, wait let rx = self.wait_for(key, wait_timeout, conn_timeout); + self.stats.waits += 1; return ActorResponse::async( rx.map_err(|_| ClientConnectorError::Disconnected) .into_actor(self) @@ -654,11 +691,15 @@ impl Handler for ClientConnector { .timeout(conn_timeout)) .into_actor(self) .map_err(|_, _, _| ClientConnectorError::Disconnected) - .and_then(move |res, _act, _| { + .and_then(move |res, act, _| { #[cfg(feature="alpn")] match res { - Err(err) => fut::Either::B(fut::err(err.into())), + Err(err) => { + act.stats.opened += 1; + fut::Either::B(fut::err(err.into())) + }, Ok(stream) => { + act.stats.opened += 1; if proto.is_secure() { fut::Either::A( _act.connector.connect_async(&conn.0.host, stream) @@ -676,8 +717,12 @@ impl Handler for ClientConnector { #[cfg(all(feature="tls", not(feature="alpn")))] match res { - Err(err) => fut::Either::B(fut::err(err.into())), + Err(err) => { + act.stats.opened += 1; + fut::Either::B(fut::err(err.into())) + }, Ok(stream) => { + act.stats.opened += 1; if proto.is_secure() { fut::Either::A( _act.connector.connect_async(&conn.0.host, stream) @@ -695,8 +740,12 @@ impl Handler for ClientConnector { #[cfg(not(any(feature="alpn", feature="tls")))] match res { - Err(err) => fut::err(err.into()), + Err(err) => { + act.stats.opened += 1; + fut::err(err.into()) + }, Ok(stream) => { + act.stats.opened += 1; if proto.is_secure() { fut::err(ClientConnectorError::SslIsNotSupported) } else { @@ -746,6 +795,7 @@ impl fut::ActorFuture for Maintenance match act.acquire(key) { Acquire::Acquired(mut conn) => { // use existing connection + act.stats.reused += 1; conn.pool = Some( AcquiredConn(key.clone(), Some(Rc::clone(&act.pool)))); let _ = waiter.tx.send(Ok(conn)); @@ -763,14 +813,16 @@ impl fut::ActorFuture for Maintenance .send(ResolveConnect::host_and_port(&conn.0.host, conn.0.port) .timeout(waiter.conn_timeout))) .map_err(|_, _, _| ()) - .and_then(move |res, _act, _| { + .and_then(move |res, act, _| { #[cfg(feature="alpn")] match res { Err(err) => { + act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into())); fut::Either::B(fut::err(())) }, Ok(stream) => { + act.stats.opened += 1; if conn.0.ssl { fut::Either::A( _act.connector.connect_async(&key.host, stream) @@ -801,10 +853,12 @@ impl fut::ActorFuture for Maintenance #[cfg(all(feature="tls", not(feature="alpn")))] match res { Err(err) => { + act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into())); fut::Either::B(fut::err(())) }, Ok(stream) => { + act.stats.opened += 1; if conn.0.ssl { fut::Either::A( _act.connector.connect_async(&conn.0.host, stream) @@ -835,10 +889,12 @@ impl fut::ActorFuture for Maintenance #[cfg(not(any(feature="alpn", feature="tls")))] match res { Err(err) => { + act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into())); fut::err(()) }, Ok(stream) => { + act.stats.opened += 1; if conn.0.ssl { let _ = waiter.tx.send( Err(ClientConnectorError::SslIsNotSupported)); diff --git a/src/client/mod.rs b/src/client/mod.rs index afe4e4595..4608e6a92 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -38,7 +38,7 @@ pub use self::request::{ClientRequest, ClientRequestBuilder}; pub use self::response::ClientResponse; pub use self::connector::{ Connect, Pause, Resume, - Connection, ClientConnector, ClientConnectorError}; + Connection, ClientConnector, ClientConnectorError, ClientConnectorStats}; pub(crate) use self::writer::HttpClientWriter; pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError};