1
0
mirror of https://github.com/actix/actix synced 2025-01-22 14:15:58 +01:00

fix ring example

This commit is contained in:
Rob Ede 2021-02-04 22:33:39 +00:00
parent f7c6ef58ad
commit c83468359d
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6

View File

@ -1,23 +1,20 @@
/* //! Ring benchmark inspired by Programming Erlang: Software for a
* Ring benchmark inspired by Programming Erlang: Software for a //! Concurrent World, by Joe Armstrong, Chapter 8.11.2
* Concurrent World, by Joe Armstrong, Chapter 8.11.2 //!
* //! "Write a ring benchmark. Create N processes in a ring. Send a
* "Write a ring benchmark. Create N processes in a ring. Send a //! message round the ring M times so that a total of N * M messages
* message round the ring M times so that a total of N * M messages //! get sent. Time how long this takes for different values of N and M."
* get sent. Time how long this takes for different values of N and M."
*/
use actix::*;
use std::env; use std::{env, io, time::SystemTime};
use std::time::SystemTime;
use actix::prelude::*;
/// A payload with a counter /// A payload with a counter
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct Payload(usize); struct Payload(usize);
impl Message for Payload { #[derive(Debug)]
type Result = ();
}
struct Node { struct Node {
id: usize, id: usize,
limit: usize, limit: usize,
@ -37,9 +34,11 @@ impl Handler<Payload> for Node {
"Actor {} reached limit of {} (payload was {})", "Actor {} reached limit of {} (payload was {})",
self.id, self.limit, msg.0 self.id, self.limit, msg.0
); );
System::current().stop(); System::current().stop();
return; return;
} }
// Some prime in order for different actors to report progress. // Some prime in order for different actors to report progress.
// Large enough to print about once per second in debug mode. // Large enough to print about once per second in debug mode.
if msg.0 % 498989 == 1 { if msg.0 % 498989 == 1 {
@ -51,99 +50,103 @@ impl Handler<Payload> for Node {
100.0 * msg.0 as f32 / self.limit as f32 100.0 * msg.0 as f32 / self.limit as f32
); );
} }
self.next self.next
.do_send(Payload(msg.0 + 1)) .do_send(Payload(msg.0 + 1))
.expect("Unable to send payload"); .expect("Unable to send payload");
} }
} }
fn print_usage_and_exit() -> ! { fn main() -> io::Result<()> {
eprintln!("Usage; actix-test <num-nodes> <num-times-message-around-ring>"); let sys = System::new();
::std::process::exit(1);
}
fn main() -> std::io::Result<()> { let (n_nodes, n_rounds) = parse_args();
let system = System::new();
let args = env::args().collect::<Vec<_>>(); let now = SystemTime::now();
if args.len() < 3 {
print_usage_and_exit();
}
let n_nodes = if let Ok(arg_num_nodes) = args[1].parse::<usize>() {
if arg_num_nodes <= 1 {
eprintln!("Number of nodes must be > 1");
::std::process::exit(1);
}
arg_num_nodes
} else {
print_usage_and_exit();
};
let n_times = if let Ok(arg_ntimes) = args[2].parse::<usize>() { sys.block_on(async {
arg_ntimes println!("Setting up {} nodes", n_nodes);
} else { let limit = n_nodes * n_rounds;
print_usage_and_exit()
};
let setup = SystemTime::now(); let node = Node::create(move |ctx| {
let first_addr = ctx.address();
println!("Setting up {} nodes", n_nodes); let mut prev_addr = Node {
let limit = n_nodes * n_times; id: 1,
let node = Node::create(move |ctx| { limit,
let first_addr = ctx.address(); next: first_addr.recipient(),
let mut prev_addr = Node { }
id: 1, .start();
limit,
next: first_addr.recipient(),
}
.start();
for id in 2..n_nodes { for id in 2..n_nodes {
prev_addr = Node { prev_addr = Node {
id, id,
limit,
next: prev_addr.recipient(),
}
.start();
}
Node {
id: n_nodes,
limit, limit,
next: prev_addr.recipient(), next: prev_addr.recipient(),
} }
.start(); });
}
Node { println!(
id: n_nodes, "Sending start message and waiting for termination after {} messages...",
limit, limit
next: prev_addr.recipient(), );
}
node.send(Payload(1)).await.unwrap();
}); });
match setup.elapsed() { sys.run().unwrap();
Ok(elapsed) => println!(
"Time taken: {}.{:06} seconds",
elapsed.as_secs(),
elapsed.subsec_micros()
),
Err(e) => println!("An error occured: {:?}", e),
}
println!(
"Sending start message and waiting for termination after {} messages...",
limit
);
let now = SystemTime::now();
let _req = node.send(Payload(1));
match system.run() {
Ok(_) => println!("Completed"),
Err(e) => println!("An error occured: {:?}", e),
}
match now.elapsed() { match now.elapsed() {
Ok(elapsed) => println!( Ok(elapsed) => println!(
"Time taken: {}.{:06} seconds ({} msg/second)", "Time taken: {}.{:06} seconds ({} msg/second)",
elapsed.as_secs(), elapsed.as_secs(),
elapsed.subsec_micros(), elapsed.subsec_micros(),
(n_nodes * n_times * 1000000) as u128 / elapsed.as_micros() (n_nodes * n_rounds * 1000000) as u128 / elapsed.as_micros()
), ),
Err(e) => println!("An error occured: {:?}", e), Err(e) => println!("An error occurred: {:?}", e),
} }
Ok(()) Ok(())
} }
fn parse_args() -> (usize, usize) {
let mut args = env::args();
// skip first arg
args.next();
let n_nodes = args
.next()
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or_else(|| print_usage_and_exit());
if n_nodes <= 1 {
eprintln!("Number of nodes must be > 1");
::std::process::exit(1);
}
let n_rounds = args
.next()
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or_else(|| print_usage_and_exit());
if args.next().is_some() {
print_usage_and_exit();
}
(n_nodes, n_rounds)
}
fn print_usage_and_exit() -> ! {
eprintln!("Usage:");
eprintln!("cargo run --example ring -- <num-nodes> <num-times-message-around-ring>");
::std::process::exit(1);
}