From 4aa663e7943ae597c0e3759b8017a5e8a176a67f Mon Sep 17 00:00:00 2001 From: Guoli Lyu Date: Thu, 11 Jul 2019 16:48:14 +0800 Subject: [PATCH] feat: update ws client example to actix-web 1.0 (#143) --- websocket/Cargo.toml | 8 +++-- websocket/src/client.rs | 72 +++++++++++++++++++++++++++++------------ 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/websocket/Cargo.toml b/websocket/Cargo.toml index 71838883..891ef14e 100644 --- a/websocket/Cargo.toml +++ b/websocket/Cargo.toml @@ -9,15 +9,17 @@ workspace = ".." name = "websocket-server" path = "src/main.rs" -#[[bin]] -#name = "websocket-client" -#path = "src/client.rs" +[[bin]] +name = "websocket-client" +path = "src/client.rs" [dependencies] actix = "0.8.2" +actix-codec = "0.1.2" actix-web = "1.0.0" actix-web-actors = "1.0.0" actix-files = "0.1.1" +awc = "0.2.1" env_logger = "0.6" futures = "0.1" bytes = "0.4" \ No newline at end of file diff --git a/websocket/src/client.rs b/websocket/src/client.rs index 11f827bb..04a683ad 100644 --- a/websocket/src/client.rs +++ b/websocket/src/client.rs @@ -2,26 +2,39 @@ use std::time::Duration; use std::{io, thread}; +use actix::io::SinkWrite; use actix::*; -use actix_web::client::{Client, ClientWriter, Message, ProtocolError}; -use futures::Future; +use actix_codec::{AsyncRead, AsyncWrite, Framed}; +use awc::{ + error::WsProtocolError, + ws::{Codec, Frame, Message}, + Client, +}; +use futures::{ + lazy, + stream::{SplitSink, Stream}, + Future, +}; fn main() { ::std::env::set_var("RUST_LOG", "actix_web=info"); let _ = env_logger::init(); let sys = actix::System::new("ws-example"); - Arbiter::spawn( - Client::new("http://127.0.0.1:8080/ws/") + Arbiter::spawn(lazy(|| { + Client::new() + .ws("http://127.0.0.1:8080/ws/") .connect() .map_err(|e| { println!("Error: {}", e); () }) - .map(|(reader, writer)| { + .map(|(response, framed)| { + println!("{:?}", response); + let (sink, stream) = framed.split(); let addr = ChatClient::create(|ctx| { - ChatClient::add_stream(reader, ctx); - ChatClient(writer) + ChatClient::add_stream(stream, ctx); + ChatClient(SinkWrite::new(sink, ctx)) }); // start console loop @@ -35,18 +48,23 @@ fn main() { }); () - }), - ); + }) + })); let _ = sys.run(); } -struct ChatClient(ClientWriter); +struct ChatClient(SinkWrite>>) +where + T: AsyncRead + AsyncWrite; #[derive(Message)] struct ClientCommand(String); -impl Actor for ChatClient { +impl Actor for ChatClient +where + T: AsyncRead + AsyncWrite, +{ type Context = Context; fn started(&mut self, ctx: &mut Context) { @@ -62,10 +80,13 @@ impl Actor for ChatClient { } } -impl ChatClient { +impl ChatClient +where + T: AsyncRead + AsyncWrite, +{ fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.0.ping(""); + act.0.write(Message::Ping(String::new())).unwrap(); act.hb(ctx); // client should also check for a timeout here, similar to the @@ -75,24 +96,30 @@ impl ChatClient { } /// Handle stdin commands -impl Handler for ChatClient { +impl Handler for ChatClient +where + T: AsyncRead + AsyncWrite, +{ type Result = (); - fn handle(&mut self, msg: ClientCommand, ctx: &mut Context) { - self.0.text(msg.0) + fn handle(&mut self, msg: ClientCommand, _ctx: &mut Context) { + self.0.write(Message::Text(msg.0)).unwrap(); } } /// Handle server websocket messages -impl StreamHandler for ChatClient { - fn handle(&mut self, msg: Message, ctx: &mut Context) { +impl StreamHandler for ChatClient +where + T: AsyncRead + AsyncWrite, +{ + fn handle(&mut self, msg: Frame, _ctx: &mut Context) { match msg { - Message::Text(txt) => println!("Server: {:?}", txt), + Frame::Text(txt) => println!("Server: {:?}", txt), _ => (), } } - fn started(&mut self, ctx: &mut Context) { + fn started(&mut self, _ctx: &mut Context) { println!("Connected"); } @@ -101,3 +128,8 @@ impl StreamHandler for ChatClient { ctx.stop() } } + +impl actix::io::WriteHandler for ChatClient where + T: AsyncRead + AsyncWrite +{ +}