diff --git a/websocket-tcp-chat/client.py b/websocket-tcp-chat/client.py index 8a1bd9ae..db8e241b 100755 --- a/websocket-tcp-chat/client.py +++ b/websocket-tcp-chat/client.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""websocket cmd client for wssrv.py example.""" +"""websocket cmd client for actix/websocket-tcp-chat example.""" import argparse import asyncio import signal @@ -7,46 +7,54 @@ import sys import aiohttp +queue = asyncio.Queue() -def start_client(loop, url): + +async def start_client(url, loop): name = input('Please enter your name: ') - # send request - ws = yield from aiohttp.ClientSession().ws_connect(url, autoclose=False, autoping=False) + ws = await aiohttp.ClientSession().ws_connect(url, autoclose=False, autoping=False) - # input reader def stdin_callback(): line = sys.stdin.buffer.readline().decode('utf-8') if not line: loop.stop() else: - ws.send_str(name + ': ' + line) - loop.add_reader(sys.stdin.fileno(), stdin_callback) + # Queue.put is a coroutine, so you can't call it directly. + asyncio.ensure_future(queue.put(ws.send_str(name + ': ' + line))) - @asyncio.coroutine - def dispatch(): + loop.add_reader(sys.stdin, stdin_callback) + + async def dispatch(): while True: - msg = yield from ws.receive() - + msg = await ws.receive() if msg.type == aiohttp.WSMsgType.TEXT: print('Text: ', msg.data.strip()) elif msg.type == aiohttp.WSMsgType.BINARY: print('Binary: ', msg.data) elif msg.type == aiohttp.WSMsgType.PING: - ws.pong() + await ws.pong() elif msg.type == aiohttp.WSMsgType.PONG: print('Pong received') else: if msg.type == aiohttp.WSMsgType.CLOSE: - yield from ws.close() + await ws.close() elif msg.type == aiohttp.WSMsgType.ERROR: print('Error during receive %s' % ws.exception()) elif msg.type == aiohttp.WSMsgType.CLOSED: pass - break - yield from dispatch() + await dispatch() + + +async def tick(): + while True: + await (await queue.get()) + + +async def main(url, loop): + await asyncio.wait([start_client(url, loop), tick()]) ARGS = argparse.ArgumentParser( @@ -68,5 +76,5 @@ if __name__ == '__main__': loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGINT, loop.stop) - asyncio.Task(start_client(loop, url)) + asyncio.Task(main(url, loop)) loop.run_forever() diff --git a/websocket-tcp-chat/src/main.rs b/websocket-tcp-chat/src/main.rs index a4874536..10e1f5d1 100644 --- a/websocket-tcp-chat/src/main.rs +++ b/websocket-tcp-chat/src/main.rs @@ -21,6 +21,7 @@ use std::time::Instant; use actix::*; use actix_web::server::HttpServer; use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; +use std::time::Duration; mod codec; mod server; @@ -68,6 +69,10 @@ impl Actor for WsChatSession { // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared // across all routes within application + + // we'll start heartbeat process on session start. + self.hb(ctx); + let addr = ctx.address(); ctx.state() .addr @@ -183,6 +188,31 @@ impl StreamHandler for WsChatSession { } } +impl WsChatSession { + /// helper method that sends ping to client every second. + /// + /// also this method check heartbeats from client + fn hb(&self, ctx: &mut ws::WebsocketContext) { + ctx.run_interval(Duration::new(1, 0), |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); + + // notify chat server + ctx.state() + .addr + .do_send(server::Disconnect { id: act.id }); + + // stop actor + ctx.stop(); + } + + ctx.ping(""); + }); + } +} + fn main() { let _ = env_logger::init(); let sys = actix::System::new("websocket-example"); @@ -205,15 +235,15 @@ fn main() { }; App::with_state(state) - // redirect to websocket.html + // redirect to websocket.html .resource("/", |r| r.method(http::Method::GET).f(|_| { HttpResponse::Found() .header("LOCATION", "/static/websocket.html") .finish() })) - // websocket + // websocket .resource("/ws/", |r| r.route().f(chat_route)) - // static resources + // static resources .handler("/static/", fs::StaticFiles::new("static/").unwrap()) }).bind("127.0.0.1:8080") .unwrap() diff --git a/websocket-tcp-chat/src/session.rs b/websocket-tcp-chat/src/session.rs index 48985156..37bd33a9 100644 --- a/websocket-tcp-chat/src/session.rs +++ b/websocket-tcp-chat/src/session.rs @@ -149,7 +149,7 @@ impl ChatSession { /// /// also this method check heartbeats from client fn hb(&self, ctx: &mut Context) { - ctx.run_later(Duration::new(1, 0), |act, ctx| { + ctx.run_interval(Duration::new(1, 0), |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { // heartbeat timed out @@ -164,7 +164,6 @@ impl ChatSession { act.framed.write(ChatResponse::Ping); // if we can not send message to sink, sink is closed (disconnected) - act.hb(ctx); }); } }