mirror of
https://github.com/actix/actix-extras.git
synced 2024-12-12 15:20:24 +01:00
b0d2947a4a
* Add better support for receiving larger payloads This change enables the maximum frame size to be configured when receiving websocket frames. It also adds a new stream time that aggregates continuation frames together into their proper collected representation. It provides no mechanism yet for sending continuations. * actix-ws: Add continuation & size config to changelog * actix-ws: Add Debug, Eq to AggregatedMessage * actix-ws: Add a configurable maximum size to aggregated continuations * refactor: move aggregate types to own module * test: fix chat example * docs: update changelog --------- Co-authored-by: Rob Ede <robjtede@icloud.com>
159 lines
4.1 KiB
Rust
159 lines
4.1 KiB
Rust
use std::{
|
|
io,
|
|
sync::Arc,
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
use actix_web::{
|
|
middleware::Logger, web, web::Html, App, HttpRequest, HttpResponse, HttpServer, Responder,
|
|
};
|
|
use actix_ws::{AggregatedMessage, Session};
|
|
use bytestring::ByteString;
|
|
use futures_util::{stream::FuturesUnordered, StreamExt as _};
|
|
use tokio::sync::Mutex;
|
|
use tracing::level_filters::LevelFilter;
|
|
use tracing_subscriber::EnvFilter;
|
|
|
|
#[derive(Clone)]
|
|
struct Chat {
|
|
inner: Arc<Mutex<ChatInner>>,
|
|
}
|
|
|
|
struct ChatInner {
|
|
sessions: Vec<Session>,
|
|
}
|
|
|
|
impl Chat {
|
|
fn new() -> Self {
|
|
Chat {
|
|
inner: Arc::new(Mutex::new(ChatInner {
|
|
sessions: Vec::new(),
|
|
})),
|
|
}
|
|
}
|
|
|
|
async fn insert(&self, session: Session) {
|
|
self.inner.lock().await.sessions.push(session);
|
|
}
|
|
|
|
async fn send(&self, msg: impl Into<ByteString>) {
|
|
let msg = msg.into();
|
|
|
|
let mut inner = self.inner.lock().await;
|
|
let mut unordered = FuturesUnordered::new();
|
|
|
|
for mut session in inner.sessions.drain(..) {
|
|
let msg = msg.clone();
|
|
|
|
unordered.push(async move {
|
|
let res = session.text(msg).await;
|
|
res.map(|_| session)
|
|
.map_err(|_| tracing::debug!("Dropping session"))
|
|
});
|
|
}
|
|
|
|
while let Some(res) = unordered.next().await {
|
|
if let Ok(session) = res {
|
|
inner.sessions.push(session);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn ws(
|
|
req: HttpRequest,
|
|
body: web::Payload,
|
|
chat: web::Data<Chat>,
|
|
) -> Result<HttpResponse, actix_web::Error> {
|
|
let (response, mut session, stream) = actix_ws::handle(&req, body)?;
|
|
|
|
// increase the maximum allowed frame size to 128KiB and aggregate continuation frames
|
|
let mut stream = stream.max_frame_size(128 * 1024).aggregate_continuations();
|
|
|
|
chat.insert(session.clone()).await;
|
|
tracing::info!("Inserted session");
|
|
|
|
let alive = Arc::new(Mutex::new(Instant::now()));
|
|
|
|
let mut session2 = session.clone();
|
|
let alive2 = alive.clone();
|
|
actix_web::rt::spawn(async move {
|
|
let mut interval = actix_web::rt::time::interval(Duration::from_secs(5));
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
if session2.ping(b"").await.is_err() {
|
|
break;
|
|
}
|
|
|
|
if Instant::now().duration_since(*alive2.lock().await) > Duration::from_secs(10) {
|
|
let _ = session2.close(None).await;
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
actix_web::rt::spawn(async move {
|
|
while let Some(Ok(msg)) = stream.recv().await {
|
|
match msg {
|
|
AggregatedMessage::Ping(bytes) => {
|
|
if session.pong(&bytes).await.is_err() {
|
|
return;
|
|
}
|
|
}
|
|
|
|
AggregatedMessage::Text(string) => {
|
|
tracing::info!("Relaying text, {string}");
|
|
chat.send(string).await;
|
|
}
|
|
|
|
AggregatedMessage::Close(reason) => {
|
|
let _ = session.close(reason).await;
|
|
tracing::info!("Got close, bailing");
|
|
return;
|
|
}
|
|
|
|
AggregatedMessage::Pong(_) => {
|
|
*alive.lock().await = Instant::now();
|
|
}
|
|
|
|
_ => (),
|
|
};
|
|
}
|
|
let _ = session.close(None).await;
|
|
});
|
|
tracing::info!("Spawned");
|
|
|
|
Ok(response)
|
|
}
|
|
|
|
async fn index() -> impl Responder {
|
|
Html::new(include_str!("chat.html").to_owned())
|
|
}
|
|
|
|
#[tokio::main(flavor = "current_thread")]
|
|
async fn main() -> io::Result<()> {
|
|
tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
EnvFilter::builder()
|
|
.with_default_directive(LevelFilter::INFO.into())
|
|
.from_env_lossy(),
|
|
)
|
|
.init();
|
|
|
|
let chat = Chat::new();
|
|
|
|
HttpServer::new(move || {
|
|
App::new()
|
|
.wrap(Logger::default())
|
|
.app_data(web::Data::new(chat.clone()))
|
|
.route("/", web::get().to(index))
|
|
.route("/ws", web::get().to(ws))
|
|
})
|
|
.bind("127.0.0.1:8080")?
|
|
.run()
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|