1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-23 23:51:06 +01:00

docs: clean up ws examples

This commit is contained in:
Rob Ede 2024-06-20 01:55:14 +01:00
parent e7ee2a06ab
commit 7e21fd753e
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
5 changed files with 52 additions and 41 deletions

View File

@ -22,10 +22,9 @@ futures-core = "0.3.17"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.6"
actix-web = "4.8" actix-web = "4.8"
anyhow = "1"
futures-util = { version = "0.3.17", default-features = false, features = ["std"] } futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
log = "0.4"
pretty_env_logger = "0.5" pretty_env_logger = "0.5"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync", "rt", "macros"] }
tracing = "0.1.30"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -19,22 +19,14 @@
## Usage ## Usage
```toml
# Cargo.toml
anyhow = "1"
actix-web = "4"
actix-ws-ng = "0.3"
```
```rust ```rust
// main.rs use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Responder};
use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_ws::Message; use actix_ws::Message;
async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> { async fn ws(req: HttpRequest, body: web::Payload) -> actix_web::Result<impl Responder> {
let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
actix_rt::spawn(async move { actix_web::rt::spawn(async move {
while let Some(Ok(msg)) = msg_stream.next().await { while let Some(Ok(msg)) = msg_stream.next().await {
match msg { match msg {
Message::Ping(bytes) => { Message::Ping(bytes) => {
@ -42,7 +34,7 @@ async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error>
return; return;
} }
} }
Message::Text(s) => println!("Got text, {}", s), Message::Text(msg) => println!("Got text: {msg}"),
_ => break, _ => break,
} }
} }
@ -54,7 +46,7 @@ async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error>
} }
#[actix_web::main] #[actix_web::main]
async fn main() -> Result<(), anyhow::Error> { async fn main() -> std::io::Result<()> {
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.wrap(Logger::default()) .wrap(Logger::default())

View File

@ -1,4 +1,5 @@
use std::{ use std::{
io,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -7,9 +8,11 @@ use actix_web::{
middleware::Logger, web, web::Html, App, HttpRequest, HttpResponse, HttpServer, Responder, middleware::Logger, web, web::Html, App, HttpRequest, HttpResponse, HttpServer, Responder,
}; };
use actix_ws::{Message, Session}; use actix_ws::{Message, Session};
use bytestring::ByteString;
use futures_util::{stream::FuturesUnordered, StreamExt as _}; use futures_util::{stream::FuturesUnordered, StreamExt as _};
use log::info;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
#[derive(Clone)] #[derive(Clone)]
struct Chat { struct Chat {
@ -33,15 +36,19 @@ impl Chat {
self.inner.lock().await.sessions.push(session); self.inner.lock().await.sessions.push(session);
} }
async fn send(&self, msg: String) { async fn send(&self, msg: impl Into<ByteString>) {
let msg = msg.into();
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
let mut unordered = FuturesUnordered::new(); let mut unordered = FuturesUnordered::new();
for mut session in inner.sessions.drain(..) { for mut session in inner.sessions.drain(..) {
let msg = msg.clone(); let msg = msg.clone();
unordered.push(async move { unordered.push(async move {
let res = session.text(msg).await; let res = session.text(msg).await;
res.map(|_| session).map_err(|_| info!("Dropping session")) res.map(|_| session)
.map_err(|_| tracing::debug!("Dropping session"))
}); });
} }
@ -61,14 +68,15 @@ async fn ws(
let (response, mut session, mut stream) = actix_ws::handle(&req, body)?; let (response, mut session, mut stream) = actix_ws::handle(&req, body)?;
chat.insert(session.clone()).await; chat.insert(session.clone()).await;
info!("Inserted session"); tracing::info!("Inserted session");
let alive = Arc::new(Mutex::new(Instant::now())); let alive = Arc::new(Mutex::new(Instant::now()));
let mut session2 = session.clone(); let mut session2 = session.clone();
let alive2 = alive.clone(); let alive2 = alive.clone();
actix_rt::spawn(async move { actix_web::rt::spawn(async move {
let mut interval = actix_rt::time::interval(Duration::from_secs(5)); let mut interval = actix_web::rt::time::interval(Duration::from_secs(5));
loop { loop {
interval.tick().await; interval.tick().await;
if session2.ping(b"").await.is_err() { if session2.ping(b"").await.is_err() {
@ -82,7 +90,7 @@ async fn ws(
} }
}); });
actix_rt::spawn(async move { actix_web::rt::spawn(async move {
while let Some(Ok(msg)) = stream.next().await { while let Some(Ok(msg)) = stream.next().await {
match msg { match msg {
Message::Ping(bytes) => { Message::Ping(bytes) => {
@ -90,19 +98,18 @@ async fn ws(
return; return;
} }
} }
Message::Text(s) => { Message::Text(msg) => {
info!("Relaying text, {}", s); tracing::info!("Relaying msg: {msg}");
let s: &str = s.as_ref(); chat.send(msg).await;
chat.send(s.into()).await;
} }
Message::Close(reason) => { Message::Close(reason) => {
let _ = session.close(reason).await; let _ = session.close(reason).await;
info!("Got close, bailing"); tracing::info!("Got close, bailing");
return; return;
} }
Message::Continuation(_) => { Message::Continuation(_) => {
let _ = session.close(None).await; let _ = session.close(None).await;
info!("Got continuation, bailing"); tracing::info!("Got continuation, bailing");
return; return;
} }
Message::Pong(_) => { Message::Pong(_) => {
@ -113,7 +120,7 @@ async fn ws(
} }
let _ = session.close(None).await; let _ = session.close(None).await;
}); });
info!("Spawned"); tracing::info!("Spawned");
Ok(response) Ok(response)
} }
@ -122,10 +129,16 @@ async fn index() -> impl Responder {
Html::new(include_str!("chat.html").to_owned()) Html::new(include_str!("chat.html").to_owned())
} }
#[actix_rt::main] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), anyhow::Error> { async fn main() -> io::Result<()> {
std::env::set_var("RUST_LOG", "info"); tracing_subscriber::fmt()
pretty_env_logger::init(); .with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();
let chat = Chat::new(); let chat = Chat::new();
HttpServer::new(move || { HttpServer::new(move || {

View File

@ -27,14 +27,14 @@ pub use self::{
/// Begin handling websocket traffic /// Begin handling websocket traffic
/// ///
/// ```no_run /// ```no_run
/// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer}; /// use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Responder};
/// use actix_ws::Message; /// use actix_ws::Message;
/// use futures_util::StreamExt as _; /// use futures_util::StreamExt as _;
/// ///
/// async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> { /// async fn ws(req: HttpRequest, body: web::Payload) -> actix_web::Result<impl Responder> {
/// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; /// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
/// ///
/// actix_rt::spawn(async move { /// actix_web::rt::spawn(async move {
/// while let Some(Ok(msg)) = msg_stream.next().await { /// while let Some(Ok(msg)) = msg_stream.next().await {
/// match msg { /// match msg {
/// Message::Ping(bytes) => { /// Message::Ping(bytes) => {
@ -42,7 +42,7 @@ pub use self::{
/// return; /// return;
/// } /// }
/// } /// }
/// Message::Text(s) => println!("Got text, {}", s), /// Message::Text(msg) => println!("Got text: {msg}"),
/// _ => break, /// _ => break,
/// } /// }
/// } /// }
@ -53,8 +53,8 @@ pub use self::{
/// Ok(response) /// Ok(response)
/// } /// }
/// ///
/// #[actix_rt::main] /// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), anyhow::Error> { /// async fn main() -> std::io::Result<()> {
/// HttpServer::new(move || { /// HttpServer::new(move || {
/// App::new() /// App::new()
/// .wrap(Logger::default()) /// .wrap(Logger::default())

View File

@ -33,6 +33,13 @@ update-readmes:
cd ./actix-identity && cargo rdme --force cd ./actix-identity && cargo rdme --force
npx -y prettier --write $(fd README.md) npx -y prettier --write $(fd README.md)
# Test workspace code.
test:
cargo {{ toolchain }} nextest run --workspace --all-features
# Test workspace code and docs.
test-all: (test) (test-docs)
# Test workspace docs. # Test workspace docs.
[group("test")] [group("test")]
[group("docs")] [group("docs")]