diff --git a/websocket-chat/src/main.rs b/websocket-chat/src/main.rs index 21dc598c..a6917a0e 100644 --- a/websocket-chat/src/main.rs +++ b/websocket-chat/src/main.rs @@ -9,11 +9,10 @@ extern crate serde_json; extern crate tokio_core; extern crate tokio_io; -#[macro_use] extern crate actix; extern crate actix_web; -use std::time::Instant; +use std::time::{Instant, Duration}; use actix::*; use actix_web::server::HttpServer; @@ -21,6 +20,11 @@ use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; mod server; +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + /// This is our websocket route state, this state is shared with all route /// instances via `HttpContext::state()` struct WsChatSessionState { @@ -43,8 +47,8 @@ fn chat_route(req: &HttpRequest) -> Result for WsChatSession { fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { println!("WEBSOCKET MESSAGE: {:?}", msg); match msg { - ws::Message::Ping(msg) => ctx.pong(&msg), - ws::Message::Pong(msg) => self.hb = Instant::now(), + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } ws::Message::Text(text) => { let m = text.trim(); // we check for /sss type of messages @@ -173,11 +182,40 @@ impl StreamHandler for WsChatSession { ws::Message::Binary(bin) => println!("Unexpected binary"), ws::Message::Close(_) => { ctx.stop(); - } + }, + _ => (), } } } +impl WsChatSession { + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // notify chat server + ctx.state() + .addr + .do_send(server::Disconnect { id: act.id }); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(""); + }); + } +} + fn main() { let _ = env_logger::init(); let sys = actix::System::new("websocket-example"); diff --git a/websocket-tcp-chat/src/client.rs b/websocket-tcp-chat/src/client.rs index 9b9e0d56..28313e29 100644 --- a/websocket-tcp-chat/src/client.rs +++ b/websocket-tcp-chat/src/client.rs @@ -97,6 +97,9 @@ impl ChatClient { ctx.run_later(Duration::new(1, 0), |act, ctx| { act.framed.write(codec::ChatRequest::Ping); act.hb(ctx); + + // client should also check for a timeout here, similar to the + // server code }); } } diff --git a/websocket-tcp-chat/src/main.rs b/websocket-tcp-chat/src/main.rs index 10e1f5d1..eefd1506 100644 --- a/websocket-tcp-chat/src/main.rs +++ b/websocket-tcp-chat/src/main.rs @@ -16,17 +16,20 @@ extern crate serde_derive; extern crate actix; extern crate actix_web; -use std::time::Instant; - use actix::*; use actix_web::server::HttpServer; use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; -use std::time::Duration; +use std::time::{Instant, Duration}; mod codec; mod server; mod session; +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + /// This is our websocket route state, this state is shared with all route /// instances via `HttpContext::state()` struct WsChatSessionState { @@ -49,8 +52,8 @@ fn chat_route(req: &HttpRequest) -> Result for WsChatSession { fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { println!("WEBSOCKET MESSAGE: {:?}", msg); match msg { - ws::Message::Ping(msg) => ctx.pong(&msg), - ws::Message::Pong(msg) => self.hb = Instant::now(), + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } ws::Message::Text(text) => { let m = text.trim(); // we check for /sss type of messages @@ -183,7 +188,8 @@ impl StreamHandler for WsChatSession { ws::Message::Binary(bin) => println!("Unexpected binary"), ws::Message::Close(_) => { ctx.stop(); - } + }, + _ => (), } } } @@ -191,11 +197,11 @@ impl StreamHandler for WsChatSession { impl WsChatSession { /// helper method that sends ping to client every second. /// - /// also this method check heartbeats from client + /// also this method checks heartbeats from client fn hb(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(Duration::new(1, 0), |act, ctx| { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats - if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // heartbeat timed out println!("Websocket Client heartbeat failed, disconnecting!"); @@ -206,6 +212,9 @@ impl WsChatSession { // stop actor ctx.stop(); + + // don't try to send a ping + return; } ctx.ping(""); diff --git a/websocket/src/client.rs b/websocket/src/client.rs index fa3f2319..7cb5fa3b 100644 --- a/websocket/src/client.rs +++ b/websocket/src/client.rs @@ -74,6 +74,9 @@ impl ChatClient { ctx.run_later(Duration::new(1, 0), |act, ctx| { act.0.ping(""); act.hb(ctx); + + // client should also check for a timeout here, similar to the + // server code }); } } diff --git a/websocket/src/main.rs b/websocket/src/main.rs index c7c55940..da8399f0 100644 --- a/websocket/src/main.rs +++ b/websocket/src/main.rs @@ -8,22 +8,38 @@ extern crate actix; extern crate actix_web; extern crate env_logger; +use std::time::{Instant, Duration}; + use actix::prelude::*; use actix_web::{ fs, http, middleware, server, ws, App, Error, HttpRequest, HttpResponse, }; +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); + /// do websocket handshake and start `MyWebSocket` actor fn ws_index(r: &HttpRequest) -> Result { - ws::start(r, MyWebSocket) + ws::start(r, MyWebSocket::new()) } /// websocket connection is long running connection, it easier /// to handle with an actor -struct MyWebSocket; +struct MyWebSocket { + /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// otherwise we drop connection. + hb: Instant, +} impl Actor for MyWebSocket { type Context = ws::WebsocketContext; + + /// Method is called on actor start. We start the heartbeat process here. + fn started(&mut self, ctx: &mut Self::Context) { + self.hb(ctx); + } } /// Handler for `ws::Message` @@ -32,7 +48,10 @@ impl StreamHandler for MyWebSocket { // process websocket messages println!("WS: {:?}", msg); match msg { - ws::Message::Ping(msg) => ctx.pong(&msg), + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); + } ws::Message::Text(text) => ctx.text(text), ws::Message::Binary(bin) => ctx.binary(bin), ws::Message::Close(_) => { @@ -43,6 +62,33 @@ impl StreamHandler for MyWebSocket { } } +impl MyWebSocket { + fn new() -> Self { + Self { hb: Instant::now() } + } + + /// helper method that sends ping to client every second. + /// + /// also this method checks heartbeats from client + fn hb(&self, ctx: &mut ::Context) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // stop actor + ctx.stop(); + + // don't try to send a ping + return; + } + + ctx.ping(""); + }); + } +} + fn main() { ::std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init();