1
0
mirror of https://github.com/actix/examples synced 2024-11-23 14:31:07 +01:00

Upgrade dependencies in websocket-tcp-example (#469)

Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
Nick 2022-02-17 14:54:41 -05:00 committed by GitHub
parent 5c1e25fe52
commit d815a44ccc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 510 additions and 439 deletions

657
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,18 +12,20 @@ name = "websocket-tcp-client"
path = "src/client.rs" path = "src/client.rs"
[dependencies] [dependencies]
actix = "0.10" actix = "0.12"
actix-web = "3" actix-codec = "0.4.1"
actix-web-actors = "3" actix-files = "0.6.0-beta.16"
actix-files = "0.3" actix-web = "4.0.0-rc.3"
actix-codec = "0.3" actix-web-actors = "4.0.0-beta.11"
rand = "0.7"
bytes = "0.5.3"
byteorder = "1.2" byteorder = "1.2"
bytes = "1"
env_logger = "0.9"
futures = "0.3" futures = "0.3"
env_logger = "0.8" log = "0.4"
serde = { version = "1.0", features = ["derive"] } rand = "0.8"
serde_json = "1.0" serde = { version = "1", features = ["derive"] }
tokio = "0.2.4" serde_json = "1"
tokio-util = "0.3" tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.6", features = ["codec"] }
tokio-stream = "0.1.8"

View File

@ -1,139 +1,114 @@
use actix::prelude::*; use std::{io, thread};
use std::str::FromStr;
use std::time::Duration; use futures::{SinkExt, StreamExt};
use std::{io, net, thread}; use tokio::{net::TcpStream, select, sync::mpsc};
use tokio::io::{split, WriteHalf}; use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
mod codec; mod codec;
#[actix_web::main] #[actix_web::main]
async fn main() { async fn main() {
// Connect to server env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
println!("Running chat client"); println!("Running chat client");
let stream = TcpStream::connect(&addr).await.unwrap(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let mut cmd_rx = UnboundedReceiverStream::new(cmd_rx);
let addr = ChatClient::create(|ctx| { // run blocking terminal input reader on separate thread
let (r, w) = split(stream); let input_thread = thread::spawn(move || loop {
ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); let mut cmd = String::with_capacity(32);
ChatClient {
framed: actix::io::FramedWrite::new(w, codec::ClientChatCodec, ctx),
}
});
// start console loop
thread::spawn(move || loop {
let mut cmd = String::new();
if io::stdin().read_line(&mut cmd).is_err() { if io::stdin().read_line(&mut cmd).is_err() {
println!("error"); log::error!("error reading line");
return; return;
} }
addr.do_send(ClientCommand(cmd)); if cmd == "/exit" {
println!("exiting input loop");
return;
}
cmd_tx.send(cmd).unwrap();
}); });
}
struct ChatClient { let io = TcpStream::connect(("127.0.0.1", 12345)).await.unwrap();
framed: actix::io::FramedWrite< let mut framed = actix_codec::Framed::new(io, codec::ClientChatCodec);
codec::ChatRequest,
WriteHalf<TcpStream>,
codec::ClientChatCodec,
>,
}
#[derive(Message)] loop {
#[rtype(result = "()")] select! {
struct ClientCommand(String); Some(msg) = framed.next() => {
match msg {
impl Actor for ChatClient { Ok(codec::ChatResponse::Message(ref msg)) => {
type Context = Context<Self>; println!("message: {}", msg);
fn started(&mut self, ctx: &mut Context<Self>) {
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
fn stopped(&mut self, _: &mut Context<Self>) {
println!("Disconnected");
// Stop application on disconnect
System::current().stop();
}
}
impl ChatClient {
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.framed.write(codec::ChatRequest::Ping);
act.hb(ctx);
// client should also check for a timeout here, similar to the
// server code
});
}
}
impl actix::io::WriteHandler<io::Error> for ChatClient {}
/// Handle stdin commands
impl Handler<ClientCommand> for ChatClient {
type Result = ();
fn handle(&mut self, msg: ClientCommand, _: &mut Context<Self>) {
let m = msg.0.trim();
if m.is_empty() {
return;
}
// we check for /sss type of messages
if m.starts_with('/') {
let v: Vec<&str> = m.splitn(2, ' ').collect();
match v[0] {
"/list" => {
self.framed.write(codec::ChatRequest::List);
}
"/join" => {
if v.len() == 2 {
self.framed.write(codec::ChatRequest::Join(v[1].to_owned()));
} else {
println!("!!! room name is required");
} }
Ok(codec::ChatResponse::Joined(ref msg)) => {
println!("!!! joined: {}", msg);
}
Ok(codec::ChatResponse::Rooms(rooms)) => {
println!("!!! Available rooms:");
for room in rooms {
println!("{}", room);
}
}
// respond to pings with a "pong"
Ok(codec::ChatResponse::Ping) => { framed.send(codec::ChatRequest::Ping).await.unwrap(); },
_ => { eprintln!("{:?}", msg); }
} }
_ => println!("!!! unknown command"),
} }
} else {
self.framed.write(codec::ChatRequest::Message(m.to_owned())); Some(cmd) = cmd_rx.next() => {
if cmd.is_empty() {
continue;
}
if cmd == "/exit" {
println!("exiting recv loop");
return;
}
if let Some(req) = parse_client_command(&cmd) {
// submit client command
framed.send(req).await.unwrap();
}
}
else => break
} }
} }
input_thread.join().unwrap();
} }
/// Server communication fn parse_client_command(msg: &str) -> Option<codec::ChatRequest> {
let m = msg.trim();
impl StreamHandler<Result<codec::ChatResponse, io::Error>> for ChatClient { if m.is_empty() {
fn handle( return None;
&mut self, }
msg: Result<codec::ChatResponse, io::Error>,
ctx: &mut Context<Self>, // we check for /sss type of messages
) { if m.starts_with('/') {
match msg { let v: Vec<&str> = m.splitn(2, ' ').collect();
Ok(codec::ChatResponse::Message(ref msg)) => { match v[0] {
println!("message: {}", msg); "/list" => Some(codec::ChatRequest::List),
} "/join" => {
Ok(codec::ChatResponse::Joined(ref msg)) => { if v.len() == 2 {
println!("!!! joined: {}", msg); Some(codec::ChatRequest::Join(v[1].to_owned()))
} } else {
Ok(codec::ChatResponse::Rooms(rooms)) => { println!("!!! room name is required");
println!("\n!!! Available rooms:"); None
for room in rooms {
println!("{}", room);
} }
println!();
} }
_ => ctx.stop(), _ => {
println!("!!! unknown command");
None
}
} }
} else {
Some(codec::ChatRequest::Message(m.to_owned()))
} }
} }

View File

@ -3,8 +3,8 @@ use std::io;
use actix::prelude::*; use actix::prelude::*;
use actix_codec::{Decoder, Encoder}; use actix_codec::{Decoder, Encoder};
use actix_web::web::{BufMut, BytesMut};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json as json; use serde_json as json;

View File

@ -2,7 +2,7 @@ use std::time::{Duration, Instant};
use actix::*; use actix::*;
use actix_files as fs; use actix_files as fs;
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web::{http::header, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws; use actix_web_actors::ws;
mod codec; mod codec;
@ -221,7 +221,7 @@ impl WsChatSession {
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
env_logger::init(); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
// Start chat server actor // Start chat server actor
let server = server::ChatServer::default().start(); let server = server::ChatServer::default().start();
@ -230,16 +230,15 @@ async fn main() -> std::io::Result<()> {
let srv = server.clone(); let srv = server.clone();
session::tcp_server("127.0.0.1:12345", srv); session::tcp_server("127.0.0.1:12345", srv);
println!("Started http server: 127.0.0.1:8080"); log::info!("starting HTTP+WebSocket server at http://localhost:8080");
// Create Http server with websocket support
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.data(server.clone()) .app_data(web::Data::new(server.clone()))
// redirect to websocket.html // redirect to websocket.html
.service(web::resource("/").route(web::get().to(|| { .service(web::resource("/").route(web::get().to(|| async {
HttpResponse::Found() HttpResponse::Found()
.header("LOCATION", "/static/websocket.html") .insert_header((header::LOCATION, "/static/websocket.html"))
.finish() .finish()
}))) })))
// websocket // websocket
@ -247,7 +246,8 @@ async fn main() -> std::io::Result<()> {
// static resources // static resources
.service(fs::Files::new("/static/", "static/")) .service(fs::Files::new("/static/", "static/"))
}) })
.bind("127.0.0.1:8080")? .bind(("127.0.0.1", 8080))?
.workers(1)
.run() .run()
.await .await
} }

View File

@ -1,18 +1,23 @@
//! `ClientSession` is an actor, it manages peer tcp connection and //! `ClientSession` is an actor, it manages peer tcp connection and
//! proxies commands from peer to `ChatServer`. //! proxies commands from peer to `ChatServer`.
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::{io, net};
use futures::StreamExt; use std::{
use tokio::io::{split, WriteHalf}; io, net,
use tokio::net::{TcpListener, TcpStream}; str::FromStr,
time::{Duration, Instant},
};
use actix::{prelude::*, spawn};
use tokio::{
io::{split, WriteHalf},
net::{TcpListener, TcpStream},
};
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use actix::prelude::*; use crate::{
codec::{ChatCodec, ChatRequest, ChatResponse},
use crate::codec::{ChatCodec, ChatRequest, ChatResponse}; server::{self, ChatServer},
use crate::server::{self, ChatServer}; };
/// Chat server sends this messages to session /// Chat server sends this messages to session
#[derive(Message)] #[derive(Message)]
@ -170,32 +175,22 @@ impl ChatSession {
} }
} }
/// Define tcp server that will accept incoming tcp connection and create /// Define TCP server that will accept incoming TCP connection and create
/// chat actors. /// chat actors.
pub fn tcp_server(_s: &str, server: Addr<ChatServer>) { pub fn tcp_server(_s: &str, server: Addr<ChatServer>) {
// Create server listener // Create server listener
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
actix_web::rt::spawn(async move { spawn(async move {
let server = server.clone(); let listener = TcpListener::bind(&addr).await.unwrap();
let mut listener = TcpListener::bind(&addr).await.unwrap();
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await { while let Ok((stream, _)) = listener.accept().await {
match stream { let server = server.clone();
Ok(stream) => { ChatSession::create(|ctx| {
let server = server.clone(); let (r, w) = split(stream);
ChatSession::create(|ctx| { ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx);
let (r, w) = split(stream); ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx))
ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); });
ChatSession::new(
server,
actix::io::FramedWrite::new(w, ChatCodec, ctx),
)
});
}
Err(_) => return,
}
} }
}); });
} }