diff --git a/Cargo.toml b/Cargo.toml index 639c61960..a527e263b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ tokio-io = "0.1" tokio-proto = "0.1" [dependencies.actix] -# path = "../actix" +#path = "../actix" git = "https://github.com/fafhrd91/actix.git" #version = "0.2" default-features = false diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs index ea983aac4..21d890434 100644 --- a/examples/multipart/src/main.rs +++ b/examples/multipart/src/main.rs @@ -21,7 +21,7 @@ impl Route for MyRoute { // get Multipart stream WrapStream::::actstream(req.multipart(payload)?) .and_then(|item, act, ctx| { - // Multipart stream is a string of Fields and nested Multiparts + // Multipart stream is a stream of Fields and nested Multiparts match item { multipart::MultipartItem::Field(field) => { println!("==== FIELD ==== {:?}", field); diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml new file mode 100644 index 000000000..3c4de88d2 --- /dev/null +++ b/examples/websocket/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "websocket-example" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[[bin]] +name = "websocket" +path = "src/main.rs" + +[[bin]] +name = "client" +path = "src/client.rs" + +[dependencies] +rand = "*" +bytes = "0.4" +byteorder = "1.1" +futures = "0.1" +tokio-io = "0.1" +tokio-core = "0.1" + +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +actix = { git = "https://github.com/fafhrd91/actix.git" } +# actix = { path = "../../../actix" } +actix-web = { path = "../../" } diff --git a/examples/websocket/client.py b/examples/websocket/client.py new file mode 100644 index 000000000..35f97c1a6 --- /dev/null +++ b/examples/websocket/client.py @@ -0,0 +1,32 @@ +import asyncio +import aiohttp + + +def req1(): + with aiohttp.MultipartWriter() as writer: + writer.append('test') + writer.append_json({'passed': True}) + + resp = yield from aiohttp.request( + "post", 'http://localhost:8080/multipart', + data=writer, headers=writer.headers) + print(resp) + assert 200 == resp.status + + +def req2(): + with aiohttp.MultipartWriter() as writer: + writer.append('test') + writer.append_json({'passed': True}) + writer.append(open('src/main.rs')) + + resp = yield from aiohttp.request( + "post", 'http://localhost:8080/multipart', + data=writer, headers=writer.headers) + print(resp) + assert 200 == resp.status + + +loop = asyncio.get_event_loop() +loop.run_until_complete(req1()) +loop.run_until_complete(req2()) diff --git a/examples/websocket/src/client.rs b/examples/websocket/src/client.rs new file mode 100644 index 000000000..0b96a1e2d --- /dev/null +++ b/examples/websocket/src/client.rs @@ -0,0 +1,166 @@ +extern crate actix; +extern crate bytes; +extern crate byteorder; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate serde; +extern crate serde_json; +#[macro_use] extern crate serde_derive; + +use std::{io, net, process, thread}; +use std::str::FromStr; +use std::time::Duration; +use futures::Future; +use tokio_core::net::TcpStream; +use actix::prelude::*; + +mod codec; + + +fn main() { + let sys = actix::System::new("chat-client"); + + // Connect to server + let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); + Arbiter::handle().spawn( + TcpStream::connect(&addr, Arbiter::handle()) + .and_then(|stream| { + let addr: SyncAddress<_> = ChatClient.framed(stream, codec::ClientChatCodec); + + // start console loop + thread::spawn(move|| { + loop { + let mut cmd = String::new(); + if io::stdin().read_line(&mut cmd).is_err() { + println!("error"); + return + } + + addr.send(ClientCommand(cmd)); + } + }); + + futures::future::ok(()) + }) + .map_err(|e| { + println!("Can not connect to server: {}", e); + process::exit(1) + }) + ); + + println!("Running chat client"); + sys.run(); +} + + +struct ChatClient; + +struct ClientCommand(String); + +impl Actor for ChatClient { + type Context = FramedContext; + + fn started(&mut self, ctx: &mut FramedContext) { + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } +} + +impl ChatClient { + fn hb(&self, ctx: &mut FramedContext) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + if ctx.send(codec::ChatRequest::Ping).is_ok() { + act.hb(ctx); + } + }); + } +} + +/// Handle stdin commands +impl Handler for ChatClient +{ + fn handle(&mut self, msg: ClientCommand, ctx: &mut FramedContext) + -> Response + { + let m = msg.0.trim(); + if m.is_empty() { + return Self::empty() + } + + // we check for /sss type of messages + if m.starts_with('/') { + let v: Vec<&str> = m.splitn(2, ' ').collect(); + match v[0] { + "/list" => { + let _ = ctx.send(codec::ChatRequest::List); + }, + "/join" => { + if v.len() == 2 { + let _ = ctx.send(codec::ChatRequest::Join(v[1].to_owned())); + } else { + println!("!!! room name is required"); + } + }, + _ => println!("!!! unknown command"), + } + } else { + let _ = ctx.send(codec::ChatRequest::Message(m.to_owned())); + } + + Self::empty() + } +} + +impl ResponseType for ChatClient { + type Item = (); + type Error = (); +} + +/// Server communication + +impl FramedActor for ChatClient { + type Io = TcpStream; + type Codec = codec::ClientChatCodec; +} + +impl StreamHandler for ChatClient { + + fn finished(&mut self, _: &mut FramedContext) { + println!("Disconnected"); + + // Stop application on disconnect + Arbiter::system().send(msgs::SystemExit(0)); + } +} + +impl ResponseType for ChatClient { + type Item = (); + type Error = (); +} + +impl Handler for ChatClient { + + fn handle(&mut self, msg: codec::ChatResponse, _: &mut FramedContext) + -> Response + { + match msg { + codec::ChatResponse::Message(ref msg) => { + println!("message: {}", msg); + } + codec::ChatResponse::Joined(ref msg) => { + println!("!!! joined: {}", msg); + } + codec::ChatResponse::Rooms(rooms) => { + println!("\n!!! Available rooms:"); + for room in rooms { + println!("{}", room); + } + println!(""); + } + _ => (), + } + + Self::empty() + } +} diff --git a/examples/websocket/src/codec.rs b/examples/websocket/src/codec.rs new file mode 100644 index 000000000..47e02a376 --- /dev/null +++ b/examples/websocket/src/codec.rs @@ -0,0 +1,125 @@ +#![allow(dead_code)] +use std::io; +use serde_json as json; +use byteorder::{BigEndian , ByteOrder}; +use bytes::{BytesMut, BufMut}; +use tokio_io::codec::{Encoder, Decoder}; + + +/// Client request +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag="cmd", content="data")] +pub enum ChatRequest { + /// List rooms + List, + /// Join rooms + Join(String), + /// Send message + Message(String), + /// Ping + Ping +} + +/// Server response +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag="cmd", content="data")] +pub enum ChatResponse { + Ping, + + /// List of rooms + Rooms(Vec), + + /// Joined + Joined(String), + + /// Message + Message(String), +} + + +/// Codec for Client -> Server transport +pub struct ChatCodec; + +impl Decoder for ChatCodec +{ + type Item = ChatRequest; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let size = { + if src.len() < 2 { + return Ok(None) + } + BigEndian::read_u16(src.as_ref()) as usize + }; + + if src.len() >= size + 2 { + src.split_to(2); + let buf = src.split_to(size); + Ok(Some(json::from_slice::(&buf)?)) + } else { + Ok(None) + } + } +} + +impl Encoder for ChatCodec +{ + type Item = ChatResponse; + type Error = io::Error; + + fn encode(&mut self, msg: ChatResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + let msg = json::to_string(&msg).unwrap(); + let msg_ref: &[u8] = msg.as_ref(); + + dst.reserve(msg_ref.len() + 2); + dst.put_u16::(msg_ref.len() as u16); + dst.put(msg_ref); + + Ok(()) + } +} + + +/// Codec for Server -> Client transport +pub struct ClientChatCodec; + +impl Decoder for ClientChatCodec +{ + type Item = ChatResponse; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let size = { + if src.len() < 2 { + return Ok(None) + } + BigEndian::read_u16(src.as_ref()) as usize + }; + + if src.len() >= size + 2 { + src.split_to(2); + let buf = src.split_to(size); + Ok(Some(json::from_slice::(&buf)?)) + } else { + Ok(None) + } + } +} + +impl Encoder for ClientChatCodec +{ + type Item = ChatRequest; + type Error = io::Error; + + fn encode(&mut self, msg: ChatRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + let msg = json::to_string(&msg).unwrap(); + let msg_ref: &[u8] = msg.as_ref(); + + dst.reserve(msg_ref.len() + 2); + dst.put_u16::(msg_ref.len() as u16); + dst.put(msg_ref); + + Ok(()) + } +} diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs index 1bf366010..c85454a92 100644 --- a/examples/websocket/src/main.rs +++ b/examples/websocket/src/main.rs @@ -1,69 +1,64 @@ -// #![feature(try_trait)] -#![allow(dead_code, unused_variables)] +#![allow(unused_variables)] +extern crate rand; +extern crate bytes; +extern crate byteorder; +extern crate tokio_io; +extern crate tokio_core; +extern crate serde; +extern crate serde_json; +#[macro_use] extern crate serde_derive; + extern crate actix; extern crate actix_web; -extern crate tokio_core; -extern crate env_logger; + +use std::time::Instant; use actix::*; use actix_web::*; -struct MyRoute {req: Option} +mod codec; +mod server; +mod session; -impl Actor for MyRoute { + +/// This is our websocket route state, this state is shared with all route instances +/// via `HttpContext::state()` +struct WsChatSessionState { + addr: SyncAddress, +} + +struct WsChatSession { + /// unique session id + id: usize, + /// Client must send ping at least once per 10 seconds, otherwise we drop connection. + hb: Instant, + /// joined room + room: String, + /// peer name + name: Option, +} + +impl Actor for WsChatSession { type Context = HttpContext; } -impl Route for MyRoute { - type State = (); - - fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { - println!("PARAMS: {:?} {:?}", req.match_info().get("name"), req.match_info()); - if !payload.eof() { - ctx.add_stream(payload); - Reply::stream(MyRoute{req: Some(req)}) - } else { - Reply::reply(httpcodes::HTTPOk) - } - } -} - -impl ResponseType for MyRoute { - type Item = (); - type Error = (); -} - -impl StreamHandler for MyRoute {} - -impl Handler for MyRoute { - fn handle(&mut self, msg: PayloadItem, ctx: &mut HttpContext) - -> Response - { - println!("CHUNK: {:?}", msg); - if let Some(req) = self.req.take() { - ctx.start(httpcodes::HTTPOk); - ctx.write_eof(); - } - Self::empty() - } -} - -struct MyWS {} - -impl Actor for MyWS { - type Context = HttpContext; -} - -impl Route for MyWS { - type State = (); +/// Entry point for our route +impl Route for WsChatSession { + type State = WsChatSessionState; fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { + // websocket handshakre, it may fail if request is not websocket request match ws::handshake(&req) { Ok(resp) => { ctx.start(resp); ctx.add_stream(ws::WsStream::new(payload)); - Reply::stream(MyWS{}) + Reply::async( + WsChatSession { + id: 0, + hb: Instant::now(), + room: "Main".to_owned(), + name: None}) } Err(err) => { Reply::reply(err) @@ -72,22 +67,92 @@ impl Route for MyWS { } } -impl ResponseType for MyWS { +/// Handle messages from chat server, we simply send it to peer websocket +impl Handler for WsChatSession { + fn handle(&mut self, msg: session::Message, ctx: &mut HttpContext) + -> Response + { + ws::WsWriter::text(ctx, &msg.0); + Self::empty() + } +} + +impl ResponseType for WsChatSession { type Item = (); type Error = (); } -impl StreamHandler for MyWS {} - -impl Handler for MyWS { +/// WebSocket message handler +impl Handler for WsChatSession { fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext) -> Response { - println!("WS: {:?}", msg); + println!("WEBSOCKET MESSAGE: {:?}", msg); match msg { - ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, msg), - ws::Message::Text(text) => ws::WsWriter::text(ctx, text), - ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin), + ws::Message::Ping(msg) => + ws::WsWriter::pong(ctx, msg), + ws::Message::Pong(msg) => + self.hb = Instant::now(), + ws::Message::Text(text) => { + let m = text.trim(); + // we check for /sss type of messages + if m.starts_with('/') { + let v: Vec<&str> = m.splitn(2, ' ').collect(); + match v[0] { + "/list" => { + // Send ListRooms message to chat server and wait for response + println!("List rooms"); + ctx.state().addr.call(self, server::ListRooms).then(|res, _, ctx| { + match res { + Ok(Ok(rooms)) => { + for room in rooms { + ws::WsWriter::text(ctx, &room); + } + }, + _ => println!("Something is wrong"), + } + fut::ok(()) + }).wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list + // of rooms back + }, + "/join" => { + if v.len() == 2 { + self.room = v[1].to_owned(); + ctx.state().addr.send( + server::Join{id: self.id, name: self.room.clone()}); + + ws::WsWriter::text(ctx, "joined"); + } else { + ws::WsWriter::text(ctx, "!!! room name is required"); + } + }, + "/name" => { + if v.len() == 2 { + self.name = Some(v[1].to_owned()); + } else { + ws::WsWriter::text(ctx, "!!! name is required"); + } + }, + _ => ws::WsWriter::text( + ctx, &format!("!!! unknown command: {:?}", m)), + } + } else { + let msg = if let Some(ref name) = self.name { + format!("{}: {}", name, m) + } else { + m.to_owned() + }; + // send message to chat server + ctx.state().addr.send( + server::Message{id: self.id, + msg: msg, + room: self.room.clone()}) + } + }, + ws::Message::Binary(bin) => + println!("Unexpected binary"), ws::Message::Closed | ws::Message::Error => { ctx.stop(); } @@ -97,30 +162,71 @@ impl Handler for MyWS { } } +impl StreamHandler for WsChatSession +{ + /// Method is called when stream get polled first time. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // HttpContext::state() is instance of WsChatSessionState, state is shared across all + // routes within application + let subs = ctx.sync_subscriber(); + ctx.state().addr.call( + self, server::Connect{addr: subs}).then( + |res, act, ctx| { + match res { + Ok(Ok(res)) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ok(()) + }).wait(ctx); + } + + /// Method is called when stream finishes, even if stream finishes with error. + fn finished(&mut self, ctx: &mut Self::Context) { + // notify chat server + ctx.state().addr.send(server::Disconnect{id: self.id}); + ctx.stop() + } +} + +impl ResponseType for WsChatSession { + type Item = (); + type Error = (); +} + fn main() { - let _ = env_logger::init(); + let sys = actix::System::new("websocket-example"); - let sys = actix::System::new("http-example"); + // Start chat server actor + let server: SyncAddress<_> = server::ChatServer::default().start(); + // Start tcp server + session::TcpServer::new("127.0.0.1:12345", server.clone()); + + // Websocket sessions state + let state = WsChatSessionState { addr: server }; + + // Create Http server with websocket support HttpServer::new( RoutingMap::default() - .app("/blah", Application::default() - .resource("/test/{name}", |r| { - r.get::(); - r.post::(); - }) - .route_handler("/static", StaticFiles::new(".", true)) + .app("/", Application::builder(state) + // redirect to websocket.html + .resource("/", |r| + r.handler(Method::GET, |req, payload, state| { + httpcodes::HTTPOk + })) + // websocket + .resource("/ws/", |r| r.get::()) + // static resources + .route_handler("/static", StaticFiles::new("static/", true)) .finish()) - .resource("/test", |r| r.post::()) - .resource("/ws/", |r| r.get::()) - .resource("/simple/", |r| - r.handler(Method::GET, |req, payload, state| { - httpcodes::HTTPOk - })) .finish()) - .serve::<_, ()>("127.0.0.1:9080").unwrap(); + .serve::<_, ()>("127.0.0.1:8080").unwrap(); - println!("starting"); let _ = sys.run(); } diff --git a/examples/websocket/src/server.rs b/examples/websocket/src/server.rs new file mode 100644 index 000000000..e20735f1e --- /dev/null +++ b/examples/websocket/src/server.rs @@ -0,0 +1,218 @@ +//! `ChatServer` is an actor. It maintains list of connection client session. +//! And manages available rooms. Peers send messages to other peers in same +//! room through `ChatServer`. + +use std::cell::RefCell; +use std::collections::{HashMap, HashSet}; +use rand::{self, Rng, ThreadRng}; +use actix::prelude::*; + +use session; + +/// Message for chat server communications + +/// New chat session is created +pub struct Connect { + pub addr: Box + Send>, +} + +/// Session is disconnected +pub struct Disconnect { + pub id: usize, +} + +/// Send message to specific room +pub struct Message { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, + /// Room name + pub room: String, +} + +/// List of available rooms +pub struct ListRooms; + +/// Join room, if room does not exists create new one. +pub struct Join { + /// Client id + pub id: usize, + /// Room name + pub name: String, +} + +/// `ChatServer` manages chat rooms and responsible for coordinating chat session. +/// implementation is super primitive +pub struct ChatServer { + sessions: HashMap + Send>>, + rooms: HashMap>, + rng: RefCell, +} + +impl Default for ChatServer { + fn default() -> ChatServer { + // default room + let mut rooms = HashMap::new(); + rooms.insert("Main".to_owned(), HashSet::new()); + + ChatServer { + sessions: HashMap::new(), + rooms: rooms, + rng: RefCell::new(rand::thread_rng()), + } + } +} + +impl ChatServer { + /// Send message to all users in the room + fn send_message(&self, room: &str, message: &str, skip_id: usize) { + if let Some(sessions) = self.rooms.get(room) { + for id in sessions { + if *id != skip_id { + if let Some(addr) = self.sessions.get(id) { + let _ = addr.send(session::Message(message.to_owned())); + } + } + } + } + } +} + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context; +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler for ChatServer { + + fn handle(&mut self, msg: Connect, _: &mut Context) -> Response { + println!("Someone joined"); + + // notify all users in same room + self.send_message(&"Main".to_owned(), "Someone joined", 0); + + // register session with random id + let id = self.rng.borrow_mut().gen::(); + self.sessions.insert(id, msg.addr); + + // auto join session to Main room + self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); + + // send id back + Self::reply(id) + } +} + +impl ResponseType for ChatServer { + /// Response type for Connect message + /// + /// Chat server returns unique session id + type Item = usize; + type Error = (); +} + + +/// Handler for Disconnect message. +impl Handler for ChatServer { + + fn handle(&mut self, msg: Disconnect, _: &mut Context) -> Response { + println!("Someone disconnected"); + + let mut rooms: Vec = Vec::new(); + + // remove address + if self.sessions.remove(&msg.id).is_some() { + // remove session from all rooms + for (name, sessions) in &mut self.rooms { + if sessions.remove(&msg.id) { + rooms.push(name.to_owned()); + } + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + + Self::empty() + } +} + +impl ResponseType for ChatServer { + type Item = (); + type Error = (); +} + +/// Handler for Message message. +impl Handler for ChatServer { + + fn handle(&mut self, msg: Message, _: &mut Context) -> Response { + self.send_message(&msg.room, msg.msg.as_str(), msg.id); + + Self::empty() + } +} + +impl ResponseType for ChatServer { + type Item = (); + type Error = (); +} + +/// Handler for `ListRooms` message. +impl Handler for ChatServer { + + fn handle(&mut self, _: ListRooms, _: &mut Context) -> Response { + let mut rooms = Vec::new(); + + for key in self.rooms.keys() { + rooms.push(key.to_owned()) + } + + Self::reply(rooms) + } +} + +impl ResponseType for ChatServer { + type Item = Vec; + type Error = (); +} + +/// Join room, send disconnect message to old room +/// send join message to new room +impl Handler for ChatServer { + + fn handle(&mut self, msg: Join, _: &mut Context) -> Response { + let Join {id, name} = msg; + let mut rooms = Vec::new(); + + // remove session from all rooms + for (n, sessions) in &mut self.rooms { + if sessions.remove(&id) { + rooms.push(n.to_owned()); + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + + if self.rooms.get_mut(&name).is_none() { + self.rooms.insert(name.clone(), HashSet::new()); + } + self.send_message(&name, "Someone connected", id); + self.rooms.get_mut(&name).unwrap().insert(id); + + Self::empty() + } +} + +impl ResponseType for ChatServer { + type Item = (); + type Error = (); +} diff --git a/examples/websocket/src/session.rs b/examples/websocket/src/session.rs new file mode 100644 index 000000000..41cf1ea4c --- /dev/null +++ b/examples/websocket/src/session.rs @@ -0,0 +1,231 @@ +//! `ClientSession` is an actor, it manages peer tcp connection and +//! proxies commands from peer to `ChatServer`. +use std::{io, net}; +use std::str::FromStr; +use std::time::{Instant, Duration}; +use tokio_core::net::{TcpStream, TcpListener}; + +use actix::*; + +use server::{self, ChatServer}; +use codec::{ChatRequest, ChatResponse, ChatCodec}; + + +/// Chat server sends this messages to session +pub struct Message(pub String); + + +/// `ChatSession` actor is responsible for tcp peer communitions. +pub struct ChatSession { + /// unique session id + id: usize, + /// this is address of chat server + addr: SyncAddress, + /// Client must send ping at least once per 10 seconds, otherwise we drop connection. + hb: Instant, + /// joined room + room: String, +} + +impl Actor for ChatSession { + /// For tcp communication we are going to use `FramedContext`. + /// It is convinient wrapper around `Framed` object from `tokio_io` + type Context = FramedContext; +} + +/// To use `FramedContext` we have to define Io type and Codec +impl FramedActor for ChatSession { + type Io = TcpStream; + type Codec= ChatCodec; +} + +/// Also `FramedContext` requires Actor which is able to handle stream +/// of `::Item` items. +impl StreamHandler for ChatSession { + + fn started(&mut self, ctx: &mut FramedContext) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + self.addr.call(self, server::Connect{addr: ctx.sync_subscriber()}).then(|res, act, ctx| { + match res { + Ok(Ok(res)) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + fut::ok(()) + }).wait(ctx); + } + + fn finished(&mut self, ctx: &mut FramedContext) { + // notify chat server + self.addr.send(server::Disconnect{id: self.id}); + + ctx.stop() + } +} + +impl ResponseType for ChatSession { + type Item = (); + type Error = (); +} + +impl Handler for ChatSession { + + /// We'll stop chat session actor on any error, high likely it is just + /// termination of the tcp stream. + fn error(&mut self, _: io::Error, ctx: &mut FramedContext) { + ctx.stop() + } + + /// This is main event loop for client requests + fn handle(&mut self, msg: ChatRequest, ctx: &mut FramedContext) + -> Response + { + match msg { + ChatRequest::List => { + // Send ListRooms message to chat server and wait for response + println!("List rooms"); + self.addr.call(self, server::ListRooms).then(|res, _, ctx| { + match res { + Ok(Ok(rooms)) => { + let _ = ctx.send(ChatResponse::Rooms(rooms)); + }, + _ => println!("Something is wrong"), + } + fut::ok(()) + }).wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list of rooms back + }, + ChatRequest::Join(name) => { + println!("Join to room: {}", name); + self.room = name.clone(); + self.addr.send(server::Join{id: self.id, name: name.clone()}); + let _ = ctx.send(ChatResponse::Joined(name)); + }, + ChatRequest::Message(message) => { + // send message to chat server + println!("Peer message: {}", message); + self.addr.send( + server::Message{id: self.id, + msg: message, room: + self.room.clone()}) + } + // we update heartbeat time on ping from peer + ChatRequest::Ping => + self.hb = Instant::now(), + } + + Self::empty() + } +} + +/// Handler for Message, chat server sends this message, we just send string to peer +impl Handler for ChatSession { + + fn handle(&mut self, msg: Message, ctx: &mut FramedContext) + -> Response + { + // send message to peer + let _ = ctx.send(ChatResponse::Message(msg.0)); + + Self::empty() + } +} + +impl ResponseType for ChatSession { + type Item = (); + type Error = (); +} + + +/// Helper methods +impl ChatSession { + + pub fn new(addr: SyncAddress) -> ChatSession { + ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned()} + } + + /// helper method that sends ping to client every second. + /// + /// also this method check heartbeats from client + fn hb(&self, ctx: &mut FramedContext) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { + // heartbeat timed out + println!("Client heartbeat failed, disconnecting!"); + + // notify chat server + act.addr.send(server::Disconnect{id: act.id}); + + // stop actor + ctx.stop(); + } + + if ctx.send(ChatResponse::Ping).is_ok() { + // if we can not send message to sink, sink is closed (disconnected) + act.hb(ctx); + } + }); + } +} + + +/// Define tcp server that will accept incomint tcp connection and create +/// chat actors. +pub struct TcpServer { + chat: SyncAddress, +} + +impl TcpServer { + pub fn new(s: &str, chat: SyncAddress) { + // Create server listener + let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); + let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); + + // Our chat server `Server` is an actor, first we need to start it + // and then add stream on incoming tcp connections to it. + // TcpListener::incoming() returns stream of the (TcpStream, net::SocketAddr) items + // So to be able to handle this events `Server` actor has to implement + // stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>` + let _: () = TcpServer::create(|ctx| { + ctx.add_stream(listener.incoming()); + TcpServer{chat: chat} + }); + } +} + +/// Make actor from `Server` +impl Actor for TcpServer { + /// Every actor has to provide execution `Context` in which it can run. + type Context = Context; +} + +/// Handle stream of TcpStream's +impl StreamHandler<(TcpStream, net::SocketAddr), io::Error> for TcpServer {} + +impl ResponseType<(TcpStream, net::SocketAddr)> for TcpServer { + type Item = (); + type Error = (); +} + +impl Handler<(TcpStream, net::SocketAddr), io::Error> for TcpServer { + + fn handle(&mut self, msg: (TcpStream, net::SocketAddr), _: &mut Context) + -> Response + { + // For each incoming connection we create `ChatSession` actor + // with out chat server address. + let server = self.chat.clone(); + let _: () = ChatSession::new(server).framed(msg.0, ChatCodec); + + // this is response for message, which is defined by `ResponseType` trait + // in this case we just return unit. + Self::empty() + } +} diff --git a/examples/websocket/static/websocket.html b/examples/websocket/static/websocket.html new file mode 100644 index 000000000..e59e13f12 --- /dev/null +++ b/examples/websocket/static/websocket.html @@ -0,0 +1,90 @@ + + + + + + + + +

