1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

update example

This commit is contained in:
Nikolay Kim 2018-01-28 01:04:58 -08:00
parent 7c7743c145
commit 55b2fb7f77
7 changed files with 68 additions and 44 deletions

View File

@ -60,7 +60,7 @@ script:
cd examples/template_tera && cargo check && cd ../.. cd examples/template_tera && cargo check && cd ../..
cd examples/diesel && cargo check && cd ../.. cd examples/diesel && cargo check && cd ../..
cd examples/tls && cargo check && cd ../.. cd examples/tls && cargo check && cd ../..
# cd examples/websocket-chat && cargo check && cd ../.. cd examples/websocket-chat && cargo check && cd ../..
cd examples/websocket && cargo check && cd ../.. cd examples/websocket && cargo check && cd ../..
fi fi
- | - |

View File

@ -25,5 +25,5 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
actix = "^0.4.2" actix = { git="https://github.com/actix/actix.git" }
actix-web = { path="../../" } actix-web = { path="../../" }

View File

@ -12,6 +12,7 @@ use std::{io, net, process, thread};
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use futures::Future; use futures::Future;
use tokio_io::AsyncRead;
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use actix::prelude::*; use actix::prelude::*;
@ -26,7 +27,9 @@ fn main() {
Arbiter::handle().spawn( Arbiter::handle().spawn(
TcpStream::connect(&addr, Arbiter::handle()) TcpStream::connect(&addr, Arbiter::handle())
.and_then(|stream| { .and_then(|stream| {
let addr: SyncAddress<_> = ChatClient.framed(stream, codec::ClientChatCodec); let addr: SyncAddress<_> = ChatClient::create_with(
stream.framed(codec::ClientChatCodec),
|_, framed| ChatClient{framed: framed});
// start console loop // start console loop
thread::spawn(move|| { thread::spawn(move|| {
@ -54,20 +57,22 @@ fn main() {
} }
struct ChatClient; struct ChatClient {
framed: FramedCell<TcpStream, codec::ClientChatCodec>,
}
#[derive(Message)] #[derive(Message)]
struct ClientCommand(String); struct ClientCommand(String);
impl Actor for ChatClient { impl Actor for ChatClient {
type Context = FramedContext<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut FramedContext<Self>) { fn started(&mut self, ctx: &mut Context<Self>) {
// start heartbeats otherwise server will disconnect after 10 seconds // start heartbeats otherwise server will disconnect after 10 seconds
self.hb(ctx) self.hb(ctx)
} }
fn stopping(&mut self, _: &mut FramedContext<Self>) -> bool { fn stopping(&mut self, _: &mut Context<Self>) -> bool {
println!("Disconnected"); println!("Disconnected");
// Stop application on disconnect // Stop application on disconnect
@ -78,11 +83,10 @@ impl Actor for ChatClient {
} }
impl ChatClient { impl ChatClient {
fn hb(&self, ctx: &mut FramedContext<Self>) { fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| { ctx.run_later(Duration::new(1, 0), |act, ctx| {
if ctx.send(codec::ChatRequest::Ping).is_ok() { act.framed.send(codec::ChatRequest::Ping);
act.hb(ctx); act.hb(ctx);
}
}); });
} }
} }
@ -91,7 +95,7 @@ impl ChatClient {
impl Handler<ClientCommand> for ChatClient { impl Handler<ClientCommand> for ChatClient {
type Result = (); type Result = ();
fn handle(&mut self, msg: ClientCommand, ctx: &mut FramedContext<Self>) { fn handle(&mut self, msg: ClientCommand, _: &mut Context<Self>) {
let m = msg.0.trim(); let m = msg.0.trim();
if m.is_empty() { if m.is_empty() {
return return
@ -102,11 +106,11 @@ impl Handler<ClientCommand> for ChatClient {
let v: Vec<&str> = m.splitn(2, ' ').collect(); let v: Vec<&str> = m.splitn(2, ' ').collect();
match v[0] { match v[0] {
"/list" => { "/list" => {
let _ = ctx.send(codec::ChatRequest::List); let _ = self.framed.send(codec::ChatRequest::List);
}, },
"/join" => { "/join" => {
if v.len() == 2 { if v.len() == 2 {
let _ = ctx.send(codec::ChatRequest::Join(v[1].to_owned())); let _ = self.framed.send(codec::ChatRequest::Join(v[1].to_owned()));
} else { } else {
println!("!!! room name is required"); println!("!!! room name is required");
} }
@ -114,18 +118,16 @@ impl Handler<ClientCommand> for ChatClient {
_ => println!("!!! unknown command"), _ => println!("!!! unknown command"),
} }
} else { } else {
let _ = ctx.send(codec::ChatRequest::Message(m.to_owned())); let _ = self.framed.send(codec::ChatRequest::Message(m.to_owned()));
} }
} }
} }
/// Server communication /// Server communication
impl FramedActor for ChatClient { impl FramedActor<TcpStream, codec::ClientChatCodec> for ChatClient {
type Io = TcpStream;
type Codec = codec::ClientChatCodec;
fn handle(&mut self, msg: io::Result<codec::ChatResponse>, ctx: &mut FramedContext<Self>) { fn handle(&mut self, msg: io::Result<codec::ChatResponse>, ctx: &mut Context<Self>) {
match msg { match msg {
Err(_) => ctx.stop(), Err(_) => ctx.stop(),
Ok(msg) => match msg { Ok(msg) => match msg {

View File

@ -62,9 +62,9 @@ impl Actor for WsChatSession {
// before processing any other events. // before processing any other events.
// HttpContext::state() is instance of WsChatSessionState, state is shared across all // HttpContext::state() is instance of WsChatSessionState, state is shared across all
// routes within application // routes within application
let subs = ctx.sync_subscriber(); let addr: SyncAddress<_> = ctx.address();
ctx.state().addr.call( ctx.state().addr.call(
self, server::Connect{addr: subs}).then( self, server::Connect{addr: addr.into_subscriber()}).then(
|res, act, ctx| { |res, act, ctx| {
match res { match res {
Ok(Ok(res)) => act.id = res, Ok(Ok(res)) => act.id = res,

View File

@ -4,6 +4,7 @@ use std::{io, net};
use std::str::FromStr; use std::str::FromStr;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use futures::Stream; use futures::Stream;
use tokio_io::AsyncRead;
use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::net::{TcpStream, TcpListener};
use actix::prelude::*; use actix::prelude::*;
@ -26,12 +27,14 @@ pub struct ChatSession {
hb: Instant, hb: Instant,
/// joined room /// joined room
room: String, room: String,
/// Framed wrapper
framed: FramedCell<TcpStream, ChatCodec>,
} }
impl Actor for ChatSession { impl Actor for ChatSession {
/// For tcp communication we are going to use `FramedContext`. /// For tcp communication we are going to use `FramedContext`.
/// It is convenient wrapper around `Framed` object from `tokio_io` /// It is convenient wrapper around `Framed` object from `tokio_io`
type Context = FramedContext<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
// we'll start heartbeat process on session start. // we'll start heartbeat process on session start.
@ -41,7 +44,7 @@ impl Actor for ChatSession {
// future within context, but context waits until this future resolves // future within context, but context waits until this future resolves
// before processing any other events. // before processing any other events.
let addr: SyncAddress<_> = ctx.address(); let addr: SyncAddress<_> = ctx.address();
self.addr.call(self, server::Connect{addr: addr.subscriber()}) self.addr.call(self, server::Connect{addr: addr.into_subscriber()})
.then(|res, act, ctx| { .then(|res, act, ctx| {
match res { match res {
Ok(Ok(res)) => act.id = res, Ok(Ok(res)) => act.id = res,
@ -59,23 +62,21 @@ impl Actor for ChatSession {
} }
} }
/// To use `FramedContext` we have to define Io type and Codec /// To use `Framed` we have to define Io type and Codec
impl FramedActor for ChatSession { impl FramedActor<TcpStream, ChatCodec> for ChatSession {
type Io = TcpStream;
type Codec= ChatCodec;
/// This is main event loop for client requests /// This is main event loop for client requests
fn handle(&mut self, msg: io::Result<ChatRequest>, ctx: &mut FramedContext<Self>) { fn handle(&mut self, msg: io::Result<ChatRequest>, ctx: &mut Context<Self>) {
match msg { match msg {
Err(_) => ctx.stop(), Err(_) => ctx.stop(),
Ok(msg) => match msg { Ok(msg) => match msg {
ChatRequest::List => { ChatRequest::List => {
// Send ListRooms message to chat server and wait for response // Send ListRooms message to chat server and wait for response
println!("List rooms"); println!("List rooms");
self.addr.call(self, server::ListRooms).then(|res, _, ctx| { self.addr.call(self, server::ListRooms).then(|res, act, ctx| {
match res { match res {
Ok(Ok(rooms)) => { Ok(Ok(rooms)) => {
let _ = ctx.send(ChatResponse::Rooms(rooms)); let _ = act.framed.send(ChatResponse::Rooms(rooms));
}, },
_ => println!("Something is wrong"), _ => println!("Something is wrong"),
} }
@ -88,7 +89,7 @@ impl FramedActor for ChatSession {
println!("Join to room: {}", name); println!("Join to room: {}", name);
self.room = name.clone(); self.room = name.clone();
self.addr.send(server::Join{id: self.id, name: name.clone()}); self.addr.send(server::Join{id: self.id, name: name.clone()});
let _ = ctx.send(ChatResponse::Joined(name)); let _ = self.framed.send(ChatResponse::Joined(name));
}, },
ChatRequest::Message(message) => { ChatRequest::Message(message) => {
// send message to chat server // send message to chat server
@ -110,23 +111,25 @@ impl FramedActor for ChatSession {
impl Handler<Message> for ChatSession { impl Handler<Message> for ChatSession {
type Result = (); type Result = ();
fn handle(&mut self, msg: Message, ctx: &mut FramedContext<Self>) { fn handle(&mut self, msg: Message, ctx: &mut Context<Self>) {
// send message to peer // send message to peer
let _ = ctx.send(ChatResponse::Message(msg.0)); let _ = self.framed.send(ChatResponse::Message(msg.0));
} }
} }
/// Helper methods /// Helper methods
impl ChatSession { impl ChatSession {
pub fn new(addr: SyncAddress<ChatServer>) -> ChatSession { pub fn new(addr: SyncAddress<ChatServer>,
ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned()} framed: FramedCell<TcpStream, ChatCodec>) -> ChatSession {
ChatSession {id: 0, addr: addr, hb: Instant::now(),
room: "Main".to_owned(), framed: framed}
} }
/// helper method that sends ping to client every second. /// helper method that sends ping to client every second.
/// ///
/// also this method check heartbeats from client /// also this method check heartbeats from client
fn hb(&self, ctx: &mut FramedContext<Self>) { fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| { ctx.run_later(Duration::new(1, 0), |act, ctx| {
// check client heartbeats // check client heartbeats
if Instant::now().duration_since(act.hb) > Duration::new(10, 0) { if Instant::now().duration_since(act.hb) > Duration::new(10, 0) {
@ -140,10 +143,9 @@ impl ChatSession {
ctx.stop(); ctx.stop();
} }
if ctx.send(ChatResponse::Ping).is_ok() { act.framed.send(ChatResponse::Ping);
// if we can not send message to sink, sink is closed (disconnected) // if we can not send message to sink, sink is closed (disconnected)
act.hb(ctx); act.hb(ctx);
}
}); });
} }
} }
@ -192,6 +194,8 @@ impl Handler<TcpConnect> for TcpServer {
// For each incoming connection we create `ChatSession` actor // For each incoming connection we create `ChatSession` actor
// with out chat server address. // with out chat server address.
let server = self.chat.clone(); let server = self.chat.clone();
let _: () = ChatSession::new(server).framed(msg.0, ChatCodec); let _: () = ChatSession::create_with(msg.0.framed(ChatCodec), |_, framed| {
ChatSession::new(server, framed)
});
} }
} }

View File

@ -15,6 +15,6 @@ path = "src/client.rs"
env_logger = "*" env_logger = "*"
futures = "0.1" futures = "0.1"
tokio-core = "0.1" tokio-core = "0.1"
#actix = "^0.4.2" #actix = "^0.4.6"
actix = { git = "https://github.com/actix/actix.git" } actix = { git = "https://github.com/actix/actix.git" }
actix-web = { path="../../" } actix-web = { path="../../" }

View File

@ -477,8 +477,10 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
// start server // start server
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: SyncAddress<_> = HttpServer::create(move |ctx| { let addr: SyncAddress<_> = HttpServer::create(move |ctx| {
ctx.add_stream(stream.map( ctx.add_message_stream(
move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); stream
.map_err(|_| ())
.map(move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false}));
self self
}); });
signals.map(|signals| signals.send( signals.map(|signals| signals.send(
@ -542,6 +544,22 @@ impl<T, A, H, U, V> Handler<io::Result<Conn<T>>> for HttpServer<T, A, H, U>
} }
} }
impl<T, A, H, U, V> Handler<Conn<T>> for HttpServer<T, A, H, U>
where T: IoStream,
H: HttpHandler + 'static,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
A: 'static,
{
type Result = ();
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::handle().spawn(
HttpChannel::new(
Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2));
}
}
impl<T, A, H, U, V> Handler<PauseServer> for HttpServer<T, A, H, U> impl<T, A, H, U, V> Handler<PauseServer> for HttpServer<T, A, H, U>
where T: IoStream, where T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,