diff --git a/examples/ring.rs b/examples/ring.rs index 91ac2419..8a67191d 100644 --- a/examples/ring.rs +++ b/examples/ring.rs @@ -1,23 +1,20 @@ -/* - * Ring benchmark inspired by Programming Erlang: Software for a - * Concurrent World, by Joe Armstrong, Chapter 8.11.2 - * - * "Write a ring benchmark. Create N processes in a ring. Send a - * message round the ring M times so that a total of N * M messages - * get sent. Time how long this takes for different values of N and M." - */ -use actix::*; +//! Ring benchmark inspired by Programming Erlang: Software for a +//! Concurrent World, by Joe Armstrong, Chapter 8.11.2 +//! +//! "Write a ring benchmark. Create N processes in a ring. Send a +//! message round the ring M times so that a total of N * M messages +//! get sent. Time how long this takes for different values of N and M." -use std::env; -use std::time::SystemTime; +use std::{env, io, time::SystemTime}; + +use actix::prelude::*; /// A payload with a counter +#[derive(Debug, Message)] +#[rtype(result = "()")] struct Payload(usize); -impl Message for Payload { - type Result = (); -} - +#[derive(Debug)] struct Node { id: usize, limit: usize, @@ -37,9 +34,11 @@ impl Handler for Node { "Actor {} reached limit of {} (payload was {})", self.id, self.limit, msg.0 ); + System::current().stop(); return; } + // Some prime in order for different actors to report progress. // Large enough to print about once per second in debug mode. if msg.0 % 498989 == 1 { @@ -51,99 +50,103 @@ impl Handler for Node { 100.0 * msg.0 as f32 / self.limit as f32 ); } + self.next .do_send(Payload(msg.0 + 1)) .expect("Unable to send payload"); } } -fn print_usage_and_exit() -> ! { - eprintln!("Usage; actix-test "); - ::std::process::exit(1); -} +fn main() -> io::Result<()> { + let sys = System::new(); -fn main() -> std::io::Result<()> { - let system = System::new(); + let (n_nodes, n_rounds) = parse_args(); - let args = env::args().collect::>(); - if args.len() < 3 { - print_usage_and_exit(); - } - let n_nodes = if let Ok(arg_num_nodes) = args[1].parse::() { - if arg_num_nodes <= 1 { - eprintln!("Number of nodes must be > 1"); - ::std::process::exit(1); - } - arg_num_nodes - } else { - print_usage_and_exit(); - }; + let now = SystemTime::now(); - let n_times = if let Ok(arg_ntimes) = args[2].parse::() { - arg_ntimes - } else { - print_usage_and_exit() - }; + sys.block_on(async { + println!("Setting up {} nodes", n_nodes); + let limit = n_nodes * n_rounds; - let setup = SystemTime::now(); + let node = Node::create(move |ctx| { + let first_addr = ctx.address(); - println!("Setting up {} nodes", n_nodes); - let limit = n_nodes * n_times; - let node = Node::create(move |ctx| { - let first_addr = ctx.address(); - let mut prev_addr = Node { - id: 1, - limit, - next: first_addr.recipient(), - } - .start(); + let mut prev_addr = Node { + id: 1, + limit, + next: first_addr.recipient(), + } + .start(); - for id in 2..n_nodes { - prev_addr = Node { - id, + for id in 2..n_nodes { + prev_addr = Node { + id, + limit, + next: prev_addr.recipient(), + } + .start(); + } + + Node { + id: n_nodes, limit, next: prev_addr.recipient(), } - .start(); - } + }); - Node { - id: n_nodes, - limit, - next: prev_addr.recipient(), - } + println!( + "Sending start message and waiting for termination after {} messages...", + limit + ); + + node.send(Payload(1)).await.unwrap(); }); - match setup.elapsed() { - Ok(elapsed) => println!( - "Time taken: {}.{:06} seconds", - elapsed.as_secs(), - elapsed.subsec_micros() - ), - Err(e) => println!("An error occured: {:?}", e), - } - println!( - "Sending start message and waiting for termination after {} messages...", - limit - ); - let now = SystemTime::now(); - - let _req = node.send(Payload(1)); - - match system.run() { - Ok(_) => println!("Completed"), - Err(e) => println!("An error occured: {:?}", e), - } + sys.run().unwrap(); match now.elapsed() { Ok(elapsed) => println!( "Time taken: {}.{:06} seconds ({} msg/second)", elapsed.as_secs(), elapsed.subsec_micros(), - (n_nodes * n_times * 1000000) as u128 / elapsed.as_micros() + (n_nodes * n_rounds * 1000000) as u128 / elapsed.as_micros() ), - Err(e) => println!("An error occured: {:?}", e), + Err(e) => println!("An error occurred: {:?}", e), } Ok(()) } + +fn parse_args() -> (usize, usize) { + let mut args = env::args(); + + // skip first arg + args.next(); + + let n_nodes = args + .next() + .and_then(|val| val.parse::().ok()) + .unwrap_or_else(|| print_usage_and_exit()); + + if n_nodes <= 1 { + eprintln!("Number of nodes must be > 1"); + ::std::process::exit(1); + } + + let n_rounds = args + .next() + .and_then(|val| val.parse::().ok()) + .unwrap_or_else(|| print_usage_and_exit()); + + if args.next().is_some() { + print_usage_and_exit(); + } + + (n_nodes, n_rounds) +} + +fn print_usage_and_exit() -> ! { + eprintln!("Usage:"); + eprintln!("cargo run --example ring -- "); + ::std::process::exit(1); +}