diff --git a/Cargo.lock b/Cargo.lock index 6557e85..90eb320 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8916,10 +8916,12 @@ dependencies = [ "actix-files", "actix-web", "actix-ws", + "awc", "env_logger", "futures-util", "log", "tokio", + "tokio-stream", ] [[package]] diff --git a/websockets/echo-actorless/Cargo.toml b/websockets/echo-actorless/Cargo.toml index 2edfdd9..3059e5f 100644 --- a/websockets/echo-actorless/Cargo.toml +++ b/websockets/echo-actorless/Cargo.toml @@ -3,11 +3,21 @@ name = "websockets-echo-actorless-example" version = "1.0.0" edition = "2021" +[[bin]] +name = "websocket-server" +path = "src/main.rs" + +[[bin]] +name = "websocket-client" +path = "src/client.rs" + [dependencies] actix-files.workspace = true actix-web.workspace = true actix-ws.workspace = true +awc.workspace = true env_logger.workspace = true -futures-util.workspace = true +futures-util = { workspace = true, features = ["sink"] } log.workspace = true tokio = { workspace = true, features = ["rt", "time", "macros"] } +tokio-stream.workspace = true diff --git a/websockets/echo-actorless/README.md b/websockets/echo-actorless/README.md index 59e41cd..bcf29ae 100644 --- a/websockets/echo-actorless/README.md +++ b/websockets/echo-actorless/README.md @@ -8,7 +8,7 @@ Simple echo websocket server using [`actix-ws`]. ```sh cd websockets/echo-actorless -cargo run +cargo run --bin websocket-server # starting HTTP server at http://localhost:8080 ``` @@ -16,6 +16,13 @@ cargo run Go to in a browser. +### rust client + +```sh +cd websockets/echo-actorless +cargo run --bin websocket-client +``` + ### CLI Client ```sh diff --git a/websockets/echo-actorless/src/client.rs b/websockets/echo-actorless/src/client.rs new file mode 100644 index 0000000..6bcfd95 --- /dev/null +++ b/websockets/echo-actorless/src/client.rs @@ -0,0 +1,72 @@ +//! Simple websocket client. + +use std::{io, thread}; + +use actix_web::web::Bytes; +use awc::ws; +use futures_util::{SinkExt as _, StreamExt as _}; +use tokio::{select, sync::mpsc}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +#[actix_web::main] +async fn main() { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + log::info!("starting echo WebSocket client"); + + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + let mut cmd_rx = UnboundedReceiverStream::new(cmd_rx); + + // run blocking terminal input reader on separate thread + let input_thread = thread::spawn(move || loop { + let mut cmd = String::with_capacity(32); + + if io::stdin().read_line(&mut cmd).is_err() { + log::error!("error reading line"); + return; + } + + cmd_tx.send(cmd).unwrap(); + }); + + let (res, mut ws) = awc::Client::new() + .ws("ws://127.0.0.1:8080/ws") + .connect() + .await + .unwrap(); + + log::debug!("response: {res:?}"); + log::info!("connected; server will echo messages sent"); + + loop { + select! { + Some(msg) = ws.next() => { + match msg { + Ok(ws::Frame::Text(txt)) => { + // log echoed messages from server + log::info!("Server: {txt:?}") + } + + Ok(ws::Frame::Ping(_)) => { + // respond to ping probes + ws.send(ws::Message::Pong(Bytes::new())).await.unwrap(); + } + + _ => {} + } + } + + Some(cmd) = cmd_rx.next() => { + if cmd.is_empty() { + continue; + } + + ws.send(ws::Message::Text(cmd.into())).await.unwrap(); + } + + else => break + } + } + + input_thread.join().unwrap(); +} diff --git a/websockets/echo/Cargo.toml b/websockets/echo/Cargo.toml index 980ce36..e649145 100644 --- a/websockets/echo/Cargo.toml +++ b/websockets/echo/Cargo.toml @@ -22,4 +22,4 @@ env_logger.workspace = true futures-util = { workspace = true, features = ["sink"] } log.workspace = true tokio = { workspace = true, features = ["full"] } -tokio-stream = "0.1.8" +tokio-stream.workspace = true diff --git a/websockets/echo/src/client.rs b/websockets/echo/src/client.rs deleted file mode 100644 index 6bcfd95..0000000 --- a/websockets/echo/src/client.rs +++ /dev/null @@ -1,72 +0,0 @@ -//! Simple websocket client. - -use std::{io, thread}; - -use actix_web::web::Bytes; -use awc::ws; -use futures_util::{SinkExt as _, StreamExt as _}; -use tokio::{select, sync::mpsc}; -use tokio_stream::wrappers::UnboundedReceiverStream; - -#[actix_web::main] -async fn main() { - env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); - - log::info!("starting echo WebSocket client"); - - let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let mut cmd_rx = UnboundedReceiverStream::new(cmd_rx); - - // run blocking terminal input reader on separate thread - let input_thread = thread::spawn(move || loop { - let mut cmd = String::with_capacity(32); - - if io::stdin().read_line(&mut cmd).is_err() { - log::error!("error reading line"); - return; - } - - cmd_tx.send(cmd).unwrap(); - }); - - let (res, mut ws) = awc::Client::new() - .ws("ws://127.0.0.1:8080/ws") - .connect() - .await - .unwrap(); - - log::debug!("response: {res:?}"); - log::info!("connected; server will echo messages sent"); - - loop { - select! { - Some(msg) = ws.next() => { - match msg { - Ok(ws::Frame::Text(txt)) => { - // log echoed messages from server - log::info!("Server: {txt:?}") - } - - Ok(ws::Frame::Ping(_)) => { - // respond to ping probes - ws.send(ws::Message::Pong(Bytes::new())).await.unwrap(); - } - - _ => {} - } - } - - Some(cmd) = cmd_rx.next() => { - if cmd.is_empty() { - continue; - } - - ws.send(ws::Message::Text(cmd.into())).await.unwrap(); - } - - else => break - } - } - - input_thread.join().unwrap(); -} diff --git a/websockets/echo/src/client.rs b/websockets/echo/src/client.rs new file mode 120000 index 0000000..eaf2039 --- /dev/null +++ b/websockets/echo/src/client.rs @@ -0,0 +1 @@ +../../echo-actorless/src/client.rs \ No newline at end of file