From a153374b6149e8315bcb61ff846c107d4c70571c Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 15 Dec 2019 22:45:38 +0600 Subject: [PATCH] migrate actix-web-actors --- Cargo.toml | 2 +- actix-web-actors/CHANGES.md | 4 + actix-web-actors/Cargo.toml | 19 ++-- actix-web-actors/src/context.rs | 61 +++++------ actix-web-actors/src/ws.rs | 164 ++++++++++++++++-------------- actix-web-actors/tests/test_ws.rs | 89 ++++++++-------- src/test.rs | 72 +++++++------ 7 files changed, 213 insertions(+), 198 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1659515e1..9438f3c7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ open-ssl = { version="0.10", package = "openssl", optional = true } rust-tls = { version = "0.16.0", package = "rustls", optional = true } [dev-dependencies] -# actix = "0.8.3" +actix = "0.9.0-alpha.1" rand = "0.7" env_logger = "0.6" serde_derive = "1.0" diff --git a/actix-web-actors/CHANGES.md b/actix-web-actors/CHANGES.md index 01e116baa..dff2dadf6 100644 --- a/actix-web-actors/CHANGES.md +++ b/actix-web-actors/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0-alpha.1] - 2019-12-15 + +* Migrate to actix-web 2.0.0 + ## [1.0.4] - 2019-12-07 * Allow comma-separated websocket subprotocols without spaces (#1172) diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index a74aef046..d4fe45363 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web-actors" -version = "1.0.4" +version = "2.0.0-alpha.1" authors = ["Nikolay Kim "] description = "Actix actors support for actix web framework." readme = "README.md" @@ -9,8 +9,6 @@ homepage = "https://actix.rs" repository = "https://github.com/actix/actix-web.git" documentation = "https://docs.rs/actix-web-actors/" license = "MIT/Apache-2.0" -exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] -workspace = ".." edition = "2018" [lib] @@ -18,13 +16,14 @@ name = "actix_web_actors" path = "src/lib.rs" [dependencies] -actix = "0.8.3" -actix-web = "1.0.9" -actix-http = "0.2.11" -actix-codec = "0.1.2" -bytes = "0.4" -futures = "0.1.25" +actix = "0.9.0-alpha.1" +actix-web = "2.0.0-alpha.5" +actix-http = "1.0.0" +actix-codec = "0.2.0" +bytes = "0.5.2" +futures = "0.3.1" +pin-project = "0.4.6" [dev-dependencies] +actix-rt = "1.0.0" env_logger = "0.6" -actix-http-test = { version = "0.2.4", features=["ssl"] } diff --git a/actix-web-actors/src/context.rs b/actix-web-actors/src/context.rs index 31b29500a..6a403de12 100644 --- a/actix-web-actors/src/context.rs +++ b/actix-web-actors/src/context.rs @@ -1,4 +1,6 @@ use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix::dev::{ AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope, @@ -7,10 +9,10 @@ use actix::fut::ActorFuture; use actix::{ Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle, }; -use actix_web::error::{Error, ErrorInternalServerError}; +use actix_web::error::Error; use bytes::Bytes; -use futures::sync::oneshot::Sender; -use futures::{Async, Future, Poll, Stream}; +use futures::channel::oneshot::Sender; +use futures::{Future, Stream}; /// Execution context for http actors pub struct HttpContext @@ -43,7 +45,7 @@ where #[inline] fn spawn(&mut self, fut: F) -> SpawnHandle where - F: ActorFuture + 'static, + F: ActorFuture + 'static, { self.inner.spawn(fut) } @@ -51,7 +53,7 @@ where #[inline] fn wait(&mut self, fut: F) where - F: ActorFuture + 'static, + F: ActorFuture + 'static, { self.inner.wait(fut) } @@ -81,7 +83,7 @@ where { #[inline] /// Create a new HTTP Context from a request and an actor - pub fn create(actor: A) -> impl Stream { + pub fn create(actor: A) -> impl Stream> { let mb = Mailbox::default(); let ctx = HttpContext { inner: ContextParts::new(mb.sender_producer()), @@ -91,7 +93,7 @@ where } /// Create a new HTTP Context - pub fn with_factory(f: F) -> impl Stream + pub fn with_factory(f: F) -> impl Stream> where F: FnOnce(&mut Self) -> A + 'static, { @@ -160,24 +162,23 @@ impl Stream for HttpContextFut where A: Actor>, { - type Item = Bytes; - type Error = Error; + type Item = Result; - fn poll(&mut self) -> Poll, Error> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { if self.fut.alive() { - match self.fut.poll() { - Ok(Async::NotReady) | Ok(Async::Ready(())) => (), - Err(_) => return Err(ErrorInternalServerError("error")), - } + let _ = Pin::new(&mut self.fut).poll(cx); } // frames if let Some(data) = self.fut.ctx().stream.pop_front() { - Ok(Async::Ready(data)) + Poll::Ready(data.map(|b| Ok(b))) } else if self.fut.alive() { - Ok(Async::NotReady) + Poll::Pending } else { - Ok(Async::Ready(None)) + Poll::Ready(None) } } } @@ -199,9 +200,9 @@ mod tests { use actix::Actor; use actix_web::http::StatusCode; - use actix_web::test::{block_on, call_service, init_service, TestRequest}; + use actix_web::test::{call_service, init_service, read_body, TestRequest}; use actix_web::{web, App, HttpResponse}; - use bytes::{Bytes, BytesMut}; + use bytes::Bytes; use super::*; @@ -223,31 +224,25 @@ mod tests { if self.count > 3 { ctx.write_eof() } else { - ctx.write(Bytes::from(format!("LINE-{}", self.count).as_bytes())); + ctx.write(Bytes::from(format!("LINE-{}", self.count))); ctx.run_later(Duration::from_millis(100), |slf, ctx| slf.write(ctx)); } } } - #[test] - fn test_default_resource() { + #[actix_rt::test] + async fn test_default_resource() { let mut srv = init_service(App::new().service(web::resource("/test").to(|| { HttpResponse::Ok().streaming(HttpContext::create(MyActor { count: 0 })) - }))); + }))) + .await; let req = TestRequest::with_uri("/test").to_request(); - let mut resp = call_service(&mut srv, req); + let resp = call_service(&mut srv, req).await; assert_eq!(resp.status(), StatusCode::OK); - let body = block_on(resp.take_body().fold( - BytesMut::new(), - move |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok::<_, Error>(body) - }, - )) - .unwrap(); - assert_eq!(body.freeze(), Bytes::from_static(b"LINE-1LINE-2LINE-3")); + let body = read_body(resp).await; + assert_eq!(body, Bytes::from_static(b"LINE-1LINE-2LINE-3")); } } diff --git a/actix-web-actors/src/ws.rs b/actix-web-actors/src/ws.rs index 0b026e35a..b28aeade4 100644 --- a/actix-web-actors/src/ws.rs +++ b/actix-web-actors/src/ws.rs @@ -1,6 +1,8 @@ //! Websocket integration use std::collections::VecDeque; use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix::dev::{ AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler, @@ -16,20 +18,20 @@ use actix_http::ws::{hash_key, Codec}; pub use actix_http::ws::{ CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError, }; - use actix_web::dev::HttpResponseBuilder; -use actix_web::error::{Error, ErrorInternalServerError, PayloadError}; +use actix_web::error::{Error, PayloadError}; use actix_web::http::{header, Method, StatusCode}; use actix_web::{HttpRequest, HttpResponse}; use bytes::{Bytes, BytesMut}; -use futures::sync::oneshot::Sender; -use futures::{Async, Future, Poll, Stream}; +use futures::channel::oneshot::Sender; +use futures::{Future, Stream}; /// Do websocket handshake and start ws actor. pub fn start(actor: A, req: &HttpRequest, stream: T) -> Result where - A: Actor> + StreamHandler, - T: Stream + 'static, + A: Actor> + + StreamHandler>, + T: Stream> + 'static, { let mut res = handshake(req)?; Ok(res.streaming(WebsocketContext::create(actor, stream))) @@ -52,8 +54,9 @@ pub fn start_with_addr( stream: T, ) -> Result<(Addr, HttpResponse), Error> where - A: Actor> + StreamHandler, - T: Stream + 'static, + A: Actor> + + StreamHandler>, + T: Stream> + 'static, { let mut res = handshake(req)?; let (addr, out_stream) = WebsocketContext::create_with_addr(actor, stream); @@ -70,8 +73,9 @@ pub fn start_with_protocols( stream: T, ) -> Result where - A: Actor> + StreamHandler, - T: Stream + 'static, + A: Actor> + + StreamHandler>, + T: Stream> + 'static, { let mut res = handshake_with_protocols(req, protocols)?; Ok(res.streaming(WebsocketContext::create(actor, stream))) @@ -202,14 +206,14 @@ where { fn spawn(&mut self, fut: F) -> SpawnHandle where - F: ActorFuture + 'static, + F: ActorFuture + 'static, { self.inner.spawn(fut) } fn wait(&mut self, fut: F) where - F: ActorFuture + 'static, + F: ActorFuture + 'static, { self.inner.wait(fut) } @@ -238,10 +242,10 @@ where { #[inline] /// Create a new Websocket context from a request and an actor - pub fn create(actor: A, stream: S) -> impl Stream + pub fn create(actor: A, stream: S) -> impl Stream> where - A: StreamHandler, - S: Stream + 'static, + A: StreamHandler>, + S: Stream> + 'static, { let (_, stream) = WebsocketContext::create_with_addr(actor, stream); stream @@ -256,10 +260,10 @@ where pub fn create_with_addr( actor: A, stream: S, - ) -> (Addr, impl Stream) + ) -> (Addr, impl Stream>) where - A: StreamHandler, - S: Stream + 'static, + A: StreamHandler>, + S: Stream> + 'static, { let mb = Mailbox::default(); let mut ctx = WebsocketContext { @@ -279,10 +283,10 @@ where actor: A, stream: S, codec: Codec, - ) -> impl Stream + ) -> impl Stream> where - A: StreamHandler, - S: Stream + 'static, + A: StreamHandler>, + S: Stream> + 'static, { let mb = Mailbox::default(); let mut ctx = WebsocketContext { @@ -298,11 +302,11 @@ where pub fn with_factory( stream: S, f: F, - ) -> impl Stream + ) -> impl Stream> where F: FnOnce(&mut Self) -> A + 'static, - A: StreamHandler, - S: Stream + 'static, + A: StreamHandler>, + S: Stream> + 'static, { let mb = Mailbox::default(); let mut ctx = WebsocketContext { @@ -346,14 +350,14 @@ where /// Send ping frame #[inline] - pub fn ping(&mut self, message: &str) { - self.write_raw(Message::Ping(message.to_string())); + pub fn ping(&mut self, message: &[u8]) { + self.write_raw(Message::Ping(Bytes::copy_from_slice(message))); } /// Send pong frame #[inline] - pub fn pong(&mut self, message: &str) { - self.write_raw(Message::Pong(message.to_string())); + pub fn pong(&mut self, message: &[u8]) { + self.write_raw(Message::Pong(Bytes::copy_from_slice(message))); } /// Send close frame @@ -415,30 +419,34 @@ impl Stream for WebsocketContextFut where A: Actor>, { - type Item = Bytes; - type Error = Error; + type Item = Result; - fn poll(&mut self) -> Poll, Error> { - if self.fut.alive() && self.fut.poll().is_err() { - return Err(ErrorInternalServerError("error")); + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + if this.fut.alive() { + let _ = Pin::new(&mut this.fut).poll(cx); } // encode messages - while let Some(item) = self.fut.ctx().messages.pop_front() { + while let Some(item) = this.fut.ctx().messages.pop_front() { if let Some(msg) = item { - self.encoder.encode(msg, &mut self.buf)?; + this.encoder.encode(msg, &mut this.buf)?; } else { - self.closed = true; + this.closed = true; break; } } - if !self.buf.is_empty() { - Ok(Async::Ready(Some(self.buf.take().freeze()))) - } else if self.fut.alive() && !self.closed { - Ok(Async::NotReady) + if !this.buf.is_empty() { + Poll::Ready(Some(Ok(this.buf.split().freeze()))) + } else if this.fut.alive() && !this.closed { + Poll::Pending } else { - Ok(Async::Ready(None)) + Poll::Ready(None) } } } @@ -454,7 +462,9 @@ where } } +#[pin_project::pin_project] struct WsStream { + #[pin] stream: S, decoder: Codec, buf: BytesMut, @@ -463,7 +473,7 @@ struct WsStream { impl WsStream where - S: Stream, + S: Stream>, { fn new(stream: S, codec: Codec) -> Self { Self { @@ -477,62 +487,64 @@ where impl Stream for WsStream where - S: Stream, + S: Stream>, { - type Item = Message; - type Error = ProtocolError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - if !self.closed { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.as_mut().project(); + + if !*this.closed { loop { - match self.stream.poll() { - Ok(Async::Ready(Some(chunk))) => { - self.buf.extend_from_slice(&chunk[..]); + this = self.as_mut().project(); + match Pin::new(&mut this.stream).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { + this.buf.extend_from_slice(&chunk[..]); } - Ok(Async::Ready(None)) => { - self.closed = true; + Poll::Ready(None) => { + *this.closed = true; break; } - Ok(Async::NotReady) => break, - Err(e) => { - return Err(ProtocolError::Io(io::Error::new( - io::ErrorKind::Other, - format!("{}", e), - ))); + Poll::Pending => break, + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(ProtocolError::Io( + io::Error::new(io::ErrorKind::Other, format!("{}", e)), + )))); } } } } - match self.decoder.decode(&mut self.buf)? { + match this.decoder.decode(this.buf)? { None => { - if self.closed { - Ok(Async::Ready(None)) + if *this.closed { + Poll::Ready(None) } else { - Ok(Async::NotReady) + Poll::Pending } } Some(frm) => { let msg = match frm { - Frame::Text(data) => { - if let Some(data) = data { - Message::Text( - std::str::from_utf8(&data) - .map_err(|_| ProtocolError::BadEncoding)? - .to_string(), - ) - } else { - Message::Text(String::new()) - } - } - Frame::Binary(data) => Message::Binary( - data.map(|b| b.freeze()).unwrap_or_else(Bytes::new), + Frame::Text(data) => Message::Text( + std::str::from_utf8(&data) + .map_err(|e| { + ProtocolError::Io(io::Error::new( + io::ErrorKind::Other, + format!("{}", e), + )) + })? + .to_string(), ), + Frame::Binary(data) => Message::Binary(data), Frame::Ping(s) => Message::Ping(s), Frame::Pong(s) => Message::Pong(s), Frame::Close(reason) => Message::Close(reason), + Frame::Continuation(item) => Message::Continuation(item), }; - Ok(Async::Ready(Some(msg))) + Poll::Ready(Some(Ok(msg))) } } } diff --git a/actix-web-actors/tests/test_ws.rs b/actix-web-actors/tests/test_ws.rs index 687cf4314..076e375d3 100644 --- a/actix-web-actors/tests/test_ws.rs +++ b/actix-web-actors/tests/test_ws.rs @@ -1,10 +1,8 @@ use actix::prelude::*; -use actix_http::HttpService; -use actix_http_test::TestServer; -use actix_web::{web, App, HttpRequest}; +use actix_web::{test, web, App, HttpRequest}; use actix_web_actors::*; -use bytes::{Bytes, BytesMut}; -use futures::{Sink, Stream}; +use bytes::Bytes; +use futures::{SinkExt, StreamExt}; struct Ws; @@ -12,9 +10,13 @@ impl Actor for Ws { type Context = ws::WebsocketContext; } -impl StreamHandler for Ws { - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { - match msg { +impl StreamHandler> for Ws { + fn handle( + &mut self, + msg: Result, + ctx: &mut Self::Context, + ) { + match msg.unwrap() { ws::Message::Ping(msg) => ctx.pong(&msg), ws::Message::Text(text) => ctx.text(text), ws::Message::Binary(bin) => ctx.binary(bin), @@ -24,45 +26,42 @@ impl StreamHandler for Ws { } } -#[test] -fn test_simple() { - let mut srv = - TestServer::new(|| { - HttpService::new(App::new().service(web::resource("/").to( - |req: HttpRequest, stream: web::Payload| ws::start(Ws, &req, stream), - ))) - }); +#[actix_rt::test] +async fn test_simple() { + let mut srv = test::start(|| { + App::new().service(web::resource("/").to( + |req: HttpRequest, stream: web::Payload| { + async move { ws::start(Ws, &req, stream) } + }, + )) + }); // client service - let framed = srv.ws().unwrap(); - let framed = srv - .block_on(framed.send(ws::Message::Text("text".to_string()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!(item, Some(ws::Frame::Text(Some(BytesMut::from("text"))))); - - let framed = srv - .block_on(framed.send(ws::Message::Binary("text".into()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!( - item, - Some(ws::Frame::Binary(Some(Bytes::from_static(b"text").into()))) - ); - - let framed = srv - .block_on(framed.send(ws::Message::Ping("text".into()))) - .unwrap(); - let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!(item, Some(ws::Frame::Pong("text".to_string().into()))); - - let framed = srv - .block_on(framed.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))) + let mut framed = srv.ws().await.unwrap(); + framed + .send(ws::Message::Text("text".to_string())) + .await .unwrap(); - let (item, _framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap(); - assert_eq!( - item, - Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into()))) - ); + let item = framed.next().await.unwrap().unwrap(); + assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text"))); + + framed + .send(ws::Message::Binary("text".into())) + .await + .unwrap(); + let item = framed.next().await.unwrap().unwrap(); + assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text").into())); + + framed.send(ws::Message::Ping("text".into())).await.unwrap(); + let item = framed.next().await.unwrap().unwrap(); + assert_eq!(item, ws::Frame::Pong(Bytes::copy_from_slice(b"text"))); + + framed + .send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))) + .await + .unwrap(); + + let item = framed.next().await.unwrap().unwrap(); + assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into()))); } diff --git a/src/test.rs b/src/test.rs index 6d546702a..1e2c0eec7 100644 --- a/src/test.rs +++ b/src/test.rs @@ -910,6 +910,7 @@ impl Drop for TestServer { #[cfg(test)] mod tests { use actix_http::httpmessage::HttpMessage; + use futures::FutureExt; use serde::{Deserialize, Serialize}; use std::time::SystemTime; @@ -1095,41 +1096,46 @@ mod tests { assert!(res.status().is_success()); } - // #[actix_rt::test] - // fn test_actor() { - // use actix::Actor; + #[actix_rt::test] + async fn test_actor() { + use actix::Actor; - // struct MyActor; + struct MyActor; - // struct Num(usize); - // impl actix::Message for Num { - // type Result = usize; - // } - // impl actix::Actor for MyActor { - // type Context = actix::Context; - // } - // impl actix::Handler for MyActor { - // type Result = usize; - // fn handle(&mut self, msg: Num, _: &mut Self::Context) -> Self::Result { - // msg.0 - // } - // } + struct Num(usize); + impl actix::Message for Num { + type Result = usize; + } + impl actix::Actor for MyActor { + type Context = actix::Context; + } + impl actix::Handler for MyActor { + type Result = usize; + fn handle(&mut self, msg: Num, _: &mut Self::Context) -> Self::Result { + msg.0 + } + } - // let addr = run_on(|| MyActor.start()); - // let mut app = init_service(App::new().service( - // web::resource("/index.html").to(move || { - // addr.send(Num(1)).from_err().and_then(|res| { - // if res == 1 { - // HttpResponse::Ok() - // } else { - // HttpResponse::BadRequest() - // } - // }) - // }), - // )); + let addr = MyActor.start(); - // let req = TestRequest::post().uri("/index.html").to_request(); - // let res = block_fn(|| app.call(req)).unwrap(); - // assert!(res.status().is_success()); - // } + let mut app = init_service(App::new().service(web::resource("/index.html").to( + move || { + addr.send(Num(1)).map(|res| match res { + Ok(res) => { + if res == 1 { + Ok(HttpResponse::Ok()) + } else { + Ok(HttpResponse::BadRequest()) + } + } + Err(err) => Err(err), + }) + }, + ))) + .await; + + let req = TestRequest::post().uri("/index.html").to_request(); + let res = app.call(req).await.unwrap(); + assert!(res.status().is_success()); + } }