diff --git a/.travis.yml b/.travis.yml index 53c6eee77..47f612511 100644 --- a/.travis.yml +++ b/.travis.yml @@ -60,7 +60,7 @@ script: cd examples/template_tera && cargo check && cd ../.. cd examples/diesel && cargo check && cd ../.. cd examples/tls && cargo check && cd ../.. - # cd examples/websocket-chat && cargo check && cd ../.. + cd examples/websocket-chat && cargo check && cd ../.. cd examples/websocket && cargo check && cd ../.. fi - | diff --git a/examples/websocket-chat/Cargo.toml b/examples/websocket-chat/Cargo.toml index 0eac954dc..5a9d7d3ac 100644 --- a/examples/websocket-chat/Cargo.toml +++ b/examples/websocket-chat/Cargo.toml @@ -25,5 +25,5 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -actix = "^0.4.2" +actix = { git="https://github.com/actix/actix.git" } actix-web = { path="../../" } diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index c57825fc9..ee9ad1298 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -12,6 +12,7 @@ use std::{io, net, process, thread}; use std::str::FromStr; use std::time::Duration; use futures::Future; +use tokio_io::AsyncRead; use tokio_core::net::TcpStream; use actix::prelude::*; @@ -26,7 +27,9 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: SyncAddress<_> = ChatClient.framed(stream, codec::ClientChatCodec); + let addr: SyncAddress<_> = ChatClient::create_with( + stream.framed(codec::ClientChatCodec), + |_, framed| ChatClient{framed: framed}); // start console loop thread::spawn(move|| { @@ -54,20 +57,22 @@ fn main() { } -struct ChatClient; +struct ChatClient { + framed: FramedCell, +} #[derive(Message)] struct ClientCommand(String); impl Actor for ChatClient { - type Context = FramedContext; + type Context = Context; - fn started(&mut self, ctx: &mut FramedContext) { + fn started(&mut self, ctx: &mut Context) { // start heartbeats otherwise server will disconnect after 10 seconds self.hb(ctx) } - fn stopping(&mut self, _: &mut FramedContext) -> bool { + fn stopping(&mut self, _: &mut Context) -> bool { println!("Disconnected"); // Stop application on disconnect @@ -78,11 +83,10 @@ impl Actor for ChatClient { } impl ChatClient { - fn hb(&self, ctx: &mut FramedContext) { + fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { - if ctx.send(codec::ChatRequest::Ping).is_ok() { - act.hb(ctx); - } + act.framed.send(codec::ChatRequest::Ping); + act.hb(ctx); }); } } @@ -91,7 +95,7 @@ impl ChatClient { impl Handler for ChatClient { type Result = (); - fn handle(&mut self, msg: ClientCommand, ctx: &mut FramedContext) { + fn handle(&mut self, msg: ClientCommand, _: &mut Context) { let m = msg.0.trim(); if m.is_empty() { return @@ -102,11 +106,11 @@ impl Handler for ChatClient { let v: Vec<&str> = m.splitn(2, ' ').collect(); match v[0] { "/list" => { - let _ = ctx.send(codec::ChatRequest::List); + let _ = self.framed.send(codec::ChatRequest::List); }, "/join" => { if v.len() == 2 { - let _ = ctx.send(codec::ChatRequest::Join(v[1].to_owned())); + let _ = self.framed.send(codec::ChatRequest::Join(v[1].to_owned())); } else { println!("!!! room name is required"); } @@ -114,18 +118,16 @@ impl Handler for ChatClient { _ => println!("!!! unknown command"), } } else { - let _ = ctx.send(codec::ChatRequest::Message(m.to_owned())); + let _ = self.framed.send(codec::ChatRequest::Message(m.to_owned())); } } } /// Server communication -impl FramedActor for ChatClient { - type Io = TcpStream; - type Codec = codec::ClientChatCodec; +impl FramedActor for ChatClient { - fn handle(&mut self, msg: io::Result, ctx: &mut FramedContext) { + fn handle(&mut self, msg: io::Result, ctx: &mut Context) { match msg { Err(_) => ctx.stop(), Ok(msg) => match msg { diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 8051e0a76..509192f00 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -62,9 +62,9 @@ impl Actor for WsChatSession { // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared across all // routes within application - let subs = ctx.sync_subscriber(); + let addr: SyncAddress<_> = ctx.address(); ctx.state().addr.call( - self, server::Connect{addr: subs}).then( + self, server::Connect{addr: addr.into_subscriber()}).then( |res, act, ctx| { match res { Ok(Ok(res)) => act.id = res, diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 66062533f..eb3b90e7b 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -4,6 +4,7 @@ use std::{io, net}; use std::str::FromStr; use std::time::{Instant, Duration}; use futures::Stream; +use tokio_io::AsyncRead; use tokio_core::net::{TcpStream, TcpListener}; use actix::prelude::*; @@ -26,12 +27,14 @@ pub struct ChatSession { hb: Instant, /// joined room room: String, + /// Framed wrapper + framed: FramedCell, } impl Actor for ChatSession { /// For tcp communication we are going to use `FramedContext`. /// It is convenient wrapper around `Framed` object from `tokio_io` - type Context = FramedContext; + type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { // we'll start heartbeat process on session start. @@ -41,7 +44,7 @@ impl Actor for ChatSession { // future within context, but context waits until this future resolves // before processing any other events. let addr: SyncAddress<_> = ctx.address(); - self.addr.call(self, server::Connect{addr: addr.subscriber()}) + self.addr.call(self, server::Connect{addr: addr.into_subscriber()}) .then(|res, act, ctx| { match res { Ok(Ok(res)) => act.id = res, @@ -59,23 +62,21 @@ impl Actor for ChatSession { } } -/// To use `FramedContext` we have to define Io type and Codec -impl FramedActor for ChatSession { - type Io = TcpStream; - type Codec= ChatCodec; +/// To use `Framed` we have to define Io type and Codec +impl FramedActor for ChatSession { /// This is main event loop for client requests - fn handle(&mut self, msg: io::Result, ctx: &mut FramedContext) { + fn handle(&mut self, msg: io::Result, ctx: &mut Context) { match msg { Err(_) => ctx.stop(), Ok(msg) => match msg { ChatRequest::List => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - self.addr.call(self, server::ListRooms).then(|res, _, ctx| { + self.addr.call(self, server::ListRooms).then(|res, act, ctx| { match res { Ok(Ok(rooms)) => { - let _ = ctx.send(ChatResponse::Rooms(rooms)); + let _ = act.framed.send(ChatResponse::Rooms(rooms)); }, _ => println!("Something is wrong"), } @@ -88,7 +89,7 @@ impl FramedActor for ChatSession { println!("Join to room: {}", name); self.room = name.clone(); self.addr.send(server::Join{id: self.id, name: name.clone()}); - let _ = ctx.send(ChatResponse::Joined(name)); + let _ = self.framed.send(ChatResponse::Joined(name)); }, ChatRequest::Message(message) => { // send message to chat server @@ -110,23 +111,25 @@ impl FramedActor for ChatSession { impl Handler for ChatSession { type Result = (); - fn handle(&mut self, msg: Message, ctx: &mut FramedContext) { + fn handle(&mut self, msg: Message, ctx: &mut Context) { // send message to peer - let _ = ctx.send(ChatResponse::Message(msg.0)); + let _ = self.framed.send(ChatResponse::Message(msg.0)); } } /// Helper methods impl ChatSession { - pub fn new(addr: SyncAddress) -> ChatSession { - ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned()} + pub fn new(addr: SyncAddress, + framed: FramedCell) -> ChatSession { + ChatSession {id: 0, addr: addr, hb: Instant::now(), + room: "Main".to_owned(), framed: framed} } /// helper method that sends ping to client every second. /// /// also this method check heartbeats from client - fn hb(&self, ctx: &mut FramedContext) { + fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { @@ -140,10 +143,9 @@ impl ChatSession { ctx.stop(); } - if ctx.send(ChatResponse::Ping).is_ok() { - // if we can not send message to sink, sink is closed (disconnected) - act.hb(ctx); - } + act.framed.send(ChatResponse::Ping); + // if we can not send message to sink, sink is closed (disconnected) + act.hb(ctx); }); } } @@ -192,6 +194,8 @@ impl Handler for TcpServer { // For each incoming connection we create `ChatSession` actor // with out chat server address. let server = self.chat.clone(); - let _: () = ChatSession::new(server).framed(msg.0, ChatCodec); + let _: () = ChatSession::create_with(msg.0.framed(ChatCodec), |_, framed| { + ChatSession::new(server, framed) + }); } } diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml index d26e749d6..fd7b683d6 100644 --- a/examples/websocket/Cargo.toml +++ b/examples/websocket/Cargo.toml @@ -15,6 +15,6 @@ path = "src/client.rs" env_logger = "*" futures = "0.1" tokio-core = "0.1" -#actix = "^0.4.2" +#actix = "^0.4.6" actix = { git = "https://github.com/actix/actix.git" } actix-web = { path="../../" } diff --git a/src/server/srv.rs b/src/server/srv.rs index 0e8f64852..46e8c4318 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -477,8 +477,10 @@ impl HttpServer, A, H, U> // start server let signals = self.subscribe_to_signals(); let addr: SyncAddress<_> = HttpServer::create(move |ctx| { - ctx.add_stream(stream.map( - move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); + ctx.add_message_stream( + stream + .map_err(|_| ()) + .map(move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); self }); signals.map(|signals| signals.send( @@ -542,6 +544,22 @@ impl Handler>> for HttpServer } } +impl Handler> for HttpServer + where T: IoStream, + H: HttpHandler + 'static, + U: IntoIterator + 'static, + V: IntoHttpHandler, + A: 'static, +{ + type Result = (); + + fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { + Arbiter::handle().spawn( + HttpChannel::new( + Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)); + } +} + impl Handler for HttpServer where T: IoStream, H: HttpHandler + 'static,