1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

update example to use actix 0.4

This commit is contained in:
Nikolay Kim 2018-01-05 14:01:19 -08:00
parent 5ff35f5b99
commit 5ae646332e
15 changed files with 150 additions and 243 deletions

View File

@ -7,7 +7,6 @@ extern crate env_logger;
extern crate futures; extern crate futures;
use futures::Stream; use futures::Stream;
use actix::*;
use actix_web::*; use actix_web::*;
use actix_web::middleware::RequestSession; use actix_web::middleware::RequestSession;
use futures::future::{FutureResult, result}; use futures::future::{FutureResult, result};

View File

@ -27,9 +27,9 @@ impl Actor for DbExecutor {
} }
impl Handler<CreateUser> for DbExecutor { impl Handler<CreateUser> for DbExecutor {
fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) type Result = MessageResult<CreateUser>;
-> Response<Self, CreateUser>
{ fn handle(&mut self, msg: CreateUser, _: &mut Self::Context) -> Self::Result {
use self::schema::users::dsl::*; use self::schema::users::dsl::*;
let uuid = format!("{}", uuid::Uuid::new_v4()); let uuid = format!("{}", uuid::Uuid::new_v4());
@ -48,6 +48,6 @@ impl Handler<CreateUser> for DbExecutor {
.load::<models::User>(&self.0) .load::<models::User>(&self.0)
.expect("Error loading person"); .expect("Error loading person");
Self::reply(items.pop().unwrap()) Ok(items.pop().unwrap())
} }
} }

View File

@ -18,8 +18,6 @@ extern crate env_logger;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
use actix::prelude::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
use diesel::prelude::*; use diesel::prelude::*;
@ -72,9 +70,8 @@ fn main() {
.start(); .start();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ let signals = actix::Arbiter::system_registry().get::<ProcessSignals>(); signals.send(Subscribe(_addr.subscriber()));
signals.send(Subscribe(_addr.subscriber())); }
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

Binary file not shown.

View File

