From 31b5b7aa49752c43655a89e43ed5f4eda20d5949 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 16 Dec 2019 13:09:54 +0600 Subject: [PATCH] convert more examples --- Cargo.toml | 4 +- web-cors/backend/Cargo.toml | 10 +-- web-cors/backend/src/main.rs | 9 ++- web-cors/backend/src/user.rs | 2 +- websocket-tcp-chat/Cargo.toml | 26 ++++---- websocket-tcp-chat/src/client.rs | 100 ++++++++++++------------------ websocket-tcp-chat/src/codec.rs | 16 +++-- websocket-tcp-chat/src/main.rs | 42 ++++++++----- websocket-tcp-chat/src/server.rs | 3 + websocket-tcp-chat/src/session.rs | 100 ++++++++++++------------------ 10 files changed, 145 insertions(+), 167 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 37e970b8..cf660626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,9 @@ members = [ "todo", # "udp-echo", "unix-socket", -# "web-cors/backend", + "web-cors/backend", "websocket", "websocket-chat", # "websocket-chat-broker", -# "websocket-tcp-chat", + "websocket-tcp-chat", ] diff --git a/web-cors/backend/Cargo.toml b/web-cors/backend/Cargo.toml index 6800e5d6..d9bfb5d4 100644 --- a/web-cors/backend/Cargo.toml +++ b/web-cors/backend/Cargo.toml @@ -1,16 +1,16 @@ [package] name = "actix-web-cors" -version = "0.1.0" +version = "1.0.0" authors = ["krircc "] -workspace = "../../" edition = "2018" [dependencies] -actix-web = "1.0.0" -actix-cors = "0.1.0" +actix-rt = "1.0.0" +actix-web = "2.0.0-alpha.6" +actix-cors = "0.2.0-alpha.3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" dotenv = "0.10" env_logger = "0.6" -futures = "0.1" +futures = "0.3" diff --git a/web-cors/backend/src/main.rs b/web-cors/backend/src/main.rs index ed7b71b6..77cb8353 100644 --- a/web-cors/backend/src/main.rs +++ b/web-cors/backend/src/main.rs @@ -6,7 +6,8 @@ use actix_web::{http::header, middleware::Logger, web, App, HttpServer}; mod user; -fn main() -> std::io::Result<()> { +#[actix_rt::main] +async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); @@ -18,11 +19,13 @@ fn main() -> std::io::Result<()> { .allowed_methods(vec!["GET", "POST"]) .allowed_headers(vec![header::AUTHORIZATION, header::ACCEPT]) .allowed_header(header::CONTENT_TYPE) - .max_age(3600), + .max_age(3600) + .finish(), ) .wrap(Logger::default()) .service(web::resource("/user/info").route(web::post().to(user::info))) }) .bind("127.0.0.1:8000")? - .run() + .start() + .await } diff --git a/web-cors/backend/src/user.rs b/web-cors/backend/src/user.rs index ed795589..ee7dd783 100644 --- a/web-cors/backend/src/user.rs +++ b/web-cors/backend/src/user.rs @@ -8,7 +8,7 @@ pub struct Info { confirm_password: String, } -pub fn info(info: web::Json) -> web::Json { +pub async fn info(info: web::Json) -> web::Json { println!("=========={:?}=========", info); web::Json(Info { username: info.username.clone(), diff --git a/websocket-tcp-chat/Cargo.toml b/websocket-tcp-chat/Cargo.toml index dd100b89..bdde1149 100644 --- a/websocket-tcp-chat/Cargo.toml +++ b/websocket-tcp-chat/Cargo.toml @@ -1,8 +1,7 @@ [package] name = "websocket-tcp-example" -version = "0.1.0" +version = "2.0.0" authors = ["Nikolay Kim "] -workspace = ".." edition = "2018" [[bin]] @@ -14,19 +13,20 @@ name = "websocket-tcp-client" path = "src/client.rs" [dependencies] -actix = "0.8.2" -actix-web = "1.0.0" -actix-web-actors = "1.0.0" -actix-files = "0.1.1" +actix = "0.9.0-alpha.2" +actix-web = "2.0.0-alpha.6" +actix-web-actors = "2.0.0-alpha.1" +actix-files = "0.2.0-alpha.3" +actix-rt = "1.0.0" +actix-codec = "0.2.0" -rand = "0.6" -bytes = "0.4" -byteorder = "1.1" -futures = "0.1" -tokio-io = "0.1" -tokio-tcp = "0.1" -tokio-codec = "0.1" +rand = "0.7" +bytes = "0.5.3" +byteorder = "1.2" +futures = "0.3" env_logger = "0.6" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" +tokio = "0.2.4" +tokio-util = "0.2.0" \ No newline at end of file diff --git a/websocket-tcp-chat/src/client.rs b/websocket-tcp-chat/src/client.rs index 1694c870..61edaca1 100644 --- a/websocket-tcp-chat/src/client.rs +++ b/websocket-tcp-chat/src/client.rs @@ -1,72 +1,43 @@ #[macro_use] -extern crate actix; -extern crate byteorder; -extern crate bytes; -extern crate futures; -extern crate serde; -extern crate serde_json; -extern crate tokio_codec; -extern crate tokio_io; -extern crate tokio_tcp; -#[macro_use] extern crate serde_derive; use actix::prelude::*; -use futures::Future; use std::str::FromStr; use std::time::Duration; -use std::{io, net, process, thread}; -use tokio_codec::FramedRead; -use tokio_io::io::WriteHalf; -use tokio_io::AsyncRead; -use tokio_tcp::TcpStream; +use std::{io, net, thread}; +use tokio::io::{split, WriteHalf}; +use tokio::net::TcpStream; +use tokio_util::codec::FramedRead; mod codec; -fn main() { - let sys = actix::System::new("chat-client"); - +#[actix_rt::main] +async fn main() { // Connect to server let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); - Arbiter::spawn( - TcpStream::connect(&addr) - .and_then(|stream| { - let addr = ChatClient::create(|ctx| { - let (r, w) = stream.split(); - ChatClient::add_stream( - FramedRead::new(r, codec::ClientChatCodec), - ctx, - ); - ChatClient { - framed: actix::io::FramedWrite::new( - w, - codec::ClientChatCodec, - ctx, - ), - } - }); - - // start console loop - thread::spawn(move || loop { - let mut cmd = String::new(); - if io::stdin().read_line(&mut cmd).is_err() { - println!("error"); - return; - } - - addr.do_send(ClientCommand(cmd)); - }); - - futures::future::ok(()) - }) - .map_err(|e| { - println!("Can not connect to server: {}", e); - process::exit(1) - }), - ); println!("Running chat client"); - sys.run(); + + let stream = TcpStream::connect(&addr).await.unwrap(); + + let addr = ChatClient::create(|ctx| { + let (r, w) = split(stream); + ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); + ChatClient { + framed: actix::io::FramedWrite::new(w, codec::ClientChatCodec, ctx), + } + }); + + // start console loop + thread::spawn(move || loop { + let mut cmd = String::new(); + if io::stdin().read_line(&mut cmd).is_err() { + println!("error"); + return; + } + + addr.do_send(ClientCommand(cmd)); + }); } struct ChatClient { @@ -74,6 +45,7 @@ struct ChatClient { } #[derive(Message)] +#[rtype(result = "()")] struct ClientCommand(String); impl Actor for ChatClient { @@ -140,23 +112,27 @@ impl Handler for ChatClient { /// Server communication -impl StreamHandler for ChatClient { - fn handle(&mut self, msg: codec::ChatResponse, _: &mut Context) { +impl StreamHandler> for ChatClient { + fn handle( + &mut self, + msg: Result, + ctx: &mut Context, + ) { match msg { - codec::ChatResponse::Message(ref msg) => { + Ok(codec::ChatResponse::Message(ref msg)) => { println!("message: {}", msg); } - codec::ChatResponse::Joined(ref msg) => { + Ok(codec::ChatResponse::Joined(ref msg)) => { println!("!!! joined: {}", msg); } - codec::ChatResponse::Rooms(rooms) => { + Ok(codec::ChatResponse::Rooms(rooms)) => { println!("\n!!! Available rooms:"); for room in rooms { println!("{}", room); } println!(); } - _ => (), + _ => ctx.stop(), } } } diff --git a/websocket-tcp-chat/src/codec.rs b/websocket-tcp-chat/src/codec.rs index 8d362630..505ea6f1 100644 --- a/websocket-tcp-chat/src/codec.rs +++ b/websocket-tcp-chat/src/codec.rs @@ -1,12 +1,15 @@ #![allow(dead_code)] +use std::io; + +use actix::prelude::*; +use actix_codec::{Decoder, Encoder}; use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, BytesMut}; use serde_json as json; -use std::io; -use tokio_io::codec::{Decoder, Encoder}; /// Client request #[derive(Serialize, Deserialize, Debug, Message)] +#[rtype(result = "()")] #[serde(tag = "cmd", content = "data")] pub enum ChatRequest { /// List rooms @@ -21,6 +24,7 @@ pub enum ChatRequest { /// Server response #[derive(Serialize, Deserialize, Debug, Message)] +#[rtype(result = "()")] #[serde(tag = "cmd", content = "data")] pub enum ChatResponse { Ping, @@ -51,7 +55,7 @@ impl Decoder for ChatCodec { }; if src.len() >= size + 2 { - src.split_to(2); + let _ = src.split_to(2); let buf = src.split_to(size); Ok(Some(json::from_slice::(&buf)?)) } else { @@ -73,7 +77,7 @@ impl Encoder for ChatCodec { let msg_ref: &[u8] = msg.as_ref(); dst.reserve(msg_ref.len() + 2); - dst.put_u16_be(msg_ref.len() as u16); + dst.put_u16(msg_ref.len() as u16); dst.put(msg_ref); Ok(()) @@ -96,7 +100,7 @@ impl Decoder for ClientChatCodec { }; if src.len() >= size + 2 { - src.split_to(2); + let _ = src.split_to(2); let buf = src.split_to(size); Ok(Some(json::from_slice::(&buf)?)) } else { @@ -118,7 +122,7 @@ impl Encoder for ClientChatCodec { let msg_ref: &[u8] = msg.as_ref(); dst.reserve(msg_ref.len() + 2); - dst.put_u16_be(msg_ref.len() as u16); + dst.put_u16(msg_ref.len() as u16); dst.put(msg_ref); Ok(()) diff --git a/websocket-tcp-chat/src/main.rs b/websocket-tcp-chat/src/main.rs index 7aeaec8e..1944ec8f 100644 --- a/websocket-tcp-chat/src/main.rs +++ b/websocket-tcp-chat/src/main.rs @@ -1,7 +1,5 @@ #[macro_use] extern crate serde_derive; -#[macro_use] -extern crate actix; use std::time::{Duration, Instant}; @@ -20,7 +18,7 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// Entry point for our route -fn chat_route( +async fn chat_route( req: HttpRequest, stream: web::Payload, srv: web::Data>, @@ -78,7 +76,7 @@ impl Actor for WsChatSession { // something is wrong with chat server _ => ctx.stop(), } - fut::ok(()) + fut::ready(()) }) .wait(ctx); } @@ -100,8 +98,20 @@ impl Handler for WsChatSession { } /// WebSocket message handler -impl StreamHandler for WsChatSession { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { +impl StreamHandler> for WsChatSession { + fn handle( + &mut self, + msg: Result, + ctx: &mut Self::Context, + ) { + let msg = match msg { + Err(_) => { + ctx.stop(); + return; + } + Ok(msg) => msg, + }; + println!("WEBSOCKET MESSAGE: {:?}", msg); match msg { ws::Message::Ping(msg) => { @@ -133,7 +143,7 @@ impl StreamHandler for WsChatSession { } _ => println!("Something is wrong"), } - fut::ok(()) + fut::ready(()) }) .wait(ctx) // .wait(ctx) pauses all events in context, @@ -180,7 +190,7 @@ impl StreamHandler for WsChatSession { ws::Message::Close(_) => { ctx.stop(); } - ws::Message::Nop => (), + _ => (), } } } @@ -206,14 +216,14 @@ impl WsChatSession { return; } - ctx.ping(""); + ctx.ping(b""); }); } } -fn main() -> std::io::Result<()> { +#[actix_rt::main] +async fn main() -> std::io::Result<()> { env_logger::init(); - let sys = actix::System::new("websocket-example"); // Start chat server actor let server = server::ChatServer::default().start(); @@ -221,10 +231,12 @@ fn main() -> std::io::Result<()> { // Start tcp server in separate thread let srv = server.clone(); Arbiter::new().exec(move || { - session::TcpServer::new("127.0.0.1:12345", srv); + session::tcp_server("127.0.0.1:12345", srv); Ok::<_, ()>(()) }); + println!("Started http server: 127.0.0.1:8080"); + // Create Http server with websocket support HttpServer::new(move || { App::new() @@ -241,8 +253,6 @@ fn main() -> std::io::Result<()> { .service(fs::Files::new("/static/", "static/")) }) .bind("127.0.0.1:8080")? - .start(); - - println!("Started http server: 127.0.0.1:8080"); - sys.run() + .start() + .await } diff --git a/websocket-tcp-chat/src/server.rs b/websocket-tcp-chat/src/server.rs index b7a1a3db..e2f66358 100644 --- a/websocket-tcp-chat/src/server.rs +++ b/websocket-tcp-chat/src/server.rs @@ -19,12 +19,14 @@ pub struct Connect { /// Session is disconnected #[derive(Message)] +#[rtype(result = "()")] pub struct Disconnect { pub id: usize, } /// Send message to specific room #[derive(Message)] +#[rtype(result = "()")] pub struct Message { /// Id of the client session pub id: usize, @@ -43,6 +45,7 @@ impl actix::Message for ListRooms { /// Join room, if room does not exists create new one. #[derive(Message)] +#[rtype(result = "()")] pub struct Join { /// Client id pub id: usize, diff --git a/websocket-tcp-chat/src/session.rs b/websocket-tcp-chat/src/session.rs index 711b3287..1481db9c 100644 --- a/websocket-tcp-chat/src/session.rs +++ b/websocket-tcp-chat/src/session.rs @@ -1,13 +1,13 @@ //! `ClientSession` is an actor, it manages peer tcp connection and //! proxies commands from peer to `ChatServer`. -use futures::Stream; use std::str::FromStr; use std::time::{Duration, Instant}; use std::{io, net}; -use tokio_codec::FramedRead; -use tokio_io::io::WriteHalf; -use tokio_io::AsyncRead; -use tokio_tcp::{TcpListener, TcpStream}; + +use futures::StreamExt; +use tokio::io::{split, WriteHalf}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_util::codec::FramedRead; use actix::prelude::*; @@ -16,6 +16,7 @@ use crate::server::{self, ChatServer}; /// Chat server sends this messages to session #[derive(Message)] +#[rtype(result = "()")] pub struct Message(pub String); /// `ChatSession` actor is responsible for tcp peer communications. @@ -57,7 +58,7 @@ impl Actor for ChatSession { // something is wrong with chat server _ => ctx.stop(), } - actix::fut::ok(()) + actix::fut::ready(()) }) .wait(ctx); } @@ -72,11 +73,11 @@ impl Actor for ChatSession { impl actix::io::WriteHandler for ChatSession {} /// To use `Framed` we have to define Io type and Codec -impl StreamHandler for ChatSession { +impl StreamHandler> for ChatSession { /// This is main event loop for client requests - fn handle(&mut self, msg: ChatRequest, ctx: &mut Context) { + fn handle(&mut self, msg: Result, ctx: &mut Context) { match msg { - ChatRequest::List => { + Ok(ChatRequest::List) => { // Send ListRooms message to chat server and wait for response println!("List rooms"); self.addr @@ -89,13 +90,13 @@ impl StreamHandler for ChatSession { } _ => println!("Something is wrong"), } - actix::fut::ok(()) + actix::fut::ready(()) }) .wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list of rooms back } - ChatRequest::Join(name) => { + Ok(ChatRequest::Join(name)) => { println!("Join to room: {}", name); self.room = name.clone(); self.addr.do_send(server::Join { @@ -104,7 +105,7 @@ impl StreamHandler for ChatSession { }); self.framed.write(ChatResponse::Joined(name)); } - ChatRequest::Message(message) => { + Ok(ChatRequest::Message(message)) => { // send message to chat server println!("Peer message: {}", message); self.addr.do_send(server::Message { @@ -114,7 +115,8 @@ impl StreamHandler for ChatSession { }) } // we update heartbeat time on ping from peer - ChatRequest::Ping => self.hb = Instant::now(), + Ok(ChatRequest::Ping) => self.hb = Instant::now(), + _ => ctx.stop(), } } } @@ -170,50 +172,30 @@ impl ChatSession { /// Define tcp server that will accept incoming tcp connection and create /// chat actors. -pub struct TcpServer { - chat: Addr, -} - -impl TcpServer { - pub fn new(_s: &str, chat: Addr) { - // Create server listener - let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); - let listener = TcpListener::bind(&addr).unwrap(); - - // Our chat server `Server` is an actor, first we need to start it - // and then add stream on incoming tcp connections to it. - // TcpListener::incoming() returns stream of the (TcpStream, net::SocketAddr) - // items So to be able to handle this events `Server` actor has to - // implement stream handler `StreamHandler<(TcpStream, - // net::SocketAddr), io::Error>` - TcpServer::create(|ctx| { - ctx.add_message_stream(listener.incoming().map_err(|_| ()).map(TcpConnect)); - TcpServer { chat } - }); - } -} - -/// Make actor from `Server` -impl Actor for TcpServer { - /// Every actor has to provide execution `Context` in which it can run. - type Context = Context; -} - -#[derive(Message)] -struct TcpConnect(TcpStream); - -/// Handle stream of TcpStream's -impl Handler for TcpServer { - type Result = (); - - fn handle(&mut self, msg: TcpConnect, _: &mut Context) { - // For each incoming connection we create `ChatSession` actor - // with out chat server address. - let server = self.chat.clone(); - ChatSession::create(|ctx| { - let (r, w) = msg.0.split(); - ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); - ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx)) - }); - } +pub fn tcp_server(_s: &str, server: Addr) { + // Create server listener + let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); + + actix_rt::spawn(async move { + let server = server.clone(); + let mut listener = TcpListener::bind(&addr).await.unwrap(); + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + match stream { + Ok(stream) => { + let server = server.clone(); + ChatSession::create(|ctx| { + let (r, w) = split(stream); + ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx); + ChatSession::new( + server, + actix::io::FramedWrite::new(w, ChatCodec, ctx), + ) + }); + } + Err(_) => return, + } + } + }); }