1
0
mirror of https://github.com/actix/examples synced 2024-12-12 14:13:11 +01:00

convert more examples

This commit is contained in:
Nikolay Kim 2019-12-16 13:09:54 +06:00
parent ca3f11b59e
commit 31b5b7aa49
10 changed files with 145 additions and 167 deletions

View File

@ -35,9 +35,9 @@ members = [
"todo", "todo",
# "udp-echo", # "udp-echo",
"unix-socket", "unix-socket",
# "web-cors/backend", "web-cors/backend",
"websocket", "websocket",
"websocket-chat", "websocket-chat",
# "websocket-chat-broker", # "websocket-chat-broker",
# "websocket-tcp-chat", "websocket-tcp-chat",
] ]

View File

@ -1,16 +1,16 @@
[package] [package]
name = "actix-web-cors" name = "actix-web-cors"
version = "0.1.0" version = "1.0.0"
authors = ["krircc <krircc@aliyun.com>"] authors = ["krircc <krircc@aliyun.com>"]
workspace = "../../"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
actix-web = "1.0.0" actix-rt = "1.0.0"
actix-cors = "0.1.0" actix-web = "2.0.0-alpha.6"
actix-cors = "0.2.0-alpha.3"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
dotenv = "0.10" dotenv = "0.10"
env_logger = "0.6" env_logger = "0.6"
futures = "0.1" futures = "0.3"

View File

@ -6,7 +6,8 @@ use actix_web::{http::header, middleware::Logger, web, App, HttpServer};
mod user; mod user;
fn main() -> std::io::Result<()> { #[actix_rt::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "actix_web=info"); std::env::set_var("RUST_LOG", "actix_web=info");
env_logger::init(); env_logger::init();
@ -18,11 +19,13 @@ fn main() -> std::io::Result<()> {
.allowed_methods(vec!["GET", "POST"]) .allowed_methods(vec!["GET", "POST"])
.allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT]) .allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT])
.allowed_header(header::CONTENT_TYPE) .allowed_header(header::CONTENT_TYPE)
.max_age(3600), .max_age(3600)
.finish(),
) )
.wrap(Logger::default()) .wrap(Logger::default())
.service(web::resource("/user/info").route(web::post().to(user::info))) .service(web::resource("/user/info").route(web::post().to(user::info)))
}) })
.bind("127.0.0.1:8000")? .bind("127.0.0.1:8000")?
.run() .start()
.await
} }

View File