@ -7,9 +7,7 @@ extern crate serde_json;
#[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_derive;
#[macro_use] extern crate json; #[macro_use] extern crate json;
use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
use bytes::BytesMut; use bytes::BytesMut;
@ -23,7 +21,7 @@ struct MyObj {
} }
/// This handler uses `HttpRequest::json()` for loading serde json object. /// This handler uses `HttpRequest::json()` for loading serde json object.
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> { fn index(req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
req.json() req.json()
.from_err() // convert all errors into `Error` .from_err() // convert all errors into `Error`
.and_then(|val: MyObj| { .and_then(|val: MyObj| {
@ -98,11 +96,8 @@ fn main() {
.start(); .start();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ signals.send(Subscribe(addr.subscriber()));
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -6,7 +6,6 @@ extern crate futures;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
use futures::{Future, Stream}; use futures::{Future, Stream};
@ -57,11 +56,8 @@ fn main() {
.start(); .start();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ signals.send(Subscribe(addr.subscriber()));
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Starting http server: 127.0.0.1:8080"); println!("Starting http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -5,7 +5,6 @@ extern crate env_logger;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
struct MyWebSocket; struct MyWebSocket;
@ -16,8 +15,10 @@ impl Actor for MyWebSocket {
impl StreamHandler<ws::Message> for MyWebSocket {} impl StreamHandler<ws::Message> for MyWebSocket {}
impl Handler<ws::Message> for MyWebSocket { impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, _: ws::Message, _: &mut Self::Context) -> Response<Self, ws::Message> { type Result = ();
Self::empty()
fn handle(&mut self, _: ws::Message, _: &mut Self::Context) {
{}
} }
} }
@ -36,11 +37,8 @@ fn main() {
.start(); .start();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ signals.send(Subscribe(addr.subscriber()));
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -11,7 +11,6 @@ use std::cell::Cell;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
/// Application state /// Application state
@ -40,9 +39,9 @@ impl Actor for MyWebSocket {
impl StreamHandler<ws::Message> for MyWebSocket {} impl StreamHandler<ws::Message> for MyWebSocket {}
impl Handler<ws::Message> for MyWebSocket { impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) type Result = ();
-> Response<Self, ws::Message>
{ fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
self.counter += 1; self.counter += 1;
println!("WS({}): {:?}", self.counter, msg); println!("WS({}): {:?}", self.counter, msg);
match msg { match msg {
@ -54,7 +53,6 @@ impl Handler<ws::Message> for MyWebSocket {
} }
_ => (), _ => (),
} }
Self::empty()
} }
} }
@ -77,11 +75,8 @@ fn main() {
.start(); .start();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ signals.send(Subscribe(addr.subscriber()));
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -4,9 +4,7 @@ extern crate env_logger;
#[macro_use] #[macro_use]
extern crate tera; extern crate tera;
use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
@ -45,11 +43,9 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
#[cfg(unix)] // Subscribe to unix signals
{ // Subscribe to unix signals let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>(); signals.send(Subscribe(addr.subscriber()));
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -6,9 +6,7 @@ extern crate env_logger;
use std::fs::File; use std::fs::File;
use std::io::Read; use std::io::Read;
use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
/// somple handle /// somple handle
@ -49,11 +47,8 @@ fn main() {
.start_ssl(&pkcs12).unwrap(); .start_ssl(&pkcs12).unwrap();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ signals.send(Subscribe(addr.subscriber()));
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8443"); println!("Started http server: 127.0.0.1:8443");
let _ = sys.run(); let _ = sys.run();

View File

@ -1,4 +1,4 @@
extern crate actix; #[macro_use] extern crate actix;
extern crate bytes; extern crate bytes;
extern crate byteorder; extern crate byteorder;
extern crate futures; extern crate futures;
@ -56,13 +56,9 @@ fn main() {
struct ChatClient; struct ChatClient;
#[derive(Message)]
struct ClientCommand(String); struct ClientCommand(String);
impl ResponseType for ClientCommand {
type Item = ();
type Error = ();
}
impl Actor for ChatClient { impl Actor for ChatClient {
type Context = FramedContext<Self>; type Context = FramedContext<Self>;
@ -70,6 +66,13 @@ impl Actor for ChatClient {
// start heartbeats otherwise server will disconnect after 10 seconds // start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx) self.hb(ctx)
} }
fn stopping(&mut self, _: &mut FramedContext<Self>) {
println!("Disconnected");
// Stop application on disconnect
Arbiter::system().send(actix::msgs::SystemExit(0));
}
} }
impl ChatClient { impl ChatClient {
@ -83,14 +86,13 @@ impl ChatClient {
} }
/// Handle stdin commands /// Handle stdin commands
impl Handler<ClientCommand> for ChatClient impl Handler<ClientCommand> for ChatClient {
{ type Result = ();
fn handle(&mut self, msg: ClientCommand, ctx: &mut FramedContext<Self>)
-> Response<Self, ClientCommand> fn handle(&mut self, msg: ClientCommand, ctx: &mut FramedContext<Self>) {
{
let m = msg.0.trim(); let m = msg.0.trim();
if m.is_empty() { if m.is_empty() {
return Self::empty() return
} }
// we check for /sss type of messages // we check for /sss type of messages
@ -112,8 +114,6 @@ impl Handler<ClientCommand> for ChatClient
} else { } else {
let _ = ctx.send(codec::ChatRequest::Message(m.to_owned())); let _ = ctx.send(codec::ChatRequest::Message(m.to_owned()));
} }
Self::empty()
} }
} }
@ -122,40 +122,26 @@ impl Handler<ClientCommand> for ChatClient
impl FramedActor for ChatClient { impl FramedActor for ChatClient {
type Io = TcpStream; type Io = TcpStream;
type Codec = codec::ClientChatCodec; type Codec = codec::ClientChatCodec;
}
impl StreamHandler<codec::ChatResponse, io::Error> for ChatClient { fn handle(&mut self, msg: io::Result<codec::ChatResponse>, ctx: &mut FramedContext<Self>) {
fn finished(&mut self, _: &mut FramedContext<Self>) {
println!("Disconnected");
// Stop application on disconnect
Arbiter::system().send(msgs::SystemExit(0));
}
}
impl Handler<codec::ChatResponse, io::Error> for ChatClient {
fn handle(&mut self, msg: codec::ChatResponse, _: &mut FramedContext<Self>)
-> Response<Self, codec::ChatResponse>
{
match msg { match msg {
codec::ChatResponse::Message(ref msg) => { Err(_) => ctx.stop(),
println!("message: {}", msg); Ok(msg) => match msg {
} codec::ChatResponse::Message(ref msg) => {
codec::ChatResponse::Joined(ref msg) => { println!("message: {}", msg);
println!("!!! joined: {}", msg);
}
codec::ChatResponse::Rooms(rooms) => {
println!("\n!!! Available rooms:");
for room in rooms {
println!("{}", room);
} }
println!(""); codec::ChatResponse::Joined(ref msg) => {
println!("!!! joined: {}", msg);
}
codec::ChatResponse::Rooms(rooms) => {
println!("\n!!! Available rooms:");
for room in rooms {
println!("{}", room);
}
println!("");
}
_ => (),
} }
_ => (),
} }
Self::empty()
} }
} }

View File

@ -10,6 +10,7 @@ extern crate serde;
extern crate serde_json; extern crate serde_json;
#[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_derive;
#[macro_use]
extern crate actix; extern crate actix;
extern crate actix_web; extern crate actix_web;
@ -17,7 +18,6 @@ use std::time::Instant;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
mod codec; mod codec;
@ -58,19 +58,18 @@ impl Actor for WsChatSession {
/// Handle messages from chat server, we simply send it to peer websocket /// Handle messages from chat server, we simply send it to peer websocket
impl Handler<session::Message> for WsChatSession { impl Handler<session::Message> for WsChatSession {
fn handle(&mut self, msg: session::Message, ctx: &mut Self::Context) type Result = ();
-> Response<Self, session::Message>
{ fn handle(&mut self, msg: session::Message, ctx: &mut Self::Context) {
ws::WsWriter::text(ctx, &msg.0); ws::WsWriter::text(ctx, &msg.0);
Self::empty()
} }
} }
/// WebSocket message handler /// WebSocket message handler
impl Handler<ws::Message> for WsChatSession { impl Handler<ws::Message> for WsChatSession {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) type Result = ();
-> Response<Self, ws::Message>
{ fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
println!("WEBSOCKET MESSAGE: {:?}", msg); println!("WEBSOCKET MESSAGE: {:?}", msg);
match msg { match msg {
ws::Message::Ping(msg) => ws::Message::Ping(msg) =>
@ -142,7 +141,6 @@ impl Handler<ws::Message> for WsChatSession {
} }
_ => (), _ => (),
} }
Self::empty()
} }
} }
@ -216,11 +214,8 @@ fn main() {
.start(); .start();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ signals.send(Subscribe(addr.subscriber()));
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();

View File

@ -13,7 +13,7 @@ use session;
/// New chat session is created /// New chat session is created
pub struct Connect { pub struct Connect {
pub addr: Box<Subscriber<session::Message> + Send>, pub addr: Box<actix::Subscriber<session::Message> + Send>,
} }
/// Response type for Connect message /// Response type for Connect message
@ -25,16 +25,13 @@ impl ResponseType for Connect {
} }
/// Session is disconnected /// Session is disconnected
#[derive(Message)]
pub struct Disconnect { pub struct Disconnect {
pub id: usize, pub id: usize,
} }
impl ResponseType for Disconnect {
type Item = ();
type Error = ();
}
/// Send message to specific room /// Send message to specific room
#[derive(Message)]
pub struct Message { pub struct Message {
/// Id of the client session /// Id of the client session
pub id: usize, pub id: usize,
@ -44,11 +41,6 @@ pub struct Message {
pub room: String, pub room: String,
} }
impl ResponseType for Message {
type Item = ();
type Error = ();
}
/// List of available rooms /// List of available rooms
pub struct ListRooms; pub struct ListRooms;
@ -58,6 +50,7 @@ impl ResponseType for ListRooms {
} }
/// Join room, if room does not exists create new one. /// Join room, if room does not exists create new one.
#[derive(Message)]
pub struct Join { pub struct Join {
/// Client id /// Client id
pub id: usize, pub id: usize,
@ -65,15 +58,10 @@ pub struct Join {
pub name: String, pub name: String,
} }
impl ResponseType for Join {
type Item = ();
type Error = ();
}
/// `ChatServer` manages chat rooms and responsible for coordinating chat session. /// `ChatServer` manages chat rooms and responsible for coordinating chat session.
/// implementation is super primitive /// implementation is super primitive
pub struct ChatServer { pub struct ChatServer {
sessions: HashMap<usize, Box<Subscriber<session::Message> + Send>>, sessions: HashMap<usize, Box<actix::Subscriber<session::Message> + Send>>,
rooms: HashMap<String, HashSet<usize>>, rooms: HashMap<String, HashSet<usize>>,
rng: RefCell<ThreadRng>, rng: RefCell<ThreadRng>,
} }
@ -118,8 +106,9 @@ impl Actor for ChatServer {
/// ///
/// Register new session and assign unique id to this session /// Register new session and assign unique id to this session
impl Handler<Connect> for ChatServer { impl Handler<Connect> for ChatServer {
type Result = MessageResult<Connect>;
fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Response<Self, Connect> { fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
println!("Someone joined"); println!("Someone joined");
// notify all users in same room // notify all users in same room
@ -133,14 +122,15 @@ impl Handler<Connect> for ChatServer {
self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id); self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
// send id back // send id back
Self::reply(id) Ok(id)
} }
} }
/// Handler for Disconnect message. /// Handler for Disconnect message.
impl Handler<Disconnect> for ChatServer { impl Handler<Disconnect> for ChatServer {
type Result = ();
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Response<Self, Disconnect> { fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
println!("Someone disconnected"); println!("Someone disconnected");
let mut rooms: Vec<String> = Vec::new(); let mut rooms: Vec<String> = Vec::new();
@ -158,40 +148,39 @@ impl Handler<Disconnect> for ChatServer {
for room in rooms { for room in rooms {
self.send_message(&room, "Someone disconnected", 0); self.send_message(&room, "Someone disconnected", 0);
} }
Self::empty()
} }
} }
/// Handler for Message message. /// Handler for Message message.
impl Handler<Message> for ChatServer { impl Handler<Message> for ChatServer {
type Result = ();
fn handle(&mut self, msg: Message, _: &mut Context<Self>) -> Response<Self, Message> { fn handle(&mut self, msg: Message, _: &mut Context<Self>) {
self.send_message(&msg.room, msg.msg.as_str(), msg.id); self.send_message(&msg.room, msg.msg.as_str(), msg.id);
Self::empty()
} }
} }
/// Handler for `ListRooms` message. /// Handler for `ListRooms` message.
impl Handler<ListRooms> for ChatServer { impl Handler<ListRooms> for ChatServer {
type Result = MessageResult<ListRooms>;
fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Response<Self, ListRooms> { fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Self::Result {
let mut rooms = Vec::new(); let mut rooms = Vec::new();
for key in self.rooms.keys() { for key in self.rooms.keys() {
rooms.push(key.to_owned()) rooms.push(key.to_owned())
} }
Self::reply(rooms) Ok(rooms)
} }
} }
/// Join room, send disconnect message to old room /// Join room, send disconnect message to old room
/// send join message to new room /// send join message to new room
impl Handler<Join> for ChatServer { impl Handler<Join> for ChatServer {
type Result = ();
fn handle(&mut self, msg: Join, _: &mut Context<Self>) -> Response<Self, Join> { fn handle(&mut self, msg: Join, _: &mut Context<Self>) {
let Join {id, name} = msg; let Join {id, name} = msg;
let mut rooms = Vec::new(); let mut rooms = Vec::new();
@ -211,7 +200,5 @@ impl Handler<Join> for ChatServer {
} }
self.send_message(&name, "Someone connected", id); self.send_message(&name, "Someone connected", id);
self.rooms.get_mut(&name).unwrap().insert(id); self.rooms.get_mut(&name).unwrap().insert(id);
Self::empty()
} }
} }

View File

@ -6,20 +6,16 @@ use std::time::{Instant, Duration};
use futures::Stream; use futures::Stream;
use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::net::{TcpStream, TcpListener};
use actix::*; use actix::prelude::*;
use server::{self, ChatServer}; use server::{self, ChatServer};
use codec::{ChatRequest, ChatResponse, ChatCodec}; use codec::{ChatRequest, ChatResponse, ChatCodec};
/// Chat server sends this messages to session /// Chat server sends this messages to session
#[derive(Message)]
pub struct Message(pub String); pub struct Message(pub String);
impl ResponseType for Message {
type Item = ();
type Error = ();
}
/// `ChatSession` actor is responsible for tcp peer communitions. /// `ChatSession` actor is responsible for tcp peer communitions.
pub struct ChatSession { pub struct ChatSession {
/// unique session id /// unique session id
@ -36,104 +32,87 @@ impl Actor for ChatSession {
/// For tcp communication we are going to use `FramedContext`. /// For tcp communication we are going to use `FramedContext`.
/// It is convinient wrapper around `Framed` object from `tokio_io` /// It is convinient wrapper around `Framed` object from `tokio_io`
type Context = FramedContext<Self>; type Context = FramedContext<Self>;
}
/// To use `FramedContext` we have to define Io type and Codec fn started(&mut self, ctx: &mut Self::Context) {
impl FramedActor for ChatSession {
type Io = TcpStream;
type Codec= ChatCodec;
}
/// Also `FramedContext` requires Actor which is able to handle stream
/// of `<Codec as Decoder>::Item` items.
impl StreamHandler<ChatRequest, io::Error> for ChatSession {
fn started(&mut self, ctx: &mut FramedContext<Self>) {
// we'll start heartbeat process on session start. // we'll start heartbeat process on session start.
self.hb(ctx); self.hb(ctx);
// register self in chat server. `AsyncContext::wait` register // register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves // future within context, but context waits until this future resolves
// before processing any other events. // before processing any other events.
self.addr.call(self, server::Connect{addr: ctx.sync_subscriber()}).then(|res, act, ctx| { let addr: SyncAddress<_> = ctx.address();
match res { self.addr.call(self, server::Connect{addr: addr.subscriber()})
Ok(Ok(res)) => act.id = res, .then(|res, act, ctx| {
// something is wrong with chat server match res {
_ => ctx.stop(), Ok(Ok(res)) => act.id = res,
} // something is wrong with chat server
fut::ok(()) _ => ctx.stop(),
}).wait(ctx); }
actix::fut::ok(())
}).wait(ctx);
} }
fn finished(&mut self, ctx: &mut FramedContext<Self>) { fn stopping(&mut self, ctx: &mut Self::Context) {
// notify chat server // notify chat server
self.addr.send(server::Disconnect{id: self.id}); self.addr.send(server::Disconnect{id: self.id});
ctx.stop() ctx.stop()
} }
} }
impl Handler<ChatRequest, io::Error> for ChatSession { /// To use `FramedContext` we have to define Io type and Codec
impl FramedActor for ChatSession {
/// We'll stop chat session actor on any error, high likely it is just type Io = TcpStream;
/// termination of the tcp stream. type Codec= ChatCodec;
fn error(&mut self, _: io::Error, ctx: &mut FramedContext<Self>) {
ctx.stop()
}
/// This is main event loop for client requests /// This is main event loop for client requests
fn handle(&mut self, msg: ChatRequest, ctx: &mut FramedContext<Self>) fn handle(&mut self, msg: io::Result<ChatRequest>, ctx: &mut FramedContext<Self>) {
-> Response<Self, ChatRequest>
{
match msg { match msg {
ChatRequest::List => { Err(_) => ctx.stop(),
// Send ListRooms message to chat server and wait for response Ok(msg) => match msg {
println!("List rooms"); ChatRequest::List => {
self.addr.call(self, server::ListRooms).then(|res, _, ctx| { // Send ListRooms message to chat server and wait for response
match res { println!("List rooms");
Ok(Ok(rooms)) => { self.addr.call(self, server::ListRooms).then(|res, _, ctx| {
let _ = ctx.send(ChatResponse::Rooms(rooms)); match res {
}, Ok(Ok(rooms)) => {
let _ = ctx.send(ChatResponse::Rooms(rooms));
},
_ => println!("Something is wrong"), _ => println!("Something is wrong"),
} }
fut::ok(()) actix::fut::ok(())
}).wait(ctx) }).wait(ctx)
// .wait(ctx) pauses all events in context, // .wait(ctx) pauses all events in context,
// so actor wont receive any new messages until it get list of rooms back // so actor wont receive any new messages until it get list of rooms back
}, },
ChatRequest::Join(name) => { ChatRequest::Join(name) => {
println!("Join to room: {}", name); println!("Join to room: {}", name);
self.room = name.clone(); self.room = name.clone();
self.addr.send(server::Join{id: self.id, name: name.clone()}); self.addr.send(server::Join{id: self.id, name: name.clone()});
let _ = ctx.send(ChatResponse::Joined(name)); let _ = ctx.send(ChatResponse::Joined(name));
}, },
ChatRequest::Message(message) => { ChatRequest::Message(message) => {
// send message to chat server // send message to chat server
println!("Peer message: {}", message); println!("Peer message: {}", message);
self.addr.send( self.addr.send(
server::Message{id: self.id, server::Message{id: self.id,
msg: message, room: msg: message, room:
self.room.clone()}) self.room.clone()})
}
// we update heartbeat time on ping from peer
ChatRequest::Ping =>
self.hb = Instant::now(),
} }
// we update heartbeat time on ping from peer
ChatRequest::Ping =>
self.hb = Instant::now(),
} }
Self::empty()
} }
} }
/// Handler for Message, chat server sends this message, we just send string to peer /// Handler for Message, chat server sends this message, we just send string to peer
impl Handler<Message> for ChatSession { impl Handler<Message> for ChatSession {
type Result = ();
fn handle(&mut self, msg: Message, ctx: &mut FramedContext<Self>) fn handle(&mut self, msg: Message, ctx: &mut FramedContext<Self>) {
-> Response<Self, Message>
{
// send message to peer // send message to peer
let _ = ctx.send(ChatResponse::Message(msg.0)); let _ = ctx.send(ChatResponse::Message(msg.0));
Self::empty()
} }
} }
@ -188,7 +167,9 @@ impl TcpServer {
// So to be able to handle this events `Server` actor has to implement // So to be able to handle this events `Server` actor has to implement
// stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>` // stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>`
let _: () = TcpServer::create(|ctx| { let _: () = TcpServer::create(|ctx| {
ctx.add_stream(listener.incoming().map(|(t, a)| TcpConnect(t, a))); ctx.add_message_stream(listener.incoming()
.map_err(|_| ())
.map(|(t, a)| TcpConnect(t, a)));
TcpServer{chat: chat} TcpServer{chat: chat}
}); });
} }
@ -200,27 +181,19 @@ impl Actor for TcpServer {
type Context = Context<Self>; type Context = Context<Self>;
} }
#[derive(Message)]
struct TcpConnect(TcpStream, net::SocketAddr); struct TcpConnect(TcpStream, net::SocketAddr);
impl ResponseType for TcpConnect {
type Item = ();
type Error = ();
}
/// Handle stream of TcpStream's /// Handle stream of TcpStream's
impl StreamHandler<TcpConnect, io::Error> for TcpServer {} impl StreamHandler<TcpConnect> for TcpServer {}
impl Handler<TcpConnect, io::Error> for TcpServer { impl Handler<TcpConnect> for TcpServer {
type Result = ();
fn handle(&mut self, msg: TcpConnect, _: &mut Context<Self>) -> Response<Self, TcpConnect> fn handle(&mut self, msg: TcpConnect, _: &mut Context<Self>) {
{
// For each incoming connection we create `ChatSession` actor // For each incoming connection we create `ChatSession` actor
// with out chat server address. // with out chat server address.
let server = self.chat.clone(); let server = self.chat.clone();
let _: () = ChatSession::new(server).framed(msg.0, ChatCodec); let _: () = ChatSession::new(server).framed(msg.0, ChatCodec);
// this is response for message, which is defined by `ResponseType` trait
// in this case we just return unit.
Self::empty()
} }
} }

View File

@ -10,7 +10,6 @@ extern crate env_logger;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
#[cfg(unix)]
use actix::actors::signal::{ProcessSignals, Subscribe}; use actix::actors::signal::{ProcessSignals, Subscribe};
/// do websocket handshake and start `MyWebSocket` actor /// do websocket handshake and start `MyWebSocket` actor
@ -38,9 +37,9 @@ impl StreamHandler<ws::Message> for MyWebSocket {
} }
impl Handler<ws::Message> for MyWebSocket { impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>) type Result = ();
-> Response<Self, ws::Message>
{ fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>) {
// process websocket messages // process websocket messages
println!("WS: {:?}", msg); println!("WS: {:?}", msg);
match msg { match msg {
@ -52,7 +51,6 @@ impl Handler<ws::Message> for MyWebSocket {
} }
_ => (), _ => (),
} }
Self::empty()
} }
} }
@ -74,11 +72,8 @@ fn main() {
.start(); .start();
// Subscribe to unix signals // Subscribe to unix signals
#[cfg(unix)] let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
{ signals.send(Subscribe(_addr.subscriber()));
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(_addr.subscriber()));
}
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();