diff --git a/actix-remote-basics/Cargo.toml b/actix-remote-basics/Cargo.toml new file mode 100644 index 0000000..46181cf --- /dev/null +++ b/actix-remote-basics/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "basics" +version = "0.1.0" +authors = ["Nikolay Kim "] +workspace = "../.." + +[dependencies] +futures = "0.1" +log = "0.4" +env_logger = "0.5" + +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +# cli +structopt = "0.2" +structopt-derive = "0.2" + +actix = "0.5" +actix-remote = "0.0.1" \ No newline at end of file diff --git a/actix-remote-basics/src/main.rs b/actix-remote-basics/src/main.rs new file mode 100644 index 0000000..b275e67 --- /dev/null +++ b/actix-remote-basics/src/main.rs @@ -0,0 +1,109 @@ +//! Start two `basic` instances +//! 1. cargo run --example basic -- 127.0.0.1:7654 +//! 2. ./target/debug/examples/basic 127.0.0.1:7655 127.0.0.1:7654 +//! +//! first instance sends messages, second instance respondes to messages from first instance +//! +#![allow(dead_code, unused_variables)] + +extern crate log; +extern crate env_logger; +extern crate futures; +#[macro_use] extern crate actix; +extern crate actix_remote; +extern crate serde_json; +#[macro_use] extern crate serde_derive; +extern crate structopt; +#[macro_use] extern crate structopt_derive; + +use std::time::Duration; + +use actix_remote::*; +use actix::prelude::*; +use futures::Future; +use structopt::StructOpt; + +mod msgs; +use msgs::TestMessage; + + +struct MyActor { + cnt: usize, + hb: bool, + recipient: Recipient, +} + +impl MyActor { + fn hb(&self, ctx: &mut Context) { + self.recipient.send(TestMessage{msg: "TEST".to_owned()}) + .and_then(|r| { + println!("REMOTE RESULT: {:?}", r); + Ok(()) + }) + .map_err(|_| ()) + .into_actor(self) + .spawn(ctx); + + ctx.run_later(Duration::from_secs(3), |act, ctx| act.hb(ctx)); + } +} + +impl Actor for MyActor { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + if self.hb { + self.hb(ctx); + } + } +} + +impl Handler for MyActor { + type Result = (); + + fn handle(&mut self, msg: TestMessage, _ctx: &mut Context) { + println!("REMOTE MESSAGE: {:?}", msg); + } +} + +#[derive(StructOpt, Debug)] +struct Cli { + /// Network address + addr: String, + /// Network node address + node: Option, +} + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_remote=debug"); + let _ = env_logger::init(); + + // cmd arguments + let args = Cli::from_args(); + let addr = args.addr.to_lowercase().trim().to_owned(); + let node = args.node.map(|n| n.to_lowercase().trim().to_owned()); + + let sys = actix::System::new("remote-example"); + + // send messages from main instance + let hb = node.is_none(); + + // create world + let mut world = World::new(addr).unwrap().add_node(node); + + // get remote recipient + let recipient = world.get_recipient::(); + + let addr = world.start(); + let a: Addr = MyActor::create(move |ctx| { + ctx.run_later(Duration::from_millis(5000), move |_, ctx| { + // register actor as recipient for `TestMessage` message + World::register_recipient( + &addr, ctx.address::>().recipient()); + }); + + MyActor{cnt: 0, hb, recipient} + }); + + let _ = sys.run(); +} \ No newline at end of file diff --git a/actix-remote-basics/src/msgs.rs b/actix-remote-basics/src/msgs.rs new file mode 100644 index 0000000..f1d2aa0 --- /dev/null +++ b/actix-remote-basics/src/msgs.rs @@ -0,0 +1,13 @@ +use actix_remote::*; + + +#[derive(Debug, Message, Serialize, Deserialize)] +pub struct TestMessage { + pub msg: String, +} + +impl RemoteMessage for TestMessage { + fn type_id() -> &'static str { + "TestMessage" + } +} \ No newline at end of file diff --git a/actix/Cargo.toml b/actix/Cargo.toml new file mode 100644 index 0000000..41a4aba --- /dev/null +++ b/actix/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "chat" +version = "0.1.0" +authors = ["Nikolay Kim "] + +[[bin]] +name = "server" +path = "src/main.rs" + +[[bin]] +name = "client" +path = "src/client.rs" + +[dependencies] +rand = "0.3" +bytes = "0.4" +byteorder = "1.1" +futures = "0.1" +tokio-io = "0.1" +tokio-core = "0.1" + +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" + +actix = "^0.5" \ No newline at end of file diff --git a/actix/README.md b/actix/README.md new file mode 100644 index 0000000..11e6c8a --- /dev/null +++ b/actix/README.md @@ -0,0 +1,20 @@ +# Chat example + + +## Server + +Chat server listens for incoming tcp connections. Server can access several types of message: + + * `/list` - list all available rooms + * `/join name` - join room, if room does not exist, create new one + * `some message` - just string, send message to all peers in same room + * client has to send heartbeat `Ping` messages, if server does not receive a heartbeat + message for 10 seconds connection gets dropped + +To start server use command: `cargo run --bin server` + +## Client + +Client connects to server. Reads input from stdin and sends to server. + +To run client use command: `cargo run --bin client` diff --git a/actix/src/client.rs b/actix/src/client.rs new file mode 100644 index 0000000..3b3f612 --- /dev/null +++ b/actix/src/client.rs @@ -0,0 +1,151 @@ +#[macro_use] extern crate actix; +extern crate bytes; +extern crate byteorder; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate serde; +extern crate serde_json; +#[macro_use] extern crate serde_derive; + +use std::{io, net, process, thread}; +use std::str::FromStr; +use std::time::Duration; +use futures::Future; +use tokio_io::AsyncRead; +use tokio_io::io::WriteHalf; +use tokio_io::codec::FramedRead; +use tokio_core::net::TcpStream; +use actix::prelude::*; + +mod codec; + + +fn main() { + let sys = actix::System::new("chat-client"); + + // Connect to server + let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); + Arbiter::handle().spawn( + TcpStream::connect(&addr, Arbiter::handle()) + .and_then(|stream| { + let addr: Addr = ChatClient::create(|ctx| { + let (r, w) = stream.split(); + ctx.add_stream(FramedRead::new(r, codec::ClientChatCodec)); + ChatClient{framed: actix::io::FramedWrite::new( + w, codec::ClientChatCodec, ctx)}}); + + // start console loop + thread::spawn(move|| { + loop { + let mut cmd = String::new(); + if io::stdin().read_line(&mut cmd).is_err() { + println!("error"); + return + } + + addr.do_send(ClientCommand(cmd)); + } + }); + + futures::future::ok(()) + }) + .map_err(|e| { + println!("Can not connect to server: {}", e); + process::exit(1) + }) + ); + + println!("Running chat client"); + sys.run(); +} + + +struct ChatClient { + framed: actix::io::FramedWrite, codec::ClientChatCodec>, +} + +#[derive(Message)] +struct ClientCommand(String); + +impl Actor for ChatClient { + type Context = Context; + + fn started(&mut self, ctx: &mut Context) { + // start heartbeats otherwise server will disconnect after 10 seconds + self.hb(ctx) + } + + fn stopping(&mut self, _: &mut Context) -> Running { + println!("Disconnected"); + + // Stop application on disconnect + Arbiter::system().do_send(actix::msgs::SystemExit(0)); + + Running::Stop + } +} + +impl ChatClient { + fn hb(&self, ctx: &mut Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + act.framed.write(codec::ChatRequest::Ping); + act.hb(ctx); + }); + } +} + +impl actix::io::WriteHandler for ChatClient {} + +/// Handle stdin commands +impl Handler for ChatClient +{ + type Result = (); + + fn handle(&mut self, msg: ClientCommand, _: &mut Context) { + let m = msg.0.trim(); + + // we check for /sss type of messages + if m.starts_with('/') { + let v: Vec<&str> = m.splitn(2, ' ').collect(); + match v[0] { + "/list" => { + self.framed.write(codec::ChatRequest::List); + }, + "/join" => { + if v.len() == 2 { + self.framed.write(codec::ChatRequest::Join(v[1].to_owned())); + } else { + println!("!!! room name is required"); + } + }, + _ => println!("!!! unknown command"), + } + } else { + self.framed.write(codec::ChatRequest::Message(m.to_owned())); + } + } +} + +/// Server communication +impl StreamHandler for ChatClient { + + fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context) { + match msg { + codec::ChatResponse::Message(ref msg) => { + println!("message: {}", msg); + } + codec::ChatResponse::Joined(ref msg) => { + println!("!!! joined: {}", msg); + } + codec::ChatResponse::Rooms(rooms) => { + println!("\n!!! Available rooms:"); + for room in rooms { + println!("{}", room); + } + println!(); + } + _ => (), + } + } +} \ No newline at end of file diff --git a/actix/src/codec.rs b/actix/src/codec.rs new file mode 100644 index 0000000..76d55b1 --- /dev/null +++ b/actix/src/codec.rs @@ -0,0 +1,124 @@ +#![allow(dead_code)] +use std::io; +use serde_json as json; +use byteorder::{BigEndian , ByteOrder}; +use bytes::{BytesMut, BufMut}; +use tokio_io::codec::{Encoder, Decoder}; + + +/// Client request +#[derive(Serialize, Deserialize, Debug, Message)] +#[serde(tag="cmd", content="data")] +pub enum ChatRequest { + /// List rooms + List, + /// Join rooms + Join(String), + /// Send message + Message(String), + /// Ping + Ping +} + +/// Server response +#[derive(Serialize, Deserialize, Debug, Message)] +#[serde(tag="cmd", content="data")] +pub enum ChatResponse { + Ping, + + /// List of rooms + Rooms(Vec), + + /// Joined + Joined(String), + + /// Message + Message(String), +} + +/// Codec for Client -> Server transport +pub struct ChatCodec; + +impl Decoder for ChatCodec +{ + type Item = ChatRequest; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let size = { + if src.len() < 2 { + return Ok(None) + } + BigEndian::read_u16(src.as_ref()) as usize + }; + + if src.len() >= size + 2 { + src.split_to(2); + let buf = src.split_to(size); + Ok(Some(json::from_slice::(&buf)?)) + } else { + Ok(None) + } + } +} + +impl Encoder for ChatCodec +{ + type Item = ChatResponse; + type Error = io::Error; + + fn encode(&mut self, msg: ChatResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + let msg = json::to_string(&msg).unwrap(); + let msg_ref: &[u8] = msg.as_ref(); + + dst.reserve(msg_ref.len() + 2); + dst.put_u16::(msg_ref.len() as u16); + dst.put(msg_ref); + + Ok(()) + } +} + + +/// Codec for Server -> Client transport +pub struct ClientChatCodec; + +impl Decoder for ClientChatCodec +{ + type Item = ChatResponse; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let size = { + if src.len() < 2 { + return Ok(None) + } + BigEndian::read_u16(src.as_ref()) as usize + }; + + if src.len() >= size + 2 { + src.split_to(2); + let buf = src.split_to(size); + Ok(Some(json::from_slice::(&buf)?)) + } else { + Ok(None) + } + } +} + +impl Encoder for ClientChatCodec +{ + type Item = ChatRequest; + type Error = io::Error; + + fn encode(&mut self, msg: ChatRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { + let msg = json::to_string(&msg).unwrap(); + let msg_ref: &[u8] = msg.as_ref(); + + dst.reserve(msg_ref.len() + 2); + dst.put_u16::(msg_ref.len() as u16); + dst.put(msg_ref); + + Ok(()) + } +} \ No newline at end of file diff --git a/actix/src/main.rs b/actix/src/main.rs new file mode 100644 index 0000000..46dec34 --- /dev/null +++ b/actix/src/main.rs @@ -0,0 +1,89 @@ +#![cfg_attr(feature="cargo-clippy", allow(let_unit_value))] +extern crate rand; +extern crate bytes; +extern crate byteorder; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate serde; +extern crate serde_json; +#[macro_use] extern crate serde_derive; + +#[macro_use] extern crate actix; + +use std::net; +use std::str::FromStr; +use futures::Stream; +use tokio_io::AsyncRead; +use tokio_io::codec::FramedRead; +use tokio_core::net::{TcpListener, TcpStream}; +use actix::prelude::*; + +mod codec; +mod server; +mod session; + +use codec::ChatCodec; +use server::ChatServer; +use session::ChatSession; + + +/// Define tcp server that will accept incoming tcp connection and create +/// chat actors. +struct Server { + chat: Addr, +} + +/// Make actor from `Server` +impl Actor for Server { + /// Every actor has to provide execution `Context` in which it can run. + type Context = Context; +} + +#[derive(Message)] +struct TcpConnect(pub TcpStream, pub net::SocketAddr); + +/// Handle stream of TcpStream's +impl Handler for Server { + /// this is response for message, which is defined by `ResponseType` trait + /// in this case we just return unit. + type Result = (); + + fn handle(&mut self, msg: TcpConnect, _: &mut Context) { + // For each incoming connection we create `ChatSession` actor + // with out chat server address. + let server = self.chat.clone(); + let _: () = ChatSession::create( + move |ctx| { + let (r, w) = msg.0.split(); + ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); + ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx)) + }); + } +} + + +fn main() { + let sys = actix::System::new("chat-server"); + + // Start chat server actor + let server: Addr = ChatServer::default().start(); + + // Create server listener + let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); + let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); + + // Our chat server `Server` is an actor, first we need to start it + // and then add stream on incoming tcp connections to it. + // TcpListener::incoming() returns stream of the (TcpStream, net::SocketAddr) items + // So to be able to handle this events `Server` actor has to implement + // stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>` + let _: () = Server::create(|ctx| { + ctx.add_message_stream(listener.incoming() + .map_err(|_| ()).map(|(st, addr)| TcpConnect(st, addr))); + Server{chat: server} + }); + + println!("Running chat server on 127.0.0.1:12345"); + sys.run(); +} \ No newline at end of file diff --git a/actix/src/server.rs b/actix/src/server.rs new file mode 100644 index 0000000..b4cfd1d --- /dev/null +++ b/actix/src/server.rs @@ -0,0 +1,203 @@ +//! `ChatServer` is an actor. It maintains list of connection client session. +//! And manages available rooms. Peers send messages to other peers in same +//! room through `ChatServer`. + +use std::cell::RefCell; +use std::collections::{HashMap, HashSet}; +use rand::{self, Rng, ThreadRng}; +use actix::prelude::*; + +use session; + +/// Message for chat server communications + +/// New chat session is created +pub struct Connect { + pub addr: Addr, +} + +/// Response type for Connect message +/// +/// Chat server returns unique session id +impl actix::Message for Connect { + type Result = usize; +} + + +/// Session is disconnected +#[derive(Message)] +pub struct Disconnect { + pub id: usize, +} + +/// Send message to specific room +#[derive(Message)] +pub struct Message { + /// Id of the client session + pub id: usize, + /// Peer message + pub msg: String, + /// Room name + pub room: String, +} + +/// List of available rooms +pub struct ListRooms; + +impl actix::Message for ListRooms { + type Result = Vec; +} + +/// Join room, if room does not exists create new one. +#[derive(Message)] +pub struct Join { + /// Client id + pub id: usize, + /// Room name + pub name: String, +} + +/// `ChatServer` manages chat rooms and responsible for coordinating chat session. +/// implementation is super primitive +pub struct ChatServer { + sessions: HashMap>, + rooms: HashMap>, + rng: RefCell, +} + +impl Default for ChatServer { + fn default() -> ChatServer { + // default room + let mut rooms = HashMap::new(); + rooms.insert("Main".to_owned(), HashSet::new()); + + ChatServer { + sessions: HashMap::new(), + rooms: rooms, + rng: RefCell::new(rand::thread_rng()), + } + } +} + +impl ChatServer { + /// Send message to all users in the room + fn send_message(&self, room: &str, message: &str, skip_id: usize) { + if let Some(sessions) = self.rooms.get(room) { + for id in sessions { + if *id != skip_id { + if let Some(addr) = self.sessions.get(id) { + addr.do_send(session::Message(message.to_owned())) + } + } + } + } + } +} + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context; +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { + println!("Someone joined"); + + // notify all users in same room + self.send_message(&"Main".to_owned(), "Someone joined", 0); + + // register session with random id + let id = self.rng.borrow_mut().gen::(); + self.sessions.insert(id, msg.addr); + + // auto join session to Main room + self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); + + // send id back + id + } +} + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context) { + println!("Someone disconnected"); + + let mut rooms: Vec = Vec::new(); + + // remove address + if self.sessions.remove(&msg.id).is_some() { + // remove session from all rooms + for (name, sessions) in &mut self.rooms { + if sessions.remove(&msg.id) { + rooms.push(name.to_owned()); + } + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + } +} + +/// Handler for Message message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Message, _: &mut Context) { + self.send_message(&msg.room, msg.msg.as_str(), msg.id); + } +} + +/// Handler for `ListRooms` message. +impl Handler for ChatServer { + type Result = MessageResult; + + fn handle(&mut self, _: ListRooms, _: &mut Context) -> Self::Result { + let mut rooms = Vec::new(); + + for key in self.rooms.keys() { + rooms.push(key.to_owned()) + } + + MessageResult(rooms) + } +} + +/// Join room, send disconnect message to old room +/// send join message to new room +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Join, _: &mut Context) { + let Join {id, name} = msg; + let mut rooms = Vec::new(); + + // remove session from all rooms + for (n, sessions) in &mut self.rooms { + if sessions.remove(&id) { + rooms.push(n.to_owned()); + } + } + // send message to other users + for room in rooms { + self.send_message(&room, "Someone disconnected", 0); + } + + if self.rooms.get_mut(&name).is_none() { + self.rooms.insert(name.clone(), HashSet::new()); + } + self.send_message(&name, "Someone connected", id); + self.rooms.get_mut(&name).unwrap().insert(id); + } +} \ No newline at end of file diff --git a/actix/src/session.rs b/actix/src/session.rs new file mode 100644 index 0000000..96cbb59 --- /dev/null +++ b/actix/src/session.rs @@ -0,0 +1,148 @@ +//! `ClientSession` is an actor, it manages peer tcp connection and +//! proxies commands from peer to `ChatServer`. +use std::io; +use std::time::{Instant, Duration}; +use tokio_core::net::TcpStream; +use tokio_io::io::WriteHalf; +use actix::prelude::*; + +use server::{self, ChatServer}; +use codec::{ChatRequest, ChatResponse, ChatCodec}; + + +/// Chat server sends this messages to session +#[derive(Message)] +pub struct Message(pub String); + +/// `ChatSession` actor is responsible for tcp peer communications. +pub struct ChatSession { + /// unique session id + id: usize, + /// this is address of chat server + addr: Addr, + /// Client must send ping at least once per 10 seconds, otherwise we drop connection. + hb: Instant, + /// joined room + room: String, + /// Framed wrapper + framed: actix::io::FramedWrite, ChatCodec>, +} + +impl Actor for ChatSession { + type Context = actix::Context; + + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + self.hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + self.addr.send(server::Connect{addr: ctx.address()}) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), + } + actix::fut::ok(()) + }).wait(ctx); + } + + fn stopping(&mut self, _: &mut Self::Context) -> Running { + // notify chat server + self.addr.do_send(server::Disconnect{id: self.id}); + Running::Stop + } +} + +impl actix::io::WriteHandler for ChatSession {} + +/// To use `Framed` with an actor, we have to implement `StreamHandler` trait +impl StreamHandler for ChatSession { + + /// This is main event loop for client requests + fn handle(&mut self, msg: ChatRequest, ctx: &mut Self::Context) { + match msg { + ChatRequest::List => { + // Send ListRooms message to chat server and wait for response + println!("List rooms"); + self.addr.send(server::ListRooms) + .into_actor(self) // <- create actor compatible future + .then(|res, act, _| { + match res { + Ok(rooms) => act.framed.write(ChatResponse::Rooms(rooms)), + _ => println!("Something is wrong"), + } + actix::fut::ok(()) + }).wait(ctx) + // .wait(ctx) pauses all events in context, + // so actor wont receive any new messages until it get list of rooms back + }, + ChatRequest::Join(name) => { + println!("Join to room: {}", name); + self.room = name.clone(); + self.addr.do_send(server::Join{id: self.id, name: name.clone()}); + self.framed.write(ChatResponse::Joined(name)); + }, + ChatRequest::Message(message) => { + // send message to chat server + println!("Peer message: {}", message); + self.addr.do_send( + server::Message{id: self.id, + msg: message, room: + self.room.clone()}) + } + // we update heartbeat time on ping from peer + ChatRequest::Ping => + self.hb = Instant::now(), + } + } +} + +/// Handler for Message, chat server sends this message, we just send string to peer +impl Handler for ChatSession { + type Result = (); + + fn handle(&mut self, msg: Message, _: &mut Self::Context) { + // send message to peer + self.framed.write(ChatResponse::Message(msg.0)); + } +} + +/// Helper methods +impl ChatSession { + + pub fn new(addr: Addr, + framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession + { + ChatSession {id: 0, + addr: addr, + hb: Instant::now(), + room: "Main".to_owned(), + framed: framed} + } + + /// helper method that sends ping to client every second. + /// + /// also this method check heartbeats from client + fn hb(&self, ctx: &mut actix::Context) { + ctx.run_later(Duration::new(1, 0), |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { + // heartbeat timed out + println!("Client heartbeat failed, disconnecting!"); + + // notify chat server + act.addr.do_send(server::Disconnect{id: act.id}); + + // stop actor + ctx.stop(); + } + + act.framed.write(ChatResponse::Ping); + act.hb(ctx); + }); + } +} \ No newline at end of file