From fd1725272528ca41abd4ac00a6a2d7e614a19dc3 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 11 Jul 2022 20:19:20 +0100 Subject: [PATCH] remove select macro from echo example --- websockets/chat-broker/src/session.rs | 2 +- websockets/chat-tcp/src/main.rs | 2 +- websockets/chat-tcp/src/server.rs | 8 ++++---- websockets/chat-tcp/src/session.rs | 2 +- websockets/chat/src/main.rs | 2 +- websockets/chat/src/server.rs | 10 +++++----- websockets/echo-actorless/Cargo.toml | 2 +- websockets/echo-actorless/README.md | 4 ++-- websockets/echo-actorless/src/handler.rs | 22 +++++++++++++++------- 9 files changed, 31 insertions(+), 23 deletions(-) diff --git a/websockets/chat-broker/src/session.rs b/websockets/chat-broker/src/session.rs index 30f7a41..5755254 100644 --- a/websockets/chat-broker/src/session.rs +++ b/websockets/chat-broker/src/session.rs @@ -78,7 +78,7 @@ impl Actor for WsChatSession { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { - self.join_room("Main", ctx); + self.join_room("main", ctx); } fn stopped(&mut self, _ctx: &mut Self::Context) { diff --git a/websockets/chat-tcp/src/main.rs b/websockets/chat-tcp/src/main.rs index f006e0b..27cfccf 100644 --- a/websockets/chat-tcp/src/main.rs +++ b/websockets/chat-tcp/src/main.rs @@ -29,7 +29,7 @@ async fn chat_route( WsChatSession { id: 0, hb: Instant::now(), - room: "Main".to_owned(), + room: "main".to_owned(), name: None, addr: srv.get_ref().clone(), }, diff --git a/websockets/chat-tcp/src/server.rs b/websockets/chat-tcp/src/server.rs index 6e699dc..eb3c1fa 100644 --- a/websockets/chat-tcp/src/server.rs +++ b/websockets/chat-tcp/src/server.rs @@ -66,7 +66,7 @@ impl Default for ChatServer { fn default() -> ChatServer { // default room let mut rooms = HashMap::new(); - rooms.insert("Main".to_owned(), HashSet::new()); + rooms.insert("main".to_owned(), HashSet::new()); ChatServer { sessions: HashMap::new(), @@ -108,14 +108,14 @@ impl Handler for ChatServer { println!("Someone joined"); // notify all users in same room - self.send_message("Main", "Someone joined", 0); + self.send_message("main", "Someone joined", 0); // register session with random id let id = self.rng.gen::(); self.sessions.insert(id, msg.addr); - // auto join session to Main room - self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); + // auto join session to main room + self.rooms.get_mut("main").unwrap().insert(id); // send id back id diff --git a/websockets/chat-tcp/src/session.rs b/websockets/chat-tcp/src/session.rs index 42f4a04..43f8af5 100644 --- a/websockets/chat-tcp/src/session.rs +++ b/websockets/chat-tcp/src/session.rs @@ -147,7 +147,7 @@ impl ChatSession { id: 0, addr, hb: Instant::now(), - room: "Main".to_owned(), + room: "main".to_owned(), framed, } } diff --git a/websockets/chat/src/main.rs b/websockets/chat/src/main.rs index 932810f..88c35ab 100644 --- a/websockets/chat/src/main.rs +++ b/websockets/chat/src/main.rs @@ -30,7 +30,7 @@ async fn chat_route( session::WsChatSession { id: 0, hb: Instant::now(), - room: "Main".to_owned(), + room: "main".to_owned(), name: None, addr: srv.get_ref().clone(), }, diff --git a/websockets/chat/src/server.rs b/websockets/chat/src/server.rs index 87be846..c5531e3 100644 --- a/websockets/chat/src/server.rs +++ b/websockets/chat/src/server.rs @@ -79,7 +79,7 @@ impl ChatServer { pub fn new(visitor_count: Arc) -> ChatServer { // default room let mut rooms = HashMap::new(); - rooms.insert("Main".to_owned(), HashSet::new()); + rooms.insert("main".to_owned(), HashSet::new()); ChatServer { sessions: HashMap::new(), @@ -122,20 +122,20 @@ impl Handler for ChatServer { println!("Someone joined"); // notify all users in same room - self.send_message("Main", "Someone joined", 0); + self.send_message("main", "Someone joined", 0); // register session with random id let id = self.rng.gen::(); self.sessions.insert(id, msg.addr); - // auto join session to Main room + // auto join session to main room self.rooms - .entry("Main".to_owned()) + .entry("main".to_owned()) .or_insert_with(HashSet::new) .insert(id); let count = self.visitor_count.fetch_add(1, Ordering::SeqCst); - self.send_message("Main", &format!("Total visitors {count}"), 0); + self.send_message("main", &format!("Total visitors {count}"), 0); // send id back id diff --git a/websockets/echo-actorless/Cargo.toml b/websockets/echo-actorless/Cargo.toml index 93e2143..1ef89f1 100644 --- a/websockets/echo-actorless/Cargo.toml +++ b/websockets/echo-actorless/Cargo.toml @@ -11,6 +11,6 @@ awc = "3" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] } +futures-lite = "1.3" log = "0.4" tokio = { version = "1.13.1", features = ["full"] } -tokio-stream = "0.1.8" diff --git a/websockets/echo-actorless/README.md b/websockets/echo-actorless/README.md index 49f9411..59e41cd 100644 --- a/websockets/echo-actorless/README.md +++ b/websockets/echo-actorless/README.md @@ -12,9 +12,9 @@ cargo run # starting HTTP server at http://localhost:8080 ``` -### Web Client +### Browser Client -Go to in a browser. +Go to in a browser. ### CLI Client diff --git a/websockets/echo-actorless/src/handler.rs b/websockets/echo-actorless/src/handler.rs index 7895405..ea4c5e9 100644 --- a/websockets/echo-actorless/src/handler.rs +++ b/websockets/echo-actorless/src/handler.rs @@ -2,8 +2,8 @@ use std::time::{Duration, Instant}; use actix_web::rt; use actix_ws::Message; -use futures_util::stream::StreamExt as _; -use tokio::select; +use futures_lite::future; +use futures_util::{future::Either, stream::StreamExt as _, FutureExt as _}; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); @@ -20,12 +20,16 @@ pub async fn echo_heartbeat_ws( log::info!("connected"); let mut last_heartbeat = Instant::now(); - let mut interval = rt::time::interval(HEARTBEAT_INTERVAL); loop { - select! { - Some(Ok(msg)) = msg_stream.next() => { + match future::or( + msg_stream.next().map(Either::Left), + interval.tick().map(Either::Right), + ) + .await + { + Either::Left(Some(Ok(msg))) => { log::debug!("msg: {msg:?}"); match msg { @@ -58,10 +62,14 @@ pub async fn echo_heartbeat_ws( } } - _ = interval.tick() => { + Either::Left(_) => {} + + Either::Right(_) => { // if no heartbeat ping/pong received recently, close the connection if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { - log::info!("client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"); + log::info!( + "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting" + ); let _ = session.close(None).await; break; }