diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 057036e68..5da1f37f6 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -13,6 +13,8 @@ use std::str::FromStr; use std::time::Duration; use futures::Future; use tokio_io::AsyncRead; +use tokio_io::io::WriteHalf; +use tokio_io::codec::FramedRead; use tokio_core::net::TcpStream; use actix::prelude::*; @@ -27,12 +29,12 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: SyncAddress<_> = ChatClient::create(|mut ctx| { - let (reader, writer) = - FramedReader::wrap(stream.framed(codec::ClientChatCodec)); - ChatClient::add_stream(reader, &mut ctx); - ChatClient{framed: writer} - }); + let addr: SyncAddress<_> = ChatClient::create(|ctx| { + let (r, w) = stream.split(); + ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); + ChatClient{ + framed: actix::io::FramedWrite::new( + w, codec::ClientChatCodec, ctx)}}); // start console loop thread::spawn(move|| { @@ -61,7 +63,7 @@ fn main() { struct ChatClient { - framed: FramedWriter, + framed: actix::io::FramedWrite, codec::ClientChatCodec>, } #[derive(Message)] @@ -88,12 +90,14 @@ impl Actor for ChatClient { impl ChatClient { fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.framed.send(codec::ChatRequest::Ping); + act.framed.write(codec::ChatRequest::Ping); act.hb(ctx); }); } } +impl actix::io::WriteHandler for ChatClient {} + /// Handle stdin commands impl Handler for ChatClient { type Result = (); @@ -109,11 +113,11 @@ impl Handler for ChatClient { let v: Vec<&str> = m.splitn(2, ' ').collect(); match v[0] { "/list" => { - let _ = self.framed.send(codec::ChatRequest::List); + self.framed.write(codec::ChatRequest::List); }, "/join" => { if v.len() == 2 { - let _ = self.framed.send(codec::ChatRequest::Join(v[1].to_owned())); + self.framed.write(codec::ChatRequest::Join(v[1].to_owned())); } else { println!("!!! room name is required"); } @@ -121,14 +125,14 @@ impl Handler for ChatClient { _ => println!("!!! unknown command"), } } else { - let _ = self.framed.send(codec::ChatRequest::Message(m.to_owned())); + self.framed.write(codec::ChatRequest::Message(m.to_owned())); } } } /// Server communication -impl StreamHandler> for ChatClient { +impl StreamHandler for ChatClient { fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context) { match msg { diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 26598c164..5bc799ca7 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -1,10 +1,12 @@ //! `ClientSession` is an actor, it manages peer tcp connection and //! proxies commands from peer to `ChatServer`. -use std::net; +use std::{io, net}; use std::str::FromStr; use std::time::{Instant, Duration}; use futures::Stream; use tokio_io::AsyncRead; +use tokio_io::io::WriteHalf; +use tokio_io::codec::FramedRead; use tokio_core::net::{TcpStream, TcpListener}; use actix::prelude::*; @@ -28,7 +30,7 @@ pub struct ChatSession { /// joined room room: String, /// Framed wrapper - framed: FramedWriter, + framed: actix::io::FramedWrite, ChatCodec>, } impl Actor for ChatSession { @@ -62,8 +64,10 @@ impl Actor for ChatSession { } } +impl actix::io::WriteHandler for ChatSession {} + /// To use `Framed` we have to define Io type and Codec -impl StreamHandler> for ChatSession { +impl StreamHandler for ChatSession { /// This is main event loop for client requests fn handle(&mut self, msg: ChatRequest, ctx: &mut Context) { @@ -74,7 +78,7 @@ impl StreamHandler> for ChatSession { self.addr.call(self, server::ListRooms).then(|res, act, ctx| { match res { Ok(Ok(rooms)) => { - let _ = act.framed.send(ChatResponse::Rooms(rooms)); + act.framed.write(ChatResponse::Rooms(rooms)); }, _ => println!("Something is wrong"), } @@ -87,7 +91,7 @@ impl StreamHandler> for ChatSession { println!("Join to room: {}", name); self.room = name.clone(); self.addr.send(server::Join{id: self.id, name: name.clone()}); - let _ = self.framed.send(ChatResponse::Joined(name)); + self.framed.write(ChatResponse::Joined(name)); }, ChatRequest::Message(message) => { // send message to chat server @@ -110,7 +114,7 @@ impl Handler for ChatSession { fn handle(&mut self, msg: Message, ctx: &mut Context) { // send message to peer - let _ = self.framed.send(ChatResponse::Message(msg.0)); + self.framed.write(ChatResponse::Message(msg.0)); } } @@ -118,7 +122,7 @@ impl Handler for ChatSession { impl ChatSession { pub fn new(addr: SyncAddress, - framed: FramedWriter) -> ChatSession { + framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession { ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned(), framed: framed} } @@ -140,7 +144,7 @@ impl ChatSession { ctx.stop(); } - act.framed.send(ChatResponse::Ping); + act.framed.write(ChatResponse::Ping); // if we can not send message to sink, sink is closed (disconnected) act.hb(ctx); }); @@ -191,10 +195,10 @@ impl Handler for TcpServer { // For each incoming connection we create `ChatSession` actor // with out chat server address. let server = self.chat.clone(); - let _: () = ChatSession::create(|mut ctx| { - let (reader, writer) = FramedReader::wrap(msg.0.framed(ChatCodec)); - ChatSession::add_stream(reader, &mut ctx); - ChatSession::new(server, writer) + let _: () = ChatSession::create(|ctx| { + let (r, w) = msg.0.split(); + ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); + ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx)) }); } }