diff --git a/Cargo.lock b/Cargo.lock index 0a1a30fc..907a3d74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -571,6 +571,19 @@ dependencies = [ "syn", ] +[[package]] +name = "actix-ws" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3" +dependencies = [ + "actix-codec", + "actix-http", + "actix-web", + "futures-core", + "tokio 1.19.2", +] + [[package]] name = "actix_derive" version = "0.6.0" @@ -7034,6 +7047,21 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "websocket-echo-actorless-example" +version = "1.0.0" +dependencies = [ + "actix-files", + "actix-web", + "actix-ws", + "awc", + "env_logger 0.9.0", + "futures-util", + "log", + "tokio 1.19.2", + "tokio-stream", +] + [[package]] name = "websocket-example" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 1bf9d217..0233bf75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ members = [ "websockets/chat-broker", "websockets/chat-tcp", "websockets/chat", + "websockets/echo-actorless", "websockets/echo", ] exclude = [ diff --git a/templating/fluent/src/lang_choice.rs b/templating/fluent/src/lang_choice.rs index 028d227c..9f422d3e 100644 --- a/templating/fluent/src/lang_choice.rs +++ b/templating/fluent/src/lang_choice.rs @@ -18,7 +18,6 @@ impl LangChoice { let lang = req .get_header::() .and_then(|lang| lang.preference().into_item()) - .map(|lang| lang.to_string()) .map_or_else(|| "en".to_owned(), |lang| lang.to_string()); Self(lang) diff --git a/websockets/echo-actorless/Cargo.toml b/websockets/echo-actorless/Cargo.toml new file mode 100644 index 00000000..93e2143d --- /dev/null +++ b/websockets/echo-actorless/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "websocket-echo-actorless-example" +version = "1.0.0" +edition = "2021" + +[dependencies] +actix-files = "0.6" +actix-web = "4" +actix-ws = "0.2.5" +awc = "3" + +env_logger = "0.9" +futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] } +log = "0.4" +tokio = { version = "1.13.1", features = ["full"] } +tokio-stream = "0.1.8" diff --git a/websockets/echo-actorless/README.md b/websockets/echo-actorless/README.md new file mode 100644 index 00000000..49f9411f --- /dev/null +++ b/websockets/echo-actorless/README.md @@ -0,0 +1,26 @@ +# Echo WebSocket (actor-less) + +Simple echo websocket server using [`actix-ws`]. + +## Usage + +### Server + +```sh +cd websockets/echo-actorless +cargo run +# starting HTTP server at http://localhost:8080 +``` + +### Web Client + +Go to in a browser. + +### CLI Client + +```sh +# using `websocat` (https://github.com/vi/websocat) +websocat -v --ping-interval=2 ws://127.0.0.1:8080/ws +``` + +[`actix-ws`]: https://crates.io/crates/actix-ws diff --git a/websockets/echo-actorless/src/handler.rs b/websockets/echo-actorless/src/handler.rs new file mode 100644 index 00000000..78954057 --- /dev/null +++ b/websockets/echo-actorless/src/handler.rs @@ -0,0 +1,116 @@ +use std::time::{Duration, Instant}; + +use actix_web::rt; +use actix_ws::Message; +use futures_util::stream::StreamExt as _; +use tokio::select; + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); + +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Echo text & binary messages received from the client, respond to ping messages, and monitor +/// connection health to detect network issues and free up resources. +pub async fn echo_heartbeat_ws( + mut session: actix_ws::Session, + mut msg_stream: actix_ws::MessageStream, +) { + log::info!("connected"); + + let mut last_heartbeat = Instant::now(); + + let mut interval = rt::time::interval(HEARTBEAT_INTERVAL); + + loop { + select! { + Some(Ok(msg)) = msg_stream.next() => { + log::debug!("msg: {msg:?}"); + + match msg { + Message::Ping(bytes) => { + last_heartbeat = Instant::now(); + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => { + last_heartbeat = Instant::now(); + } + + Message::Text(text) => { + session.text(text).await.unwrap(); + } + + Message::Binary(bin) => { + session.binary(bin).await.unwrap(); + } + + Message::Close(reason) => { + let _ = session.close(reason).await; + break; + } + + _ => { + let _ = session.close(None).await; + break; + } + } + } + + _ = interval.tick() => { + // if no heartbeat ping/pong received recently, close the connection + if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT { + log::info!("client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"); + let _ = session.close(None).await; + break; + } + + // send heartbeat ping + let _ = session.ping(b"").await; + + // reset interval duration + interval.reset(); + } + }; + } +} + +/// Echo text & binary messages received from the client and respond to ping messages. +/// +/// This example is just for demonstration of simplicity. In reality, you likely want to include +/// some handling of heartbeats for connection health tracking to free up server resources when +/// connections die or network issues arise. +pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream) { + log::info!("connected"); + + while let Some(Ok(msg)) = msg_stream.next().await { + log::debug!("msg: {msg:?}"); + + match msg { + Message::Ping(bytes) => { + let _ = session.pong(&bytes).await; + } + + Message::Pong(_) => {} + + Message::Text(text) => { + session.text(text).await.unwrap(); + } + + Message::Binary(bin) => { + session.binary(bin).await.unwrap(); + } + + Message::Close(reason) => { + let _ = session.close(reason).await; + break; + } + + _ => { + let _ = session.close(None).await; + break; + } + } + } +} diff --git a/websockets/echo-actorless/src/main.rs b/websockets/echo-actorless/src/main.rs new file mode 100644 index 00000000..fcb67604 --- /dev/null +++ b/websockets/echo-actorless/src/main.rs @@ -0,0 +1,60 @@ +//! Simple echo websocket server. +//! +//! Open `http://localhost:8080/` in browser to test. + +use actix_files::NamedFile; +use actix_web::{ + middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder, +}; + +mod handler; + +async fn index() -> impl Responder { + NamedFile::open_async("./static/index.html").await.unwrap() +} + +/// Handshake and start WebSocket handler with heartbeats. +async fn echo_heartbeat_ws(req: HttpRequest, stream: web::Payload) -> Result { + let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; + + // spawn websocket handler (and don't await it) so that the response is returned immediately + rt::spawn(handler::echo_heartbeat_ws(session, msg_stream)); + + Ok(res) +} + +/// Handshake and start basic WebSocket handler. +/// +/// This example is just for demonstration of simplicity. In reality, you likely want to include +/// some handling of heartbeats for connection health tracking to free up server resources when +/// connections die or network issues arise. +async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result { + let (res, session, msg_stream) = actix_ws::handle(&req, stream)?; + + // spawn websocket handler (and don't await it) so that the response is returned immediately + rt::spawn(handler::echo_ws(session, msg_stream)); + + Ok(res) +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + log::info!("starting HTTP server at http://localhost:8080"); + + HttpServer::new(|| { + App::new() + // WebSocket UI HTML file + .service(web::resource("/").to(index)) + // websocket routes + .service(web::resource("/ws").route(web::get().to(echo_heartbeat_ws))) + .service(web::resource("/ws-basic").route(web::get().to(echo_ws))) + // enable logger + .wrap(middleware::Logger::default()) + }) + .workers(2) + .bind(("127.0.0.1", 8080))? + .run() + .await +} diff --git a/websockets/echo-actorless/static/index.html b/websockets/echo-actorless/static/index.html new file mode 120000 index 00000000..24f0aba6 --- /dev/null +++ b/websockets/echo-actorless/static/index.html @@ -0,0 +1 @@ +../../echo/static/index.html \ No newline at end of file