1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

update websocket-chat example

This commit is contained in:
Nikolay Kim 2018-02-08 14:03:27 -08:00
parent f8f99ec0c7
commit 6181a84d7b
2 changed files with 32 additions and 24 deletions

View File

@ -13,6 +13,8 @@ use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use futures::Future; use futures::Future;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use tokio_io::io::WriteHalf;
use tokio_io::codec::FramedRead;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use actix::prelude::*; use actix::prelude::*;
@ -27,12 +29,12 @@ fn main() {
Arbiter::handle().spawn( Arbiter::handle().spawn(
TcpStream::connect(&addr, Arbiter::handle()) TcpStream::connect(&addr, Arbiter::handle())
.and_then(|stream| { .and_then(|stream| {
let addr: SyncAddress<_> = ChatClient::create(|mut ctx| { let addr: SyncAddress<_> = ChatClient::create(|ctx| {
let (reader, writer) = let (r, w) = stream.split();
FramedReader::wrap(stream.framed(codec::ClientChatCodec)); ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx);
ChatClient::add_stream(reader, &mut ctx); ChatClient{
ChatClient{framed: writer} framed: actix::io::FramedWrite::new(
}); w, codec::ClientChatCodec, ctx)}});
// start console loop // start console loop
thread::spawn(move|| { thread::spawn(move|| {
@ -61,7 +63,7 @@ fn main() {
struct ChatClient { struct ChatClient {
framed: FramedWriter<TcpStream, codec::ClientChatCodec>, framed: actix::io::FramedWrite<WriteHalf<TcpStream>, codec::ClientChatCodec>,
} }
#[derive(Message)] #[derive(Message)]
@ -88,12 +90,14 @@ impl Actor for ChatClient {
impl ChatClient { impl ChatClient {
fn hb(&self, ctx: &mut Context<Self>) { fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| { ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.framed.send(codec::ChatRequest::Ping); act.framed.write(codec::ChatRequest::Ping);
act.hb(ctx); act.hb(ctx);
}); });
} }
} }
impl actix::io::WriteHandler<io::Error> for ChatClient {}
/// Handle stdin commands /// Handle stdin commands
impl Handler<ClientCommand> for ChatClient { impl Handler<ClientCommand> for ChatClient {
type Result = (); type Result = ();
@ -109,11 +113,11 @@ impl Handler<ClientCommand> for ChatClient {
let v: Vec<&str> = m.splitn(2, ' ').collect(); let v: Vec<&str> = m.splitn(2, ' ').collect();
match v[0] { match v[0] {
"/list" => { "/list" => {
let _ = self.framed.send(codec::ChatRequest::List); self.framed.write(codec::ChatRequest::List);
}, },
"/join" => { "/join" => {
if v.len() == 2 { if v.len() == 2 {
let _ = self.framed.send(codec::ChatRequest::Join(v[1].to_owned())); self.framed.write(codec::ChatRequest::Join(v[1].to_owned()));
} else { } else {
println!("!!! room name is required"); println!("!!! room name is required");
} }
@ -121,14 +125,14 @@ impl Handler<ClientCommand> for ChatClient {
_ => println!("!!! unknown command"), _ => println!("!!! unknown command"),
} }
} else { } else {
let _ = self.framed.send(codec::ChatRequest::Message(m.to_owned())); self.framed.write(codec::ChatRequest::Message(m.to_owned()));
} }
} }
} }
/// Server communication /// Server communication
impl StreamHandler<codec::ChatResponse, FramedError<codec::ClientChatCodec>> for ChatClient { impl StreamHandler<codec::ChatResponse, io::Error> for ChatClient {
fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context<Self>) { fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context<Self>) {
match msg { match msg {

View File

@ -1,10 +1,12 @@
//! `ClientSession` is an actor, it manages peer tcp connection and //! `ClientSession` is an actor, it manages peer tcp connection and
//! proxies commands from peer to `ChatServer`. //! proxies commands from peer to `ChatServer`.
use std::net; use std::{io, net};
use std::str::FromStr; use std::str::FromStr;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use futures::Stream; use futures::Stream;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
use tokio_io::io::WriteHalf;
use tokio_io::codec::FramedRead;
use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::net::{TcpStream, TcpListener};
use actix::prelude::*; use actix::prelude::*;
@ -28,7 +30,7 @@ pub struct ChatSession {
/// joined room /// joined room
room: String, room: String,
/// Framed wrapper /// Framed wrapper
framed: FramedWriter<TcpStream, ChatCodec>, framed: actix::io::FramedWrite<WriteHalf<TcpStream>, ChatCodec>,
} }
impl Actor for ChatSession { impl Actor for ChatSession {
@ -62,8 +64,10 @@ impl Actor for ChatSession {
} }
} }
impl actix::io::WriteHandler<io::Error> for ChatSession {}
/// To use `Framed` we have to define Io type and Codec /// To use `Framed` we have to define Io type and Codec
impl StreamHandler<ChatRequest, FramedError<ChatCodec>> for ChatSession { impl StreamHandler<ChatRequest, io::Error> for ChatSession {
/// This is main event loop for client requests /// This is main event loop for client requests
fn handle(&mut self, msg: ChatRequest, ctx: &mut Context<Self>) { fn handle(&mut self, msg: ChatRequest, ctx: &mut Context<Self>) {
@ -74,7 +78,7 @@ impl StreamHandler<ChatRequest, FramedError<ChatCodec>> for ChatSession {
self.addr.call(self, server::ListRooms).then(|res, act, ctx| { self.addr.call(self, server::ListRooms).then(|res, act, ctx| {
match res { match res {
Ok(Ok(rooms)) => { Ok(Ok(rooms)) => {
let _ = act.framed.send(ChatResponse::Rooms(rooms)); act.framed.write(ChatResponse::Rooms(rooms));
}, },
_ => println!("Something is wrong"), _ => println!("Something is wrong"),
} }
@ -87,7 +91,7 @@ impl StreamHandler<ChatRequest, FramedError<ChatCodec>> for ChatSession {
println!("Join to room: {}", name); println!("Join to room: {}", name);
self.room = name.clone(); self.room = name.clone();
self.addr.send(server::Join{id: self.id, name: name.clone()}); self.addr.send(server::Join{id: self.id, name: name.clone()});
let _ = self.framed.send(ChatResponse::Joined(name)); self.framed.write(ChatResponse::Joined(name));
}, },
ChatRequest::Message(message) => { ChatRequest::Message(message) => {
// send message to chat server // send message to chat server
@ -110,7 +114,7 @@ impl Handler<Message> for ChatSession {
fn handle(&mut self, msg: Message, ctx: &mut Context<Self>) { fn handle(&mut self, msg: Message, ctx: &mut Context<Self>) {
// send message to peer // send message to peer
let _ = self.framed.send(ChatResponse::Message(msg.0)); self.framed.write(ChatResponse::Message(msg.0));
} }
} }
@ -118,7 +122,7 @@ impl Handler<Message> for ChatSession {
impl ChatSession { impl ChatSession {
pub fn new(addr: SyncAddress<ChatServer>, pub fn new(addr: SyncAddress<ChatServer>,
framed: FramedWriter<TcpStream, ChatCodec>) -> ChatSession { framed: actix::io::FramedWrite<WriteHalf<TcpStream>, ChatCodec>) -> ChatSession {
ChatSession {id: 0, addr: addr, hb: Instant::now(), ChatSession {id: 0, addr: addr, hb: Instant::now(),
room: "Main".to_owned(), framed: framed} room: "Main".to_owned(), framed: framed}
} }
@ -140,7 +144,7 @@ impl ChatSession {
ctx.stop(); ctx.stop();
} }
act.framed.send(ChatResponse::Ping); act.framed.write(ChatResponse::Ping);
// if we can not send message to sink, sink is closed (disconnected) // if we can not send message to sink, sink is closed (disconnected)
act.hb(ctx); act.hb(ctx);
}); });
@ -191,10 +195,10 @@ impl Handler<TcpConnect> for TcpServer {
// For each incoming connection we create `ChatSession` actor // For each incoming connection we create `ChatSession` actor
// with out chat server address. // with out chat server address.
let server = self.chat.clone(); let server = self.chat.clone();
let _: () = ChatSession::create(|mut ctx| { let _: () = ChatSession::create(|ctx| {
let (reader, writer) = FramedReader::wrap(msg.0.framed(ChatCodec)); let (r, w) = msg.0.split();
ChatSession::add_stream(reader, &mut ctx); ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx);
ChatSession::new(server, writer) ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx))
}); });
} }
} }