From 95e2a0ef2e5617264176e67fe9b5769717702a56 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 21 Nov 2019 12:17:01 +0600 Subject: [PATCH] migrate actix-framed --- Cargo.toml | 2 +- actix-framed/Cargo.toml | 21 +-- actix-framed/src/app.rs | 63 ++++----- actix-framed/src/helpers.rs | 48 ++++--- actix-framed/src/route.rs | 68 +++++----- actix-framed/src/service.rs | 67 +++++----- actix-framed/src/test.rs | 7 +- actix-framed/tests/test_server.rs | 204 +++++++++++++++++------------- 8 files changed, 263 insertions(+), 217 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 32918ee43..3efc058df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ members = [ "actix-http", "actix-cors", "actix-files", - #"actix-framed", + "actix-framed", #"actix-session", "actix-identity", #"actix-multipart", diff --git a/actix-framed/Cargo.toml b/actix-framed/Cargo.toml index 232c6ae66..4783daefd 100644 --- a/actix-framed/Cargo.toml +++ b/actix-framed/Cargo.toml @@ -20,19 +20,20 @@ name = "actix_framed" path = "src/lib.rs" [dependencies] -actix-codec = "0.1.2" -actix-service = "0.4.2" +actix-codec = "0.2.0-alpha.1" +actix-service = "1.0.0-alpha.1" actix-router = "0.1.2" -actix-rt = "0.2.2" -actix-http = "0.2.11" -actix-server-config = "0.1.1" +actix-rt = "1.0.0-alpha.1" +actix-http = "0.3.0-alpha.1" +actix-server-config = "0.3.0-alpha.1" bytes = "0.4" -futures = "0.1.25" +futures = "0.3.1" +pin-project = "0.4.6" log = "0.4" [dev-dependencies] -actix-server = { version = "0.6.0", features=["openssl"] } -actix-connect = { version = "0.2.0", features=["openssl"] } -actix-http-test = { version = "0.1.0", features=["openssl"] } -actix-utils = "0.4.0" +actix-server = { version = "0.8.0-alpha.1", features=["openssl"] } +actix-connect = { version = "0.3.0-alpha.1", features=["openssl"] } +actix-http-test = { version = "0.3.0-alpha.1", features=["openssl"] } +actix-utils = "0.5.0-alpha.1" diff --git a/actix-framed/src/app.rs b/actix-framed/src/app.rs index ad5b1ec26..f3e746e9f 100644 --- a/actix-framed/src/app.rs +++ b/actix-framed/src/app.rs @@ -1,21 +1,24 @@ +use std::future::Future; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::h1::{Codec, SendResponse}; use actix_http::{Error, Request, Response}; use actix_router::{Path, Router, Url}; use actix_server_config::ServerConfig; -use actix_service::{IntoNewService, NewService, Service}; -use futures::{Async, Future, Poll}; +use actix_service::{IntoServiceFactory, Service, ServiceFactory}; +use futures::future::{ok, FutureExt, LocalBoxFuture}; use crate::helpers::{BoxedHttpNewService, BoxedHttpService, HttpNewService}; use crate::request::FramedRequest; use crate::state::State; -type BoxedResponse = Box>; +type BoxedResponse = LocalBoxFuture<'static, Result<(), Error>>; pub trait HttpServiceFactory { - type Factory: NewService; + type Factory: ServiceFactory; fn path(&self) -> &str; @@ -48,19 +51,19 @@ impl FramedApp { pub fn service(mut self, factory: U) -> Self where U: HttpServiceFactory, - U::Factory: NewService< + U::Factory: ServiceFactory< Config = (), Request = FramedRequest, Response = (), Error = Error, InitError = (), > + 'static, - ::Future: 'static, - ::Service: Service< + ::Future: 'static, + ::Service: Service< Request = FramedRequest, Response = (), Error = Error, - Future = Box>, + Future = LocalBoxFuture<'static, Result<(), Error>>, >, { let path = factory.path().to_string(); @@ -70,12 +73,12 @@ impl FramedApp { } } -impl IntoNewService> for FramedApp +impl IntoServiceFactory> for FramedApp where - T: AsyncRead + AsyncWrite + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: 'static, { - fn into_new_service(self) -> FramedAppFactory { + fn into_factory(self) -> FramedAppFactory { FramedAppFactory { state: self.state, services: Rc::new(self.services), @@ -89,9 +92,9 @@ pub struct FramedAppFactory { services: Rc>)>>, } -impl NewService for FramedAppFactory +impl ServiceFactory for FramedAppFactory where - T: AsyncRead + AsyncWrite + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: 'static, { type Config = ServerConfig; @@ -128,28 +131,30 @@ pub struct CreateService { enum CreateServiceItem { Future( Option, - Box>, Error = ()>>, + LocalBoxFuture<'static, Result>, ()>>, ), Service(String, BoxedHttpService>), } impl Future for CreateService where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, { - type Item = FramedAppService; - type Error = (); + type Output = Result, ()>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let mut done = true; // poll http services for item in &mut self.fut { let res = match item { CreateServiceItem::Future(ref mut path, ref mut fut) => { - match fut.poll()? { - Async::Ready(service) => Some((path.take().unwrap(), service)), - Async::NotReady => { + match Pin::new(fut).poll(cx) { + Poll::Ready(Ok(service)) => { + Some((path.take().unwrap(), service)) + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => { done = false; None } @@ -176,12 +181,12 @@ where } router }); - Ok(Async::Ready(FramedAppService { + Poll::Ready(Ok(FramedAppService { router: router.finish(), state: self.state.clone(), })) } else { - Ok(Async::NotReady) + Poll::Pending } } } @@ -193,15 +198,15 @@ pub struct FramedAppService { impl Service for FramedAppService where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, { type Request = (Request, Framed); type Response = (); type Error = Error; type Future = BoxedResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, (req, framed): (Request, Framed)) -> Self::Future { @@ -210,8 +215,8 @@ where if let Some((srv, _info)) = self.router.recognize_mut(&mut path) { return srv.call(FramedRequest::new(req, framed, path, self.state.clone())); } - Box::new( - SendResponse::new(framed, Response::NotFound().finish()).then(|_| Ok(())), - ) + SendResponse::new(framed, Response::NotFound().finish()) + .then(|_| ok(())) + .boxed_local() } } diff --git a/actix-framed/src/helpers.rs b/actix-framed/src/helpers.rs index b343301f3..b654f9cd7 100644 --- a/actix-framed/src/helpers.rs +++ b/actix-framed/src/helpers.rs @@ -1,36 +1,38 @@ +use std::task::{Context, Poll}; + use actix_http::Error; -use actix_service::{NewService, Service}; -use futures::{Future, Poll}; +use actix_service::{Service, ServiceFactory}; +use futures::future::{FutureExt, LocalBoxFuture}; pub(crate) type BoxedHttpService = Box< dyn Service< Request = Req, Response = (), Error = Error, - Future = Box>, + Future = LocalBoxFuture<'static, Result<(), Error>>, >, >; pub(crate) type BoxedHttpNewService = Box< - dyn NewService< + dyn ServiceFactory< Config = (), Request = Req, Response = (), Error = Error, InitError = (), Service = BoxedHttpService, - Future = Box, Error = ()>>, + Future = LocalBoxFuture<'static, Result, ()>>, >, >; -pub(crate) struct HttpNewService(T); +pub(crate) struct HttpNewService(T); impl HttpNewService where - T: NewService, + T: ServiceFactory, T::Response: 'static, T::Future: 'static, - T::Service: Service>> + 'static, + T::Service: Service>> + 'static, ::Future: 'static, { pub fn new(service: T) -> Self { @@ -38,12 +40,12 @@ where } } -impl NewService for HttpNewService +impl ServiceFactory for HttpNewService where - T: NewService, + T: ServiceFactory, T::Request: 'static, T::Future: 'static, - T::Service: Service>> + 'static, + T::Service: Service>> + 'static, ::Future: 'static, { type Config = (); @@ -52,13 +54,19 @@ where type Error = Error; type InitError = (); type Service = BoxedHttpService; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: &()) -> Self::Future { - Box::new(self.0.new_service(&()).map_err(|_| ()).and_then(|service| { - let service: BoxedHttpService<_> = Box::new(HttpServiceWrapper { service }); - Ok(service) - })) + let fut = self.0.new_service(&()); + + async move { + fut.await.map_err(|_| ()).map(|service| { + let service: BoxedHttpService<_> = + Box::new(HttpServiceWrapper { service }); + service + }) + } + .boxed_local() } } @@ -70,7 +78,7 @@ impl Service for HttpServiceWrapper where T: Service< Response = (), - Future = Box>, + Future = LocalBoxFuture<'static, Result<(), Error>>, Error = Error, >, T::Request: 'static, @@ -78,10 +86,10 @@ where type Request = T::Request; type Response = (); type Error = Error; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result<(), Error>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready() + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + self.service.poll_ready(cx) } fn call(&mut self, req: Self::Request) -> Self::Future { diff --git a/actix-framed/src/route.rs b/actix-framed/src/route.rs index 5beb24165..783039684 100644 --- a/actix-framed/src/route.rs +++ b/actix-framed/src/route.rs @@ -1,11 +1,12 @@ use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_http::{http::Method, Error}; -use actix_service::{NewService, Service}; -use futures::future::{ok, FutureResult}; -use futures::{Async, Future, IntoFuture, Poll}; +use actix_service::{Service, ServiceFactory}; +use futures::future::{ok, FutureExt, LocalBoxFuture, Ready}; use log::error; use crate::app::HttpServiceFactory; @@ -15,11 +16,11 @@ use crate::request::FramedRequest; /// /// Route uses builder-like pattern for configuration. /// If handler is not explicitly set, default *404 Not Found* handler is used. -pub struct FramedRoute { +pub struct FramedRoute { handler: F, pattern: String, methods: Vec, - state: PhantomData<(Io, S, R)>, + state: PhantomData<(Io, S, R, E)>, } impl FramedRoute { @@ -53,12 +54,12 @@ impl FramedRoute { self } - pub fn to(self, handler: F) -> FramedRoute + pub fn to(self, handler: F) -> FramedRoute where F: FnMut(FramedRequest) -> R, - R: IntoFuture, - R::Future: 'static, - R::Error: fmt::Debug, + R: Future> + 'static, + + E: fmt::Debug, { FramedRoute { handler, @@ -69,15 +70,14 @@ impl FramedRoute { } } -impl HttpServiceFactory for FramedRoute +impl HttpServiceFactory for FramedRoute where Io: AsyncRead + AsyncWrite + 'static, F: FnMut(FramedRequest) -> R + Clone, - R: IntoFuture, - R::Future: 'static, - R::Error: fmt::Display, + R: Future> + 'static, + E: fmt::Display, { - type Factory = FramedRouteFactory; + type Factory = FramedRouteFactory; fn path(&self) -> &str { &self.pattern @@ -92,27 +92,26 @@ where } } -pub struct FramedRouteFactory { +pub struct FramedRouteFactory { handler: F, methods: Vec, - _t: PhantomData<(Io, S, R)>, + _t: PhantomData<(Io, S, R, E)>, } -impl NewService for FramedRouteFactory +impl ServiceFactory for FramedRouteFactory where Io: AsyncRead + AsyncWrite + 'static, F: FnMut(FramedRequest) -> R + Clone, - R: IntoFuture, - R::Future: 'static, - R::Error: fmt::Display, + R: Future> + 'static, + E: fmt::Display, { type Config = (); type Request = FramedRequest; type Response = (); type Error = Error; type InitError = (); - type Service = FramedRouteService; - type Future = FutureResult; + type Service = FramedRouteService; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(FramedRouteService { @@ -123,35 +122,38 @@ where } } -pub struct FramedRouteService { +pub struct FramedRouteService { handler: F, methods: Vec, - _t: PhantomData<(Io, S, R)>, + _t: PhantomData<(Io, S, R, E)>, } -impl Service for FramedRouteService +impl Service for FramedRouteService where Io: AsyncRead + AsyncWrite + 'static, F: FnMut(FramedRequest) -> R + Clone, - R: IntoFuture, - R::Future: 'static, - R::Error: fmt::Display, + R: Future> + 'static, + E: fmt::Display, { type Request = FramedRequest; type Response = (); type Error = Error; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result<(), Error>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: FramedRequest) -> Self::Future { - Box::new((self.handler)(req).into_future().then(|res| { + let fut = (self.handler)(req); + + async move { + let res = fut.await; if let Err(e) = res { error!("Error in request handler: {}", e); } Ok(()) - })) + } + .boxed_local() } } diff --git a/actix-framed/src/service.rs b/actix-framed/src/service.rs index fbbc9fbef..ed3a75ff5 100644 --- a/actix-framed/src/service.rs +++ b/actix-framed/src/service.rs @@ -1,4 +1,6 @@ use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::body::BodySize; @@ -6,9 +8,9 @@ use actix_http::error::ResponseError; use actix_http::h1::{Codec, Message}; use actix_http::ws::{verify_handshake, HandshakeError}; use actix_http::{Request, Response}; -use actix_service::{NewService, Service}; -use futures::future::{ok, Either, FutureResult}; -use futures::{Async, Future, IntoFuture, Poll, Sink}; +use actix_service::{Service, ServiceFactory}; +use futures::future::{err, ok, Either, Ready}; +use futures::Future; /// Service that verifies incoming request if it is valid websocket /// upgrade request. In case of error returns `HandshakeError` @@ -22,14 +24,14 @@ impl Default for VerifyWebSockets { } } -impl NewService for VerifyWebSockets { +impl ServiceFactory for VerifyWebSockets { type Config = C; type Request = (Request, Framed); type Response = (Request, Framed); type Error = (HandshakeError, Framed); type InitError = (); type Service = VerifyWebSockets; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &C) -> Self::Future { ok(VerifyWebSockets { _t: PhantomData }) @@ -40,16 +42,16 @@ impl Service for VerifyWebSockets { type Request = (Request, Framed); type Response = (Request, Framed); type Error = (HandshakeError, Framed); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, (req, framed): (Request, Framed)) -> Self::Future { match verify_handshake(req.head()) { - Err(e) => Err((e, framed)).into_future(), - Ok(_) => Ok((req, framed)).into_future(), + Err(e) => err((e, framed)), + Ok(_) => ok((req, framed)), } } } @@ -67,9 +69,9 @@ where } } -impl NewService for SendError +impl ServiceFactory for SendError where - T: AsyncRead + AsyncWrite + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, R: 'static, E: ResponseError + 'static, { @@ -79,7 +81,7 @@ where type Error = (E, Framed); type InitError = (); type Service = SendError; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &C) -> Self::Future { ok(SendError(PhantomData)) @@ -88,25 +90,25 @@ where impl Service for SendError where - T: AsyncRead + AsyncWrite + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, R: 'static, E: ResponseError + 'static, { type Request = Result)>; type Response = R; type Error = (E, Framed); - type Future = Either)>, SendErrorFut>; + type Future = Either)>>, SendErrorFut>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Result)>) -> Self::Future { match req { - Ok(r) => Either::A(ok(r)), + Ok(r) => Either::Left(ok(r)), Err((e, framed)) => { let res = e.error_response().drop_body(); - Either::B(SendErrorFut { + Either::Right(SendErrorFut { framed: Some(framed), res: Some((res, BodySize::Empty).into()), err: Some(e), @@ -117,6 +119,7 @@ where } } +#[pin_project::pin_project] pub struct SendErrorFut { res: Option, BodySize)>>, framed: Option>, @@ -127,23 +130,27 @@ pub struct SendErrorFut { impl Future for SendErrorFut where E: ResponseError, - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, { - type Item = R; - type Error = (E, Framed); + type Output = Result)>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { if let Some(res) = self.res.take() { - if self.framed.as_mut().unwrap().force_send(res).is_err() { - return Err((self.err.take().unwrap(), self.framed.take().unwrap())); + if self.framed.as_mut().unwrap().write(res).is_err() { + return Poll::Ready(Err(( + self.err.take().unwrap(), + self.framed.take().unwrap(), + ))); } } - match self.framed.as_mut().unwrap().poll_complete() { - Ok(Async::Ready(_)) => { - Err((self.err.take().unwrap(), self.framed.take().unwrap())) + match self.framed.as_mut().unwrap().flush(cx) { + Poll::Ready(Ok(_)) => { + Poll::Ready(Err((self.err.take().unwrap(), self.framed.take().unwrap()))) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(_) => Err((self.err.take().unwrap(), self.framed.take().unwrap())), + Poll::Ready(Err(_)) => { + Poll::Ready(Err((self.err.take().unwrap(), self.framed.take().unwrap()))) + } + Poll::Pending => Poll::Pending, } } } diff --git a/actix-framed/src/test.rs b/actix-framed/src/test.rs index 3bc828df4..b90a493dc 100644 --- a/actix-framed/src/test.rs +++ b/actix-framed/src/test.rs @@ -1,4 +1,6 @@ //! Various helpers for Actix applications to use during testing. +use std::future::Future; + use actix_codec::Framed; use actix_http::h1::Codec; use actix_http::http::header::{Header, HeaderName, IntoHeaderValue}; @@ -6,7 +8,6 @@ use actix_http::http::{HttpTryFrom, Method, Uri, Version}; use actix_http::test::{TestBuffer, TestRequest as HttpTestRequest}; use actix_router::{Path, Url}; use actix_rt::Runtime; -use futures::IntoFuture; use crate::{FramedRequest, State}; @@ -121,10 +122,10 @@ impl TestRequest { pub fn run(self, f: F) -> Result where F: FnOnce(FramedRequest) -> R, - R: IntoFuture, + R: Future>, { let mut rt = Runtime::new().unwrap(); - rt.block_on(f(self.finish()).into_future()) + rt.block_on(f(self.finish())) } } diff --git a/actix-framed/tests/test_server.rs b/actix-framed/tests/test_server.rs index 00f6a97d8..6e4bb6ada 100644 --- a/actix-framed/tests/test_server.rs +++ b/actix-framed/tests/test_server.rs @@ -1,30 +1,31 @@ use actix_codec::{AsyncRead, AsyncWrite}; use actix_http::{body, http::StatusCode, ws, Error, HttpService, Response}; -use actix_http_test::TestServer; -use actix_service::{IntoNewService, NewService}; +use actix_http_test::{block_on, TestServer}; +use actix_service::{pipeline_factory, IntoServiceFactory, ServiceFactory}; use actix_utils::framed::FramedTransport; use bytes::{Bytes, BytesMut}; -use futures::future::{self, ok}; -use futures::{Future, Sink, Stream}; +use futures::{future, SinkExt, StreamExt}; use actix_framed::{FramedApp, FramedRequest, FramedRoute, SendError, VerifyWebSockets}; -fn ws_service( +async fn ws_service( req: FramedRequest, -) -> impl Future { - let (req, framed, _) = req.into_parts(); +) -> Result<(), Error> { + let (req, mut framed, _) = req.into_parts(); let res = ws::handshake(req.head()).unwrap().message_body(()); framed .send((res, body::BodySize::None).into()) - .map_err(|_| panic!()) - .and_then(|framed| { - FramedTransport::new(framed.into_framed(ws::Codec::new()), service) - .map_err(|_| panic!()) - }) + .await + .unwrap(); + FramedTransport::new(framed.into_framed(ws::Codec::new()), service) + .await + .unwrap(); + + Ok(()) } -fn service(msg: ws::Frame) -> impl Future { +async fn service(msg: ws::Frame) -> Result { let msg = match msg { ws::Frame::Ping(msg) => ws::Message::Pong(msg), ws::Frame::Text(text) => { @@ -34,108 +35,129 @@ fn service(msg: ws::Frame) -> impl Future { ws::Frame::Close(reason) => ws::Message::Close(reason), _ => panic!(), }; - ok(msg) + Ok(msg) } #[test] fn test_simple() { - let mut srv = TestServer::new(|| { - HttpService::build() - .upgrade( - FramedApp::new().service(FramedRoute::get("/index.html").to(ws_service)), - ) - .finish(|_| future::ok::<_, Error>(Response::NotFound())) - }); + block_on(async { + let mut srv = TestServer::start(|| { + HttpService::build() + .upgrade( + FramedApp::new() + .service(FramedRoute::get("/index.html").to(ws_service)), + ) + .finish(|_| future::ok::<_, Error>(Response::NotFound())) + }); - assert!(srv.ws_at("/test").is_err()); + assert!(srv.ws_at("/test").await.is_err()); - // client service - let framed = srv.ws_at("/index.html").unwrap(); - let framed = srv - .block_on(framed.send(ws::Message::Text("text".to_string()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!(item, Some(ws::Frame::Text(Some(BytesMut::from("text"))))); + // client service + let mut framed = srv.ws_at("/index.html").await.unwrap(); + framed + .send(ws::Message::Text("text".to_string())) + .await + .unwrap(); + let (item, mut framed) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Text(Some(BytesMut::from("text"))) + ); - let framed = srv - .block_on(framed.send(ws::Message::Binary("text".into()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!( - item, - Some(ws::Frame::Binary(Some(Bytes::from_static(b"text").into()))) - ); + framed + .send(ws::Message::Binary("text".into())) + .await + .unwrap(); + let (item, mut framed) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Binary(Some(Bytes::from_static(b"text").into())) + ); - let framed = srv - .block_on(framed.send(ws::Message::Ping("text".into()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!(item, Some(ws::Frame::Pong("text".to_string().into()))); + framed.send(ws::Message::Ping("text".into())).await.unwrap(); + let (item, mut framed) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Pong("text".to_string().into()) + ); - let framed = srv - .block_on(framed.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))) - .unwrap(); + framed + .send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) + .await + .unwrap(); - let (item, _framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!( - item, - Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into()))) - ); + let (item, _) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Close(Some(ws::CloseCode::Normal.into())) + ); + }) } #[test] fn test_service() { - let mut srv = TestServer::new(|| { - actix_http::h1::OneRequest::new().map_err(|_| ()).and_then( - VerifyWebSockets::default() - .then(SendError::default()) - .map_err(|_| ()) + block_on(async { + let mut srv = TestServer::start(|| { + pipeline_factory(actix_http::h1::OneRequest::new().map_err(|_| ())).and_then( + pipeline_factory( + pipeline_factory(VerifyWebSockets::default()) + .then(SendError::default()) + .map_err(|_| ()), + ) .and_then( FramedApp::new() .service(FramedRoute::get("/index.html").to(ws_service)) - .into_new_service() + .into_factory() .map_err(|_| ()), ), - ) - }); + ) + }); - // non ws request - let res = srv.block_on(srv.get("/index.html").send()).unwrap(); - assert_eq!(res.status(), StatusCode::BAD_REQUEST); + // non ws request + let res = srv.get("/index.html").send().await.unwrap(); + assert_eq!(res.status(), StatusCode::BAD_REQUEST); - // not found - assert!(srv.ws_at("/test").is_err()); + // not found + assert!(srv.ws_at("/test").await.is_err()); - // client service - let framed = srv.ws_at("/index.html").unwrap(); - let framed = srv - .block_on(framed.send(ws::Message::Text("text".to_string()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!(item, Some(ws::Frame::Text(Some(BytesMut::from("text"))))); + // client service + let mut framed = srv.ws_at("/index.html").await.unwrap(); + framed + .send(ws::Message::Text("text".to_string())) + .await + .unwrap(); + let (item, mut framed) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Text(Some(BytesMut::from("text"))) + ); - let framed = srv - .block_on(framed.send(ws::Message::Binary("text".into()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!( - item, - Some(ws::Frame::Binary(Some(Bytes::from_static(b"text").into()))) - ); + framed + .send(ws::Message::Binary("text".into())) + .await + .unwrap(); + let (item, mut framed) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Binary(Some(Bytes::from_static(b"text").into())) + ); - let framed = srv - .block_on(framed.send(ws::Message::Ping("text".into()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!(item, Some(ws::Frame::Pong("text".to_string().into()))); + framed.send(ws::Message::Ping("text".into())).await.unwrap(); + let (item, mut framed) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Pong("text".to_string().into()) + ); - let framed = srv - .block_on(framed.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))) - .unwrap(); + framed + .send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) + .await + .unwrap(); - let (item, _framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!( - item, - Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into()))) - ); + let (item, _) = framed.into_future().await; + assert_eq!( + item.unwrap().unwrap(), + ws::Frame::Close(Some(ws::CloseCode::Normal.into())) + ); + }) }