1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

add websocket example

This commit is contained in:
Nikolay Kim 2017-10-20 17:16:17 -07:00
parent 1db4200621
commit 56c91adce2
13 changed files with 1120 additions and 85 deletions

View File

@ -21,7 +21,7 @@ impl Route for MyRoute {
// get Multipart stream // get Multipart stream
WrapStream::<MyRoute>::actstream(req.multipart(payload)?) WrapStream::<MyRoute>::actstream(req.multipart(payload)?)
.and_then(|item, act, ctx| { .and_then(|item, act, ctx| {
// Multipart stream is a string of Fields and nested Multiparts // Multipart stream is a stream of Fields and nested Multiparts
match item { match item {
multipart::MultipartItem::Field(field) => { multipart::MultipartItem::Field(field) => {
println!("==== FIELD ==== {:?}", field); println!("==== FIELD ==== {:?}", field);

View File

@ -0,0 +1,28 @@
[package]
name = "websocket-example"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
[[bin]]
name = "websocket"
path = "src/main.rs"
[[bin]]
name = "client"
path = "src/client.rs"
[dependencies]
rand = "*"
bytes = "0.4"
byteorder = "1.1"
futures = "0.1"
tokio-io = "0.1"
tokio-core = "0.1"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
actix = { git = "https://github.com/fafhrd91/actix.git" }
# actix = { path = "../../../actix" }
actix-web = { path = "../../" }

View File

@ -0,0 +1,32 @@
import asyncio
import aiohttp
def req1():
with aiohttp.MultipartWriter() as writer:
writer.append('test')
writer.append_json({'passed': True})
resp = yield from aiohttp.request(
"post", 'http://localhost:8080/multipart',
data=writer, headers=writer.headers)
print(resp)
assert 200 == resp.status
def req2():
with aiohttp.MultipartWriter() as writer:
writer.append('test')
writer.append_json({'passed': True})
writer.append(open('src/main.rs'))
resp = yield from aiohttp.request(
"post", 'http://localhost:8080/multipart',
data=writer, headers=writer.headers)
print(resp)
assert 200 == resp.status
loop = asyncio.get_event_loop()
loop.run_until_complete(req1())
loop.run_until_complete(req2())

View File

@ -0,0 +1,166 @@
extern crate actix;
extern crate bytes;
extern crate byteorder;
extern crate futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate serde;
extern crate serde_json;
#[macro_use] extern crate serde_derive;
use std::{io, net, process, thread};
use std::str::FromStr;
use std::time::Duration;
use futures::Future;
use tokio_core::net::TcpStream;
use actix::prelude::*;
mod codec;
fn main() {
let sys = actix::System::new("chat-client");
// Connect to server
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
Arbiter::handle().spawn(
TcpStream::connect(&addr, Arbiter::handle())
.and_then(|stream| {
let addr: SyncAddress<_> = ChatClient.framed(stream, codec::ClientChatCodec);
// start console loop
thread::spawn(move|| {
loop {
let mut cmd = String::new();
if io::stdin().read_line(&mut cmd).is_err() {
println!("error");
return
}
addr.send(ClientCommand(cmd));
}
});
futures::future::ok(())
})
.map_err(|e| {
println!("Can not connect to server: {}", e);
process::exit(1)
})
);
println!("Running chat client");
sys.run();
}
struct ChatClient;
struct ClientCommand(String);
impl Actor for ChatClient {
type Context = FramedContext<Self>;
fn started(&mut self, ctx: &mut FramedContext<Self>) {
// start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx)
}
}
impl ChatClient {
fn hb(&self, ctx: &mut FramedContext<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
if ctx.send(codec::ChatRequest::Ping).is_ok() {
act.hb(ctx);
}
});
}
}
/// Handle stdin commands
impl Handler<ClientCommand> for ChatClient
{
fn handle(&mut self, msg: ClientCommand, ctx: &mut FramedContext<Self>)
-> Response<Self, ClientCommand>
{
let m = msg.0.trim();
if m.is_empty() {
return Self::empty()
}
// we check for /sss type of messages
if m.starts_with('/') {
let v: Vec<&str> = m.splitn(2, ' ').collect();
match v[0] {
"/list" => {
let _ = ctx.send(codec::ChatRequest::List);
},
"/join" => {
if v.len() == 2 {
let _ = ctx.send(codec::ChatRequest::Join(v[1].to_owned()));
} else {
println!("!!! room name is required");
}
},
_ => println!("!!! unknown command"),
}
} else {
let _ = ctx.send(codec::ChatRequest::Message(m.to_owned()));
}
Self::empty()
}
}
impl ResponseType<ClientCommand> for ChatClient {
type Item = ();
type Error = ();
}
/// Server communication
impl FramedActor for ChatClient {
type Io = TcpStream;
type Codec = codec::ClientChatCodec;
}
impl StreamHandler<codec::ChatResponse, io::Error> for ChatClient {
fn finished(&mut self, _: &mut FramedContext<Self>) {
println!("Disconnected");
// Stop application on disconnect
Arbiter::system().send(msgs::SystemExit(0));
}
}
impl ResponseType<codec::ChatResponse> for ChatClient {
type Item = ();
type Error = ();
}
impl Handler<codec::ChatResponse, io::Error> for ChatClient {
fn handle(&mut self, msg: codec::ChatResponse, _: &mut FramedContext<Self>)
-> Response<Self, codec::ChatResponse>
{
match msg {
codec::ChatResponse::Message(ref msg) => {
println!("message: {}", msg);
}
codec::ChatResponse::Joined(ref msg) => {
println!("!!! joined: {}", msg);
}
codec::ChatResponse::Rooms(rooms) => {
println!("\n!!! Available rooms:");
for room in rooms {
println!("{}", room);
}
println!("");
}
_ => (),
}
Self::empty()
}
}

View File

@ -0,0 +1,125 @@
#![allow(dead_code)]
use std::io;
use serde_json as json;
use byteorder::{BigEndian , ByteOrder};
use bytes::{BytesMut, BufMut};
use tokio_io::codec::{Encoder, Decoder};
/// Client request
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag="cmd", content="data")]
pub enum ChatRequest {
/// List rooms
List,
/// Join rooms
Join(String),
/// Send message
Message(String),
/// Ping
Ping
}
/// Server response
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag="cmd", content="data")]
pub enum ChatResponse {
Ping,
/// List of rooms
Rooms(Vec<String>),
/// Joined
Joined(String),
/// Message
Message(String),
}
/// Codec for Client -> Server transport
pub struct ChatCodec;
impl Decoder for ChatCodec
{
type Item = ChatRequest;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let size = {
if src.len() < 2 {
return Ok(None)
}
BigEndian::read_u16(src.as_ref()) as usize
};
if src.len() >= size + 2 {
src.split_to(2);
let buf = src.split_to(size);
Ok(Some(json::from_slice::<ChatRequest>(&buf)?))
} else {
Ok(None)
}
}
}
impl Encoder for ChatCodec
{
type Item = ChatResponse;
type Error = io::Error;
fn encode(&mut self, msg: ChatResponse, dst: &mut BytesMut) -> Result<(), Self::Error> {
let msg = json::to_string(&msg).unwrap();
let msg_ref: &[u8] = msg.as_ref();
dst.reserve(msg_ref.len() + 2);
dst.put_u16::<BigEndian>(msg_ref.len() as u16);
dst.put(msg_ref);
Ok(())
}
}
/// Codec for Server -> Client transport
pub struct ClientChatCodec;
impl Decoder for ClientChatCodec
{
type Item = ChatResponse;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let size = {
if src.len() < 2 {
return Ok(None)
}
BigEndian::read_u16(src.as_ref()) as usize
};
if src.len() >= size + 2 {
src.split_to(2);
let buf = src.split_to(size);
Ok(Some(json::from_slice::<ChatResponse>(&buf)?))
} else {
Ok(None)
}
}
}
impl Encoder for ClientChatCodec
{
type Item = ChatRequest;
type Error = io::Error;
fn encode(&mut self, msg: ChatRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
let msg = json::to_string(&msg).unwrap();
let msg_ref: &[u8] = msg.as_ref();
dst.reserve(msg_ref.len() + 2);
dst.put_u16::<BigEndian>(msg_ref.len() as u16);
dst.put(msg_ref);
Ok(())
}
}

View File

@ -1,69 +1,64 @@
// #![feature(try_trait)] #![allow(unused_variables)]
#![allow(dead_code, unused_variables)] extern crate rand;
extern crate bytes;
extern crate byteorder;
extern crate tokio_io;
extern crate tokio_core;
extern crate serde;
extern crate serde_json;
#[macro_use] extern crate serde_derive;
extern crate actix; extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate tokio_core;
extern crate env_logger; use std::time::Instant;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
struct MyRoute {req: Option<HttpRequest>} mod codec;
mod server;
mod session;
impl Actor for MyRoute {
/// This is our websocket route state, this state is shared with all route instances
/// via `HttpContext::state()`
struct WsChatSessionState {
addr: SyncAddress<server::ChatServer>,
}
struct WsChatSession {
/// unique session id
id: usize,
/// Client must send ping at least once per 10 seconds, otherwise we drop connection.
hb: Instant,
/// joined room
room: String,
/// peer name
name: Option<String>,
}
impl Actor for WsChatSession {
type Context = HttpContext<Self>; type Context = HttpContext<Self>;
} }
impl Route for MyRoute { /// Entry point for our route
type State = (); impl Route for WsChatSession {
type State = WsChatSessionState;
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self> {
println!("PARAMS: {:?} {:?}", req.match_info().get("name"), req.match_info());
if !payload.eof() {
ctx.add_stream(payload);
Reply::stream(MyRoute{req: Some(req)})
} else {
Reply::reply(httpcodes::HTTPOk)
}
}
}
impl ResponseType<PayloadItem> for MyRoute {
type Item = ();
type Error = ();
}
impl StreamHandler<PayloadItem> for MyRoute {}
impl Handler<PayloadItem> for MyRoute {
fn handle(&mut self, msg: PayloadItem, ctx: &mut HttpContext<Self>)
-> Response<Self, PayloadItem>
{
println!("CHUNK: {:?}", msg);
if let Some(req) = self.req.take() {
ctx.start(httpcodes::HTTPOk);
ctx.write_eof();
}
Self::empty()
}
}
struct MyWS {}
impl Actor for MyWS {
type Context = HttpContext<Self>;
}
impl Route for MyWS {
type State = ();
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self> fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self>
{ {
// websocket handshakre, it may fail if request is not websocket request
match ws::handshake(&req) { match ws::handshake(&req) {
Ok(resp) => { Ok(resp) => {
ctx.start(resp); ctx.start(resp);
ctx.add_stream(ws::WsStream::new(payload)); ctx.add_stream(ws::WsStream::new(payload));
Reply::stream(MyWS{}) Reply::async(
WsChatSession {
id: 0,
hb: Instant::now(),
room: "Main".to_owned(),
name: None})
} }
Err(err) => { Err(err) => {
Reply::reply(err) Reply::reply(err)
@ -72,22 +67,92 @@ impl Route for MyWS {
} }
} }
impl ResponseType<ws::Message> for MyWS { /// Handle messages from chat server, we simply send it to peer websocket
impl Handler<session::Message> for WsChatSession {
fn handle(&mut self, msg: session::Message, ctx: &mut HttpContext<Self>)
-> Response<Self, session::Message>
{
ws::WsWriter::text(ctx, &msg.0);
Self::empty()
}
}
impl ResponseType<session::Message> for WsChatSession {
type Item = (); type Item = ();
type Error = (); type Error = ();
} }
impl StreamHandler<ws::Message> for MyWS {} /// WebSocket message handler
impl Handler<ws::Message> for WsChatSession {
impl Handler<ws::Message> for MyWS {
fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>) fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>)
-> Response<Self, ws::Message> -> Response<Self, ws::Message>
{ {
println!("WS: {:?}", msg); println!("WEBSOCKET MESSAGE: {:?}", msg);
match msg { match msg {
ws::Message::Ping(msg) => ws::WsWriter::pong(ctx, msg), ws::Message::Ping(msg) =>
ws::Message::Text(text) => ws::WsWriter::text(ctx, text), ws::WsWriter::pong(ctx, msg),
ws::Message::Binary(bin) => ws::WsWriter::binary(ctx, bin), ws::Message::Pong(msg) =>
self.hb = Instant::now(),
ws::Message::Text(text) => {
let m = text.trim();
// we check for /sss type of messages
if m.starts_with('/') {
let v: Vec<&str> = m.splitn(2, ' ').collect();
match v[0] {
"/list" => {
// Send ListRooms message to chat server and wait for response
println!("List rooms");
ctx.state().addr.call(self, server::ListRooms).then(|res, _, ctx| {
match res {
Ok(Ok(rooms)) => {
for room in rooms {
ws::WsWriter::text(ctx, &room);
}
},
_ => println!("Something is wrong"),
}
fut::ok(())
}).wait(ctx)
// .wait(ctx) pauses all events in context,
// so actor wont receive any new messages until it get list
// of rooms back
},
"/join" => {
if v.len() == 2 {
self.room = v[1].to_owned();
ctx.state().addr.send(
server::Join{id: self.id, name: self.room.clone()});
ws::WsWriter::text(ctx, "joined");
} else {
ws::WsWriter::text(ctx, "!!! room name is required");
}
},
"/name" => {
if v.len() == 2 {
self.name = Some(v[1].to_owned());
} else {
ws::WsWriter::text(ctx, "!!! name is required");
}
},
_ => ws::WsWriter::text(
ctx, &format!("!!! unknown command: {:?}", m)),
}
} else {
let msg = if let Some(ref name) = self.name {
format!("{}: {}", name, m)
} else {
m.to_owned()
};
// send message to chat server
ctx.state().addr.send(
server::Message{id: self.id,
msg: msg,
room: self.room.clone()})
}
},
ws::Message::Binary(bin) =>
println!("Unexpected binary"),
ws::Message::Closed | ws::Message::Error => { ws::Message::Closed | ws::Message::Error => {
ctx.stop(); ctx.stop();
} }
@ -97,30 +162,71 @@ impl Handler<ws::Message> for MyWS {
} }
} }
impl StreamHandler<ws::Message> for WsChatSession
{
/// Method is called when stream get polled first time.
/// We register ws session with ChatServer
fn started(&mut self, ctx: &mut Self::Context) {
// register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves
// before processing any other events.
// HttpContext::state() is instance of WsChatSessionState, state is shared across all
// routes within application
let subs = ctx.sync_subscriber();
ctx.state().addr.call(
self, server::Connect{addr: subs}).then(
|res, act, ctx| {
match res {
Ok(Ok(res)) => act.id = res,
// something is wrong with chat server
_ => ctx.stop(),
}
fut::ok(())
}).wait(ctx);
}
/// Method is called when stream finishes, even if stream finishes with error.
fn finished(&mut self, ctx: &mut Self::Context) {
// notify chat server
ctx.state().addr.send(server::Disconnect{id: self.id});
ctx.stop()
}
}
impl ResponseType<ws::Message> for WsChatSession {
type Item = ();
type Error = ();
}
fn main() { fn main() {
let _ = env_logger::init(); let sys = actix::System::new("websocket-example");
let sys = actix::System::new("http-example"); // Start chat server actor
let server: SyncAddress<_> = server::ChatServer::default().start();
// Start tcp server
session::TcpServer::new("127.0.0.1:12345", server.clone());
// Websocket sessions state
let state = WsChatSessionState { addr: server };
// Create Http server with websocket support
HttpServer::new( HttpServer::new(
RoutingMap::default() RoutingMap::default()
.app("/blah", Application::default() .app("/", Application::builder(state)
.resource("/test/{name}", |r| { // redirect to websocket.html
r.get::<MyRoute>(); .resource("/", |r|
r.post::<MyRoute>();
})
.route_handler("/static", StaticFiles::new(".", true))
.finish())
.resource("/test", |r| r.post::<MyRoute>())
.resource("/ws/", |r| r.get::<MyWS>())
.resource("/simple/", |r|
r.handler(Method::GET, |req, payload, state| { r.handler(Method::GET, |req, payload, state| {
httpcodes::HTTPOk httpcodes::HTTPOk
})) }))
// websocket
.resource("/ws/", |r| r.get::<WsChatSession>())
// static resources
.route_handler("/static", StaticFiles::new("static/", true))
.finish()) .finish())
.serve::<_, ()>("127.0.0.1:9080").unwrap(); .finish())
.serve::<_, ()>("127.0.0.1:8080").unwrap();
println!("starting");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -0,0 +1,218 @@
//! `ChatServer` is an actor. It maintains list of connection client session.
//! And manages available rooms. Peers send messages to other peers in same
//! room through `ChatServer`.
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use rand::{self, Rng, ThreadRng};
use actix::prelude::*;
use session;
/// Message for chat server communications
/// New chat session is created
pub struct Connect {
pub addr: Box<Subscriber<session::Message> + Send>,
}
/// Session is disconnected
pub struct Disconnect {
pub id: usize,
}
/// Send message to specific room
pub struct Message {
/// Id of the client session
pub id: usize,
/// Peer message
pub msg: String,
/// Room name
pub room: String,
}
/// List of available rooms
pub struct ListRooms;
/// Join room, if room does not exists create new one.
pub struct Join {
/// Client id
pub id: usize,
/// Room name
pub name: String,
}
/// `ChatServer` manages chat rooms and responsible for coordinating chat session.
/// implementation is super primitive
pub struct ChatServer {
sessions: HashMap<usize, Box<Subscriber<session::Message> + Send>>,
rooms: HashMap<String, HashSet<usize>>,
rng: RefCell<ThreadRng>,
}
impl Default for ChatServer {
fn default() -> ChatServer {
// default room
let mut rooms = HashMap::new();
rooms.insert("Main".to_owned(), HashSet::new());
ChatServer {
sessions: HashMap::new(),
rooms: rooms,
rng: RefCell::new(rand::thread_rng()),
}
}
}
impl ChatServer {
/// Send message to all users in the room
fn send_message(&self, room: &str, message: &str, skip_id: usize) {
if let Some(sessions) = self.rooms.get(room) {
for id in sessions {
if *id != skip_id {
if let Some(addr) = self.sessions.get(id) {
let _ = addr.send(session::Message(message.to_owned()));
}
}
}
}
}
}
/// Make actor from `ChatServer`
impl Actor for ChatServer {
/// We are going to use simple Context, we just need ability to communicate
/// with other actors.
type Context = Context<Self>;
}
/// Handler for Connect message.
///
/// Register new session and assign unique id to this session
impl Handler<Connect> for ChatServer {
fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Response<Self, Connect> {
println!("Someone joined");
// notify all users in same room
self.send_message(&"Main".to_owned(), "Someone joined", 0);
// register session with random id
let id = self.rng.borrow_mut().gen::<usize>();
self.sessions.insert(id, msg.addr);
// auto join session to Main room
self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
// send id back
Self::reply(id)
}
}
impl ResponseType<Connect> for ChatServer {
/// Response type for Connect message
///
/// Chat server returns unique session id
type Item = usize;
type Error = ();
}
/// Handler for Disconnect message.
impl Handler<Disconnect> for ChatServer {
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Response<Self, Disconnect> {
println!("Someone disconnected");
let mut rooms: Vec<String> = Vec::new();
// remove address
if self.sessions.remove(&msg.id).is_some() {
// remove session from all rooms
for (name, sessions) in &mut self.rooms {
if sessions.remove(&msg.id) {
rooms.push(name.to_owned());
}
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", 0);
}
Self::empty()
}
}
impl ResponseType<Disconnect> for ChatServer {
type Item = ();
type Error = ();
}
/// Handler for Message message.
impl Handler<Message> for ChatServer {
fn handle(&mut self, msg: Message, _: &mut Context<Self>) -> Response<Self, Message> {
self.send_message(&msg.room, msg.msg.as_str(), msg.id);
Self::empty()
}
}
impl ResponseType<Message> for ChatServer {
type Item = ();
type Error = ();
}
/// Handler for `ListRooms` message.
impl Handler<ListRooms> for ChatServer {
fn handle(&mut self, _: ListRooms, _: &mut Context<Self>) -> Response<Self, ListRooms> {
let mut rooms = Vec::new();
for key in self.rooms.keys() {
rooms.push(key.to_owned())
}
Self::reply(rooms)
}
}
impl ResponseType<ListRooms> for ChatServer {
type Item = Vec<String>;
type Error = ();
}
/// Join room, send disconnect message to old room
/// send join message to new room
impl Handler<Join> for ChatServer {
fn handle(&mut self, msg: Join, _: &mut Context<Self>) -> Response<Self, Join> {
let Join {id, name} = msg;
let mut rooms = Vec::new();
// remove session from all rooms
for (n, sessions) in &mut self.rooms {
if sessions.remove(&id) {
rooms.push(n.to_owned());
}
}
// send message to other users
for room in rooms {
self.send_message(&room, "Someone disconnected", 0);
}
if self.rooms.get_mut(&name).is_none() {
self.rooms.insert(name.clone(), HashSet::new());
}
self.send_message(&name, "Someone connected", id);
self.rooms.get_mut(&name).unwrap().insert(id);
Self::empty()
}
}
impl ResponseType<Join> for ChatServer {
type Item = ();
type Error = ();
}

View File

@ -0,0 +1,231 @@
//! `ClientSession` is an actor, it manages peer tcp connection and
//! proxies commands from peer to `ChatServer`.
use std::{io, net};
use std::str::FromStr;
use std::time::{Instant, Duration};
use tokio_core::net::{TcpStream, TcpListener};
use actix::*;
use server::{self, ChatServer};
use codec::{ChatRequest, ChatResponse, ChatCodec};
/// Chat server sends this messages to session
pub struct Message(pub String);
/// `ChatSession` actor is responsible for tcp peer communitions.
pub struct ChatSession {
/// unique session id
id: usize,
/// this is address of chat server
addr: SyncAddress<ChatServer>,
/// Client must send ping at least once per 10 seconds, otherwise we drop connection.
hb: Instant,
/// joined room
room: String,
}
impl Actor for ChatSession {
/// For tcp communication we are going to use `FramedContext`.
/// It is convinient wrapper around `Framed` object from `tokio_io`
type Context = FramedContext<Self>;
}
/// To use `FramedContext` we have to define Io type and Codec
impl FramedActor for ChatSession {
type Io = TcpStream;
type Codec= ChatCodec;
}
/// Also `FramedContext` requires Actor which is able to handle stream
/// of `<Codec as Decoder>::Item` items.
impl StreamHandler<ChatRequest, io::Error> for ChatSession {
fn started(&mut self, ctx: &mut FramedContext<Self>) {
// we'll start heartbeat process on session start.
self.hb(ctx);
// register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves
// before processing any other events.
self.addr.call(self, server::Connect{addr: ctx.sync_subscriber()}).then(|res, act, ctx| {
match res {
Ok(Ok(res)) => act.id = res,
// something is wrong with chat server
_ => ctx.stop(),
}
fut::ok(())
}).wait(ctx);
}
fn finished(&mut self, ctx: &mut FramedContext<Self>) {
// notify chat server
self.addr.send(server::Disconnect{id: self.id});
ctx.stop()
}
}
impl ResponseType<ChatRequest> for ChatSession {
type Item = ();
type Error = ();
}
impl Handler<ChatRequest, io::Error> for ChatSession {
/// We'll stop chat session actor on any error, high likely it is just
/// termination of the tcp stream.
fn error(&mut self, _: io::Error, ctx: &mut FramedContext<Self>) {
ctx.stop()
}
/// This is main event loop for client requests
fn handle(&mut self, msg: ChatRequest, ctx: &mut FramedContext<Self>)
-> Response<Self, ChatRequest>
{
match msg {
ChatRequest::List => {
// Send ListRooms message to chat server and wait for response
println!("List rooms");
self.addr.call(self, server::ListRooms).then(|res, _, ctx| {
match res {
Ok(Ok(rooms)) => {
let _ = ctx.send(ChatResponse::Rooms(rooms));
},
_ => println!("Something is wrong"),
}
fut::ok(())
}).wait(ctx)
// .wait(ctx) pauses all events in context,
// so actor wont receive any new messages until it get list of rooms back
},
ChatRequest::Join(name) => {
println!("Join to room: {}", name);
self.room = name.clone();
self.addr.send(server::Join{id: self.id, name: name.clone()});
let _ = ctx.send(ChatResponse::Joined(name));
},
ChatRequest::Message(message) => {
// send message to chat server
println!("Peer message: {}", message);
self.addr.send(
server::Message{id: self.id,
msg: message, room:
self.room.clone()})
}
// we update heartbeat time on ping from peer
ChatRequest::Ping =>
self.hb = Instant::now(),
}
Self::empty()
}
}
/// Handler for Message, chat server sends this message, we just send string to peer
impl Handler<Message> for ChatSession {
fn handle(&mut self, msg: Message, ctx: &mut FramedContext<Self>)
-> Response<Self, Message>
{
// send message to peer
let _ = ctx.send(ChatResponse::Message(msg.0));
Self::empty()
}
}
impl ResponseType<Message> for ChatSession {
type Item = ();
type Error = ();
}
/// Helper methods
impl ChatSession {
pub fn new(addr: SyncAddress<ChatServer>) -> ChatSession {
ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned()}
}
/// helper method that sends ping to client every second.
///
/// also this method check heartbeats from client
fn hb(&self, ctx: &mut FramedContext<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > Duration::new(10, 0) {
// heartbeat timed out
println!("Client heartbeat failed, disconnecting!");
// notify chat server
act.addr.send(server::Disconnect{id: act.id});
// stop actor
ctx.stop();
}
if ctx.send(ChatResponse::Ping).is_ok() {
// if we can not send message to sink, sink is closed (disconnected)
act.hb(ctx);
}
});
}
}
/// Define tcp server that will accept incomint tcp connection and create
/// chat actors.
pub struct TcpServer {
chat: SyncAddress<ChatServer>,
}
impl TcpServer {
pub fn new(s: &str, chat: SyncAddress<ChatServer>) {
// Create server listener
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap();
// Our chat server `Server` is an actor, first we need to start it
// and then add stream on incoming tcp connections to it.
// TcpListener::incoming() returns stream of the (TcpStream, net::SocketAddr) items
// So to be able to handle this events `Server` actor has to implement
// stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>`
let _: () = TcpServer::create(|ctx| {
ctx.add_stream(listener.incoming());
TcpServer{chat: chat}
});
}
}
/// Make actor from `Server`
impl Actor for TcpServer {
/// Every actor has to provide execution `Context` in which it can run.
type Context = Context<Self>;
}
/// Handle stream of TcpStream's
impl StreamHandler<(TcpStream, net::SocketAddr), io::Error> for TcpServer {}
impl ResponseType<(TcpStream, net::SocketAddr)> for TcpServer {
type Item = ();
type Error = ();
}
impl Handler<(TcpStream, net::SocketAddr), io::Error> for TcpServer {
fn handle(&mut self, msg: (TcpStream, net::SocketAddr), _: &mut Context<Self>)
-> Response<Self, (TcpStream, net::SocketAddr)>
{
// For each incoming connection we create `ChatSession` actor
// with out chat server address.
let server = self.chat.clone();
let _: () = ChatSession::new(server).framed(msg.0, ChatCodec);
// this is response for message, which is defined by `ResponseType` trait
// in this case we just return unit.
Self::empty()
}
}

View File

@ -0,0 +1,90 @@
<!DOCTYPE html>
<meta charset="utf-8" />
<html>
<head>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js">
</script>
<script language="javascript" type="text/javascript">
$(function() {
var conn = null;
function log(msg) {
var control = $('#log');
control.html(control.html() + msg + '<br/>');
control.scrollTop(control.scrollTop() + 1000);
}
function connect() {
disconnect();
var wsUri = (window.location.protocol=='https:'&&'wss://'||'ws://')+window.location.host + '/ws/';
conn = new WebSocket(wsUri);
log('Connecting...');
conn.onopen = function() {
log('Connected.');
update_ui();
};
conn.onmessage = function(e) {
log('Received: ' + e.data);
};
conn.onclose = function() {
log('Disconnected.');
conn = null;
update_ui();
};
}
function disconnect() {
if (conn != null) {
log('Disconnecting...');
conn.close();
conn = null;
update_ui();
}
}
function update_ui() {
var msg = '';
if (conn == null) {
$('#status').text('disconnected');
$('#connect').html('Connect');
} else {
$('#status').text('connected (' + conn.protocol + ')');
$('#connect').html('Disconnect');
}
}
$('#connect').click(function() {
if (conn == null) {
connect();
} else {
disconnect();
}
update_ui();
return false;
});
$('#send').click(function() {
var text = $('#text').val();
log('Sending: ' + text);
conn.send(text);
$('#text').val('').focus();
return false;
});
$('#text').keyup(function(e) {
if (e.keyCode === 13) {
$('#send').click();
return false;
}
});
});
</script>
</head>
<body>
<h3>Chat!</h3>
<div>
<button id="connect">Connect</button>&nbsp;|&nbsp;Status:
<span id="status">disconnected</span>
</div>
<div id="log"
style="width:20em;height:15em;overflow:auto;border:1px solid black">
</div>
<form id="chatform" onsubmit="return false;">
<input id="text" type="text" />
<input id="send" type="button" value="Send" />
</form>
</body>
</html>

View File

@ -2,11 +2,14 @@ use std;
use std::rc::Rc; use std::rc::Rc;
use std::collections::VecDeque; use std::collections::VecDeque;
use futures::{Async, Stream, Poll}; use futures::{Async, Stream, Poll};
use futures::sync::oneshot::Sender;
use bytes::Bytes; use bytes::Bytes;
use actix::{Actor, ActorState, ActorContext, AsyncContext}; use actix::{Actor, ActorState, ActorContext, AsyncContext,
Handler, Subscriber, ResponseType};
use actix::fut::ActorFuture; use actix::fut::ActorFuture;
use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle}; use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle,
Envelope, ToEnvelope, RemoteEnvelope};
use route::{Route, Frame}; use route::{Route, Frame};
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@ -118,6 +121,25 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
} }
} }
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
#[doc(hidden)]
pub fn subscriber<M: 'static>(&mut self) -> Box<Subscriber<M>>
where A: Handler<M>
{
Box::new(self.address.unsync_address())
}
#[doc(hidden)]
pub fn sync_subscriber<M: 'static + Send>(&mut self) -> Box<Subscriber<M> + Send>
where A: Handler<M>,
A::Item: Send,
A::Error: Send,
{
Box::new(self.address.sync_address())
}
}
#[doc(hidden)] #[doc(hidden)]
impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
{ {
@ -149,22 +171,25 @@ impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
} }
// check wait futures // check wait futures
if let Some(ref mut act) = self.act { if self.wait.poll(act, ctx) {
if let Ok(Async::NotReady) = self.wait.poll(act, ctx) {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
}
let mut prep_stop = false; let mut prep_stop = false;
loop { loop {
let mut not_ready = true; let mut not_ready = true;
if let Ok(Async::Ready(_)) = self.address.poll(act, ctx) { if self.address.poll(act, ctx) {
not_ready = false not_ready = false
} }
self.items.poll(act, ctx); self.items.poll(act, ctx);
// check wait futures
if self.wait.poll(act, ctx) {
return Ok(Async::NotReady)
}
// are we done // are we done
if !not_ready { if !not_ready {
continue continue
@ -213,3 +238,17 @@ impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
} }
} }
} }
type ToEnvelopeSender<A, M> =
Sender<Result<<A as ResponseType<M>>::Item, <A as ResponseType<M>>::Error>>;
impl<A, M> ToEnvelope<A, M> for HttpContext<A>
where M: Send + 'static,
A: Actor<Context=HttpContext<A>> + Route + Handler<M>,
<A as ResponseType<M>>::Item: Send, <A as ResponseType<M>>::Item: Send
{
fn pack(msg: M, tx: Option<ToEnvelopeSender<A, M>>) -> Envelope<A>
{
RemoteEnvelope::new(msg, tx).into()
}
}