Chat!

+
+  | Status: + disconnected +
+
+
+
+ + +
+ + diff --git a/src/context.rs b/src/context.rs index ff3d0e05b..abb2c97d8 100644 --- a/src/context.rs +++ b/src/context.rs @@ -2,11 +2,14 @@ use std; use std::rc::Rc; use std::collections::VecDeque; use futures::{Async, Stream, Poll}; +use futures::sync::oneshot::Sender; use bytes::Bytes; -use actix::{Actor, ActorState, ActorContext, AsyncContext}; +use actix::{Actor, ActorState, ActorContext, AsyncContext, + Handler, Subscriber, ResponseType}; use actix::fut::ActorFuture; -use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle}; +use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle, + Envelope, ToEnvelope, RemoteEnvelope}; use route::{Route, Frame}; use httpresponse::HttpResponse; @@ -118,6 +121,25 @@ impl HttpContext where A: Actor + Route { } } +impl HttpContext where A: Actor + Route { + + #[doc(hidden)] + pub fn subscriber(&mut self) -> Box> + where A: Handler + { + Box::new(self.address.unsync_address()) + } + + #[doc(hidden)] + pub fn sync_subscriber(&mut self) -> Box + Send> + where A: Handler, + A::Item: Send, + A::Error: Send, + { + Box::new(self.address.sync_address()) + } +} + #[doc(hidden)] impl Stream for HttpContext where A: Actor + Route { @@ -149,22 +171,25 @@ impl Stream for HttpContext where A: Actor + Route } // check wait futures - if let Some(ref mut act) = self.act { - if let Ok(Async::NotReady) = self.wait.poll(act, ctx) { - return Ok(Async::NotReady) - } + if self.wait.poll(act, ctx) { + return Ok(Async::NotReady) } let mut prep_stop = false; loop { let mut not_ready = true; - if let Ok(Async::Ready(_)) = self.address.poll(act, ctx) { + if self.address.poll(act, ctx) { not_ready = false } self.items.poll(act, ctx); + // check wait futures + if self.wait.poll(act, ctx) { + return Ok(Async::NotReady) + } + // are we done if !not_ready { continue @@ -213,3 +238,17 @@ impl Stream for HttpContext where A: Actor + Route } } } + +type ToEnvelopeSender = + Sender>::Item, >::Error>>; + +impl ToEnvelope for HttpContext + where M: Send + 'static, + A: Actor> + Route + Handler, + >::Item: Send, >::Item: Send +{ + fn pack(msg: M, tx: Option>) -> Envelope + { + RemoteEnvelope::new(msg, tx).into() + } +} diff --git a/src/staticfiles.rs b/src/staticfiles.rs index 4c6a5673b..2b6ea65b4 100644 --- a/src/staticfiles.rs +++ b/src/staticfiles.rs @@ -77,7 +77,7 @@ impl StaticFiles { let entry = entry.unwrap(); // show file url as relative to static path let file_url = format!( - "{}{}", self.prefix, + "{}/{}", self.prefix, entry.path().strip_prefix(&self.directory).unwrap().to_string_lossy()); // if file is a directory, add '/' to the end of the name diff --git a/src/ws.rs b/src/ws.rs index d141ce336..5560a46b0 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -269,10 +269,10 @@ pub struct WsWriter; impl WsWriter { /// Send text frame - pub fn text(ctx: &mut HttpContext, text: String) + pub fn text(ctx: &mut HttpContext, text: &str) where A: Actor> + Route { - let mut frame = wsframe::Frame::message(Vec::from(text.as_str()), OpCode::Text, true); + let mut frame = wsframe::Frame::message(Vec::from(text), OpCode::Text, true); let mut buf = Vec::new(); frame.format(&mut buf).unwrap();