From ca3f11b59e9a0fb6740309127387897d8c99e9be Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 16 Dec 2019 11:23:36 +0600 Subject: [PATCH] migrate readis and websockets examples --- Cargo.toml | 6 +-- redis-session/Cargo.toml | 15 +++--- redis-session/src/main.rs | 84 +++++++++++++++--------------- {actix_redis => redis}/Cargo.toml | 14 ++--- {actix_redis => redis}/README.md | 0 {actix_redis => redis}/src/main.rs | 79 +++++++++++++++------------- websocket/Cargo.toml | 20 +++---- websocket/src/client.rs | 71 ++++++++++++------------- websocket/src/main.rs | 32 +++++++----- 9 files changed, 163 insertions(+), 158 deletions(-) rename {actix_redis => redis}/Cargo.toml (50%) rename {actix_redis => redis}/README.md (100%) rename {actix_redis => redis}/src/main.rs (52%) diff --git a/Cargo.toml b/Cargo.toml index fe86e4f5..37e970b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,5 @@ [workspace] members = [ -# "actix_redis", "async_db", "async_ex1", "async_ex2", @@ -22,7 +21,8 @@ members = [ "openssl", # "protobuf", "r2d2", -# "redis-session", + "redis", + "redis-session", "rustls", "server-sent-events", "simple-auth-server", @@ -36,7 +36,7 @@ members = [ # "udp-echo", "unix-socket", # "web-cors/backend", -# "websocket", + "websocket", "websocket-chat", # "websocket-chat-broker", # "websocket-tcp-chat", diff --git a/redis-session/Cargo.toml b/redis-session/Cargo.toml index edd40825..33df152d 100644 --- a/redis-session/Cargo.toml +++ b/redis-session/Cargo.toml @@ -1,18 +1,17 @@ [package] name = "redis_session" -version = "0.1.0" +version = "2.0.0" authors = ["Nikolay Kim "] -workspace = ".." edition = "2018" [dependencies] -actix-web = "1.0.3" -actix-session = "0.2.0" -actix-redis = { version = "0.6.0", features = ["web"] } +actix-rt = "1.0.0" +actix-web = "2.0.0-alpha.6" +actix-session = "0.3.0-alpha.3" +actix-redis = { version = "0.8.0-alpha.1", features = ["web"] } env_logger = "0.6" serde = { version = "^1.0", features = ["derive"] } -actix-service = "0.4.1" -actix-http-test = "0.2.2" -actix-http = "0.2.5" +actix-service = "1.0.0" +actix-http = "1.0.0" serde_json = "1.0.40" time = "0.1.42" diff --git a/redis-session/src/main.rs b/redis-session/src/main.rs index e85b59e0..25c314bd 100644 --- a/redis-session/src/main.rs +++ b/redis-session/src/main.rs @@ -19,7 +19,7 @@ pub struct IndexResponse { counter: i32, } -fn index(session: Session) -> Result { +async fn index(session: Session) -> Result { let user_id: Option = session.get::("user_id").unwrap(); let counter: i32 = session .get::("counter") @@ -29,7 +29,7 @@ fn index(session: Session) -> Result { Ok(HttpResponse::Ok().json(IndexResponse { user_id, counter })) } -fn do_something(session: Session) -> Result { +async fn do_something(session: Session) -> Result { let user_id: Option = session.get::("user_id").unwrap(); let counter: i32 = session .get::("counter") @@ -44,7 +44,8 @@ fn do_something(session: Session) -> Result { struct Identity { user_id: String, } -fn login(user_id: web::Json, session: Session) -> Result { + +async fn login(user_id: web::Json, session: Session) -> Result { let id = user_id.into_inner().user_id; session.set("user_id", &id)?; session.renew(); @@ -60,7 +61,7 @@ fn login(user_id: web::Json, session: Session) -> Result })) } -fn logout(session: Session) -> Result { +async fn logout(session: Session) -> Result { let id: Option = session.get("user_id")?; if let Some(x) = id { session.purge(); @@ -70,7 +71,8 @@ fn logout(session: Session) -> Result { } } -fn main() -> std::io::Result<()> { +#[actix_rt::main] +async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); env_logger::init(); @@ -86,24 +88,24 @@ fn main() -> std::io::Result<()> { .service(resource("/logout").route(post().to(logout))) }) .bind("127.0.0.1:8080")? - .run() + .start() + .await } #[cfg(test)] mod test { use super::*; - use actix_http::{httpmessage::HttpMessage, HttpService}; - use actix_http_test::{block_on, TestServer}; + use actix_http::httpmessage::HttpMessage; use actix_web::{ - middleware, + middleware, test, web::{get, post, resource}, App, }; use serde_json::json; use time; - #[test] - fn test_workflow() { + #[actix_rt::test] + async fn test_workflow() { // Step 1: GET index // - set-cookie actix-session will be in response (session cookie #1) // - response should be: {"counter": 0, "user_id": None} @@ -134,26 +136,24 @@ mod test { // - set-cookie actix-session will be in response (session cookie #3) // - response should be: {"counter": 0, "user_id": None} - let mut srv = TestServer::new(|| { - HttpService::new( - App::new() - .wrap( - RedisSession::new("127.0.0.1:6379", &[0; 32]) - .cookie_name("test-session"), - ) - .wrap(middleware::Logger::default()) - .service(resource("/").route(get().to(index))) - .service(resource("/do_something").route(post().to(do_something))) - .service(resource("/login").route(post().to(login))) - .service(resource("/logout").route(post().to(logout))), - ) + let srv = test::start(|| { + App::new() + .wrap( + RedisSession::new("127.0.0.1:6379", &[0; 32]) + .cookie_name("test-session"), + ) + .wrap(middleware::Logger::default()) + .service(resource("/").route(get().to(index))) + .service(resource("/do_something").route(post().to(do_something))) + .service(resource("/login").route(post().to(login))) + .service(resource("/logout").route(post().to(logout))) }); // Step 1: GET index // - set-cookie actix-session will be in response (session cookie #1) // - response should be: {"counter": 0, "user_id": None} let req_1a = srv.get("/").send(); - let mut resp_1 = srv.block_on(req_1a).unwrap(); + let mut resp_1 = req_1a.await.unwrap(); let cookie_1 = resp_1 .cookies() .unwrap() @@ -161,7 +161,7 @@ mod test { .into_iter() .find(|c| c.name() == "test-session") .unwrap(); - let result_1 = block_on(resp_1.json::()).unwrap(); + let result_1 = resp_1.json::().await.unwrap(); assert_eq!( result_1, IndexResponse { @@ -174,7 +174,7 @@ mod test { // - set-cookie will *not* be in response // - response should be: {"counter": 0, "user_id": None} let req_2 = srv.get("/").cookie(cookie_1.clone()).send(); - let resp_2 = srv.block_on(req_2).unwrap(); + let resp_2 = req_2.await.unwrap(); let cookie_2 = resp_2 .cookies() .unwrap() @@ -187,8 +187,8 @@ mod test { // - adds new session state in redis: {"counter": 1} // - response should be: {"counter": 1, "user_id": None} let req_3 = srv.post("/do_something").cookie(cookie_1.clone()).send(); - let mut resp_3 = srv.block_on(req_3).unwrap(); - let result_3 = block_on(resp_3.json::()).unwrap(); + let mut resp_3 = req_3.await.unwrap(); + let result_3 = resp_3.json::().await.unwrap(); assert_eq!( result_3, IndexResponse { @@ -201,8 +201,8 @@ mod test { // - updates session state in redis: {"counter": 2} // - response should be: {"counter": 2, "user_id": None} let req_4 = srv.post("/do_something").cookie(cookie_1.clone()).send(); - let mut resp_4 = srv.block_on(req_4).unwrap(); - let result_4 = block_on(resp_4.json::()).unwrap(); + let mut resp_4 = req_4.await.unwrap(); + let result_4 = resp_4.json::().await.unwrap(); assert_eq!( result_4, IndexResponse { @@ -218,7 +218,7 @@ mod test { .post("/login") .cookie(cookie_1.clone()) .send_json(&json!({"user_id": "ferris"})); - let mut resp_5 = srv.block_on(req_5).unwrap(); + let mut resp_5 = req_5.await.unwrap(); let cookie_2 = resp_5 .cookies() .unwrap() @@ -231,7 +231,7 @@ mod test { cookie_1.value().to_string() != cookie_2.value().to_string() ); - let result_5 = block_on(resp_5.json::()).unwrap(); + let result_5 = resp_5.json::().await.unwrap(); assert_eq!( result_5, IndexResponse { @@ -243,8 +243,8 @@ mod test { // Step 6: GET index, including session cookie #2 in request // - response should be: {"counter": 2, "user_id": "ferris"} let req_6 = srv.get("/").cookie(cookie_2.clone()).send(); - let mut resp_6 = srv.block_on(req_6).unwrap(); - let result_6 = block_on(resp_6.json::()).unwrap(); + let mut resp_6 = req_6.await.unwrap(); + let result_6 = resp_6.json::().await.unwrap(); assert_eq!( result_6, IndexResponse { @@ -257,8 +257,8 @@ mod test { // - updates session state in redis: {"counter": 3, "user_id": "ferris"} // - response should be: {"counter": 2, "user_id": None} let req_7 = srv.post("/do_something").cookie(cookie_2.clone()).send(); - let mut resp_7 = srv.block_on(req_7).unwrap(); - let result_7 = block_on(resp_7.json::()).unwrap(); + let mut resp_7 = req_7.await.unwrap(); + let result_7 = resp_7.json::().await.unwrap(); assert_eq!( result_7, IndexResponse { @@ -271,7 +271,7 @@ mod test { // - set-cookie actix-session will be in response (session cookie #3) // - response should be: {"counter": 0, "user_id": None} let req_8 = srv.get("/").cookie(cookie_1.clone()).send(); - let mut resp_8 = srv.block_on(req_8).unwrap(); + let mut resp_8 = req_8.await.unwrap(); let cookie_3 = resp_8 .cookies() .unwrap() @@ -279,7 +279,7 @@ mod test { .into_iter() .find(|c| c.name() == "test-session") .unwrap(); - let result_8 = block_on(resp_8.json::()).unwrap(); + let result_8 = resp_8.json::().await.unwrap(); assert_eq!( result_8, IndexResponse { @@ -293,7 +293,7 @@ mod test { // - set-cookie actix-session will be in response with session cookie #2 // invalidation logic let req_9 = srv.post("/logout").cookie(cookie_2.clone()).send(); - let resp_9 = srv.block_on(req_9).unwrap(); + let resp_9 = req_9.await.unwrap(); let cookie_4 = resp_9 .cookies() .unwrap() @@ -307,8 +307,8 @@ mod test { // - set-cookie actix-session will be in response (session cookie #3) // - response should be: {"counter": 0, "user_id": None} let req_10 = srv.get("/").cookie(cookie_2.clone()).send(); - let mut resp_10 = srv.block_on(req_10).unwrap(); - let result_10 = block_on(resp_10.json::()).unwrap(); + let mut resp_10 = req_10.await.unwrap(); + let result_10 = resp_10.json::().await.unwrap(); assert_eq!( result_10, IndexResponse { diff --git a/actix_redis/Cargo.toml b/redis/Cargo.toml similarity index 50% rename from actix_redis/Cargo.toml rename to redis/Cargo.toml index f1534fe3..fbbadf8b 100644 --- a/actix_redis/Cargo.toml +++ b/redis/Cargo.toml @@ -1,16 +1,16 @@ [package] name = "actix_redis" -version = "0.1.0" +version = "1.0.0" authors = ["dowwie "] edition = "2018" -workspace = ".." [dependencies] -actix = "0.8.2" -actix-web = "1.0.3" -actix-redis = "0.6" -futures = "0.1.23" -redis-async = "0.4.0" +actix = "0.9.0-alpha.2" +actix-rt = "1.0.0" +actix-web = "2.0.0-alpha.6" +actix-redis = "0.8.0-alpha.1" +futures = "0.3.1" +redis-async = "0.6.1" serde = "1.0.71" serde_derive = "1.0.71" env_logger = "0.6" diff --git a/actix_redis/README.md b/redis/README.md similarity index 100% rename from actix_redis/README.md rename to redis/README.md diff --git a/actix_redis/src/main.rs b/redis/src/main.rs similarity index 52% rename from actix_redis/src/main.rs rename to redis/src/main.rs index a9ed3c4c..2824ea1f 100644 --- a/actix_redis/src/main.rs +++ b/redis/src/main.rs @@ -4,9 +4,9 @@ extern crate redis_async; extern crate serde_derive; use actix::prelude::*; -use actix_redis::{Command, Error as ARError, RedisActor}; +use actix_redis::{Command, RedisActor}; use actix_web::{middleware, web, App, Error as AWError, HttpResponse, HttpServer}; -use futures::future::{join_all, Future}; +use futures::future::join_all; use redis_async::resp::RespValue; #[derive(Deserialize)] @@ -16,10 +16,10 @@ pub struct CacheInfo { three: String, } -fn cache_stuff( +async fn cache_stuff( info: web::Json, redis: web::Data>, -) -> impl Future { +) -> Result { let info = info.into_inner(); let one = redis.send(Command(resp_array!["SET", "mydomain:one", info.one])); @@ -33,47 +33,51 @@ fn cache_stuff( // canceled and an error will be returned immediately. If all futures complete // successfully, however, then the returned future will succeed with a `Vec` of // all the successful results. - let info_set = join_all(vec![one, two, three].into_iter()); + let res: Vec> = + join_all(vec![one, two, three].into_iter()) + .await + .into_iter() + .map(|item| { + item.map_err(AWError::from) + .and_then(|res| res.map_err(AWError::from)) + }) + .collect(); - info_set - .map_err(AWError::from) - .and_then(|res: Vec>| - // successful operations return "OK", so confirm that all returned as so - if !res.iter().all(|res| match res { - Ok(RespValue::SimpleString(x)) if x=="OK" => true, - _ => false - }) { - Ok(HttpResponse::InternalServerError().finish()) - } else { - Ok(HttpResponse::Ok().body("successfully cached values")) - } - ) + // successful operations return "OK", so confirm that all returned as so + if !res.iter().all(|res| match res { + Ok(RespValue::SimpleString(x)) if x == "OK" => true, + _ => false, + }) { + Ok(HttpResponse::InternalServerError().finish()) + } else { + Ok(HttpResponse::Ok().body("successfully cached values")) + } } -fn del_stuff( - redis: web::Data>, -) -> impl Future { - redis +async fn del_stuff(redis: web::Data>) -> Result { + let res = redis .send(Command(resp_array![ "DEL", "mydomain:one", "mydomain:two", "mydomain:three" ])) - .map_err(AWError::from) - .and_then(|res: Result| match &res { - Ok(RespValue::Integer(x)) if x == &3 => { - Ok(HttpResponse::Ok().body("successfully deleted values")) - } - _ => { - println!("---->{:?}", res); - Ok(HttpResponse::InternalServerError().finish()) - } - }) + .await?; + + match res { + Ok(RespValue::Integer(x)) if x == 3 => { + Ok(HttpResponse::Ok().body("successfully deleted values")) + } + _ => { + println!("---->{:?}", res); + Ok(HttpResponse::InternalServerError().finish()) + } + } } -fn main() -> std::io::Result<()> { - std::env::set_var("RUST_LOG", "actix_web=info,actix_redis=info"); +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=trace,actix_redis=trace"); env_logger::init(); HttpServer::new(|| { @@ -84,10 +88,11 @@ fn main() -> std::io::Result<()> { .wrap(middleware::Logger::default()) .service( web::resource("/stuff") - .route(web::post().to_async(cache_stuff)) - .route(web::delete().to_async(del_stuff)), + .route(web::post().to(cache_stuff)) + .route(web::delete().to(del_stuff)), ) }) .bind("0.0.0.0:8080")? - .run() + .start() + .await } diff --git a/websocket/Cargo.toml b/websocket/Cargo.toml index 891ef14e..24aac800 100644 --- a/websocket/Cargo.toml +++ b/websocket/Cargo.toml @@ -1,9 +1,8 @@ [package] name = "websocket" -version = "0.1.0" +version = "2.0.0" authors = ["Nikolay Kim "] edition = "2018" -workspace = ".." [[bin]] name = "websocket-server" @@ -14,12 +13,13 @@ name = "websocket-client" path = "src/client.rs" [dependencies] -actix = "0.8.2" -actix-codec = "0.1.2" -actix-web = "1.0.0" -actix-web-actors = "1.0.0" -actix-files = "0.1.1" -awc = "0.2.1" +actix = "0.9.0-alpha.2" +actix-codec = "0.2.0" +actix-web = "2.0.0-alpha.6" +actix-web-actors = "2.0.0-alpha.1" +actix-files = "0.2.0-alpha.3" +actix-rt = "1.0.0" +awc = "1.0.0" env_logger = "0.6" -futures = "0.1" -bytes = "0.4" \ No newline at end of file +futures = "0.3.1" +bytes = "0.5.3" diff --git a/websocket/src/client.rs b/websocket/src/client.rs index 0a9eee35..e8cdaa19 100644 --- a/websocket/src/client.rs +++ b/websocket/src/client.rs @@ -10,52 +10,47 @@ use awc::{ ws::{Codec, Frame, Message}, Client, }; -use futures::{ - lazy, - stream::{SplitSink, Stream}, - Future, -}; +use bytes::Bytes; +use futures::stream::{SplitSink, StreamExt}; -fn main() { +#[actix_rt::main] +async fn main() { ::std::env::set_var("RUST_LOG", "actix_web=info"); env_logger::init(); - let sys = actix::System::new("ws-example"); - Arbiter::spawn(lazy(|| { - Client::new() - .ws("http://127.0.0.1:8080/ws/") - .connect() - .map_err(|e| { - println!("Error: {}", e); - }) - .map(|(response, framed)| { - println!("{:?}", response); - let (sink, stream) = framed.split(); - let addr = ChatClient::create(|ctx| { - ChatClient::add_stream(stream, ctx); - ChatClient(SinkWrite::new(sink, ctx)) - }); + let (response, framed) = Client::new() + .ws("http://127.0.0.1:8080/ws/") + .connect() + .await + .map_err(|e| { + println!("Error: {}", e); + }) + .unwrap(); - // start console loop - thread::spawn(move || loop { - let mut cmd = String::new(); - if io::stdin().read_line(&mut cmd).is_err() { - println!("error"); - return; - } - addr.do_send(ClientCommand(cmd)); - }); - }) - })); + println!("{:?}", response); + let (sink, stream) = framed.split(); + let addr = ChatClient::create(|ctx| { + ChatClient::add_stream(stream, ctx); + ChatClient(SinkWrite::new(sink, ctx)) + }); - let _ = sys.run(); + // start console loop + thread::spawn(move || loop { + let mut cmd = String::new(); + if io::stdin().read_line(&mut cmd).is_err() { + println!("error"); + return; + } + addr.do_send(ClientCommand(cmd)); + }); } -struct ChatClient(SinkWrite>>) +struct ChatClient(SinkWrite, Message>>) where T: AsyncRead + AsyncWrite; #[derive(Message)] +#[rtype(result = "()")] struct ClientCommand(String); impl Actor for ChatClient @@ -83,7 +78,7 @@ where { fn hb(&self, ctx: &mut Context) { ctx.run_later(Duration::new(1, 0), |act, ctx| { - act.0.write(Message::Ping(String::new())).unwrap(); + act.0.write(Message::Ping(Bytes::from_static(b""))).unwrap(); act.hb(ctx); // client should also check for a timeout here, similar to the @@ -105,12 +100,12 @@ where } /// Handle server websocket messages -impl StreamHandler for ChatClient +impl StreamHandler> for ChatClient where T: AsyncRead + AsyncWrite, { - fn handle(&mut self, msg: Frame, _ctx: &mut Context) { - if let Frame::Text(txt) = msg { + fn handle(&mut self, msg: Result, _: &mut Context) { + if let Ok(Frame::Text(txt)) = msg { println!("Server: {:?}", txt) } } diff --git a/websocket/src/main.rs b/websocket/src/main.rs index b16e3fa3..379c37ae 100644 --- a/websocket/src/main.rs +++ b/websocket/src/main.rs @@ -16,10 +16,10 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// do websocket handshake and start `MyWebSocket` actor -fn ws_index(r: HttpRequest, stream: web::Payload) -> Result { +async fn ws_index(r: HttpRequest, stream: web::Payload) -> Result { println!("{:?}", r); let res = ws::start(MyWebSocket::new(), &r, stream); - println!("{:?}", res.as_ref().unwrap()); + println!("{:?}", res); res } @@ -41,24 +41,28 @@ impl Actor for MyWebSocket { } /// Handler for `ws::Message` -impl StreamHandler for MyWebSocket { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { +impl StreamHandler> for MyWebSocket { + fn handle( + &mut self, + msg: Result, + ctx: &mut Self::Context, + ) { // process websocket messages println!("WS: {:?}", msg); match msg { - ws::Message::Ping(msg) => { + Ok(ws::Message::Ping(msg)) => { self.hb = Instant::now(); ctx.pong(&msg); } - ws::Message::Pong(_) => { + Ok(ws::Message::Pong(_)) => { self.hb = Instant::now(); } - ws::Message::Text(text) => ctx.text(text), - ws::Message::Binary(bin) => ctx.binary(bin), - ws::Message::Close(_) => { + Ok(ws::Message::Text(text)) => ctx.text(text), + Ok(ws::Message::Binary(bin)) => ctx.binary(bin), + Ok(ws::Message::Close(_)) => { ctx.stop(); } - ws::Message::Nop => (), + _ => ctx.stop(), } } } @@ -85,12 +89,13 @@ impl MyWebSocket { return; } - ctx.ping(""); + ctx.ping(b""); }); } } -fn main() -> std::io::Result<()> { +#[actix_rt::main] +async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info"); env_logger::init(); @@ -105,5 +110,6 @@ fn main() -> std::io::Result<()> { }) // start http server on 127.0.0.1:8080 .bind("127.0.0.1:8080")? - .run() + .start() + .await }