diff --git a/CHANGES.md b/CHANGES.md index 59898cbab..2ab9fe9af 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## 0.8.0-alpha.1 (2019-12-xx) + +* Migrate to actix 0.9 + ## 0.7 (2019-09-25) * added cache_keygen functionality to RedisSession builder, enabling support for diff --git a/Cargo.toml b/Cargo.toml index 28b0e688c..9bb5dce3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-redis" -version = "0.7.0" +version = "0.8.0-alpha.1" authors = ["Nikolay Kim "] description = "Redis integration for actix framework" license = "MIT/Apache-2.0" @@ -25,32 +25,31 @@ codecov = { repository = "actix/actix-redis", branch = "master", service = "gith default = ["web"] # actix-web integration -web = ["actix/http", "actix-service", "actix-utils", "actix-web", "actix-session/cookie-session", "rand", "serde", "serde_json"] +web = ["actix/http", "actix-service", "actix-web", "actix-session/cookie-session", "rand", "serde", "serde_json"] [dependencies] -actix = "0.8.3" +actix = "0.9.0-alpha.1" +actix-utils = "1.0.3" log = "0.4.6" backoff = "0.1.5" -derive_more = "0.15.0" -futures = "0.1.28" -tokio-io = "0.1.12" -tokio-codec = "0.1.1" -tokio-tcp = "0.1.3" -redis-async = "0.4.5" +derive_more = "0.99.2" +futures = "0.3.1" +redis-async = "0.6.1" +actix-rt = "1.0.0" time = "0.1.42" +tokio = "0.2.4" +tokio-util = "0.2.0" # actix web session -actix-web = { version = "1.0.7", optional = true } -actix-utils = { version = "0.4.5", optional = true } -actix-service = { version = "0.4.2", optional = true } -actix-session = { version = "0.2.0", optional = true } +actix-web = { version = "2.0.0-alpha.6", optional = true } +actix-service = { version = "1.0.0", optional = true } +actix-session = { version = "0.3.0-alpha", optional = true } rand = { version = "0.7.0", optional = true } serde = { version = "1.0.101", optional = true, features = ["derive"] } serde_json = { version = "1.0.40", optional = true } env_logger = "0.6.2" -actix-http-test = "0.2.5" -actix-http = "0.2.10" [dev-dependencies] env_logger = "0.6" +actix-http = "1.0.0" diff --git a/src/redis.rs b/src/redis.rs index 296273f4c..c9aa643e7 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -3,16 +3,15 @@ use std::io; use actix::actors::resolver::{Connect, Resolver}; use actix::prelude::*; +use actix_utils::oneshot; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; -use futures::unsync::oneshot; -use futures::Future; +use futures::FutureExt; use redis_async::error::Error as RespError; use redis_async::resp::{RespCodec, RespValue}; -use tokio_codec::FramedRead; -use tokio_io::io::WriteHalf; -use tokio_io::AsyncRead; -use tokio_tcp::TcpStream; +use tokio::io::{split, WriteHalf}; +use tokio::net::TcpStream; +use tokio_util::codec::FramedRead; use crate::Error; @@ -57,20 +56,30 @@ impl Actor for RedisActor { .send(Connect::host(self.addr.as_str())) .into_actor(self) .map(|res, act, ctx| match res { - Ok(stream) => { - info!("Connected to redis server: {}", act.addr); + Ok(res) => match res { + Ok(stream) => { + info!("Connected to redis server: {}", act.addr); - let (r, w) = stream.split(); + let (r, w) = split(stream); - // configure write side of the connection - let framed = actix::io::FramedWrite::new(w, RespCodec, ctx); - act.cell = Some(framed); + // configure write side of the connection + let framed = actix::io::FramedWrite::new(w, RespCodec, ctx); + act.cell = Some(framed); - // read side of the connection - ctx.add_stream(FramedRead::new(r, RespCodec)); + // read side of the connection + ctx.add_stream(FramedRead::new(r, RespCodec)); - act.backoff.reset(); - } + act.backoff.reset(); + } + Err(err) => { + error!("Can not connect to redis server: {}", err); + // re-connect with backoff time. + // we stop current context, supervisor will restart it. + if let Some(timeout) = act.backoff.next_backoff() { + ctx.run_later(timeout, |_, ctx| ctx.stop()); + } + } + }, Err(err) => { error!("Can not connect to redis server: {}", err); // re-connect with backoff time. @@ -80,14 +89,6 @@ impl Actor for RedisActor { } } }) - .map_err(|err, act, ctx| { - error!("Can not connect to redis server: {}", err); - // re-connect with backoff time. - // we stop current context, supervisor will restart it. - if let Some(timeout) = act.backoff.next_backoff() { - ctx.run_later(timeout, |_, ctx| ctx.stop()); - } - }) .wait(ctx); } } @@ -108,23 +109,26 @@ impl actix::io::WriteHandler for RedisActor { } } -impl StreamHandler for RedisActor { - fn error(&mut self, err: RespError, _: &mut Self::Context) -> Running { - if let Some(tx) = self.queue.pop_front() { - let _ = tx.send(Err(err.into())); - } - Running::Stop - } - - fn handle(&mut self, msg: RespValue, _: &mut Self::Context) { - if let Some(tx) = self.queue.pop_front() { - let _ = tx.send(Ok(msg)); +impl StreamHandler> for RedisActor { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Err(e) => { + if let Some(tx) = self.queue.pop_front() { + let _ = tx.send(Err(e.into())); + } + ctx.stop(); + } + Ok(val) => { + if let Some(tx) = self.queue.pop_front() { + let _ = tx.send(Ok(val)); + } + } } } } impl Handler for RedisActor { - type Result = ResponseFuture; + type Result = ResponseFuture>; fn handle(&mut self, msg: Command, _: &mut Self::Context) -> Self::Result { let (tx, rx) = oneshot::channel(); @@ -135,6 +139,9 @@ impl Handler for RedisActor { let _ = tx.send(Err(Error::NotConnected)); } - Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res)) + Box::new(rx.map(|res| match res { + Ok(res) => res, + Err(_) => Err(Error::Disconnected), + })) } } diff --git a/src/session.rs b/src/session.rs index a8e0da7ff..5728809bf 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,4 +1,6 @@ use std::cell::RefCell; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{collections::HashMap, iter, rc::Rc}; use actix::prelude::*; @@ -8,8 +10,7 @@ use actix_web::cookie::{Cookie, CookieJar, Key, SameSite}; use actix_web::dev::{ServiceRequest, ServiceResponse}; use actix_web::http::header::{self, HeaderValue}; use actix_web::{error, Error, HttpMessage}; -use futures::future::{err, ok, Either, Future, FutureResult}; -use futures::Poll; +use futures::future::{err, ok, Either, Future, FutureExt, Ready}; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use redis_async::resp::RespValue; use time::{self, Duration}; @@ -107,7 +108,7 @@ where type Error = S::Error; type InitError = (); type Transform = RedisSessionMiddleware; - type Future = FutureResult; + type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ok(RedisSessionMiddleware { @@ -133,17 +134,18 @@ where type Request = ServiceRequest; type Response = ServiceResponse; type Error = Error; - type Future = Box>; + type Future = Pin>>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.borrow_mut().poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.borrow_mut().poll_ready(cx) } fn call(&mut self, mut req: ServiceRequest) -> Self::Future { let mut srv = self.service.clone(); let inner = self.inner.clone(); - Box::new(self.inner.load(&req).and_then(move |state| { + Box::pin(async move { + let state = inner.load(&req).await?; let value = if let Some((state, value)) = state { Session::set_session(state.into_iter(), &mut req); Some(value) @@ -151,55 +153,43 @@ where None }; - srv.call(req).and_then(move |mut res| { - match Session::get_changes(&mut res) { - (SessionStatus::Unchanged, None) => { - Either::A(Either::A(Either::A(ok(res)))) + let mut res = srv.call(req).await?; + + match Session::get_changes(&mut res) { + (SessionStatus::Unchanged, None) => Ok(res), + (SessionStatus::Unchanged, Some(state)) => { + if value.is_none() { + // implies the session is new + inner.update(res, state, value).await + } else { + Ok(res) } - (SessionStatus::Unchanged, Some(state)) => { - Either::A(Either::A(Either::B(if value.is_none() { - // implies the session is new - Either::A(inner.update(res, state, value)) - } else { - Either::B(ok(res)) - }))) - } - (SessionStatus::Changed, Some(state)) => { - Either::A(Either::B(Either::A(inner.update(res, state, value)))) - } - (SessionStatus::Purged, Some(_)) => { - if let Some(val) = value { - Either::A(Either::B(Either::B(Either::A( - inner.clear_cache(val).and_then(move |_| { - match inner.remove_cookie(&mut res) { - Ok(_) => Either::A(ok(res)), - Err(_err) => Either::B(err( - error::ErrorInternalServerError(_err), - )), - } - }), - )))) - } else { - Either::A(Either::B(Either::B(Either::B(err( - error::ErrorInternalServerError("unexpected"), - ))))) - } - } - (SessionStatus::Renewed, Some(state)) => { - if let Some(val) = value { - Either::B(Either::A( - inner - .clear_cache(val) - .and_then(move |_| inner.update(res, state, None)), - )) - } else { - Either::B(Either::B(inner.update(res, state, None))) - } - } - (_, None) => unreachable!(), } - }) - })) + (SessionStatus::Changed, Some(state)) => { + inner.update(res, state, value).await + } + (SessionStatus::Purged, Some(_)) => { + if let Some(val) = value { + inner.clear_cache(val).await?; + match inner.remove_cookie(&mut res) { + Ok(_) => Ok(res), + Err(_err) => Err(error::ErrorInternalServerError(_err)), + } + } else { + Err(error::ErrorInternalServerError("unexpected")) + } + } + (SessionStatus::Renewed, Some(state)) => { + if let Some(val) = value { + inner.clear_cache(val).await?; + inner.update(res, state, None).await + } else { + inner.update(res, state, None).await + } + } + (_, None) => unreachable!(), + } + }) } } @@ -220,7 +210,7 @@ impl Inner { fn load( &self, req: &ServiceRequest, - ) -> impl Future, String)>, Error = Error> + ) -> impl Future, String)>, Error>> { if let Ok(cookies) = req.cookies() { for cookie in cookies.iter() { @@ -230,47 +220,52 @@ impl Inner { if let Some(cookie) = jar.signed(&self.key).get(&self.name) { let value = cookie.value().to_owned(); let cachekey = (self.cache_keygen)(&cookie.value()); - return Either::A( - self.addr - .send(Command(resp_array!["GET", cachekey])) - .map_err(Error::from) - .and_then(move |res| match res { - Ok(val) => { - match val { - RespValue::Error(err) => { - return Err( - error::ErrorInternalServerError(err), - ); - } - RespValue::SimpleString(s) => { - if let Ok(val) = serde_json::from_str(&s) - { - return Ok(Some((val, value))); + return Either::Left( + self.addr.send(Command(resp_array!["GET", cachekey])).map( + |result| match result { + Err(e) => Err(Error::from(e)), + Ok(res) => match res { + Ok(val) => { + match val { + RespValue::Error(err) => { + return Err( + error::ErrorInternalServerError( + err, + ), + ); } - } - RespValue::BulkString(s) => { - if let Ok(val) = - serde_json::from_slice(&s) - { - return Ok(Some((val, value))); + RespValue::SimpleString(s) => { + if let Ok(val) = + serde_json::from_str(&s) + { + return Ok(Some((val, value))); + } } + RespValue::BulkString(s) => { + if let Ok(val) = + serde_json::from_slice(&s) + { + return Ok(Some((val, value))); + } + } + _ => (), } - _ => (), + Ok(None) } - Ok(None) - } - Err(err) => { - Err(error::ErrorInternalServerError(err)) - } - }), + Err(err) => { + Err(error::ErrorInternalServerError(err)) + } + }, + }, + ), ); } else { - return Either::B(ok(None)); + return Either::Right(ok(None)); } } } } - Either::B(ok(None)) + Either::Right(ok(None)) } fn update( @@ -278,7 +273,7 @@ impl Inner { mut res: ServiceResponse, state: impl Iterator, value: Option, - ) -> impl Future, Error = Error> { + ) -> impl Future, Error>> { let (value, jar) = if let Some(value) = value { (value.clone(), None) } else { @@ -316,42 +311,47 @@ impl Inner { let state: HashMap<_, _> = state.collect(); match serde_json::to_string(&state) { - Err(e) => Either::A(err(e.into())), - Ok(body) => Either::B( + Err(e) => Either::Left(err(e.into())), + Ok(body) => Either::Right( self.addr .send(Command(resp_array!["SET", cachekey, body, "EX", &self.ttl])) - .map_err(Error::from) - .and_then(move |redis_result| match redis_result { - Ok(_) => { - if let Some(jar) = jar { - for cookie in jar.delta() { - let val = - HeaderValue::from_str(&cookie.to_string())?; - res.headers_mut().append(header::SET_COOKIE, val); + .map(|result| match result { + Err(e) => Err(Error::from(e)), + Ok(redis_result) => match redis_result { + Ok(_) => { + if let Some(jar) = jar { + for cookie in jar.delta() { + let val = + HeaderValue::from_str(&cookie.to_string())?; + res.headers_mut() + .append(header::SET_COOKIE, val); + } } + Ok(res) } - Ok(res) - } - Err(err) => Err(error::ErrorInternalServerError(err)), + Err(err) => Err(error::ErrorInternalServerError(err)), + }, }), ), } } /// removes cache entry - fn clear_cache(&self, key: String) -> impl Future { + fn clear_cache(&self, key: String) -> impl Future> { let cachekey = (self.cache_keygen)(&key); self.addr .send(Command(resp_array!["DEL", cachekey])) - .map_err(Error::from) - .and_then(|res| { - match res { - // redis responds with number of deleted records - Ok(RespValue::Integer(x)) if x > 0 => Ok(()), - _ => Err(error::ErrorInternalServerError( - "failed to remove session from cache", - )), + .map(|res| match res { + Err(e) => Err(Error::from(e)), + Ok(res) => { + match res { + // redis responds with number of deleted records + Ok(RespValue::Integer(x)) if x > 0 => Ok(()), + _ => Err(error::ErrorInternalServerError( + "failed to remove session from cache", + )), + } } }) } @@ -374,13 +374,12 @@ impl Inner { #[cfg(test)] mod test { use super::*; - use actix_http::{httpmessage::HttpMessage, HttpService}; - use actix_http_test::{block_on, TestServer}; + use actix_http::httpmessage::HttpMessage; use actix_session::Session; use actix_web::{ - middleware, web, + middleware, test, web, web::{get, post, resource}, - App, HttpResponse, HttpServer, Result, + App, HttpResponse, Result, }; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -392,7 +391,7 @@ mod test { counter: i32, } - fn index(session: Session) -> Result { + async fn index(session: Session) -> Result { let user_id: Option = session.get::("user_id").unwrap(); let counter: i32 = session .get::("counter") @@ -402,7 +401,7 @@ mod test { Ok(HttpResponse::Ok().json(IndexResponse { user_id, counter })) } - fn do_something(session: Session) -> Result { + async fn do_something(session: Session) -> Result { let user_id: Option = session.get::("user_id").unwrap(); let counter: i32 = session .get::("counter") @@ -417,7 +416,11 @@ mod test { struct Identity { user_id: String, } - fn login(user_id: web::Json, session: Session) -> Result { + + async fn login( + user_id: web::Json, + session: Session, + ) -> Result { let id = user_id.into_inner().user_id; session.set("user_id", &id)?; session.renew(); @@ -433,7 +436,7 @@ mod test { })) } - fn logout(session: Session) -> Result { + async fn logout(session: Session) -> Result { let id: Option = session.get("user_id")?; if let Some(x) = id { session.purge(); @@ -443,8 +446,8 @@ mod test { } } - #[test] - fn test_workflow() { + #[actix_rt::test] + async fn test_workflow() { // Step 1: GET index // - set-cookie actix-session will be in response (session cookie #1) // - response should be: {"counter": 0, "user_id": None} @@ -475,26 +478,24 @@ mod test { // - set-cookie actix-session will be in response (session cookie #3) // - response should be: {"counter": 0, "user_id": None} - let mut srv = TestServer::new(|| { - HttpService::new( - App::new() - .wrap( - RedisSession::new("127.0.0.1:6379", &[0; 32]) - .cookie_name("test-session"), - ) - .wrap(middleware::Logger::default()) - .service(resource("/").route(get().to(index))) - .service(resource("/do_something").route(post().to(do_something))) - .service(resource("/login").route(post().to(login))) - .service(resource("/logout").route(post().to(logout))), - ) + let srv = test::start(|| { + App::new() + .wrap( + RedisSession::new("127.0.0.1:6379", &[0; 32]) + .cookie_name("test-session"), + ) + .wrap(middleware::Logger::default()) + .service(resource("/").route(get().to(index))) + .service(resource("/do_something").route(post().to(do_something))) + .service(resource("/login").route(post().to(login))) + .service(resource("/logout").route(post().to(logout))) }); // Step 1: GET index // - set-cookie actix-session will be in response (session cookie #1) // - response should be: {"counter": 0, "user_id": None} let req_1a = srv.get("/").send(); - let mut resp_1 = srv.block_on(req_1a).unwrap(); + let mut resp_1 = req_1a.await.unwrap(); let cookie_1 = resp_1 .cookies() .unwrap() @@ -502,7 +503,7 @@ mod test { .into_iter() .find(|c| c.name() == "test-session") .unwrap(); - let result_1 = block_on(resp_1.json::()).unwrap(); + let result_1 = resp_1.json::().await.unwrap(); assert_eq!( result_1, IndexResponse { @@ -515,7 +516,7 @@ mod test { // - set-cookie will *not* be in response // - response should be: {"counter": 0, "user_id": None} let req_2 = srv.get("/").cookie(cookie_1.clone()).send(); - let resp_2 = srv.block_on(req_2).unwrap(); + let resp_2 = req_2.await.unwrap(); let cookie_2 = resp_2 .cookies() .unwrap() @@ -528,8 +529,8 @@ mod test { // - adds new session state in redis: {"counter": 1} // - response should be: {"counter": 1, "user_id": None} let req_3 = srv.post("/do_something").cookie(cookie_1.clone()).send(); - let mut resp_3 = srv.block_on(req_3).unwrap(); - let result_3 = block_on(resp_3.json::()).unwrap(); + let mut resp_3 = req_3.await.unwrap(); + let result_3 = resp_3.json::().await.unwrap(); assert_eq!( result_3, IndexResponse { @@ -542,8 +543,8 @@ mod test { // - updates session state in redis: {"counter": 2} // - response should be: {"counter": 2, "user_id": None} let req_4 = srv.post("/do_something").cookie(cookie_1.clone()).send(); - let mut resp_4 = srv.block_on(req_4).unwrap(); - let result_4 = block_on(resp_4.json::()).unwrap(); + let mut resp_4 = req_4.await.unwrap(); + let result_4 = resp_4.json::().await.unwrap(); assert_eq!( result_4, IndexResponse { @@ -559,7 +560,7 @@ mod test { .post("/login") .cookie(cookie_1.clone()) .send_json(&json!({"user_id": "ferris"})); - let mut resp_5 = srv.block_on(req_5).unwrap(); + let mut resp_5 = req_5.await.unwrap(); let cookie_2 = resp_5 .cookies() .unwrap() @@ -572,7 +573,7 @@ mod test { cookie_1.value().to_string() != cookie_2.value().to_string() ); - let result_5 = block_on(resp_5.json::()).unwrap(); + let result_5 = resp_5.json::().await.unwrap(); assert_eq!( result_5, IndexResponse { @@ -584,8 +585,8 @@ mod test { // Step 6: GET index, including session cookie #2 in request // - response should be: {"counter": 2, "user_id": "ferris"} let req_6 = srv.get("/").cookie(cookie_2.clone()).send(); - let mut resp_6 = srv.block_on(req_6).unwrap(); - let result_6 = block_on(resp_6.json::()).unwrap(); + let mut resp_6 = req_6.await.unwrap(); + let result_6 = resp_6.json::().await.unwrap(); assert_eq!( result_6, IndexResponse { @@ -598,8 +599,8 @@ mod test { // - updates session state in redis: {"counter": 3, "user_id": "ferris"} // - response should be: {"counter": 2, "user_id": None} let req_7 = srv.post("/do_something").cookie(cookie_2.clone()).send(); - let mut resp_7 = srv.block_on(req_7).unwrap(); - let result_7 = block_on(resp_7.json::()).unwrap(); + let mut resp_7 = req_7.await.unwrap(); + let result_7 = resp_7.json::().await.unwrap(); assert_eq!( result_7, IndexResponse { @@ -612,7 +613,7 @@ mod test { // - set-cookie actix-session will be in response (session cookie #3) // - response should be: {"counter": 0, "user_id": None} let req_8 = srv.get("/").cookie(cookie_1.clone()).send(); - let mut resp_8 = srv.block_on(req_8).unwrap(); + let mut resp_8 = req_8.await.unwrap(); let cookie_3 = resp_8 .cookies() .unwrap() @@ -620,7 +621,7 @@ mod test { .into_iter() .find(|c| c.name() == "test-session") .unwrap(); - let result_8 = block_on(resp_8.json::()).unwrap(); + let result_8 = resp_8.json::().await.unwrap(); assert_eq!( result_8, IndexResponse { @@ -634,7 +635,7 @@ mod test { // - set-cookie actix-session will be in response with session cookie #2 // invalidation logic let req_9 = srv.post("/logout").cookie(cookie_2.clone()).send(); - let resp_9 = srv.block_on(req_9).unwrap(); + let resp_9 = req_9.await.unwrap(); let cookie_4 = resp_9 .cookies() .unwrap() @@ -648,8 +649,8 @@ mod test { // - set-cookie actix-session will be in response (session cookie #3) // - response should be: {"counter": 0, "user_id": None} let req_10 = srv.get("/").cookie(cookie_2.clone()).send(); - let mut resp_10 = srv.block_on(req_10).unwrap(); - let result_10 = block_on(resp_10.json::()).unwrap(); + let mut resp_10 = req_10.await.unwrap(); + let result_10 = resp_10.json::().await.unwrap(); assert_eq!( result_10, IndexResponse { diff --git a/tests/test_redis.rs b/tests/test_redis.rs index 979c0230a..b9bb9c390 100644 --- a/tests/test_redis.rs +++ b/tests/test_redis.rs @@ -1,63 +1,42 @@ #[macro_use] extern crate redis_async; -use actix::prelude::*; use actix_redis::{Command, Error, RedisActor, RespValue}; -use futures::Future; - -#[test] -fn test_error_connect() -> std::io::Result<()> { - let sys = System::new("test"); +#[actix_rt::test] +async fn test_error_connect() { let addr = RedisActor::start("localhost:54000"); let _addr2 = addr.clone(); - Arbiter::spawn_fn(move || { - addr.send(Command(resp_array!["GET", "test"])).then(|res| { - match res { - Ok(Err(Error::NotConnected)) => (), - _ => panic!("Should not happen {:?}", res), - } - System::current().stop(); - Ok(()) - }) - }); - - sys.run() + let res = addr.send(Command(resp_array!["GET", "test"])).await; + match res { + Ok(Err(Error::NotConnected)) => (), + _ => panic!("Should not happen {:?}", res), + } } -#[test] -fn test_redis() -> std::io::Result<()> { +#[actix_rt::test] +async fn test_redis() { env_logger::init(); - let sys = System::new("test"); let addr = RedisActor::start("127.0.0.1:6379"); - let _addr2 = addr.clone(); + let res = addr + .send(Command(resp_array!["SET", "test", "value"])) + .await; - Arbiter::spawn_fn(move || { - let addr2 = addr.clone(); - addr.send(Command(resp_array!["SET", "test", "value"])) - .then(move |res| match res { + match res { + Ok(Ok(resp)) => { + assert_eq!(resp, RespValue::SimpleString("OK".to_owned())); + + let res = addr.send(Command(resp_array!["GET", "test"])).await; + match res { Ok(Ok(resp)) => { - assert_eq!(resp, RespValue::SimpleString("OK".to_owned())); - addr2.send(Command(resp_array!["GET", "test"])).then(|res| { - match res { - Ok(Ok(resp)) => { - println!("RESP: {:?}", resp); - assert_eq!( - resp, - RespValue::BulkString((&b"value"[..]).into()) - ); - } - _ => panic!("Should not happen {:?}", res), - } - System::current().stop(); - Ok(()) - }) + println!("RESP: {:?}", resp); + assert_eq!(resp, RespValue::BulkString((&b"value"[..]).into())); } _ => panic!("Should not happen {:?}", res), - }) - }); - - sys.run() + } + } + _ => panic!("Should not happen {:?}", res), + } }