1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

add clinet connector stats

This commit is contained in:
Nikolay Kim 2018-04-11 16:11:11 -07:00
parent 76fcdc13a3
commit d18f9c5905
3 changed files with 66 additions and 10 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.5.0"
version = "0.5.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
readme = "README.md"

View File

@ -6,7 +6,8 @@ use std::time::{Duration, Instant};
use std::collections::{HashMap, VecDeque};
use actix::{fut, Actor, ActorFuture, Arbiter, Context, AsyncContext,
Handler, Message, ActorResponse, Supervised, ContextFutureSpawner};
Recipient, Syn, Handler, Message, ActorResponse,
Supervised, ContextFutureSpawner};
use actix::registry::ArbiterService;
use actix::fut::WrapFuture;
use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
@ -31,6 +32,15 @@ use tokio_tls::TlsConnectorExt;
use {HAS_OPENSSL, HAS_TLS};
use server::IoStream;
/// Client connector usage stats
#[derive(Default, Message)]
pub struct ClientConnectorStats {
pub waits: usize,
pub reused: usize,
pub opened: usize,
pub closed: usize,
pub errors: usize,
}
#[derive(Debug)]
/// `Connect` type represents a message that can be sent to
@ -160,6 +170,9 @@ pub struct ClientConnector {
#[cfg(all(feature="tls", not(feature="alpn")))]
connector: TlsConnector,
stats: ClientConnectorStats,
subscriber: Option<Recipient<Syn, ClientConnectorStats>>,
pool: Rc<Pool>,
pool_modified: Rc<Cell<bool>>,
@ -202,6 +215,8 @@ impl Default for ClientConnector {
{
let builder = TlsConnector::builder().unwrap();
ClientConnector {
stats: ClientConnectorStats::default(),
subscriber: None,
pool: Rc::new(Pool::new(Rc::clone(&_modified))),
pool_modified: _modified,
connector: builder.build().unwrap(),
@ -220,7 +235,9 @@ impl Default for ClientConnector {
}
#[cfg(not(any(feature="alpn", feature="tls")))]
ClientConnector {pool: Rc::new(Pool::new(Rc::clone(&_modified))),
ClientConnector {stats: ClientConnectorStats::default(),
subscriber: None,
pool: Rc::new(Pool::new(Rc::clone(&_modified))),
pool_modified: _modified,
conn_lifetime: Duration::from_secs(15),
conn_keep_alive: Duration::from_secs(75),
@ -286,6 +303,8 @@ impl ClientConnector {
let modified = Rc::new(Cell::new(false));
ClientConnector {
connector,
stats: ClientConnectorStats::default(),
subscriber: None,
pool: Rc::new(Pool::new(Rc::clone(&modified))),
pool_modified: modified,
conn_lifetime: Duration::from_secs(15),
@ -339,6 +358,12 @@ impl ClientConnector {
self
}
/// Subscribe for connector stats. Only one subscriber is supported.
pub fn stats(mut self, subs: Recipient<Syn, ClientConnectorStats>) -> Self {
self.subscriber = Some(subs);
self
}
fn acquire(&mut self, key: &Key) -> Acquire {
// check limits
if self.limit > 0 {
@ -372,6 +397,7 @@ impl ClientConnector {
if (now - conn.0) > self.conn_keep_alive
|| (now - conn.1.ts) > self.conn_lifetime
{
self.stats.closed += 1;
self.to_close.push(conn.1);
} else {
let mut conn = conn.1;
@ -379,6 +405,7 @@ impl ClientConnector {
match conn.stream().read(&mut buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Ok(n) if n > 0 => {
self.stats.closed += 1;
self.to_close.push(conn);
continue
},
@ -433,6 +460,7 @@ impl ClientConnector {
for conn in to_close {
self.release_key(&conn.key);
self.to_close.push(conn);
self.stats.closed += 1;
}
}
@ -459,6 +487,7 @@ impl ClientConnector {
{
let conn = conns.pop_front().unwrap().1;
self.to_close.push(conn);
self.stats.closed += 1;
} else {
break
}
@ -485,6 +514,12 @@ impl ClientConnector {
self.collect(true);
// re-schedule next collect period
ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx));
// send stats
let stats = mem::replace(&mut self.stats, ClientConnectorStats::default());
if let Some(ref mut subscr) = self.subscriber {
let _ = subscr.do_send(stats);
}
}
fn collect_waiters(&mut self) {
@ -609,6 +644,7 @@ impl Handler<Connect> for ClientConnector {
// check pause state
if self.paused.is_some() {
let rx = self.wait_for(key, wait_timeout, conn_timeout);
self.stats.waits += 1;
return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self)
@ -616,7 +652,6 @@ impl Handler<Connect> for ClientConnector {
Ok(conn) => fut::ok(conn),
Err(err) => fut::err(err),
}));
}
// acquire connection
@ -625,11 +660,13 @@ impl Handler<Connect> for ClientConnector {
Acquire::Acquired(mut conn) => {
// use existing connection
conn.pool = Some(AcquiredConn(key, Some(Rc::clone(&self.pool))));
self.stats.reused += 1;
return ActorResponse::async(fut::ok(conn))
},
Acquire::NotAvailable => {
// connection is not available, wait
let rx = self.wait_for(key, wait_timeout, conn_timeout);
self.stats.waits += 1;
return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self)
@ -654,11 +691,15 @@ impl Handler<Connect> for ClientConnector {
.timeout(conn_timeout))
.into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected)
.and_then(move |res, _act, _| {
.and_then(move |res, act, _| {
#[cfg(feature="alpn")]
match res {
Err(err) => fut::Either::B(fut::err(err.into())),
Err(err) => {
act.stats.opened += 1;
fut::Either::B(fut::err(err.into()))
},
Ok(stream) => {
act.stats.opened += 1;
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&conn.0.host, stream)
@ -676,8 +717,12 @@ impl Handler<Connect> for ClientConnector {
#[cfg(all(feature="tls", not(feature="alpn")))]
match res {
Err(err) => fut::Either::B(fut::err(err.into())),
Err(err) => {
act.stats.opened += 1;
fut::Either::B(fut::err(err.into()))
},
Ok(stream) => {
act.stats.opened += 1;
if proto.is_secure() {
fut::Either::A(
_act.connector.connect_async(&conn.0.host, stream)
@ -695,8 +740,12 @@ impl Handler<Connect> for ClientConnector {
#[cfg(not(any(feature="alpn", feature="tls")))]
match res {
Err(err) => fut::err(err.into()),
Err(err) => {
act.stats.opened += 1;
fut::err(err.into())
},
Ok(stream) => {
act.stats.opened += 1;
if proto.is_secure() {
fut::err(ClientConnectorError::SslIsNotSupported)
} else {
@ -746,6 +795,7 @@ impl fut::ActorFuture for Maintenance
match act.acquire(key) {
Acquire::Acquired(mut conn) => {
// use existing connection
act.stats.reused += 1;
conn.pool = Some(
AcquiredConn(key.clone(), Some(Rc::clone(&act.pool))));
let _ = waiter.tx.send(Ok(conn));
@ -763,14 +813,16 @@ impl fut::ActorFuture for Maintenance
.send(ResolveConnect::host_and_port(&conn.0.host, conn.0.port)
.timeout(waiter.conn_timeout)))
.map_err(|_, _, _| ())
.and_then(move |res, _act, _| {
.and_then(move |res, act, _| {
#[cfg(feature="alpn")]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
},
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
fut::Either::A(
_act.connector.connect_async(&key.host, stream)
@ -801,10 +853,12 @@ impl fut::ActorFuture for Maintenance
#[cfg(all(feature="tls", not(feature="alpn")))]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::Either::B(fut::err(()))
},
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
fut::Either::A(
_act.connector.connect_async(&conn.0.host, stream)
@ -835,10 +889,12 @@ impl fut::ActorFuture for Maintenance
#[cfg(not(any(feature="alpn", feature="tls")))]
match res {
Err(err) => {
act.stats.errors += 1;
let _ = waiter.tx.send(Err(err.into()));
fut::err(())
},
Ok(stream) => {
act.stats.opened += 1;
if conn.0.ssl {
let _ = waiter.tx.send(
Err(ClientConnectorError::SslIsNotSupported));

View File

@ -38,7 +38,7 @@ pub use self::request::{ClientRequest, ClientRequestBuilder};
pub use self::response::ClientResponse;
pub use self::connector::{
Connect, Pause, Resume,
Connection, ClientConnector, ClientConnectorError};
Connection, ClientConnector, ClientConnectorError, ClientConnectorStats};
pub(crate) use self::writer::HttpClientWriter;
pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError};