1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00
actix-extras/actix-ws/examples/chat.rs

159 lines
4.1 KiB
Rust
Raw Normal View History

2023-11-03 23:49:18 +01:00
use std::{
2024-06-20 02:55:14 +02:00
io,
2023-11-03 23:49:18 +01:00
sync::Arc,
time::{Duration, Instant},
};
2024-06-20 02:37:37 +02:00
use actix_web::{
middleware::Logger, web, web::Html, App, HttpRequest, HttpResponse, HttpServer, Responder,
};
use actix_ws::{AggregatedMessage, Session};
2024-06-20 02:55:14 +02:00
use bytestring::ByteString;
2024-03-02 22:29:40 +01:00
use futures_util::{stream::FuturesUnordered, StreamExt as _};
2023-11-03 23:49:18 +01:00
use tokio::sync::Mutex;
2024-06-20 02:55:14 +02:00
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
2023-11-03 23:49:18 +01:00
#[derive(Clone)]
struct Chat {
inner: Arc<Mutex<ChatInner>>,
}
struct ChatInner {
sessions: Vec<Session>,
}
impl Chat {
fn new() -> Self {
Chat {
inner: Arc::new(Mutex::new(ChatInner {
sessions: Vec::new(),
})),
}
}
async fn insert(&self, session: Session) {
self.inner.lock().await.sessions.push(session);
}
2024-06-20 02:55:14 +02:00
async fn send(&self, msg: impl Into<ByteString>) {
let msg = msg.into();
2023-11-03 23:49:18 +01:00
let mut inner = self.inner.lock().await;
let mut unordered = FuturesUnordered::new();
for mut session in inner.sessions.drain(..) {
let msg = msg.clone();
2024-06-20 02:55:14 +02:00
2023-11-03 23:49:18 +01:00
unordered.push(async move {
let res = session.text(msg).await;
2024-06-20 02:55:14 +02:00
res.map(|_| session)
.map_err(|_| tracing::debug!("Dropping session"))
2023-11-03 23:49:18 +01:00
});
}
while let Some(res) = unordered.next().await {
if let Ok(session) = res {
inner.sessions.push(session);
}
}
}
}
async fn ws(
req: HttpRequest,
body: web::Payload,
chat: web::Data<Chat>,
) -> Result<HttpResponse, actix_web::Error> {
let (response, mut session, stream) = actix_ws::handle(&req, body)?;
// increase the maximum allowed frame size to 128KiB and aggregate continuation frames
let mut stream = stream.max_frame_size(128 * 1024).aggregate_continuations();
2023-11-03 23:49:18 +01:00
chat.insert(session.clone()).await;
2024-06-20 02:55:14 +02:00
tracing::info!("Inserted session");
2023-11-03 23:49:18 +01:00
let alive = Arc::new(Mutex::new(Instant::now()));
let mut session2 = session.clone();
let alive2 = alive.clone();
2024-06-20 02:55:14 +02:00
actix_web::rt::spawn(async move {
let mut interval = actix_web::rt::time::interval(Duration::from_secs(5));
2023-11-03 23:49:18 +01:00
loop {
interval.tick().await;
if session2.ping(b"").await.is_err() {
break;
}
if Instant::now().duration_since(*alive2.lock().await) > Duration::from_secs(10) {
let _ = session2.close(None).await;
break;
}
}
});
2024-06-20 02:55:14 +02:00
actix_web::rt::spawn(async move {
while let Some(Ok(msg)) = stream.recv().await {
2023-11-03 23:49:18 +01:00
match msg {
AggregatedMessage::Ping(bytes) => {
2023-11-03 23:49:18 +01:00
if session.pong(&bytes).await.is_err() {
return;
}
}
AggregatedMessage::Text(string) => {
tracing::info!("Relaying text, {string}");
chat.send(string).await;
2023-11-03 23:49:18 +01:00
}
AggregatedMessage::Close(reason) => {
2023-11-03 23:49:18 +01:00
let _ = session.close(reason).await;
2024-06-20 02:55:14 +02:00
tracing::info!("Got close, bailing");
2023-11-03 23:49:18 +01:00
return;
}
AggregatedMessage::Pong(_) => {
2023-11-03 23:49:18 +01:00
*alive.lock().await = Instant::now();
}
2023-11-03 23:49:18 +01:00
_ => (),
};
}
let _ = session.close(None).await;
});
2024-06-20 02:55:14 +02:00
tracing::info!("Spawned");
2023-11-03 23:49:18 +01:00
Ok(response)
}
2024-06-20 02:37:37 +02:00
async fn index() -> impl Responder {
Html::new(include_str!("chat.html").to_owned())
2023-11-03 23:49:18 +01:00
}
2024-06-20 02:55:14 +02:00
#[tokio::main(flavor = "current_thread")]
async fn main() -> io::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();
2023-11-03 23:49:18 +01:00
let chat = Chat::new();
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.app_data(web::Data::new(chat.clone()))
.route("/", web::get().to(index))
.route("/ws", web::get().to(ws))
})
.bind("127.0.0.1:8080")?
.run()
.await?;
Ok(())
}