diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 66011054..057036e6 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -27,9 +27,12 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: SyncAddress<_> = ChatClient::create_with( - stream.framed(codec::ClientChatCodec), - |_, framed| ChatClient{framed: framed}); + let addr: SyncAddress<_> = ChatClient::create(|mut ctx| { + let (reader, writer) = + FramedReader::wrap(stream.framed(codec::ClientChatCodec)); + ChatClient::add_stream(reader, &mut ctx); + ChatClient{framed: writer} + }); // start console loop thread::spawn(move|| { @@ -58,7 +61,7 @@ fn main() { struct ChatClient { - framed: FramedCell, + framed: FramedWriter, } #[derive(Message)] @@ -125,27 +128,24 @@ impl Handler for ChatClient { /// Server communication -impl FramedHandler for ChatClient { +impl StreamHandler> for ChatClient { - fn handle(&mut self, msg: io::Result, ctx: &mut Context) { + fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context) { match msg { - Err(_) => ctx.stop(), - Ok(msg) => match msg { - codec::ChatResponse::Message(ref msg) => { - println!("message: {}", msg); - } - codec::ChatResponse::Joined(ref msg) => { - println!("!!! joined: {}", msg); - } - codec::ChatResponse::Rooms(rooms) => { - println!("\n!!! Available rooms:"); - for room in rooms { - println!("{}", room); - } - println!(""); - } - _ => (), + codec::ChatResponse::Message(ref msg) => { + println!("message: {}", msg); } + codec::ChatResponse::Joined(ref msg) => { + println!("!!! joined: {}", msg); + } + codec::ChatResponse::Rooms(rooms) => { + println!("\n!!! Available rooms:"); + for room in rooms { + println!("{}", room); + } + println!(""); + } + _ => (), } } } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 7589dc98..26598c16 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -1,6 +1,6 @@ //! `ClientSession` is an actor, it manages peer tcp connection and //! proxies commands from peer to `ChatServer`. -use std::{io, net}; +use std::net; use std::str::FromStr; use std::time::{Instant, Duration}; use futures::Stream; @@ -28,7 +28,7 @@ pub struct ChatSession { /// joined room room: String, /// Framed wrapper - framed: FramedCell, + framed: FramedWriter, } impl Actor for ChatSession { @@ -63,46 +63,43 @@ impl Actor for ChatSession { } /// To use `Framed` we have to define Io type and Codec -impl FramedHandler for ChatSession { +impl StreamHandler> for ChatSession { /// This is main event loop for client requests - fn handle(&mut self, msg: io::Result, ctx: &mut Context) { + fn handle(&mut self, msg: ChatRequest, ctx: &mut Context) { match msg { - Err(_) => ctx.stop(), - Ok(msg) => match msg { - ChatRequest::List => { - // Send ListRooms message to chat server and wait for response - println!("List rooms"); - self.addr.call(self, server::ListRooms).then(|res, act, ctx| { - match res { - Ok(Ok(rooms)) => { - let _ = act.framed.send(ChatResponse::Rooms(rooms)); - }, + ChatRequest::List => { + // Send ListRooms message to chat server and wait for response + println!("List rooms"); + self.addr.call(self, server::ListRooms).then(|res, act, ctx| { + match res { + Ok(Ok(rooms)) => { + let _ = act.framed.send(ChatResponse::Rooms(rooms)); + }, _ => println!("Something is wrong"), - } - actix::fut::ok(()) - }).wait(ctx) - // .wait(ctx) pauses all events in context, - // so actor wont receive any new messages until it get list of rooms back - }, - ChatRequest::Join(name) => { - println!("Join to room: {}", name); - self.room = name.clone(); - self.addr.send(server::Join{id: self.id, name: name.clone()}); - let _ = self.framed.send(ChatResponse::Joined(name)); - }, - ChatRequest::Message(message) => { - // send message to chat server - println!("Peer message: {}", message); - self.addr.send( - server::Message{id: self.id, - msg: message, room: - self.room.clone()}) - } - // we update heartbeat time on ping from peer - ChatRequest::Ping => - self.hb = Instant::now(), + } + actix::fut::ok(()) + }).wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list of rooms back + }, + ChatRequest::Join(name) => { + println!("Join to room: {}", name); + self.room = name.clone(); + self.addr.send(server::Join{id: self.id, name: name.clone()}); + let _ = self.framed.send(ChatResponse::Joined(name)); + }, + ChatRequest::Message(message) => { + // send message to chat server + println!("Peer message: {}", message); + self.addr.send( + server::Message{id: self.id, + msg: message, room: + self.room.clone()}) } + // we update heartbeat time on ping from peer + ChatRequest::Ping => + self.hb = Instant::now(), } } } @@ -121,7 +118,7 @@ impl Handler for ChatSession { impl ChatSession { pub fn new(addr: SyncAddress, - framed: FramedCell) -> ChatSession { + framed: FramedWriter) -> ChatSession { ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned(), framed: framed} } @@ -194,8 +191,10 @@ impl Handler for TcpServer { // For each incoming connection we create `ChatSession` actor // with out chat server address. let server = self.chat.clone(); - let _: () = ChatSession::create_with(msg.0.framed(ChatCodec), |_, framed| { - ChatSession::new(server, framed) + let _: () = ChatSession::create(|mut ctx| { + let (reader, writer) = FramedReader::wrap(msg.0.framed(ChatCodec)); + ChatSession::add_stream(reader, &mut ctx); + ChatSession::new(server, writer) }); } }