1
0
mirror of https://github.com/actix/examples synced 2024-11-23 22:41:07 +01:00

upgrade ws examples

This commit is contained in:
Nikolay Kim 2019-03-29 15:08:35 -07:00
parent e4f71e8fd5
commit 769cc5b84e
7 changed files with 111 additions and 148 deletions

View File

@ -2,7 +2,7 @@
name = "basics" name = "basics"
version = "1.0.0" version = "1.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../" workspace = ".."
edition = "2018" edition = "2018"
[dependencies] [dependencies]

View File

@ -2,23 +2,24 @@
name = "websocket-example" name = "websocket-example"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../" workspace = ".."
edition = "2018"
[[bin]] [[bin]]
name = "websocket-chat-server" name = "websocket-chat-server"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
rand = "*" actix = "0.8.0-alpha.2"
actix-web = "1.0.0-alpha.1"
actix-web-actors = "1.0.0-alpha.1"
actix-files = "0.1.0-alpha.1"
rand = "0.6"
bytes = "0.4" bytes = "0.4"
byteorder = "1.1" byteorder = "1.1"
futures = "0.1" futures = "0.1.25"
tokio-io = "0.1" tokio-io = "0.1"
tokio-core = "0.1" env_logger = "0.6"
env_logger = "*"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
actix = "0.7"
actix-web = "0.7"

View File

