1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

update websocket-chat example

This commit is contained in:
Nikolay Kim 2018-02-03 08:25:31 -08:00
parent 671ab35cf6
commit d568161852
2 changed files with 61 additions and 62 deletions

View File

@ -27,9 +27,12 @@ fn main() {
Arbiter::handle().spawn( Arbiter::handle().spawn(
TcpStream::connect(&addr, Arbiter::handle()) TcpStream::connect(&addr, Arbiter::handle())
.and_then(|stream| { .and_then(|stream| {
let addr: SyncAddress<_> = ChatClient::create_with( let addr: SyncAddress<_> = ChatClient::create(|mut ctx| {
stream.framed(codec::ClientChatCodec), let (reader, writer) =
|_, framed| ChatClient{framed: framed}); FramedReader::wrap(stream.framed(codec::ClientChatCodec));
ChatClient::add_stream(reader, &mut ctx);
ChatClient{framed: writer}
});
// start console loop // start console loop
thread::spawn(move|| { thread::spawn(move|| {
@ -58,7 +61,7 @@ fn main() {
struct ChatClient { struct ChatClient {
framed: FramedCell<TcpStream, codec::ClientChatCodec>, framed: FramedWriter<TcpStream, codec::ClientChatCodec>,
} }
#[derive(Message)] #[derive(Message)]
@ -125,27 +128,24 @@ impl Handler<ClientCommand> for ChatClient {
/// Server communication /// Server communication
impl FramedHandler<TcpStream, codec::ClientChatCodec> for ChatClient { impl StreamHandler<codec::ChatResponse, FramedError<codec::ClientChatCodec>> for ChatClient {
fn handle(&mut self, msg: io::Result<codec::ChatResponse>, ctx: &mut Context<Self>) { fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context<Self>) {
match msg { match msg {
Err(_) => ctx.stop(), codec::ChatResponse::Message(ref msg) => {
Ok(msg) => match msg { println!("message: {}", msg);
codec::ChatResponse::Message(ref msg) => {
println!("message: {}", msg);
}
codec::ChatResponse::Joined(ref msg) => {
println!("!!! joined: {}", msg);
}
codec::ChatResponse::Rooms(rooms) => {
println!("\n!!! Available rooms:");
for room in rooms {
println!("{}", room);
}
println!("");
}
_ => (),
} }
codec::ChatResponse::Joined(ref msg) => {
println!("!!! joined: {}", msg);
}
codec::ChatResponse::Rooms(rooms) => {
println!("\n!!! Available rooms:");
for room in rooms {
println!("{}", room);
}
println!("");
}
_ => (),
} }
} }
} }

View File

@ -1,6 +1,6 @@
//! `ClientSession` is an actor, it manages peer tcp connection and //! `ClientSession` is an actor, it manages peer tcp connection and
//! proxies commands from peer to `ChatServer`. //! proxies commands from peer to `ChatServer`.
use std::{io, net}; use std::net;
use std::str::FromStr; use std::str::FromStr;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use futures::Stream; use futures::Stream;
@ -28,7 +28,7 @@ pub struct ChatSession {
/// joined room /// joined room
room: String, room: String,
/// Framed wrapper /// Framed wrapper
framed: FramedCell<TcpStream, ChatCodec>, framed: FramedWriter<TcpStream, ChatCodec>,
} }
impl Actor for ChatSession { impl Actor for ChatSession {
@ -63,46 +63,43 @@ impl Actor for ChatSession {
} }
/// To use `Framed` we have to define Io type and Codec /// To use `Framed` we have to define Io type and Codec
impl FramedHandler<TcpStream, ChatCodec> for ChatSession { impl StreamHandler<ChatRequest, FramedError<ChatCodec>> for ChatSession {
/// This is main event loop for client requests /// This is main event loop for client requests
fn handle(&mut self, msg: io::Result<ChatRequest>, ctx: &mut Context<Self>) { fn handle(&mut self, msg: ChatRequest, ctx: &mut Context<Self>) {
match msg { match msg {
Err(_) => ctx.stop(), ChatRequest::List => {
Ok(msg) => match msg { // Send ListRooms message to chat server and wait for response
ChatRequest::List => { println!("List rooms");
// Send ListRooms message to chat server and wait for response self.addr.call(self, server::ListRooms).then(|res, act, ctx| {
println!("List rooms"); match res {
self.addr.call(self, server::ListRooms).then(|res, act, ctx| { Ok(Ok(rooms)) => {
match res { let _ = act.framed.send(ChatResponse::Rooms(rooms));
Ok(Ok(rooms)) => { },
let _ = act.framed.send(ChatResponse::Rooms(rooms));
},
_ => println!("Something is wrong"), _ => println!("Something is wrong"),
} }
actix::fut::ok(()) actix::fut::ok(())
}).wait(ctx) }).wait(ctx)
// .wait(ctx) pauses all events in context, // .wait(ctx) pauses all events in context,
// so actor wont receive any new messages until it get list of rooms back // so actor wont receive any new messages until it get list of rooms back
}, },
ChatRequest::Join(name) => { ChatRequest::Join(name) => {
println!("Join to room: {}", name); println!("Join to room: {}", name);
self.room = name.clone(); self.room = name.clone();
self.addr.send(server::Join{id: self.id, name: name.clone()}); self.addr.send(server::Join{id: self.id, name: name.clone()});
let _ = self.framed.send(ChatResponse::Joined(name)); let _ = self.framed.send(ChatResponse::Joined(name));
}, },
ChatRequest::Message(message) => { ChatRequest::Message(message) => {
// send message to chat server // send message to chat server
println!("Peer message: {}", message); println!("Peer message: {}", message);
self.addr.send( self.addr.send(
server::Message{id: self.id, server::Message{id: self.id,
msg: message, room: msg: message, room:
self.room.clone()}) self.room.clone()})
}
// we update heartbeat time on ping from peer
ChatRequest::Ping =>
self.hb = Instant::now(),
} }
// we update heartbeat time on ping from peer
ChatRequest::Ping =>
self.hb = Instant::now(),
} }
} }
} }
@ -121,7 +118,7 @@ impl Handler<Message> for ChatSession {
impl ChatSession { impl ChatSession {
pub fn new(addr: SyncAddress<ChatServer>, pub fn new(addr: SyncAddress<ChatServer>,
framed: FramedCell<TcpStream, ChatCodec>) -> ChatSession { framed: FramedWriter<TcpStream, ChatCodec>) -> ChatSession {
ChatSession {id: 0, addr: addr, hb: Instant::now(), ChatSession {id: 0, addr: addr, hb: Instant::now(),
room: "Main".to_owned(), framed: framed} room: "Main".to_owned(), framed: framed}
} }
@ -194,8 +191,10 @@ impl Handler<TcpConnect> for TcpServer {
// For each incoming connection we create `ChatSession` actor // For each incoming connection we create `ChatSession` actor
// with out chat server address. // with out chat server address.
let server = self.chat.clone(); let server = self.chat.clone();
let _: () = ChatSession::create_with(msg.0.framed(ChatCodec), |_, framed| { let _: () = ChatSession::create(|mut ctx| {
ChatSession::new(server, framed) let (reader, writer) = FramedReader::wrap(msg.0.framed(ChatCodec));
ChatSession::add_stream(reader, &mut ctx);
ChatSession::new(server, writer)
}); });
} }
} }