From 0bfe07b3712ed03abd81017216cb8c44e57c497f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 20 Oct 2017 23:12:36 -0700 Subject: [PATCH] process inactive tasks --- examples/websocket-chat/Cargo.toml | 3 +- examples/websocket-chat/README.md | 28 +++++++++++ examples/websocket-chat/client.py | 72 +++++++++++++++++++++++++++++ examples/websocket-chat/src/main.rs | 7 ++- examples/websocket/client.py | 72 +++++++++++++++++++++++++++++ src/httpcodes.rs | 2 + src/server.rs | 44 +++++++++++++----- src/task.rs | 34 ++++++-------- src/ws.rs | 10 +++- 9 files changed, 238 insertions(+), 34 deletions(-) create mode 100644 examples/websocket-chat/README.md create mode 100755 examples/websocket-chat/client.py create mode 100755 examples/websocket/client.py diff --git a/examples/websocket-chat/Cargo.toml b/examples/websocket-chat/Cargo.toml index 3c4de88d2..5303915d3 100644 --- a/examples/websocket-chat/Cargo.toml +++ b/examples/websocket-chat/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" authors = ["Nikolay Kim "] [[bin]] -name = "websocket" +name = "server" path = "src/main.rs" [[bin]] @@ -18,6 +18,7 @@ byteorder = "1.1" futures = "0.1" tokio-io = "0.1" tokio-core = "0.1" +env_logger = "*" serde = "1.0" serde_json = "1.0" diff --git a/examples/websocket-chat/README.md b/examples/websocket-chat/README.md new file mode 100644 index 000000000..cf7bfa754 --- /dev/null +++ b/examples/websocket-chat/README.md @@ -0,0 +1,28 @@ +# Websocket chat example + +This is extension of the +[actix chat example](https://github.com/fafhrd91/actix/tree/master/examples/chat) + + +## Server + +Chat server listens for incoming tcp connections. Server can access several types of message: + + * `\list` - list all available rooms + * `\join name` - join room, if room does not exist, create new one + * `some message` - just string, send messsage to all peers in same room + * client has to send heartbeat `Ping` messages, if server does not receive a heartbeat + message for 10 seconds connection gets droppped + +To start server use command: `cargo run --bin server` + +## Client + +Client connects to server. Reads input from stdin and sends to server. + +To run client use command: `cargo run --bin client` + + +## WebSocket Browser Client + +Open url: http://localhost:8080/ diff --git a/examples/websocket-chat/client.py b/examples/websocket-chat/client.py new file mode 100755 index 000000000..8a1bd9aee --- /dev/null +++ b/examples/websocket-chat/client.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +"""websocket cmd client for wssrv.py example.""" +import argparse +import asyncio +import signal +import sys + +import aiohttp + + +def start_client(loop, url): + name = input('Please enter your name: ') + + # send request + ws = yield from aiohttp.ClientSession().ws_connect(url, autoclose=False, autoping=False) + + # input reader + def stdin_callback(): + line = sys.stdin.buffer.readline().decode('utf-8') + if not line: + loop.stop() + else: + ws.send_str(name + ': ' + line) + loop.add_reader(sys.stdin.fileno(), stdin_callback) + + @asyncio.coroutine + def dispatch(): + while True: + msg = yield from ws.receive() + + if msg.type == aiohttp.WSMsgType.TEXT: + print('Text: ', msg.data.strip()) + elif msg.type == aiohttp.WSMsgType.BINARY: + print('Binary: ', msg.data) + elif msg.type == aiohttp.WSMsgType.PING: + ws.pong() + elif msg.type == aiohttp.WSMsgType.PONG: + print('Pong received') + else: + if msg.type == aiohttp.WSMsgType.CLOSE: + yield from ws.close() + elif msg.type == aiohttp.WSMsgType.ERROR: + print('Error during receive %s' % ws.exception()) + elif msg.type == aiohttp.WSMsgType.CLOSED: + pass + + break + + yield from dispatch() + + +ARGS = argparse.ArgumentParser( + description="websocket console client for wssrv.py example.") +ARGS.add_argument( + '--host', action="store", dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action="store", dest='port', + default=8080, type=int, help='Port number') + +if __name__ == '__main__': + args = ARGS.parse_args() + if ':' in args.host: + args.host, port = args.host.split(':', 1) + args.port = int(port) + + url = 'http://{}:{}/ws/'.format(args.host, args.port) + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, loop.stop) + asyncio.Task(start_client(loop, url)) + loop.run_forever() diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index c85454a92..2a0b0ad63 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -4,6 +4,7 @@ extern crate bytes; extern crate byteorder; extern crate tokio_io; extern crate tokio_core; +extern crate env_logger; extern crate serde; extern crate serde_json; #[macro_use] extern crate serde_derive; @@ -200,6 +201,7 @@ impl ResponseType for WsChatSession { fn main() { + let _ = env_logger::init(); let sys = actix::System::new("websocket-example"); // Start chat server actor @@ -218,7 +220,10 @@ fn main() { // redirect to websocket.html .resource("/", |r| r.handler(Method::GET, |req, payload, state| { - httpcodes::HTTPOk + httpcodes::HTTPFound + .builder() + .header("LOCATION", "/static/websocket.html") + .body(Body::Empty) })) // websocket .resource("/ws/", |r| r.get::()) diff --git a/examples/websocket/client.py b/examples/websocket/client.py new file mode 100755 index 000000000..8a1bd9aee --- /dev/null +++ b/examples/websocket/client.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +"""websocket cmd client for wssrv.py example.""" +import argparse +import asyncio +import signal +import sys + +import aiohttp + + +def start_client(loop, url): + name = input('Please enter your name: ') + + # send request + ws = yield from aiohttp.ClientSession().ws_connect(url, autoclose=False, autoping=False) + + # input reader + def stdin_callback(): + line = sys.stdin.buffer.readline().decode('utf-8') + if not line: + loop.stop() + else: + ws.send_str(name + ': ' + line) + loop.add_reader(sys.stdin.fileno(), stdin_callback) + + @asyncio.coroutine + def dispatch(): + while True: + msg = yield from ws.receive() + + if msg.type == aiohttp.WSMsgType.TEXT: + print('Text: ', msg.data.strip()) + elif msg.type == aiohttp.WSMsgType.BINARY: + print('Binary: ', msg.data) + elif msg.type == aiohttp.WSMsgType.PING: + ws.pong() + elif msg.type == aiohttp.WSMsgType.PONG: + print('Pong received') + else: + if msg.type == aiohttp.WSMsgType.CLOSE: + yield from ws.close() + elif msg.type == aiohttp.WSMsgType.ERROR: + print('Error during receive %s' % ws.exception()) + elif msg.type == aiohttp.WSMsgType.CLOSED: + pass + + break + + yield from dispatch() + + +ARGS = argparse.ArgumentParser( + description="websocket console client for wssrv.py example.") +ARGS.add_argument( + '--host', action="store", dest='host', + default='127.0.0.1', help='Host name') +ARGS.add_argument( + '--port', action="store", dest='port', + default=8080, type=int, help='Port number') + +if __name__ == '__main__': + args = ARGS.parse_args() + if ':' in args.host: + args.host, port = args.host.split(':', 1) + args.port = int(port) + + url = 'http://{}:{}/ws/'.format(args.host, args.port) + + loop = asyncio.get_event_loop() + loop.add_signal_handler(signal.SIGINT, loop.stop) + asyncio.Task(start_client(loop, url)) + loop.run_forever() diff --git a/src/httpcodes.rs b/src/httpcodes.rs index 54d70ea5c..f575ad528 100644 --- a/src/httpcodes.rs +++ b/src/httpcodes.rs @@ -13,6 +13,8 @@ pub const HTTPOk: StaticResponse = StaticResponse(StatusCode::OK); pub const HTTPCreated: StaticResponse = StaticResponse(StatusCode::CREATED); pub const HTTPNoContent: StaticResponse = StaticResponse(StatusCode::NO_CONTENT); +pub const HTTPFound: StaticResponse = StaticResponse(StatusCode::FOUND); + pub const HTTPBadRequest: StaticResponse = StaticResponse(StatusCode::BAD_REQUEST); pub const HTTPNotFound: StaticResponse = StaticResponse(StatusCode::NOT_FOUND); pub const HTTPUnauthorized: StaticResponse = StaticResponse(StatusCode::UNAUTHORIZED); diff --git a/src/server.rs b/src/server.rs index cf6ae8bce..d3c11f453 100644 --- a/src/server.rs +++ b/src/server.rs @@ -115,7 +115,7 @@ impl Handler<(T, A), io::Error> for HttpServer reader: Reader::new(), error: false, items: VecDeque::new(), - inactive: Vec::new(), + inactive: VecDeque::new(), keepalive: true, keepalive_timer: None, }); @@ -143,7 +143,7 @@ pub struct HttpChannel { reader: Reader, error: bool, items: VecDeque, - inactive: Vec, + inactive: VecDeque, keepalive: bool, keepalive_timer: Option, } @@ -192,20 +192,22 @@ impl Future for HttpChannel }; match self.items[idx].task.poll_io(&mut self.stream, req) { - Ok(Async::Ready(val)) => { + Ok(Async::Ready(ready)) => { let mut item = self.items.pop_front().unwrap(); // overide keep-alive state if self.keepalive { self.keepalive = item.task.keepalive(); } - if !val { + if !ready { item.eof = true; - self.inactive.push(item); + self.inactive.push_back(item); } // no keep-alive - if !self.keepalive && self.items.is_empty() { + if ready && !self.keepalive && + self.items.is_empty() && self.inactive.is_empty() + { return Ok(Async::Ready(())) } continue @@ -217,11 +219,11 @@ impl Future for HttpChannel return Err(()) } } - } else if !self.items[idx].finished { + } else if !self.items[idx].finished && !self.items[idx].error { match self.items[idx].task.poll() { + Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => self.items[idx].finished = true, - Ok(Async::NotReady) => (), Err(_) => self.items[idx].error = true, } @@ -229,6 +231,26 @@ impl Future for HttpChannel idx += 1; } + // check inactive tasks + let mut idx = 0; + while idx < self.inactive.len() { + if idx == 0 && self.inactive[idx].error && self.inactive[idx].finished { + let _ = self.inactive.pop_front(); + continue + } + + if !self.inactive[idx].finished && !self.inactive[idx].error { + match self.inactive[idx].task.poll() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => + self.inactive[idx].finished = true, + Err(_) => + self.inactive[idx].error = true, + } + } + idx += 1; + } + // read incoming data if !self.error && self.items.len() < MAX_PIPELINED_MESSAGES { match self.reader.parse(&mut self.stream) { @@ -251,7 +273,7 @@ impl Future for HttpChannel self.keepalive_timer.take(); // on parse error, stop reading stream but - // complete tasks + // tasks need to be completed self.error = true; if let ReaderError::Error(err) = err { @@ -265,7 +287,7 @@ impl Future for HttpChannel } Ok(Async::NotReady) => { // start keep-alive timer, this is also slow request timeout - if self.items.is_empty() { + if self.items.is_empty() && self.inactive.is_empty() { if self.keepalive { if self.keepalive_timer.is_none() { trace!("Start keep-alive timer"); @@ -287,7 +309,7 @@ impl Future for HttpChannel } // check for parse error - if self.items.is_empty() && self.error { + if self.items.is_empty() && self.inactive.is_empty() && self.error { return Ok(Async::Ready(())) } } diff --git a/src/task.rs b/src/task.rs index e7f51be9c..836c00481 100644 --- a/src/task.rs +++ b/src/task.rs @@ -250,27 +250,23 @@ impl Task { Frame::Message(response) => { self.prepare(info, response); } - Frame::Payload(chunk) => { - match chunk { - Some(chunk) => { - if self.prepared { - // TODO: add warning, write after EOF - self.encoder.encode(&mut self.buffer, chunk.as_ref()); - } else { - // might be response for EXCEPT - self.buffer.extend(chunk) - } - } - None => { - // TODO: add error "not eof"" - if !self.encoder.encode(&mut self.buffer, [].as_ref()) { - debug!("last payload item, but it is not EOF "); - return Err(()) - } - break - } + Frame::Payload(Some(chunk)) => { + if self.prepared { + // TODO: add warning, write after EOF + self.encoder.encode(&mut self.buffer, chunk.as_ref()); + } else { + // might be response for EXCEPT + self.buffer.extend(chunk) } }, + Frame::Payload(None) => { + // TODO: add error "not eof"" + if !self.encoder.encode(&mut self.buffer, [].as_ref()) { + debug!("last payload item, but it is not EOF "); + return Err(()) + } + break + }, } } } diff --git a/src/ws.rs b/src/ws.rs index c8baf885c..b88711713 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -199,9 +199,12 @@ impl Stream for WsStream { } Async::Ready(Some(Err(_))) => { self.closed = true; + break; } Async::Ready(None) => { done = true; + self.closed = true; + break; } Async::NotReady => break, } @@ -218,8 +221,11 @@ impl Stream for WsStream { OpCode::Continue => continue, OpCode::Bad => return Ok(Async::Ready(Some(Message::Error))), - OpCode::Close => - return Ok(Async::Ready(Some(Message::Closed))), + OpCode::Close => { + self.closed = true; + self.error_sent = true; + return Ok(Async::Ready(Some(Message::Closed))) + }, OpCode::Ping => return Ok(Async::Ready(Some( Message::Ping(String::from_utf8_lossy(&payload).into())))),