@ -8,7 +8,7 @@ pub struct Info {
confirm_password: String, confirm_password: String,
} }
pub fn info(info: web::Json<Info>) -> web::Json<Info> { pub async fn info(info: web::Json<Info>) -> web::Json<Info> {
println!("=========={:?}=========", info); println!("=========={:?}=========", info);
web::Json(Info { web::Json(Info {
username: info.username.clone(), username: info.username.clone(),

View File

@ -1,8 +1,7 @@
[package] [package]
name = "websocket-tcp-example" name = "websocket-tcp-example"
version = "0.1.0" version = "2.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = ".."
edition = "2018" edition = "2018"
[[bin]] [[bin]]
@ -14,19 +13,20 @@ name = "websocket-tcp-client"
path = "src/client.rs" path = "src/client.rs"
[dependencies] [dependencies]
actix = "0.8.2" actix = "0.9.0-alpha.2"
actix-web = "1.0.0" actix-web = "2.0.0-alpha.6"
actix-web-actors = "1.0.0" actix-web-actors = "2.0.0-alpha.1"
actix-files = "0.1.1" actix-files = "0.2.0-alpha.3"
actix-rt = "1.0.0"
actix-codec = "0.2.0"
rand = "0.6" rand = "0.7"
bytes = "0.4" bytes = "0.5.3"
byteorder = "1.1" byteorder = "1.2"
futures = "0.1" futures = "0.3"
tokio-io = "0.1"
tokio-tcp = "0.1"
tokio-codec = "0.1"
env_logger = "0.6" env_logger = "0.6"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
tokio = "0.2.4"
tokio-util = "0.2.0"

View File

@ -1,48 +1,30 @@
#[macro_use] #[macro_use]
extern crate actix;
extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate serde;
extern crate serde_json;
extern crate tokio_codec;
extern crate tokio_io;
extern crate tokio_tcp;
#[macro_use]
extern crate serde_derive; extern crate serde_derive;
use actix::prelude::*; use actix::prelude::*;
use futures::Future;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use std::{io, net, process, thread}; use std::{io, net, thread};
use tokio_codec::FramedRead; use tokio::io::{split, WriteHalf};
use tokio_io::io::WriteHalf; use tokio::net::TcpStream;
use tokio_io::AsyncRead; use tokio_util::codec::FramedRead;
use tokio_tcp::TcpStream;
mod codec; mod codec;
fn main() { #[actix_rt::main]
let sys = actix::System::new("chat-client"); async fn main() {
// Connect to server // Connect to server
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
Arbiter::spawn(
TcpStream::connect(&addr) println!("Running chat client");
.and_then(|stream| {
let stream = TcpStream::connect(&addr).await.unwrap();
let addr = ChatClient::create(|ctx| { let addr = ChatClient::create(|ctx| {
let (r, w) = stream.split(); let (r, w) = split(stream);
ChatClient::add_stream( ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx);
FramedRead::new(r, codec::ClientChatCodec),
ctx,
);
ChatClient { ChatClient {
framed: actix::io::FramedWrite::new( framed: actix::io::FramedWrite::new(w, codec::ClientChatCodec, ctx),
w,
codec::ClientChatCodec,
ctx,
),
} }
}); });
@ -56,17 +38,6 @@ fn main() {
addr.do_send(ClientCommand(cmd)); addr.do_send(ClientCommand(cmd));
}); });
futures::future::ok(())
})
.map_err(|e| {
println!("Can not connect to server: {}", e);
process::exit(1)
}),
);
println!("Running chat client");
sys.run();
} }
struct ChatClient { struct ChatClient {
@ -74,6 +45,7 @@ struct ChatClient {
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
struct ClientCommand(String); struct ClientCommand(String);
impl Actor for ChatClient { impl Actor for ChatClient {
@ -140,23 +112,27 @@ impl Handler<ClientCommand> for ChatClient {
/// Server communication /// Server communication
impl StreamHandler<codec::ChatResponse, io::Error> for ChatClient { impl StreamHandler<Result<codec::ChatResponse, io::Error>> for ChatClient {
fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context<Self>) { fn handle(
&mut self,
msg: Result<codec::ChatResponse, io::Error>,
ctx: &mut Context<Self>,
) {
match msg { match msg {
codec::ChatResponse::Message(ref msg) => { Ok(codec::ChatResponse::Message(ref msg)) => {
println!("message: {}", msg); println!("message: {}", msg);
} }
codec::ChatResponse::Joined(ref msg) => { Ok(codec::ChatResponse::Joined(ref msg)) => {
println!("!!! joined: {}", msg); println!("!!! joined: {}", msg);
} }
codec::ChatResponse::Rooms(rooms) => { Ok(codec::ChatResponse::Rooms(rooms)) => {
println!("\n!!! Available rooms:"); println!("\n!!! Available rooms:");
for room in rooms { for room in rooms {
println!("{}", room); println!("{}", room);
} }
println!(); println!();
} }
_ => (), _ => ctx.stop(),
} }
} }
} }

View File

@ -1,12 +1,15 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::io;
use actix::prelude::*;
use actix_codec::{Decoder, Encoder};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use serde_json as json; use serde_json as json;
use std::io;
use tokio_io::codec::{Decoder, Encoder};
/// Client request /// Client request
#[derive(Serialize, Deserialize, Debug, Message)] #[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
#[serde(tag = "cmd", content = "data")] #[serde(tag = "cmd", content = "data")]
pub enum ChatRequest { pub enum ChatRequest {
/// List rooms /// List rooms
@ -21,6 +24,7 @@ pub enum ChatRequest {
/// Server response /// Server response
#[derive(Serialize, Deserialize, Debug, Message)] #[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "()")]
#[serde(tag = "cmd", content = "data")] #[serde(tag = "cmd", content = "data")]
pub enum ChatResponse { pub enum ChatResponse {
Ping, Ping,
@ -51,7 +55,7 @@ impl Decoder for ChatCodec {
}; };
if src.len() >= size + 2 { if src.len() >= size + 2 {
src.split_to(2); let _ = src.split_to(2);
let buf = src.split_to(size); let buf = src.split_to(size);
Ok(Some(json::from_slice::<ChatRequest>(&buf)?)) Ok(Some(json::from_slice::<ChatRequest>(&buf)?))
} else { } else {
@ -73,7 +77,7 @@ impl Encoder for ChatCodec {
let msg_ref: &[u8] = msg.as_ref(); let msg_ref: &[u8] = msg.as_ref();
dst.reserve(msg_ref.len() + 2); dst.reserve(msg_ref.len() + 2);
dst.put_u16_be(msg_ref.len() as u16); dst.put_u16(msg_ref.len() as u16);
dst.put(msg_ref); dst.put(msg_ref);
Ok(()) Ok(())
@ -96,7 +100,7 @@ impl Decoder for ClientChatCodec {
}; };
if src.len() >= size + 2 { if src.len() >= size + 2 {
src.split_to(2); let _ = src.split_to(2);
let buf = src.split_to(size); let buf = src.split_to(size);
Ok(Some(json::from_slice::<ChatResponse>(&buf)?)) Ok(Some(json::from_slice::<ChatResponse>(&buf)?))
} else { } else {
@ -118,7 +122,7 @@ impl Encoder for ClientChatCodec {
let msg_ref: &[u8] = msg.as_ref(); let msg_ref: &[u8] = msg.as_ref();
dst.reserve(msg_ref.len() + 2); dst.reserve(msg_ref.len() + 2);
dst.put_u16_be(msg_ref.len() as u16); dst.put_u16(msg_ref.len() as u16);
dst.put(msg_ref); dst.put(msg_ref);
Ok(()) Ok(())

View File

@ -1,7 +1,5 @@
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
#[macro_use]
extern crate actix;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -20,7 +18,7 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
/// Entry point for our route /// Entry point for our route
fn chat_route( async fn chat_route(
req: HttpRequest, req: HttpRequest,
stream: web::Payload, stream: web::Payload,
srv: web::Data<Addr<server::ChatServer>>, srv: web::Data<Addr<server::ChatServer>>,
@ -78,7 +76,7 @@ impl Actor for WsChatSession {
// something is wrong with chat server // something is wrong with chat server
_ => ctx.stop(), _ => ctx.stop(),
} }
fut::ok(()) fut::ready(())
}) })
.wait(ctx); .wait(ctx);
} }
@ -100,8 +98,20 @@ impl Handler<session::Message> for WsChatSession {
} }
/// WebSocket message handler /// WebSocket message handler
impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession { impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
println!("WEBSOCKET MESSAGE: {:?}", msg); println!("WEBSOCKET MESSAGE: {:?}", msg);
match msg { match msg {
ws::Message::Ping(msg) => { ws::Message::Ping(msg) => {
@ -133,7 +143,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
} }
_ => println!("Something is wrong"), _ => println!("Something is wrong"),
} }
fut::ok(()) fut::ready(())
}) })
.wait(ctx) .wait(ctx)
// .wait(ctx) pauses all events in context, // .wait(ctx) pauses all events in context,
@ -180,7 +190,7 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
ws::Message::Close(_) => { ws::Message::Close(_) => {
ctx.stop(); ctx.stop();
} }
ws::Message::Nop => (), _ => (),
} }
} }
} }
@ -206,14 +216,14 @@ impl WsChatSession {
return; return;
} }
ctx.ping(""); ctx.ping(b"");
}); });
} }
} }
fn main() -> std::io::Result<()> { #[actix_rt::main]
async fn main() -> std::io::Result<()> {
env_logger::init(); env_logger::init();
let sys = actix::System::new("websocket-example");
// Start chat server actor // Start chat server actor
let server = server::ChatServer::default().start(); let server = server::ChatServer::default().start();
@ -221,10 +231,12 @@ fn main() -> std::io::Result<()> {
// Start tcp server in separate thread // Start tcp server in separate thread
let srv = server.clone(); let srv = server.clone();
Arbiter::new().exec(move || { Arbiter::new().exec(move || {
session::TcpServer::new("127.0.0.1:12345", srv); session::tcp_server("127.0.0.1:12345", srv);
Ok::<_, ()>(()) Ok::<_, ()>(())
}); });
println!("Started http server: 127.0.0.1:8080");
// Create Http server with websocket support // Create Http server with websocket support
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
@ -241,8 +253,6 @@ fn main() -> std::io::Result<()> {
.service(fs::Files::new("/static/", "static/")) .service(fs::Files::new("/static/", "static/"))
}) })
.bind("127.0.0.1:8080")? .bind("127.0.0.1:8080")?
.start(); .start()
.await
println!("Started http server: 127.0.0.1:8080");
sys.run()
} }

