diff --git a/tools/wsload/src/wsclient.rs b/tools/wsload/src/wsclient.rs index 2d8db7fb7..ab5cbe765 100644 --- a/tools/wsload/src/wsclient.rs +++ b/tools/wsload/src/wsclient.rs @@ -35,8 +35,9 @@ fn main() { -s, --size=[NUMBER] 'size of PUBLISH packet payload to send in KB' -w, --warm-up=[SECONDS] 'seconds before counter values are considered for reporting' -r, --sample-rate=[SECONDS] 'seconds between average reports' - -c, --concurrency=[NUMBER] 'number of websockt connections to open and use concurrently for sending' - -t, --threads=[NUMBER] 'number of threads to use'", + -c, --concurrency=[NUMBER] 'number of websocket connections to open and use concurrently for sending' + -t, --threads=[NUMBER] 'number of threads to use' + --max-payload=[NUMBER] 'max size of payload before reconnect KB'", ) .get_matches(); @@ -50,9 +51,13 @@ fn main() { let threads = parse_u64_default(matches.value_of("threads"), num_cpus::get() as u64); let concurrency = parse_u64_default(matches.value_of("concurrency"), 1); let payload_size: usize = match matches.value_of("size") { - Some(s) => parse_u64_default(Some(s), 0) as usize * 1024, + Some(s) => parse_u64_default(Some(s), 1) as usize * 1024, None => 1024, }; + let max_payload_size: usize = match matches.value_of("max-payload") { + Some(s) => parse_u64_default(Some(s), 0) as usize * 1024, + None => 0, + }; let warmup_seconds = parse_u64_default(matches.value_of("warm-up"), 2) as u64; let sample_rate = parse_u64_default(matches.value_of("sample-rate"), 1) as usize; @@ -64,7 +69,10 @@ fn main() { let sys = actix::System::new("ws-client"); - let mut report = true; + let _: () = Perf{counters: perf_counters.clone(), + payload: payload.len(), + sample_rate_secs: sample_rate}.start(); + for t in 0..threads { let pl = payload.clone(); let ws = ws_url.clone(); @@ -72,40 +80,41 @@ fn main() { let addr = Arbiter::new(format!("test {}", t)); addr.do_send(actix::msgs::Execute::new(move || -> Result<(), ()> { - let mut reps = report; for _ in 0..concurrency { let pl2 = pl.clone(); let perf2 = perf.clone(); + let ws2 = ws.clone(); Arbiter::handle().spawn( - ws::Client::new(&ws).connect() + ws::Client::new(&ws) + .write_buffer_capacity(0) + .connect() .map_err(|e| { println!("Error: {}", e); - Arbiter::system().do_send(actix::msgs::SystemExit(0)); + //Arbiter::system().do_send(actix::msgs::SystemExit(0)); () }) .map(move |(reader, writer)| { let addr: Addr = ChatClient::create(move |ctx| { ChatClient::add_stream(reader, ctx); - ChatClient{conn: writer, + ChatClient{url: ws2, + conn: writer, payload: pl2, - report: reps, bin: bin, ts: time::precise_time_ns(), perf_counters: perf2, - sample_rate_secs: sample_rate, + sent: 0, + max_payload_size: max_payload_size, } }); }) ); - reps = false; } Ok(()) })); - report = false; } - let _ = sys.run(); + let res = sys.run(); } fn parse_u64_default(input: Option<&str>, default: u64) -> u64 { @@ -113,43 +122,33 @@ fn parse_u64_default(input: Option<&str>, default: u64) -> u64 { .unwrap_or(default) } -struct ChatClient{ - conn: ws::ClientWriter, - payload: Arc, - ts: u64, - bin: bool, - report: bool, - perf_counters: Arc, +struct Perf { + counters: Arc, + payload: usize, sample_rate_secs: usize, } -impl Actor for ChatClient { +impl Actor for Perf { type Context = Context; fn started(&mut self, ctx: &mut Context) { - self.send_text(); - if self.report { - self.sample_rate(ctx); - } - } - - fn stopping(&mut self, _: &mut Context) -> Running { - Arbiter::system().do_send(actix::msgs::SystemExit(0)); - Running::Stop + self.sample_rate(ctx); } } -impl ChatClient { +impl Perf { fn sample_rate(&self, ctx: &mut Context) { ctx.run_later(Duration::new(self.sample_rate_secs as u64, 0), |act, ctx| { - let req_count = act.perf_counters.pull_request_count(); + let req_count = act.counters.pull_request_count(); if req_count != 0 { - let latency = act.perf_counters.pull_latency_ns(); - let latency_max = act.perf_counters.pull_latency_max_ns(); + let conns = act.counters.pull_connections_count(); + let latency = act.counters.pull_latency_ns(); + let latency_max = act.counters.pull_latency_max_ns(); println!( - "rate: {}, throughput: {:?} kb, latency: {}, latency max: {}", + "rate: {}, conns: {}, throughput: {:?} kb, latency: {}, latency max: {}", req_count / act.sample_rate_secs, - (((req_count * act.payload.len()) as f64) / 1024.0) / + conns / act.sample_rate_secs, + (((req_count * act.payload) as f64) / 1024.0) / act.sample_rate_secs as f64, time::Duration::nanoseconds((latency / req_count as u64) as i64), time::Duration::nanoseconds(latency_max as i64) @@ -159,13 +158,71 @@ impl ChatClient { act.sample_rate(ctx); }); } +} - fn send_text(&mut self) { - self.ts = time::precise_time_ns(); - if self.bin { - self.conn.binary(&self.payload); +struct ChatClient{ + url: String, + conn: ws::ClientWriter, + payload: Arc, + ts: u64, + bin: bool, + perf_counters: Arc, + sent: usize, + max_payload_size: usize, +} + +impl Actor for ChatClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + self.send_text(); + self.perf_counters.register_connection(); + } +} + +impl ChatClient { + + fn send_text(&mut self) -> bool { + self.sent += self.payload.len(); + + if self.max_payload_size > 0 && self.sent > self.max_payload_size { + let ws = self.url.clone(); + let pl = self.payload.clone(); + let bin = self.bin; + let perf_counters = self.perf_counters.clone(); + let max_payload_size = self.max_payload_size; + + Arbiter::handle().spawn( + ws::Client::new(&self.url).connect() + .map_err(|e| { + println!("Error: {}", e); + Arbiter::system().do_send(actix::msgs::SystemExit(0)); + () + }) + .map(move |(reader, writer)| { + let addr: Addr = ChatClient::create(move |ctx| { + ChatClient::add_stream(reader, ctx); + ChatClient{url: ws, + conn: writer, + payload: pl, + bin: bin, + ts: time::precise_time_ns(), + perf_counters: perf_counters, + sent: 0, + max_payload_size: max_payload_size, + } + }); + }) + ); + false } else { - self.conn.text(&self.payload); + self.ts = time::precise_time_ns(); + if self.bin { + self.conn.binary(&self.payload); + } else { + self.conn.text(&self.payload); + } + true } } } @@ -183,7 +240,9 @@ impl StreamHandler for ChatClient { if txt == self.payload.as_ref().as_str() { self.perf_counters.register_request(); self.perf_counters.register_latency(time::precise_time_ns() - self.ts); - self.send_text(); + if !self.send_text() { + ctx.stop(); + } } else { println!("not eaqual"); } @@ -196,6 +255,7 @@ impl StreamHandler for ChatClient { pub struct PerfCounters { req: AtomicUsize, + conn: AtomicUsize, lat: AtomicUsize, lat_max: AtomicUsize } @@ -204,6 +264,7 @@ impl PerfCounters { pub fn new() -> PerfCounters { PerfCounters { req: AtomicUsize::new(0), + conn: AtomicUsize::new(0), lat: AtomicUsize::new(0), lat_max: AtomicUsize::new(0), } @@ -213,6 +274,10 @@ impl PerfCounters { self.req.swap(0, Ordering::SeqCst) } + pub fn pull_connections_count(&self) -> usize { + self.conn.swap(0, Ordering::SeqCst) + } + pub fn pull_latency_ns(&self) -> u64 { self.lat.swap(0, Ordering::SeqCst) as u64 } @@ -225,6 +290,10 @@ impl PerfCounters { self.req.fetch_add(1, Ordering::SeqCst); } + pub fn register_connection(&self) { + self.conn.fetch_add(1, Ordering::SeqCst); + } + pub fn register_latency(&self, nanos: u64) { let nanos = nanos as usize; self.lat.fetch_add(nanos, Ordering::SeqCst);