1
0
mirror of https://github.com/actix/examples synced 2025-06-27 01:27:43 +02:00

update all websocket examples to v4

This commit is contained in:
Rob Ede
2022-02-18 01:44:53 +00:00
parent 1b23e3ff3d
commit 4d8573c3fe
40 changed files with 1340 additions and 1682 deletions

View File

@ -0,0 +1,72 @@
//! Simple websocket client.
use std::{io, thread};
use actix_web::web::Bytes;
use awc::ws;
use futures::{SinkExt as _, StreamExt as _};
use tokio::{select, sync::mpsc};
use tokio_stream::wrappers::UnboundedReceiverStream;
#[actix_web::main]
async fn main() {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
log::info!("starting echo WebSocket client");
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let mut cmd_rx = UnboundedReceiverStream::new(cmd_rx);
// run blocking terminal input reader on separate thread
let input_thread = thread::spawn(move || loop {
let mut cmd = String::with_capacity(32);
if io::stdin().read_line(&mut cmd).is_err() {
log::error!("error reading line");
return;
}
cmd_tx.send(cmd).unwrap();
});
let (res, mut ws) = awc::Client::new()
.ws("ws://127.0.0.1:8080/ws")
.connect()
.await
.unwrap();
log::debug!("response: {res:?}");
log::info!("connected; server will echo messages sent");
loop {
select! {
Some(msg) = ws.next() => {
match msg {
Ok(ws::Frame::Text(txt)) => {
// log echoed messages from server
log::info!("Server: {:?}", txt)
}
Ok(ws::Frame::Ping(_)) => {
// respond to ping probes
ws.send(ws::Message::Pong(Bytes::new())).await.unwrap();
}
_ => {}
}
}
Some(cmd) = cmd_rx.next() => {
if cmd.is_empty() {
continue;
}
ws.send(ws::Message::Text(cmd.into())).await.unwrap();
}
else => break
}
}
input_thread.join().unwrap();
}

View File

@ -0,0 +1,42 @@
//! Simple echo websocket server.
//!
//! Open `http://localhost:8080/` in browser to test.
use actix_files::NamedFile;
use actix_web::{
middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
use actix_web_actors::ws;
mod server;
use self::server::MyWebSocket;
async fn index() -> impl Responder {
NamedFile::open_async("./static/index.html").await.unwrap()
}
/// WebSocket handshake and start `MyWebSocket` actor.
async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
ws::start(MyWebSocket::new(), &req, stream)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
log::info!("starting HTTP server at http://localhost:8080");
HttpServer::new(|| {
App::new()
// WebSocket UI HTML file
.service(web::resource("/").to(index))
// websocket route
.service(web::resource("/ws").route(web::get().to(echo_ws)))
// enable logger
.wrap(middleware::Logger::default())
})
.workers(2)
.bind(("127.0.0.1", 8080))?
.run()
.await
}

View File

@ -0,0 +1,82 @@
use std::time::{Duration, Instant};
use actix::prelude::*;
use actix_web_actors::ws;
/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// websocket connection is long running connection, it easier
/// to handle with an actor
pub struct MyWebSocket {
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
/// otherwise we drop connection.
hb: Instant,
}
impl MyWebSocket {
pub fn new() -> Self {
Self { hb: Instant::now() }
}
/// helper method that sends ping to client every second.
///
/// also this method checks heartbeats from client
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
// heartbeat timed out
println!("Websocket Client heartbeat failed, disconnecting!");
// stop actor
ctx.stop();
// don't try to send a ping
return;
}
ctx.ping(b"");
});
}
}
impl Actor for MyWebSocket {
type Context = ws::WebsocketContext<Self>;
/// Method is called on actor start. We start the heartbeat process here.
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
}
}
/// Handler for `ws::Message`
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
// process websocket messages
println!("WS: {:?}", msg);
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
_ => ctx.stop(),
}
}
}