View File

@ -19,12 +19,14 @@ pub struct Connect {
/// Session is disconnected /// Session is disconnected
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Disconnect { pub struct Disconnect {
pub id: usize, pub id: usize,
} }
/// Send message to specific room /// Send message to specific room
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Message { pub struct Message {
/// Id of the client session /// Id of the client session
pub id: usize, pub id: usize,
@ -43,6 +45,7 @@ impl actix::Message for ListRooms {
/// Join room, if room does not exists create new one. /// Join room, if room does not exists create new one.
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Join { pub struct Join {
/// Client id /// Client id
pub id: usize, pub id: usize,

View File

@ -1,13 +1,13 @@
//! `ClientSession` is an actor, it manages peer tcp connection and //! `ClientSession` is an actor, it manages peer tcp connection and
//! proxies commands from peer to `ChatServer`. //! proxies commands from peer to `ChatServer`.
use futures::Stream;
use std::str::FromStr; use std::str::FromStr;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{io, net}; use std::{io, net};
use tokio_codec::FramedRead;
use tokio_io::io::WriteHalf; use futures::StreamExt;
use tokio_io::AsyncRead; use tokio::io::{split, WriteHalf};
use tokio_tcp::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::FramedRead;
use actix::prelude::*; use actix::prelude::*;
@ -16,6 +16,7 @@ use crate::server::{self, ChatServer};
/// Chat server sends this messages to session /// Chat server sends this messages to session
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")]
pub struct Message(pub String); pub struct Message(pub String);
/// `ChatSession` actor is responsible for tcp peer communications. /// `ChatSession` actor is responsible for tcp peer communications.
@ -57,7 +58,7 @@ impl Actor for ChatSession {
// something is wrong with chat server // something is wrong with chat server
_ => ctx.stop(), _ => ctx.stop(),
} }
actix::fut::ok(()) actix::fut::ready(())
}) })
.wait(ctx); .wait(ctx);
} }
@ -72,11 +73,11 @@ impl Actor for ChatSession {
impl actix::io::WriteHandler<io::Error> for ChatSession {} impl actix::io::WriteHandler<io::Error> for ChatSession {}
/// To use `Framed` we have to define Io type and Codec /// To use `Framed` we have to define Io type and Codec
impl StreamHandler<ChatRequest, io::Error> for ChatSession { impl StreamHandler<Result<ChatRequest, io::Error>> for ChatSession {
/// This is main event loop for client requests /// This is main event loop for client requests
fn handle(&mut self, msg: ChatRequest, ctx: &mut Context<Self>) { fn handle(&mut self, msg: Result<ChatRequest, io::Error>, ctx: &mut Context<Self>) {
match msg { match msg {
ChatRequest::List => { Ok(ChatRequest::List) => {
// Send ListRooms message to chat server and wait for response // Send ListRooms message to chat server and wait for response
println!("List rooms"); println!("List rooms");
self.addr self.addr
@ -89,13 +90,13 @@ impl StreamHandler<ChatRequest, io::Error> for ChatSession {
} }
_ => println!("Something is wrong"), _ => println!("Something is wrong"),
} }
actix::fut::ok(()) actix::fut::ready(())
}) })
.wait(ctx) .wait(ctx)
// .wait(ctx) pauses all events in context, // .wait(ctx) pauses all events in context,
// so actor wont receive any new messages until it get list of rooms back // so actor wont receive any new messages until it get list of rooms back
} }
ChatRequest::Join(name) => { Ok(ChatRequest::Join(name)) => {
println!("Join to room: {}", name); println!("Join to room: {}", name);
self.room = name.clone(); self.room = name.clone();
self.addr.do_send(server::Join { self.addr.do_send(server::Join {
@ -104,7 +105,7 @@ impl StreamHandler<ChatRequest, io::Error> for ChatSession {
}); });
self.framed.write(ChatResponse::Joined(name)); self.framed.write(ChatResponse::Joined(name));
} }
ChatRequest::Message(message) => { Ok(ChatRequest::Message(message)) => {
// send message to chat server // send message to chat server
println!("Peer message: {}", message); println!("Peer message: {}", message);
self.addr.do_send(server::Message { self.addr.do_send(server::Message {
@ -114,7 +115,8 @@ impl StreamHandler<ChatRequest, io::Error> for ChatSession {
}) })
} }
// we update heartbeat time on ping from peer // we update heartbeat time on ping from peer
ChatRequest::Ping => self.hb = Instant::now(), Ok(ChatRequest::Ping) => self.hb = Instant::now(),
_ => ctx.stop(),
} }
} }
} }
@ -170,50 +172,30 @@ impl ChatSession {
/// Define tcp server that will accept incoming tcp connection and create /// Define tcp server that will accept incoming tcp connection and create
/// chat actors. /// chat actors.
pub struct TcpServer { pub fn tcp_server(_s: &str, server: Addr<ChatServer>) {
chat: Addr<ChatServer>,
}
impl TcpServer {
pub fn new(_s: &str, chat: Addr<ChatServer>) {
// Create server listener // Create server listener
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
let listener = TcpListener::bind(&addr).unwrap();
// Our chat server `Server` is an actor, first we need to start it actix_rt::spawn(async move {
// and then add stream on incoming tcp connections to it. let server = server.clone();
// TcpListener::incoming() returns stream of the (TcpStream, net::SocketAddr) let mut listener = TcpListener::bind(&addr).await.unwrap();
// items So to be able to handle this events `Server` actor has to let mut incoming = listener.incoming();
// implement stream handler `StreamHandler<(TcpStream,
// net::SocketAddr), io::Error>`
TcpServer::create(|ctx| {
ctx.add_message_stream(listener.incoming().map_err(|_| ()).map(TcpConnect));
TcpServer { chat }
});
}
}
/// Make actor from `Server` while let Some(stream) = incoming.next().await {
impl Actor for TcpServer { match stream {
/// Every actor has to provide execution `Context` in which it can run. Ok(stream) => {
type Context = Context<Self>; let server = server.clone();
}
#[derive(Message)]
struct TcpConnect(TcpStream);
/// Handle stream of TcpStream's
impl Handler<TcpConnect> for TcpServer {
type Result = ();
fn handle(&mut self, msg: TcpConnect, _: &mut Context<Self>) {
// For each incoming connection we create `ChatSession` actor
// with out chat server address.
let server = self.chat.clone();
ChatSession::create(|ctx| { ChatSession::create(|ctx| {
let (r, w) = msg.0.split(); let (r, w) = split(stream);
ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx);
ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx)) ChatSession::new(
server,
actix::io::FramedWrite::new(w, ChatCodec, ctx),
)
}); });
} }
Err(_) => return,
}
}
});
} }