@ -1,22 +1,9 @@
#![allow(unused_variables)]
extern crate byteorder;
extern crate bytes;
extern crate env_logger;
extern crate futures;
extern crate rand;
extern crate serde;
extern crate serde_json;
extern crate tokio_core;
extern crate tokio_io;
extern crate actix;
extern crate actix_web;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use actix::*; use actix::*;
use actix_web::server::HttpServer; use actix_files as fs;
use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
mod server; mod server;
@ -25,22 +12,22 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout /// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// This is our websocket route state, this state is shared with all route
/// instances via `HttpContext::state()`
struct WsChatSessionState {
addr: Addr<server::ChatServer>,
}
/// Entry point for our route /// Entry point for our route
fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> { fn chat_route(
req: HttpRequest,
stream: web::Payload,
srv: web::Data<Addr<server::ChatServer>>,
) -> Result<HttpResponse, Error> {
ws::start( ws::start(
req,
WsChatSession { WsChatSession {
id: 0, id: 0,
hb: Instant::now(), hb: Instant::now(),
room: "Main".to_owned(), room: "Main".to_owned(),
name: None, name: None,
addr: srv.get_ref().clone(),
}, },
&req,
stream,
) )
} }
@ -54,10 +41,12 @@ struct WsChatSession {
room: String, room: String,
/// peer name /// peer name
name: Option<String>, name: Option<String>,
/// Chat server
addr: Addr<server::ChatServer>,
} }
impl Actor for WsChatSession { impl Actor for WsChatSession {
type Context = ws::WebsocketContext<Self, WsChatSessionState>; type Context = ws::WebsocketContext<Self>;
/// Method is called on actor start. /// Method is called on actor start.
/// We register ws session with ChatServer /// We register ws session with ChatServer
@ -71,8 +60,7 @@ impl Actor for WsChatSession {
// HttpContext::state() is instance of WsChatSessionState, state is shared // HttpContext::state() is instance of WsChatSessionState, state is shared
// across all routes within application // across all routes within application
let addr = ctx.address(); let addr = ctx.address();
ctx.state() self.addr
.addr
.send(server::Connect { .send(server::Connect {
addr: addr.recipient(), addr: addr.recipient(),
}) })
@ -88,9 +76,9 @@ impl Actor for WsChatSession {
.wait(ctx); .wait(ctx);
} }
fn stopping(&mut self, ctx: &mut Self::Context) -> Running { fn stopping(&mut self, _: &mut Self::Context) -> Running {
// notify chat server // notify chat server
ctx.state().addr.do_send(server::Disconnect { id: self.id }); self.addr.do_send(server::Disconnect { id: self.id });
Running::Stop Running::Stop
} }
} }
@ -126,8 +114,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
// Send ListRooms message to chat server and wait for // Send ListRooms message to chat server and wait for
// response // response
println!("List rooms"); println!("List rooms");
ctx.state() self.addr
.addr
.send(server::ListRooms) .send(server::ListRooms)
.into_actor(self) .into_actor(self)
.then(|res, _, ctx| { .then(|res, _, ctx| {
@ -149,7 +136,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
"/join" => { "/join" => {
if v.len() == 2 { if v.len() == 2 {
self.room = v[1].to_owned(); self.room = v[1].to_owned();
ctx.state().addr.do_send(server::Join { self.addr.do_send(server::Join {
id: self.id, id: self.id,
name: self.room.clone(), name: self.room.clone(),
}); });
@ -175,14 +162,14 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
m.to_owned() m.to_owned()
}; };
// send message to chat server // send message to chat server
ctx.state().addr.do_send(server::ClientMessage { self.addr.do_send(server::ClientMessage {
id: self.id, id: self.id,
msg: msg, msg: msg,
room: self.room.clone(), room: self.room.clone(),
}) })
} }
} }
ws::Message::Binary(bin) => println!("Unexpected binary"), ws::Message::Binary(_) => println!("Unexpected binary"),
ws::Message::Close(_) => { ws::Message::Close(_) => {
ctx.stop(); ctx.stop();
} }
@ -194,7 +181,7 @@ impl WsChatSession {
/// helper method that sends ping to client every second. /// helper method that sends ping to client every second.
/// ///
/// also this method checks heartbeats from client /// also this method checks heartbeats from client
fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) { fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats // check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
@ -202,7 +189,7 @@ impl WsChatSession {
println!("Websocket Client heartbeat failed, disconnecting!"); println!("Websocket Client heartbeat failed, disconnecting!");
// notify chat server // notify chat server
ctx.state().addr.do_send(server::Disconnect { id: act.id }); act.addr.do_send(server::Disconnect { id: act.id });
// stop actor // stop actor
ctx.stop(); ctx.stop();
@ -216,38 +203,30 @@ impl WsChatSession {
} }
} }
fn main() { fn main() -> std::io::Result<()> {
let _ = env_logger::init(); env_logger::init();
let sys = actix::System::new("websocket-example"); let sys = System::new("ws-example");
// Start chat server actor in separate thread // Start chat server actor
let server = Arbiter::start(|_| server::ChatServer::default()); let server = server::ChatServer::default().start();
// Create Http server with websocket support // Create Http server with websocket support
HttpServer::new(move || { HttpServer::new(move || {
// Websocket sessions state App::new()
let state = WsChatSessionState { .data(server.clone())
addr: server.clone(),
};
App::with_state(state)
// redirect to websocket.html // redirect to websocket.html
.resource("/", |r| { .service(web::resource("/").route(web::get().to(|| {
r.method(http::Method::GET).f(|_| {
HttpResponse::Found() HttpResponse::Found()
.header("LOCATION", "/static/websocket.html") .header("LOCATION", "/static/websocket.html")
.finish() .finish()
}) })))
})
// websocket // websocket
.resource("/ws/", |r| r.route().f(chat_route)) .service(web::resource("/ws/").to(chat_route))
// static resources // static resources
.handler("/static/", fs::StaticFiles::new("static/").unwrap()) .service(fs::Files::new("/static/", "static/"))
}) })
.bind("127.0.0.1:8080") .bind("127.0.0.1:8080")?
.unwrap()
.start(); .start();
println!("Started http server: 127.0.0.1:8080"); sys.run()
let _ = sys.run();
} }

View File

@ -2,7 +2,8 @@
name = "websocket-tcp-example" name = "websocket-tcp-example"
version = "0.1.0" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../" workspace = ".."
edition = "2018"
[[bin]] [[bin]]
name = "websocket-tcp-server" name = "websocket-tcp-server"
@ -13,18 +14,19 @@ name = "websocket-tcp-client"
path = "src/client.rs" path = "src/client.rs"
[dependencies] [dependencies]
rand = "*" actix = "0.8.0-alpha.2"
actix-web = "1.0.0-alpha.1"
actix-web-actors = "1.0.0-alpha.1"
actix-files = "0.1.0-alpha.1"
rand = "0.6"
bytes = "0.4" bytes = "0.4"
byteorder = "1.1" byteorder = "1.1"
futures = "0.1" futures = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-tcp = "0.1" tokio-tcp = "0.1"
tokio-codec = "0.1" tokio-codec = "0.1"
env_logger = "*" env_logger = "0.6"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
actix = "0.7"
actix-web = "0.7"

View File

@ -1,25 +1,14 @@
#![allow(unused_variables)]
extern crate byteorder;
extern crate bytes;
extern crate env_logger;
extern crate futures;
extern crate rand;
extern crate serde;
extern crate serde_json;
extern crate tokio_codec;
extern crate tokio_io;
extern crate tokio_tcp;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
#[macro_use] #[macro_use]
extern crate actix; extern crate actix;
extern crate actix_web;
use std::time::{Duration, Instant};
use actix::*; use actix::*;
use actix_web::server::HttpServer; use actix_files as fs;
use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use std::time::{Duration, Instant}; use actix_web_actors::ws;
mod codec; mod codec;
mod server; mod server;
@ -30,22 +19,22 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
/// How long before lack of client response causes a timeout /// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// This is our websocket route state, this state is shared with all route
/// instances via `HttpContext::state()`
struct WsChatSessionState {
addr: Addr<server::ChatServer>,
}
/// Entry point for our route /// Entry point for our route
fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> { fn chat_route(
req: HttpRequest,
stream: web::Payload,
srv: web::Data<Addr<server::ChatServer>>,
) -> Result<HttpResponse, Error> {
ws::start( ws::start(
req,
WsChatSession { WsChatSession {
id: 0, id: 0,
hb: Instant::now(), hb: Instant::now(),
room: "Main".to_owned(), room: "Main".to_owned(),
name: None, name: None,
addr: srv.get_ref().clone(),
}, },
&req,
stream,
) )
} }
@ -59,26 +48,26 @@ struct WsChatSession {
room: String, room: String,
/// peer name /// peer name
name: Option<String>, name: Option<String>,
/// Chat server
addr: Addr<server::ChatServer>,
} }
impl Actor for WsChatSession { impl Actor for WsChatSession {
type Context = ws::WebsocketContext<Self, WsChatSessionState>; type Context = ws::WebsocketContext<Self>;
/// Method is called on actor start. /// Method is called on actor start.
/// We register ws session with ChatServer /// We register ws session with ChatServer
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
// we'll start heartbeat process on session start.
self.hb(ctx);
// register self in chat server. `AsyncContext::wait` register // register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves // future within context, but context waits until this future resolves
// before processing any other events. // before processing any other events.
// HttpContext::state() is instance of WsChatSessionState, state is shared // HttpContext::state() is instance of WsChatSessionState, state is shared
// across all routes within application // across all routes within application
// we'll start heartbeat process on session start.
self.hb(ctx);
let addr = ctx.address(); let addr = ctx.address();
ctx.state() self.addr
.addr
.send(server::Connect { .send(server::Connect {
addr: addr.recipient(), addr: addr.recipient(),
}) })
@ -94,9 +83,9 @@ impl Actor for WsChatSession {
.wait(ctx); .wait(ctx);
} }
fn stopping(&mut self, ctx: &mut Self::Context) -> Running { fn stopping(&mut self, _: &mut Self::Context) -> Running {
// notify chat server // notify chat server
ctx.state().addr.do_send(server::Disconnect { id: self.id }); self.addr.do_send(server::Disconnect { id: self.id });
Running::Stop Running::Stop
} }
} }
@ -132,8 +121,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
// Send ListRooms message to chat server and wait for // Send ListRooms message to chat server and wait for
// response // response
println!("List rooms"); println!("List rooms");
ctx.state() self.addr
.addr
.send(server::ListRooms) .send(server::ListRooms)
.into_actor(self) .into_actor(self)
.then(|res, _, ctx| { .then(|res, _, ctx| {
@ -155,7 +143,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
"/join" => { "/join" => {
if v.len() == 2 { if v.len() == 2 {
self.room = v[1].to_owned(); self.room = v[1].to_owned();
ctx.state().addr.do_send(server::Join { self.addr.do_send(server::Join {
id: self.id, id: self.id,
name: self.room.clone(), name: self.room.clone(),
}); });
@ -181,14 +169,14 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
m.to_owned() m.to_owned()
}; };
// send message to chat server // send message to chat server
ctx.state().addr.do_send(server::Message { self.addr.do_send(server::Message {
id: self.id, id: self.id,
msg: msg, msg: msg,
room: self.room.clone(), room: self.room.clone(),
}) })
} }
} }
ws::Message::Binary(bin) => println!("Unexpected binary"), ws::Message::Binary(_) => println!("Unexpected binary"),
ws::Message::Close(_) => { ws::Message::Close(_) => {
ctx.stop(); ctx.stop();
} }
@ -200,7 +188,7 @@ impl WsChatSession {
/// helper method that sends ping to client every second. /// helper method that sends ping to client every second.
/// ///
/// also this method checks heartbeats from client /// also this method checks heartbeats from client
fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) { fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
// check client heartbeats // check client heartbeats
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
@ -208,7 +196,7 @@ impl WsChatSession {
println!("Websocket Client heartbeat failed, disconnecting!"); println!("Websocket Client heartbeat failed, disconnecting!");
// notify chat server // notify chat server
ctx.state().addr.do_send(server::Disconnect { id: act.id }); act.addr.do_send(server::Disconnect { id: act.id });
// stop actor // stop actor
ctx.stop(); ctx.stop();
@ -222,45 +210,38 @@ impl WsChatSession {
} }
} }
fn main() { fn main() -> std::io::Result<()> {
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("websocket-example"); let sys = actix::System::new("websocket-example");
// Start chat server actor in separate thread // Start chat server actor
let server = Arbiter::start(|_| server::ChatServer::default()); let server = server::ChatServer::default().start();
// Start tcp server in separate thread // Start tcp server in separate thread
let srv = server.clone(); let srv = server.clone();
Arbiter::new("tcp-server").do_send::<msgs::Execute>(msgs::Execute::new(move || { Arbiter::new().exec(move || {
session::TcpServer::new("127.0.0.1:12345", srv); session::TcpServer::new("127.0.0.1:12345", srv);
Ok(()) Ok::<_, ()>(())
})); });
// Create Http server with websocket support // Create Http server with websocket support
HttpServer::new(move || { HttpServer::new(move || {
// Websocket sessions state App::new()
let state = WsChatSessionState { .data(server.clone())
addr: server.clone(),
};
App::with_state(state)
// redirect to websocket.html // redirect to websocket.html
.resource("/", |r| { .service(web::resource("/").route(web::get().to(|| {
r.method(http::Method::GET).f(|_| {
HttpResponse::Found() HttpResponse::Found()
.header("LOCATION", "/static/websocket.html") .header("LOCATION", "/static/websocket.html")
.finish() .finish()
}) })))
})
// websocket // websocket
.resource("/ws/", |r| r.route().f(chat_route)) .service(web::resource("/ws/").to(chat_route))
// static resources // static resources
.handler("/static/", fs::StaticFiles::new("static/").unwrap()) .service(fs::Files::new("/static/", "static/"))
}) })
.bind("127.0.0.1:8080") .bind("127.0.0.1:8080")?
.unwrap()
.start(); .start();
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); sys.run()
} }

