1
0
mirror of https://github.com/actix/actix synced 2025-01-22 14:15:58 +01:00

Fix broken Ring example (#306)

Co-authored-by: Yuki Okushi <huyuumi.dev@gmail.com>
This commit is contained in:
Martin Hellspong 2020-01-21 23:18:00 +01:00 committed by Yuki Okushi
parent 0b28a7e5a2
commit 5e8fdd0408

View File

@ -19,6 +19,7 @@ impl Message for Payload {
} }
struct Node { struct Node {
id: usize,
limit: usize, limit: usize,
next: Recipient<Payload>, next: Recipient<Payload>,
} }
@ -32,10 +33,24 @@ impl Handler<Payload> for Node {
fn handle(&mut self, msg: Payload, _: &mut Context<Self>) { fn handle(&mut self, msg: Payload, _: &mut Context<Self>) {
if msg.0 >= self.limit { if msg.0 >= self.limit {
println!("Reached limit of {} (payload was {})", self.limit, msg.0); println!(
"Actor {} reached limit of {} (payload was {})",
self.id, self.limit, msg.0
);
System::current().stop(); System::current().stop();
return; return;
} }
// Some prime in order for different actors to report progress.
// Large enough to print about once per second in debug mode.
if msg.0 % 498989 == 1 {
println!(
"Actor {} received message {} of {} ({:.2}%)",
self.id,
msg.0,
self.limit,
100.0 * msg.0 as f32 / self.limit as f32
);
}
self.next self.next
.do_send(Payload(msg.0 + 1)) .do_send(Payload(msg.0 + 1))
.expect("Unable to send payload"); .expect("Unable to send payload");
@ -47,8 +62,9 @@ fn print_usage_and_exit() -> ! {
::std::process::exit(1); ::std::process::exit(1);
} }
#[actix_rt::main] fn main() -> std::io::Result<()> {
async fn main() { let system = System::new("ring");
let args = env::args().collect::<Vec<_>>(); let args = env::args().collect::<Vec<_>>();
if args.len() < 3 { if args.len() < 3 {
print_usage_and_exit(); print_usage_and_exit();
@ -69,33 +85,36 @@ async fn main() {
print_usage_and_exit() print_usage_and_exit()
}; };
let now = SystemTime::now(); let setup = SystemTime::now();
println!("Setting up nodes"); println!("Setting up {} nodes", n_nodes);
let limit = n_nodes * n_times;
let node = Node::create(move |ctx| { let node = Node::create(move |ctx| {
let first_addr = ctx.address(); let first_addr = ctx.address();
let mut prev_addr = Node { let mut prev_addr = Node {
limit: n_nodes * n_times, id: 1,
limit,
next: first_addr.recipient(), next: first_addr.recipient(),
} }
.start(); .start();
for _ in 2..n_nodes { for id in 2..n_nodes {
prev_addr = Node { prev_addr = Node {
limit: n_nodes * n_times, id,
limit,
next: prev_addr.recipient(), next: prev_addr.recipient(),
} }
.start(); .start();
} }
Node { Node {
limit: n_nodes * n_times, id: n_nodes,
limit,
next: prev_addr.recipient(), next: prev_addr.recipient(),
} }
}); });
node.send(Payload(0)).await.unwrap();
match now.elapsed() { match setup.elapsed() {
Ok(elapsed) => println!( Ok(elapsed) => println!(
"Time taken: {}.{:06} seconds", "Time taken: {}.{:06} seconds",
elapsed.as_secs(), elapsed.as_secs(),
@ -103,4 +122,28 @@ async fn main() {
), ),
Err(e) => println!("An error occured: {:?}", e), Err(e) => println!("An error occured: {:?}", e),
} }
println!(
"Sending start message and waiting for termination after {} messages...",
limit
);
let now = SystemTime::now();
let _req = node.send(Payload(1));
match system.run() {
Ok(_) => println!("Completed"),
Err(e) => println!("An error occured: {:?}", e),
}
match now.elapsed() {
Ok(elapsed) => println!(
"Time taken: {}.{:06} seconds ({} msg/second)",
elapsed.as_secs(),
elapsed.subsec_micros(),
(n_nodes * n_times * 1000000) as u128 / elapsed.as_micros()
),
Err(e) => println!("An error occured: {:?}", e),
}
Ok(())
} }