View File

@ -77,7 +77,7 @@ impl StaticFiles {
let entry = entry.unwrap(); let entry = entry.unwrap();
// show file url as relative to static path // show file url as relative to static path
let file_url = format!( let file_url = format!(
"{}{}", self.prefix, "{}/{}", self.prefix,
entry.path().strip_prefix(&self.directory).unwrap().to_string_lossy()); entry.path().strip_prefix(&self.directory).unwrap().to_string_lossy());
// if file is a directory, add '/' to the end of the name // if file is a directory, add '/' to the end of the name

View File

@ -269,10 +269,10 @@ pub struct WsWriter;
impl WsWriter { impl WsWriter {
/// Send text frame /// Send text frame
pub fn text<A>(ctx: &mut HttpContext<A>, text: String) pub fn text<A>(ctx: &mut HttpContext<A>, text: &str)
where A: Actor<Context=HttpContext<A>> + Route where A: Actor<Context=HttpContext<A>> + Route
{ {
let mut frame = wsframe::Frame::message(Vec::from(text.as_str()), OpCode::Text, true); let mut frame = wsframe::Frame::message(Vec::from(text), OpCode::Text, true);
let mut buf = Vec::new(); let mut buf = Vec::new();
frame.format(&mut buf).unwrap(); frame.format(&mut buf).unwrap();