1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

remove select macro

This commit is contained in:
Rob Ede 2022-07-12 01:42:58 +01:00
parent 0b93208bb2
commit 25368e6b65
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
5 changed files with 97 additions and 41 deletions

View File

@ -11,7 +11,7 @@ awc = "3"
derive_more = "0.99.5" derive_more = "0.99.5"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.17", default-features = false, features = ["std", "sink"] } futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
log = "0.4" log = "0.4"
rand = "0.8" rand = "0.8"
tokio = { version = "1.13.1", features = ["rt", "macros", "sync", "time"] } tokio = { version = "1.13.1", features = ["rt", "macros", "sync", "time"] }

View File

@ -1,14 +1,17 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use actix_web::rt;
use actix_ws::Message; use actix_ws::Message;
use futures_util::stream::StreamExt as _; use futures_util::{
future::{select, Either},
StreamExt as _,
};
use tokio::{ use tokio::{
select, pin,
sync::{ sync::{
mpsc::{self, UnboundedSender}, mpsc::{self, UnboundedSender},
oneshot, oneshot,
}, },
time::interval,
}; };
use crate::{Command, ConnId, RoomId}; use crate::{Command, ConnId, RoomId};
@ -31,27 +34,41 @@ pub async fn chat_ws(
let mut name = None; let mut name = None;
let mut room = "main".to_owned(); let mut room = "main".to_owned();
let mut last_heartbeat = Instant::now(); let mut last_heartbeat = Instant::now();
let mut interval = rt::time::interval(HEARTBEAT_INTERVAL); let mut interval = interval(HEARTBEAT_INTERVAL);
let (conn_tx, mut conn_rx) = mpsc::unbounded_channel(); let (conn_tx, mut conn_rx) = mpsc::unbounded_channel();
let (res_tx, res_rx) = oneshot::channel(); let (res_tx, res_rx) = oneshot::channel();
// unwrap: chat server is not dropped before the HTTP server
server_tx server_tx
.send(Command::Connect { conn_tx, res_tx }) .send(Command::Connect { conn_tx, res_tx })
.unwrap(); .unwrap();
// unwrap: chat server does not drop our response channel
let conn_id = res_rx.await.unwrap(); let conn_id = res_rx.await.unwrap();
loop { let close_reason = loop {
select! { // most of the futures we process need to be stack-pinned to work with select()
let tick = interval.tick();
pin!(tick);
let msg_rx = conn_rx.recv();
pin!(msg_rx);
let messages = select(msg_stream.next(), msg_rx);
pin!(messages);
match select(messages, tick).await {
// commands & messages received from client // commands & messages received from client
Some(Ok(msg)) = msg_stream.next() => { Either::Left((Either::Left((Some(Ok(msg)), _)), _)) => {
log::debug!("msg: {msg:?}"); log::debug!("msg: {msg:?}");
match msg { match msg {
Message::Ping(bytes) => { Message::Ping(bytes) => {
last_heartbeat = Instant::now(); last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await; // unwrap:
session.pong(&bytes).await.unwrap();
} }
Message::Pong(_) => { Message::Pong(_) => {
@ -59,47 +76,64 @@ pub async fn chat_ws(
} }
Message::Text(text) => { Message::Text(text) => {
process_text_msg(&server_tx, &mut session, &text, conn_id, &mut room, &mut name).await; process_text_msg(
&server_tx,
&mut session,
&text,
conn_id,
&mut room,
&mut name,
)
.await;
} }
Message::Binary(_bin) => { Message::Binary(_bin) => {
log::warn!("unexpected binary message"); log::warn!("unexpected binary message");
} }
Message::Close(reason) => { Message::Close(reason) => break reason,
let _ = session.close(reason).await;
break;
}
_ => { _ => {
let _ = session.close(None).await; break None;
break;
} }
} }
} }
// client WebSocket stream error
Either::Left((Either::Left((Some(Err(err)), _)), _)) => {
log::error!("{}", err);
break None;
}
// client WebSocket stream ended
Either::Left((Either::Left((None, _)), _)) => break None,
// chat messages received from other room participants // chat messages received from other room participants
Some(chat_msg) = conn_rx.recv() => { Either::Left((Either::Right((Some(chat_msg), _)), _)) => {
session.text(chat_msg).await.unwrap(); session.text(chat_msg).await.unwrap();
} }
// heartbeat // all connection's msg senders were dropped
_ = interval.tick() => { Either::Left((Either::Right((None, _)), _)) => unreachable!(),
// heartbeat internal tick
Either::Right((_inst, _)) => {
// if no heartbeat ping/pong received recently, close the connection // if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!("client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"); log::info!(
let _ = session.close(None).await; "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
break; );
break None;
} }
// send heartbeat ping // send heartbeat ping
let _ = session.ping(b"").await; let _ = session.ping(b"").await;
// reset interval duration
interval.reset();
} }
}; };
} };
// attempt to close connection gracefully
let _ = session.close(close_reason).await;
} }
async fn process_text_msg( async fn process_text_msg(
@ -110,12 +144,14 @@ async fn process_text_msg(
room: &mut RoomId, room: &mut RoomId,
name: &mut Option<String>, name: &mut Option<String>,
) { ) {
// strip leading and trailing whitespace (spaces, newlines, etc.)
let msg = text.trim(); let msg = text.trim();
// we check for /<cmd> type of messages // we check for /<cmd> type of messages
if msg.starts_with('/') { if msg.starts_with('/') {
let mut cmd_args = msg.splitn(2, ' '); let mut cmd_args = msg.splitn(2, ' ');
// unwrap: we have guaranteed non-zero string length already
match cmd_args.next().unwrap() { match cmd_args.next().unwrap() {
"/list" => { "/list" => {
// Send ListRooms message to chat server and wait for // Send ListRooms message to chat server and wait for
@ -124,6 +160,7 @@ async fn process_text_msg(
let (res_tx, res_rx) = oneshot::channel(); let (res_tx, res_rx) = oneshot::channel();
server_tx.send(Command::List { res_tx }).unwrap(); server_tx.send(Command::List { res_tx }).unwrap();
// unwrap: chat server does not drop our response channel
let rooms = res_rx.await.unwrap(); let rooms = res_rx.await.unwrap();
for room in rooms { for room in rooms {
@ -145,6 +182,7 @@ async fn process_text_msg(
}) })
.unwrap(); .unwrap();
// unwrap: chat server does not drop our response channel
res_rx.await.unwrap(); res_rx.await.unwrap();
session.text(format!("joined {room_id}")).await.unwrap(); session.text(format!("joined {room_id}")).await.unwrap();
@ -171,6 +209,7 @@ async fn process_text_msg(
} }
} }
} else { } else {
// prefix message with our name, if assigned
let msg = match name { let msg = match name {
Some(ref name) => format!("{name}: {msg}"), Some(ref name) => format!("{name}: {msg}"),
None => msg.to_owned(), None => msg.to_owned(),
@ -186,8 +225,10 @@ async fn process_text_msg(
skip: conn, skip: conn,
res_tx, res_tx,
}) })
// unwrap: chat server is not dropped before the HTTP server
.unwrap(); .unwrap();
// unwrap: chat server does not drop our response channel
res_rx.await.unwrap(); res_rx.await.unwrap();
} }
} }

View File

@ -3,10 +3,12 @@
//! Open `http://localhost:8080/` in browser to test. //! Open `http://localhost:8080/` in browser to test.
use actix_files::NamedFile; use actix_files::NamedFile;
use actix_web::{ use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder, use tokio::{
sync::mpsc::UnboundedSender,
task::{spawn, spawn_local},
try_join,
}; };
use tokio::sync::mpsc::UnboundedSender;
mod command; mod command;
mod handler; mod handler;
@ -37,7 +39,7 @@ async fn chat_ws(
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
// spawn websocket handler (and don't await it) so that the response is returned immediately // spawn websocket handler (and don't await it) so that the response is returned immediately
rt::spawn(handler::chat_ws((**server_tx).clone(), session, msg_stream)); spawn_local(handler::chat_ws((**server_tx).clone(), session, msg_stream));
Ok(res) Ok(res)
} }
@ -51,9 +53,9 @@ async fn main() -> std::io::Result<()> {
let (chat_server, server_tx) = ChatServer::new(); let (chat_server, server_tx) = ChatServer::new();
rt::spawn(chat_server.run()); let chat_server = spawn(chat_server.run());
HttpServer::new(move || { let http_server = HttpServer::new(move || {
App::new() App::new()
.app_data(web::Data::new(server_tx.clone())) .app_data(web::Data::new(server_tx.clone()))
// WebSocket UI HTML file // WebSocket UI HTML file
@ -65,6 +67,9 @@ async fn main() -> std::io::Result<()> {
}) })
.workers(2) .workers(2)
.bind(("127.0.0.1", 8080))? .bind(("127.0.0.1", 8080))?
.run() .run();
.await
try_join!(http_server, async move { chat_server.await.unwrap() })?;
Ok(())
} }

