From 7e21fd753e9a38f4bbffc9974860e606cfdd748e Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 20 Jun 2024 01:55:14 +0100 Subject: [PATCH] docs: clean up ws examples --- actix-ws/Cargo.toml | 7 +++--- actix-ws/README.md | 18 ++++---------- actix-ws/examples/chat.rs | 49 +++++++++++++++++++++++++-------------- actix-ws/src/lib.rs | 12 +++++----- justfile | 7 ++++++ 5 files changed, 52 insertions(+), 41 deletions(-) diff --git a/actix-ws/Cargo.toml b/actix-ws/Cargo.toml index d489fa0cd..537f75aa7 100644 --- a/actix-ws/Cargo.toml +++ b/actix-ws/Cargo.toml @@ -22,10 +22,9 @@ futures-core = "0.3.17" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-rt = "2.6" actix-web = "4.8" -anyhow = "1" futures-util = { version = "0.3.17", default-features = false, features = ["std"] } -log = "0.4" pretty_env_logger = "0.5" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "rt", "macros"] } +tracing = "0.1.30" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/actix-ws/README.md b/actix-ws/README.md index 72390f889..b3744c110 100644 --- a/actix-ws/README.md +++ b/actix-ws/README.md @@ -19,22 +19,14 @@ ## Usage -```toml -# Cargo.toml -anyhow = "1" -actix-web = "4" -actix-ws-ng = "0.3" -``` - ```rust -// main.rs -use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Responder}; use actix_ws::Message; -async fn ws(req: HttpRequest, body: web::Payload) -> Result { +async fn ws(req: HttpRequest, body: web::Payload) -> actix_web::Result { let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; - actix_rt::spawn(async move { + actix_web::rt::spawn(async move { while let Some(Ok(msg)) = msg_stream.next().await { match msg { Message::Ping(bytes) => { @@ -42,7 +34,7 @@ async fn ws(req: HttpRequest, body: web::Payload) -> Result return; } } - Message::Text(s) => println!("Got text, {}", s), + Message::Text(msg) => println!("Got text: {msg}"), _ => break, } } @@ -54,7 +46,7 @@ async fn ws(req: HttpRequest, body: web::Payload) -> Result } #[actix_web::main] -async fn main() -> Result<(), anyhow::Error> { +async fn main() -> std::io::Result<()> { HttpServer::new(move || { App::new() .wrap(Logger::default()) diff --git a/actix-ws/examples/chat.rs b/actix-ws/examples/chat.rs index cea98e2a7..67c1d1573 100644 --- a/actix-ws/examples/chat.rs +++ b/actix-ws/examples/chat.rs @@ -1,4 +1,5 @@ use std::{ + io, sync::Arc, time::{Duration, Instant}, }; @@ -7,9 +8,11 @@ use actix_web::{ middleware::Logger, web, web::Html, App, HttpRequest, HttpResponse, HttpServer, Responder, }; use actix_ws::{Message, Session}; +use bytestring::ByteString; use futures_util::{stream::FuturesUnordered, StreamExt as _}; -use log::info; use tokio::sync::Mutex; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; #[derive(Clone)] struct Chat { @@ -33,15 +36,19 @@ impl Chat { self.inner.lock().await.sessions.push(session); } - async fn send(&self, msg: String) { + async fn send(&self, msg: impl Into) { + let msg = msg.into(); + let mut inner = self.inner.lock().await; let mut unordered = FuturesUnordered::new(); for mut session in inner.sessions.drain(..) { let msg = msg.clone(); + unordered.push(async move { let res = session.text(msg).await; - res.map(|_| session).map_err(|_| info!("Dropping session")) + res.map(|_| session) + .map_err(|_| tracing::debug!("Dropping session")) }); } @@ -61,14 +68,15 @@ async fn ws( let (response, mut session, mut stream) = actix_ws::handle(&req, body)?; chat.insert(session.clone()).await; - info!("Inserted session"); + tracing::info!("Inserted session"); let alive = Arc::new(Mutex::new(Instant::now())); let mut session2 = session.clone(); let alive2 = alive.clone(); - actix_rt::spawn(async move { - let mut interval = actix_rt::time::interval(Duration::from_secs(5)); + actix_web::rt::spawn(async move { + let mut interval = actix_web::rt::time::interval(Duration::from_secs(5)); + loop { interval.tick().await; if session2.ping(b"").await.is_err() { @@ -82,7 +90,7 @@ async fn ws( } }); - actix_rt::spawn(async move { + actix_web::rt::spawn(async move { while let Some(Ok(msg)) = stream.next().await { match msg { Message::Ping(bytes) => { @@ -90,19 +98,18 @@ async fn ws( return; } } - Message::Text(s) => { - info!("Relaying text, {}", s); - let s: &str = s.as_ref(); - chat.send(s.into()).await; + Message::Text(msg) => { + tracing::info!("Relaying msg: {msg}"); + chat.send(msg).await; } Message::Close(reason) => { let _ = session.close(reason).await; - info!("Got close, bailing"); + tracing::info!("Got close, bailing"); return; } Message::Continuation(_) => { let _ = session.close(None).await; - info!("Got continuation, bailing"); + tracing::info!("Got continuation, bailing"); return; } Message::Pong(_) => { @@ -113,7 +120,7 @@ async fn ws( } let _ = session.close(None).await; }); - info!("Spawned"); + tracing::info!("Spawned"); Ok(response) } @@ -122,10 +129,16 @@ async fn index() -> impl Responder { Html::new(include_str!("chat.html").to_owned()) } -#[actix_rt::main] -async fn main() -> Result<(), anyhow::Error> { - std::env::set_var("RUST_LOG", "info"); - pretty_env_logger::init(); +#[tokio::main(flavor = "current_thread")] +async fn main() -> io::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .init(); + let chat = Chat::new(); HttpServer::new(move || { diff --git a/actix-ws/src/lib.rs b/actix-ws/src/lib.rs index 67ce756f2..a33e226a5 100644 --- a/actix-ws/src/lib.rs +++ b/actix-ws/src/lib.rs @@ -27,14 +27,14 @@ pub use self::{ /// Begin handling websocket traffic /// /// ```no_run -/// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer}; +/// use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Responder}; /// use actix_ws::Message; /// use futures_util::StreamExt as _; /// -/// async fn ws(req: HttpRequest, body: web::Payload) -> Result { +/// async fn ws(req: HttpRequest, body: web::Payload) -> actix_web::Result { /// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; /// -/// actix_rt::spawn(async move { +/// actix_web::rt::spawn(async move { /// while let Some(Ok(msg)) = msg_stream.next().await { /// match msg { /// Message::Ping(bytes) => { @@ -42,7 +42,7 @@ pub use self::{ /// return; /// } /// } -/// Message::Text(s) => println!("Got text, {}", s), +/// Message::Text(msg) => println!("Got text: {msg}"), /// _ => break, /// } /// } @@ -53,8 +53,8 @@ pub use self::{ /// Ok(response) /// } /// -/// #[actix_rt::main] -/// async fn main() -> Result<(), anyhow::Error> { +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> std::io::Result<()> { /// HttpServer::new(move || { /// App::new() /// .wrap(Logger::default()) diff --git a/justfile b/justfile index 99b848840..0da3db2f1 100644 --- a/justfile +++ b/justfile @@ -33,6 +33,13 @@ update-readmes: cd ./actix-identity && cargo rdme --force npx -y prettier --write $(fd README.md) +# Test workspace code. +test: + cargo {{ toolchain }} nextest run --workspace --all-features + +# Test workspace code and docs. +test-all: (test) (test-docs) + # Test workspace docs. [group("test")] [group("docs")]