View File

@ -6,7 +6,7 @@ use actix::prelude::*;
use rand::{self, rngs::ThreadRng, Rng}; use rand::{self, rngs::ThreadRng, Rng};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use session; use crate::session;
/// Message for chat server communications /// Message for chat server communications

View File

@ -11,8 +11,8 @@ use tokio_tcp::{TcpListener, TcpStream};
use actix::prelude::*; use actix::prelude::*;
use codec::{ChatCodec, ChatRequest, ChatResponse}; use crate::codec::{ChatCodec, ChatRequest, ChatResponse};
use server::{self, ChatServer}; use crate::server::{self, ChatServer};
/// Chat server sends this messages to session /// Chat server sends this messages to session
#[derive(Message)] #[derive(Message)]
@ -62,7 +62,7 @@ impl Actor for ChatSession {
.wait(ctx); .wait(ctx);
} }
fn stopping(&mut self, ctx: &mut Self::Context) -> Running { fn stopping(&mut self, _: &mut Self::Context) -> Running {
// notify chat server // notify chat server
self.addr.do_send(server::Disconnect { id: self.id }); self.addr.do_send(server::Disconnect { id: self.id });
Running::Stop Running::Stop
@ -82,7 +82,7 @@ impl StreamHandler<ChatRequest, io::Error> for ChatSession {
self.addr self.addr
.send(server::ListRooms) .send(server::ListRooms)
.into_actor(self) .into_actor(self)
.then(|res, act, ctx| { .then(|res, act, _| {
match res { match res {
Ok(rooms) => { Ok(rooms) => {
act.framed.write(ChatResponse::Rooms(rooms)); act.framed.write(ChatResponse::Rooms(rooms));
@ -124,7 +124,7 @@ impl StreamHandler<ChatRequest, io::Error> for ChatSession {
impl Handler<Message> for ChatSession { impl Handler<Message> for ChatSession {
type Result = (); type Result = ();
fn handle(&mut self, msg: Message, ctx: &mut Context<Self>) { fn handle(&mut self, msg: Message, _: &mut Context<Self>) {
// send message to peer // send message to peer
self.framed.write(ChatResponse::Message(msg.0)); self.framed.write(ChatResponse::Message(msg.0));
} }
@ -175,7 +175,7 @@ pub struct TcpServer {
} }
impl TcpServer { impl TcpServer {
pub fn new(s: &str, chat: Addr<ChatServer>) { pub fn new(_s: &str, chat: Addr<ChatServer>) {
// Create server listener // Create server listener
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
let listener = TcpListener::bind(&addr).unwrap(); let listener = TcpListener::bind(&addr).unwrap();