View File

@ -2,6 +2,7 @@
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
io,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
@ -62,7 +63,8 @@ impl ChatServer {
for conn_id in sessions { for conn_id in sessions {
if *conn_id != skip_id { if *conn_id != skip_id {
if let Some(tx) = self.sessions.get(conn_id) { if let Some(tx) = self.sessions.get(conn_id) {
tx.send(msg.clone()).unwrap(); // errors if client disconnected abruptly and hasn't been timed-out yet
let _ = tx.send(msg.clone());
} }
} }
} }
@ -152,12 +154,17 @@ impl ChatServer {
self.send_message(&room, "Someone connected", conn_id).await; self.send_message(&room, "Someone connected", conn_id).await;
} }
pub async fn run(mut self) { pub async fn run(mut self) -> io::Result<()> {
loop { loop {
match self.rx.recv().await.unwrap() { let cmd = match self.rx.recv().await {
Some(cmd) => cmd,
None => break,
};
match cmd {
Command::Connect { conn_tx, res_tx } => { Command::Connect { conn_tx, res_tx } => {
let conn_id = self.connect(conn_tx).await; let conn_id = self.connect(conn_tx).await;
res_tx.send(conn_id).unwrap(); let _ = res_tx.send(conn_id);
} }
Command::Disconnect { conn } => { Command::Disconnect { conn } => {
@ -165,12 +172,12 @@ impl ChatServer {
} }
Command::List { res_tx } => { Command::List { res_tx } => {
res_tx.send(self.list_rooms()).unwrap(); let _ = res_tx.send(self.list_rooms());
} }
Command::Join { conn, room, res_tx } => { Command::Join { conn, room, res_tx } => {
self.join_room(conn, room).await; self.join_room(conn, room).await;
res_tx.send(()).unwrap(); let _ = res_tx.send(());
} }
Command::Message { Command::Message {
@ -180,9 +187,11 @@ impl ChatServer {
res_tx, res_tx,
} => { } => {
self.send_message(&room, msg, skip).await; self.send_message(&room, msg, skip).await;
res_tx.send(()).unwrap(); let _ = res_tx.send(());
} }
} }
} }
Ok(())
} }
} }

View File

@ -73,6 +73,7 @@ pub async fn echo_heartbeat_ws(
// client WebSocket stream error // client WebSocket stream error
Either::Left((Some(Err(err)), _)) => { Either::Left((Some(Err(err)), _)) => {
log::error!("{}", err); log::error!("{}", err);
break None;
} }
// client WebSocket stream ended // client WebSocket stream ended