1
0
mirror of https://github.com/actix/examples synced 2024-11-27 16:02:57 +01:00

add actorless websocket echo example

This commit is contained in:
Rob Ede 2022-07-11 01:44:46 +01:00
parent 4ff98dbaa5
commit a4a060994d
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
8 changed files with 248 additions and 1 deletions

28
Cargo.lock generated
View File

@ -571,6 +571,19 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "actix-ws"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3"
dependencies = [
"actix-codec",
"actix-http",
"actix-web",
"futures-core",
"tokio 1.19.2",
]
[[package]] [[package]]
name = "actix_derive" name = "actix_derive"
version = "0.6.0" version = "0.6.0"
@ -7034,6 +7047,21 @@ dependencies = [
"rand 0.8.5", "rand 0.8.5",
] ]
[[package]]
name = "websocket-echo-actorless-example"
version = "1.0.0"
dependencies = [
"actix-files",
"actix-web",
"actix-ws",
"awc",
"env_logger 0.9.0",
"futures-util",
"log",
"tokio 1.19.2",
"tokio-stream",
]
[[package]] [[package]]
name = "websocket-example" name = "websocket-example"
version = "1.0.0" version = "1.0.0"

View File

@ -56,6 +56,7 @@ members = [
"websockets/chat-broker", "websockets/chat-broker",
"websockets/chat-tcp", "websockets/chat-tcp",
"websockets/chat", "websockets/chat",
"websockets/echo-actorless",
"websockets/echo", "websockets/echo",
] ]
exclude = [ exclude = [

View File

@ -18,7 +18,6 @@ impl LangChoice {
let lang = req let lang = req
.get_header::<AcceptLanguage>() .get_header::<AcceptLanguage>()
.and_then(|lang| lang.preference().into_item()) .and_then(|lang| lang.preference().into_item())
.map(|lang| lang.to_string())
.map_or_else(|| "en".to_owned(), |lang| lang.to_string()); .map_or_else(|| "en".to_owned(), |lang| lang.to_string());
Self(lang) Self(lang)

View File

@ -0,0 +1,16 @@
[package]
name = "websocket-echo-actorless-example"
version = "1.0.0"
edition = "2021"
[dependencies]
actix-files = "0.6"
actix-web = "4"
actix-ws = "0.2.5"
awc = "3"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["std", "sink"] }
log = "0.4"
tokio = { version = "1.13.1", features = ["full"] }
tokio-stream = "0.1.8"

View File

@ -0,0 +1,26 @@
# Echo WebSocket (actor-less)
Simple echo websocket server using [`actix-ws`].
## Usage
### Server
```sh
cd websockets/echo-actorless
cargo run
# starting HTTP server at http://localhost:8080
```
### Web Client
Go to <http://localhost:8080/> in a browser.
### CLI Client
```sh
# using `websocat` (https://github.com/vi/websocat)
websocat -v --ping-interval=2 ws://127.0.0.1:8080/ws
```
[`actix-ws`]: https://crates.io/crates/actix-ws

View File

@ -0,0 +1,116 @@
use std::time::{Duration, Instant};
use actix_web::rt;
use actix_ws::Message;
use futures_util::stream::StreamExt as _;
use tokio::select;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// Echo text & binary messages received from the client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn echo_heartbeat_ws(
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
) {
log::info!("connected");
let mut last_heartbeat = Instant::now();
let mut interval = rt::time::interval(HEARTBEAT_INTERVAL);
loop {
select! {
Some(Ok(msg)) = msg_stream.next() => {
log::debug!("msg: {msg:?}");
match msg {
Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {
last_heartbeat = Instant::now();
}
Message::Text(text) => {
session.text(text).await.unwrap();
}
Message::Binary(bin) => {
session.binary(bin).await.unwrap();
}
Message::Close(reason) => {
let _ = session.close(reason).await;
break;
}
_ => {
let _ = session.close(None).await;
break;
}
}
}
_ = interval.tick() => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
log::info!("client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting");
let _ = session.close(None).await;
break;
}
// send heartbeat ping
let _ = session.ping(b"").await;
// reset interval duration
interval.reset();
}
};
}
}
/// Echo text & binary messages received from the client and respond to ping messages.
///
/// This example is just for demonstration of simplicity. In reality, you likely want to include
/// some handling of heartbeats for connection health tracking to free up server resources when
/// connections die or network issues arise.
pub async fn echo_ws(mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream) {
log::info!("connected");
while let Some(Ok(msg)) = msg_stream.next().await {
log::debug!("msg: {msg:?}");
match msg {
Message::Ping(bytes) => {
let _ = session.pong(&bytes).await;
}
Message::Pong(_) => {}
Message::Text(text) => {
session.text(text).await.unwrap();
}
Message::Binary(bin) => {
session.binary(bin).await.unwrap();
}
Message::Close(reason) => {
let _ = session.close(reason).await;
break;
}
_ => {
let _ = session.close(None).await;
break;
}
}
}
}

View File

@ -0,0 +1,60 @@
//! Simple echo websocket server.
//!
//! Open `http://localhost:8080/` in browser to test.
use actix_files::NamedFile;
use actix_web::{
middleware, rt, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
mod handler;
async fn index() -> impl Responder {
NamedFile::open_async("./static/index.html").await.unwrap()
}
/// Handshake and start WebSocket handler with heartbeats.
async fn echo_heartbeat_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
// spawn websocket handler (and don't await it) so that the response is returned immediately
rt::spawn(handler::echo_heartbeat_ws(session, msg_stream));
Ok(res)
}
/// Handshake and start basic WebSocket handler.
///
/// This example is just for demonstration of simplicity. In reality, you likely want to include
/// some handling of heartbeats for connection health tracking to free up server resources when
/// connections die or network issues arise.
async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
// spawn websocket handler (and don't await it) so that the response is returned immediately
rt::spawn(handler::echo_ws(session, msg_stream));
Ok(res)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
log::info!("starting HTTP server at http://localhost:8080");
HttpServer::new(|| {
App::new()
// WebSocket UI HTML file
.service(web::resource("/").to(index))
// websocket routes
.service(web::resource("/ws").route(web::get().to(echo_heartbeat_ws)))
.service(web::resource("/ws-basic").route(web::get().to(echo_ws)))
// enable logger
.wrap(middleware::Logger::default())
})
.workers(2)
.bind(("127.0.0.1", 8080))?
.run()
.await
}

View File

@ -0,0 +1 @@
../../echo/static/index.html