From 769cc5b84e86bcf2665762e63c702a76696b3605 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 29 Mar 2019 15:08:35 -0700 Subject: [PATCH] upgrade ws examples --- basics/Cargo.toml | 2 +- websocket-chat/Cargo.toml | 19 +++--- websocket-chat/src/main.rs | 99 +++++++++++---------------- websocket-tcp-chat/Cargo.toml | 16 +++-- websocket-tcp-chat/src/main.rs | 109 ++++++++++++------------------ websocket-tcp-chat/src/server.rs | 2 +- websocket-tcp-chat/src/session.rs | 12 ++-- 7 files changed, 111 insertions(+), 148 deletions(-) diff --git a/basics/Cargo.toml b/basics/Cargo.toml index 4eb17bc..64c08aa 100644 --- a/basics/Cargo.toml +++ b/basics/Cargo.toml @@ -2,7 +2,7 @@ name = "basics" version = "1.0.0" authors = ["Nikolay Kim "] -workspace = "../" +workspace = ".." edition = "2018" [dependencies] diff --git a/websocket-chat/Cargo.toml b/websocket-chat/Cargo.toml index b6d281f..64c7876 100644 --- a/websocket-chat/Cargo.toml +++ b/websocket-chat/Cargo.toml @@ -2,23 +2,24 @@ name = "websocket-example" version = "0.1.0" authors = ["Nikolay Kim "] -workspace = "../" +workspace = ".." +edition = "2018" [[bin]] name = "websocket-chat-server" path = "src/main.rs" [dependencies] -rand = "*" +actix = "0.8.0-alpha.2" +actix-web = "1.0.0-alpha.1" +actix-web-actors = "1.0.0-alpha.1" +actix-files = "0.1.0-alpha.1" + +rand = "0.6" bytes = "0.4" byteorder = "1.1" -futures = "0.1" +futures = "0.1.25" tokio-io = "0.1" -tokio-core = "0.1" -env_logger = "*" - +env_logger = "0.6" serde = "1.0" serde_json = "1.0" - -actix = "0.7" -actix-web = "0.7" diff --git a/websocket-chat/src/main.rs b/websocket-chat/src/main.rs index 8849aba..45085f7 100644 --- a/websocket-chat/src/main.rs +++ b/websocket-chat/src/main.rs @@ -1,22 +1,9 @@ -#![allow(unused_variables)] -extern crate byteorder; -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate rand; -extern crate serde; -extern crate serde_json; -extern crate tokio_core; -extern crate tokio_io; - -extern crate actix; -extern crate actix_web; - use std::time::{Duration, Instant}; use actix::*; -use actix_web::server::HttpServer; -use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; +use actix_files as fs; +use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use actix_web_actors::ws; mod server; @@ -25,22 +12,22 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); -/// This is our websocket route state, this state is shared with all route -/// instances via `HttpContext::state()` -struct WsChatSessionState { - addr: Addr, -} - /// Entry point for our route -fn chat_route(req: &HttpRequest) -> Result { +fn chat_route( + req: HttpRequest, + stream: web::Payload, + srv: web::Data>, +) -> Result { ws::start( - req, WsChatSession { id: 0, hb: Instant::now(), room: "Main".to_owned(), name: None, + addr: srv.get_ref().clone(), }, + &req, + stream, ) } @@ -54,10 +41,12 @@ struct WsChatSession { room: String, /// peer name name: Option, + /// Chat server + addr: Addr, } impl Actor for WsChatSession { - type Context = ws::WebsocketContext; + type Context = ws::WebsocketContext; /// Method is called on actor start. /// We register ws session with ChatServer @@ -71,8 +60,7 @@ impl Actor for WsChatSession { // HttpContext::state() is instance of WsChatSessionState, state is shared // across all routes within application let addr = ctx.address(); - ctx.state() - .addr + self.addr .send(server::Connect { addr: addr.recipient(), }) @@ -88,9 +76,9 @@ impl Actor for WsChatSession { .wait(ctx); } - fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + fn stopping(&mut self, _: &mut Self::Context) -> Running { // notify chat server - ctx.state().addr.do_send(server::Disconnect { id: self.id }); + self.addr.do_send(server::Disconnect { id: self.id }); Running::Stop } } @@ -126,8 +114,7 @@ impl StreamHandler for WsChatSession { // Send ListRooms message to chat server and wait for // response println!("List rooms"); - ctx.state() - .addr + self.addr .send(server::ListRooms) .into_actor(self) .then(|res, _, ctx| { @@ -149,7 +136,7 @@ impl StreamHandler for WsChatSession { "/join" => { if v.len() == 2 { self.room = v[1].to_owned(); - ctx.state().addr.do_send(server::Join { + self.addr.do_send(server::Join { id: self.id, name: self.room.clone(), }); @@ -175,14 +162,14 @@ impl StreamHandler for WsChatSession { m.to_owned() }; // send message to chat server - ctx.state().addr.do_send(server::ClientMessage { + self.addr.do_send(server::ClientMessage { id: self.id, msg: msg, room: self.room.clone(), }) } } - ws::Message::Binary(bin) => println!("Unexpected binary"), + ws::Message::Binary(_) => println!("Unexpected binary"), ws::Message::Close(_) => { ctx.stop(); } @@ -194,7 +181,7 @@ impl WsChatSession { /// helper method that sends ping to client every second. /// /// also this method checks heartbeats from client - fn hb(&self, ctx: &mut ws::WebsocketContext) { + fn hb(&self, ctx: &mut ws::WebsocketContext) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { @@ -202,7 +189,7 @@ impl WsChatSession { println!("Websocket Client heartbeat failed, disconnecting!"); // notify chat server - ctx.state().addr.do_send(server::Disconnect { id: act.id }); + act.addr.do_send(server::Disconnect { id: act.id }); // stop actor ctx.stop(); @@ -216,38 +203,30 @@ impl WsChatSession { } } -fn main() { - let _ = env_logger::init(); - let sys = actix::System::new("websocket-example"); +fn main() -> std::io::Result<()> { + env_logger::init(); + let sys = System::new("ws-example"); - // Start chat server actor in separate thread - let server = Arbiter::start(|_| server::ChatServer::default()); + // Start chat server actor + let server = server::ChatServer::default().start(); // Create Http server with websocket support HttpServer::new(move || { - // Websocket sessions state - let state = WsChatSessionState { - addr: server.clone(), - }; - - App::with_state(state) + App::new() + .data(server.clone()) // redirect to websocket.html - .resource("/", |r| { - r.method(http::Method::GET).f(|_| { - HttpResponse::Found() - .header("LOCATION", "/static/websocket.html") - .finish() - }) - }) + .service(web::resource("/").route(web::get().to(|| { + HttpResponse::Found() + .header("LOCATION", "/static/websocket.html") + .finish() + }))) // websocket - .resource("/ws/", |r| r.route().f(chat_route)) + .service(web::resource("/ws/").to(chat_route)) // static resources - .handler("/static/", fs::StaticFiles::new("static/").unwrap()) + .service(fs::Files::new("/static/", "static/")) }) - .bind("127.0.0.1:8080") - .unwrap() + .bind("127.0.0.1:8080")? .start(); - println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); + sys.run() } diff --git a/websocket-tcp-chat/Cargo.toml b/websocket-tcp-chat/Cargo.toml index c73d567..b4a06eb 100644 --- a/websocket-tcp-chat/Cargo.toml +++ b/websocket-tcp-chat/Cargo.toml @@ -2,7 +2,8 @@ name = "websocket-tcp-example" version = "0.1.0" authors = ["Nikolay Kim "] -workspace = "../" +workspace = ".." +edition = "2018" [[bin]] name = "websocket-tcp-server" @@ -13,18 +14,19 @@ name = "websocket-tcp-client" path = "src/client.rs" [dependencies] -rand = "*" +actix = "0.8.0-alpha.2" +actix-web = "1.0.0-alpha.1" +actix-web-actors = "1.0.0-alpha.1" +actix-files = "0.1.0-alpha.1" + +rand = "0.6" bytes = "0.4" byteorder = "1.1" futures = "0.1" tokio-io = "0.1" tokio-tcp = "0.1" tokio-codec = "0.1" -env_logger = "*" - +env_logger = "0.6" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" - -actix = "0.7" -actix-web = "0.7" diff --git a/websocket-tcp-chat/src/main.rs b/websocket-tcp-chat/src/main.rs index dff734c..cd96c40 100644 --- a/websocket-tcp-chat/src/main.rs +++ b/websocket-tcp-chat/src/main.rs @@ -1,25 +1,14 @@ -#![allow(unused_variables)] -extern crate byteorder; -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate rand; -extern crate serde; -extern crate serde_json; -extern crate tokio_codec; -extern crate tokio_io; -extern crate tokio_tcp; #[macro_use] extern crate serde_derive; - #[macro_use] extern crate actix; -extern crate actix_web; + +use std::time::{Duration, Instant}; use actix::*; -use actix_web::server::HttpServer; -use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; -use std::time::{Duration, Instant}; +use actix_files as fs; +use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; +use actix_web_actors::ws; mod codec; mod server; @@ -30,22 +19,22 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); -/// This is our websocket route state, this state is shared with all route -/// instances via `HttpContext::state()` -struct WsChatSessionState { - addr: Addr, -} - /// Entry point for our route -fn chat_route(req: &HttpRequest) -> Result { +fn chat_route( + req: HttpRequest, + stream: web::Payload, + srv: web::Data>, +) -> Result { ws::start( - req, WsChatSession { id: 0, hb: Instant::now(), room: "Main".to_owned(), name: None, + addr: srv.get_ref().clone(), }, + &req, + stream, ) } @@ -59,26 +48,26 @@ struct WsChatSession { room: String, /// peer name name: Option, + /// Chat server + addr: Addr, } impl Actor for WsChatSession { - type Context = ws::WebsocketContext; + type Context = ws::WebsocketContext; /// Method is called on actor start. /// We register ws session with ChatServer fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared // across all routes within application - - // we'll start heartbeat process on session start. - self.hb(ctx); - let addr = ctx.address(); - ctx.state() - .addr + self.addr .send(server::Connect { addr: addr.recipient(), }) @@ -94,9 +83,9 @@ impl Actor for WsChatSession { .wait(ctx); } - fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + fn stopping(&mut self, _: &mut Self::Context) -> Running { // notify chat server - ctx.state().addr.do_send(server::Disconnect { id: self.id }); + self.addr.do_send(server::Disconnect { id: self.id }); Running::Stop } } @@ -132,8 +121,7 @@ impl StreamHandler for WsChatSession { // Send ListRooms message to chat server and wait for // response println!("List rooms"); - ctx.state() - .addr + self.addr .send(server::ListRooms) .into_actor(self) .then(|res, _, ctx| { @@ -155,7 +143,7 @@ impl StreamHandler for WsChatSession { "/join" => { if v.len() == 2 { self.room = v[1].to_owned(); - ctx.state().addr.do_send(server::Join { + self.addr.do_send(server::Join { id: self.id, name: self.room.clone(), }); @@ -181,14 +169,14 @@ impl StreamHandler for WsChatSession { m.to_owned() }; // send message to chat server - ctx.state().addr.do_send(server::Message { + self.addr.do_send(server::Message { id: self.id, msg: msg, room: self.room.clone(), }) } } - ws::Message::Binary(bin) => println!("Unexpected binary"), + ws::Message::Binary(_) => println!("Unexpected binary"), ws::Message::Close(_) => { ctx.stop(); } @@ -200,7 +188,7 @@ impl WsChatSession { /// helper method that sends ping to client every second. /// /// also this method checks heartbeats from client - fn hb(&self, ctx: &mut ws::WebsocketContext) { + fn hb(&self, ctx: &mut ws::WebsocketContext) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { @@ -208,7 +196,7 @@ impl WsChatSession { println!("Websocket Client heartbeat failed, disconnecting!"); // notify chat server - ctx.state().addr.do_send(server::Disconnect { id: act.id }); + act.addr.do_send(server::Disconnect { id: act.id }); // stop actor ctx.stop(); @@ -222,45 +210,38 @@ impl WsChatSession { } } -fn main() { +fn main() -> std::io::Result<()> { let _ = env_logger::init(); let sys = actix::System::new("websocket-example"); - // Start chat server actor in separate thread - let server = Arbiter::start(|_| server::ChatServer::default()); + // Start chat server actor + let server = server::ChatServer::default().start(); // Start tcp server in separate thread let srv = server.clone(); - Arbiter::new("tcp-server").do_send::(msgs::Execute::new(move || { + Arbiter::new().exec(move || { session::TcpServer::new("127.0.0.1:12345", srv); - Ok(()) - })); + Ok::<_, ()>(()) + }); // Create Http server with websocket support HttpServer::new(move || { - // Websocket sessions state - let state = WsChatSessionState { - addr: server.clone(), - }; - - App::with_state(state) + App::new() + .data(server.clone()) // redirect to websocket.html - .resource("/", |r| { - r.method(http::Method::GET).f(|_| { - HttpResponse::Found() - .header("LOCATION", "/static/websocket.html") - .finish() - }) - }) + .service(web::resource("/").route(web::get().to(|| { + HttpResponse::Found() + .header("LOCATION", "/static/websocket.html") + .finish() + }))) // websocket - .resource("/ws/", |r| r.route().f(chat_route)) + .service(web::resource("/ws/").to(chat_route)) // static resources - .handler("/static/", fs::StaticFiles::new("static/").unwrap()) + .service(fs::Files::new("/static/", "static/")) }) - .bind("127.0.0.1:8080") - .unwrap() + .bind("127.0.0.1:8080")? .start(); println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); + sys.run() } diff --git a/websocket-tcp-chat/src/server.rs b/websocket-tcp-chat/src/server.rs index efc946a..c30b928 100644 --- a/websocket-tcp-chat/src/server.rs +++ b/websocket-tcp-chat/src/server.rs @@ -6,7 +6,7 @@ use actix::prelude::*; use rand::{self, rngs::ThreadRng, Rng}; use std::collections::{HashMap, HashSet}; -use session; +use crate::session; /// Message for chat server communications diff --git a/websocket-tcp-chat/src/session.rs b/websocket-tcp-chat/src/session.rs index 37bd33a..2747879 100644 --- a/websocket-tcp-chat/src/session.rs +++ b/websocket-tcp-chat/src/session.rs @@ -11,8 +11,8 @@ use tokio_tcp::{TcpListener, TcpStream}; use actix::prelude::*; -use codec::{ChatCodec, ChatRequest, ChatResponse}; -use server::{self, ChatServer}; +use crate::codec::{ChatCodec, ChatRequest, ChatResponse}; +use crate::server::{self, ChatServer}; /// Chat server sends this messages to session #[derive(Message)] @@ -62,7 +62,7 @@ impl Actor for ChatSession { .wait(ctx); } - fn stopping(&mut self, ctx: &mut Self::Context) -> Running { + fn stopping(&mut self, _: &mut Self::Context) -> Running { // notify chat server self.addr.do_send(server::Disconnect { id: self.id }); Running::Stop @@ -82,7 +82,7 @@ impl StreamHandler for ChatSession { self.addr .send(server::ListRooms) .into_actor(self) - .then(|res, act, ctx| { + .then(|res, act, _| { match res { Ok(rooms) => { act.framed.write(ChatResponse::Rooms(rooms)); @@ -124,7 +124,7 @@ impl StreamHandler for ChatSession { impl Handler for ChatSession { type Result = (); - fn handle(&mut self, msg: Message, ctx: &mut Context) { + fn handle(&mut self, msg: Message, _: &mut Context) { // send message to peer self.framed.write(ChatResponse::Message(msg.0)); } @@ -175,7 +175,7 @@ pub struct TcpServer { } impl TcpServer { - pub fn new(s: &str, chat: Addr) { + pub fn new(_s: &str, chat: Addr) { // Create server listener let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let listener = TcpListener::bind(&addr).unwrap();