mirror of
https://github.com/fafhrd91/actix-web
synced 2025-06-26 15:07:42 +02:00
Add WsResponseBuilder
to build web socket session response (#1920)
Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
committed by
GitHub
parent
fa7f3e6908
commit
4c9ca7196d
@ -1,11 +1,9 @@
|
||||
use actix::prelude::*;
|
||||
use actix_web::{
|
||||
http::{header, StatusCode},
|
||||
web, App, HttpRequest, HttpResponse,
|
||||
};
|
||||
use actix_web_actors::*;
|
||||
use actix_http::ws::Codec;
|
||||
use actix_web::{web, App, HttpRequest};
|
||||
use actix_web_actors::ws;
|
||||
use bytes::Bytes;
|
||||
use futures_util::{SinkExt as _, StreamExt as _};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
|
||||
struct Ws;
|
||||
|
||||
@ -15,37 +13,34 @@ impl Actor for Ws {
|
||||
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Ws {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
match msg.unwrap() {
|
||||
ws::Message::Ping(msg) => ctx.pong(&msg),
|
||||
ws::Message::Text(text) => ctx.text(text),
|
||||
ws::Message::Binary(bin) => ctx.binary(bin),
|
||||
ws::Message::Close(reason) => ctx.close(reason),
|
||||
_ => {}
|
||||
match msg {
|
||||
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
||||
Ok(ws::Message::Text(text)) => ctx.text(text),
|
||||
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
|
||||
Ok(ws::Message::Close(reason)) => ctx.close(reason),
|
||||
_ => ctx.close(Some(ws::CloseCode::Error.into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_simple() {
|
||||
let mut srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move { ws::start(Ws, &req, stream) },
|
||||
))
|
||||
});
|
||||
const MAX_FRAME_SIZE: usize = 10_000;
|
||||
const DEFAULT_FRAME_SIZE: usize = 10;
|
||||
|
||||
async fn common_test_code(mut srv: actix_test::TestServer, frame_size: usize) {
|
||||
// client service
|
||||
let mut framed = srv.ws().await.unwrap();
|
||||
framed.send(ws::Message::Text("text".into())).await.unwrap();
|
||||
|
||||
framed.send(ws::Message::Text("text".into())).await.unwrap();
|
||||
let item = framed.next().await.unwrap().unwrap();
|
||||
assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text")));
|
||||
|
||||
let bytes = Bytes::from(vec![0; frame_size]);
|
||||
framed
|
||||
.send(ws::Message::Binary("text".into()))
|
||||
.send(ws::Message::Binary(bytes.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
let item = framed.next().await.unwrap().unwrap();
|
||||
assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text")));
|
||||
assert_eq!(item, ws::Frame::Binary(bytes));
|
||||
|
||||
framed.send(ws::Message::Ping("text".into())).await.unwrap();
|
||||
let item = framed.next().await.unwrap().unwrap();
|
||||
@ -55,55 +50,137 @@ async fn test_simple() {
|
||||
.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let item = framed.next().await.unwrap().unwrap();
|
||||
assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into())));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_with_credentials() {
|
||||
let mut srv = actix_test::start(|| {
|
||||
async fn simple_builder() {
|
||||
let srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move {
|
||||
if req.headers().contains_key("Authorization") {
|
||||
ws::start(Ws, &req, stream)
|
||||
} else {
|
||||
Ok(HttpResponse::new(StatusCode::UNAUTHORIZED))
|
||||
}
|
||||
ws::WsResponseBuilder::new(Ws, &req, stream).start()
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
// client service without credentials
|
||||
match srv.ws().await {
|
||||
Ok(_) => panic!("WebSocket client without credentials should panic"),
|
||||
Err(awc::error::WsClientError::InvalidResponseStatus(status)) => {
|
||||
assert_eq!(status, StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
Err(e) => panic!("Invalid error from WebSocket client: {}", e),
|
||||
}
|
||||
|
||||
let headers = srv.client_headers().unwrap();
|
||||
headers.insert(
|
||||
header::AUTHORIZATION,
|
||||
header::HeaderValue::from_static("Bearer Something"),
|
||||
);
|
||||
|
||||
// client service with credentials
|
||||
let client = srv.ws();
|
||||
|
||||
let mut framed = client.await.unwrap();
|
||||
|
||||
framed.send(ws::Message::Text("text".into())).await.unwrap();
|
||||
|
||||
let item = framed.next().await.unwrap().unwrap();
|
||||
assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text")));
|
||||
|
||||
framed
|
||||
.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let item = framed.next().await.unwrap().unwrap();
|
||||
assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into())));
|
||||
common_test_code(srv, DEFAULT_FRAME_SIZE).await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn builder_with_frame_size() {
|
||||
let srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move {
|
||||
ws::WsResponseBuilder::new(Ws, &req, stream)
|
||||
.frame_size(MAX_FRAME_SIZE)
|
||||
.start()
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
common_test_code(srv, MAX_FRAME_SIZE).await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn builder_with_frame_size_exceeded() {
|
||||
const MAX_FRAME_SIZE: usize = 64;
|
||||
|
||||
let mut srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move {
|
||||
ws::WsResponseBuilder::new(Ws, &req, stream)
|
||||
.frame_size(MAX_FRAME_SIZE)
|
||||
.start()
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
// client service
|
||||
let mut framed = srv.ws().await.unwrap();
|
||||
|
||||
// create a request with a frame size larger than expected
|
||||
let bytes = Bytes::from(vec![0; MAX_FRAME_SIZE + 1]);
|
||||
framed.send(ws::Message::Binary(bytes)).await.unwrap();
|
||||
|
||||
let frame = framed.next().await.unwrap().unwrap();
|
||||
let close_reason = match frame {
|
||||
ws::Frame::Close(Some(reason)) => reason,
|
||||
_ => panic!("close frame expected"),
|
||||
};
|
||||
assert_eq!(close_reason.code, ws::CloseCode::Error);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn builder_with_codec() {
|
||||
let srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move {
|
||||
ws::WsResponseBuilder::new(Ws, &req, stream)
|
||||
.codec(Codec::new())
|
||||
.start()
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
common_test_code(srv, DEFAULT_FRAME_SIZE).await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn builder_with_protocols() {
|
||||
let srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move {
|
||||
ws::WsResponseBuilder::new(Ws, &req, stream)
|
||||
.protocols(&["A", "B"])
|
||||
.start()
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
common_test_code(srv, DEFAULT_FRAME_SIZE).await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn builder_with_codec_and_frame_size() {
|
||||
let srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move {
|
||||
ws::WsResponseBuilder::new(Ws, &req, stream)
|
||||
.codec(Codec::new())
|
||||
.frame_size(MAX_FRAME_SIZE)
|
||||
.start()
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
common_test_code(srv, DEFAULT_FRAME_SIZE).await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn builder_full() {
|
||||
let srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move {
|
||||
ws::WsResponseBuilder::new(Ws, &req, stream)
|
||||
.frame_size(MAX_FRAME_SIZE)
|
||||
.codec(Codec::new())
|
||||
.protocols(&["A", "B"])
|
||||
.start()
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
common_test_code(srv, MAX_FRAME_SIZE).await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn simple_start() {
|
||||
let srv = actix_test::start(|| {
|
||||
App::new().service(web::resource("/").to(
|
||||
|req: HttpRequest, stream: web::Payload| async move { ws::start(Ws, &req, stream) },
|
||||
))
|
||||
});
|
||||
|
||||
common_test_code(srv, DEFAULT_FRAME_SIZE).await;
|
||||
}
|
||||
|
Reference in New Issue
Block a user