From 342c711bad3b1ad45c4bc24d9a6430cf242e5041 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 2 Jan 2020 01:04:00 +0900 Subject: [PATCH 1/5] Make crate 2018 edition --- Cargo.toml | 1 + src/lib.rs | 14 -------------- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3a931f218..f2e1bb7ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [package] name = "actix-protobuf" version = "0.4.1" +edition = "2018" authors = ["kingxsp "] description = "Protobuf support for actix-web framework." readme = "README.md" diff --git a/src/lib.rs b/src/lib.rs index f8bb3317a..ee6648242 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,3 @@ -extern crate actix; -extern crate actix_web; -extern crate bytes; -extern crate derive_more; -extern crate futures; - -#[cfg(test)] -extern crate http; - -extern crate prost; -#[cfg(test)] -#[macro_use] -extern crate prost_derive; - use derive_more::Display; use std::fmt; use std::ops::{Deref, DerefMut}; From 0eeda4c39878e0f7c2deb6b8376da4fe9014eabd Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 2 Jan 2020 03:40:29 +0900 Subject: [PATCH 2/5] Migrate to actix-web v2 --- Cargo.toml | 12 ++-- examples/prost-example/Cargo.toml | 10 +-- examples/prost-example/src/main.rs | 24 +++---- src/lib.rs | 106 ++++++++++++++++------------- 4 files changed, 77 insertions(+), 75 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2e1bb7ba..c12834838 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "actix-protobuf" version = "0.4.1" edition = "2018" -authors = ["kingxsp "] +authors = ["kingxsp , Yuki Okushi "] description = "Protobuf support for actix-web framework." readme = "README.md" keywords = ["actix"] @@ -21,16 +21,16 @@ path = "src/lib.rs" [dependencies] bytes = "0.4" -futures = "0.1" -derive_more = "0.14" +futures = "0.3.1" +derive_more = "0.99" -actix = "0.8.1" -actix-web = "1.0.0-rc" +actix = "0.9" +actix-rt = "1" +actix-web = "2" prost = "0.5.0" [dev-dependencies] -http = "^0.1" prost-derive = "0.5.0" [workspace] diff --git a/examples/prost-example/Cargo.toml b/examples/prost-example/Cargo.toml index 977c17a3f..555b74913 100644 --- a/examples/prost-example/Cargo.toml +++ b/examples/prost-example/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "prost-example" -version = "0.3.0" -authors = ["kingxsp "] +version = "0.4.0" +edition = "2018" +authors = ["kingxsp , Yuki Okushi "] [dependencies] bytes = "0.4" @@ -10,6 +11,7 @@ env_logger = "*" prost = "0.5.0" prost-derive = "0.5.0" -actix = "0.8.1" -actix-web = "1.0.0-rc" +actix = "0.9" +actix-rt = "1" +actix-web = "2" actix-protobuf = { path="../../" } diff --git a/examples/prost-example/src/main.rs b/examples/prost-example/src/main.rs index 5f1029b7e..61dd729c9 100644 --- a/examples/prost-example/src/main.rs +++ b/examples/prost-example/src/main.rs @@ -1,9 +1,3 @@ -extern crate actix; -extern crate actix_protobuf; -extern crate actix_web; -extern crate bytes; -extern crate env_logger; -extern crate prost; #[macro_use] extern crate prost_derive; @@ -18,25 +12,23 @@ pub struct MyObj { pub name: String, } -fn index(msg: ProtoBuf) -> Result { +async fn index(msg: ProtoBuf) -> Result { println!("model: {:?}", msg); HttpResponse::Ok().protobuf(msg.0) // <- send response } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=debug,actix_server=info"); env_logger::init(); - let sys = actix::System::new("prost-example"); HttpServer::new(|| { App::new() .wrap(middleware::Logger::default()) .service(web::resource("/").route(web::post().to(index))) - }).bind("127.0.0.1:8081") - .unwrap() + }) + .bind("127.0.0.1:8081")? .shutdown_timeout(1) - .start(); - - println!("Started http server: 127.0.0.1:8081"); - let _ = sys.run(); + .run() + .await } diff --git a/src/lib.rs b/src/lib.rs index ee6648242..3199bb54f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,10 @@ use derive_more::Display; use std::fmt; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::future::Future; +use std::task; +use std::task::Poll; use bytes::{BytesMut, IntoBuf}; use prost::DecodeError as ProtoBufDecodeError; @@ -11,7 +15,8 @@ use actix_web::dev::{HttpResponseBuilder, Payload}; use actix_web::error::{Error, PayloadError, ResponseError}; use actix_web::http::header::{CONTENT_LENGTH, CONTENT_TYPE}; use actix_web::{FromRequest, HttpMessage, HttpRequest, HttpResponse, Responder}; -use futures::{Future, Poll, Stream}; +use futures::future::{ready, LocalBoxFuture, FutureExt, Ready}; +use futures::StreamExt; #[derive(Debug, Display)] pub enum ProtoBufPayloadError { @@ -111,7 +116,7 @@ where { type Config = ProtoBufConfig; type Error = Error; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; #[inline] fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { @@ -119,29 +124,30 @@ where .app_data::() .map(|c| c.limit) .unwrap_or(262_144); - Box::new( - ProtoBufMessage::new(req, payload) - .limit(limit) - .map_err(move |e| e.into()) - .map(ProtoBuf), - ) + ProtoBufMessage::new(req, payload) + .limit(limit) + .map(move |res| match res { + Err(e) => Err(e.into()), + Ok(item) => Ok(ProtoBuf(item)), + }) + .boxed_local() } } impl Responder for ProtoBuf { type Error = Error; - type Future = Result; + type Future = Ready>; fn respond_to(self, _: &HttpRequest) -> Self::Future { let mut buf = Vec::new(); - self.0 + ready(self.0 .encode(&mut buf) .map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e))) .and_then(|()| { Ok(HttpResponse::Ok() .content_type("application/protobuf") .body(buf)) - }) + })) } } @@ -150,7 +156,7 @@ pub struct ProtoBufMessage { length: Option, stream: Option, err: Option, - fut: Option>>, + fut: Option>>, } impl ProtoBufMessage { @@ -192,40 +198,44 @@ impl ProtoBufMessage { } impl Future for ProtoBufMessage { - type Item = T; - type Error = ProtoBufPayloadError; + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, task: &mut task::Context<'_>) -> Poll { if let Some(ref mut fut) = self.fut { - return fut.poll(); + return Pin::new(fut).poll(task); } if let Some(err) = self.err.take() { - return Err(err); + return Poll::Ready(Err(err)); } let limit = self.limit; if let Some(len) = self.length.take() { if len > limit { - return Err(ProtoBufPayloadError::Overflow); + return Poll::Ready(Err(ProtoBufPayloadError::Overflow)); } } - let fut = self - .stream - .take() - .expect("ProtoBufMessage could not be used second time") - .from_err() - .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { - if (body.len() + chunk.len()) > limit { - Err(ProtoBufPayloadError::Overflow) - } else { - body.extend_from_slice(&chunk); - Ok(body) + let mut stream = self.stream.take().expect("ProtoBufMessage could not be used second time"); + + self.fut = Some( + async move { + let mut body = BytesMut::with_capacity(8192); + + while let Some(item) = stream.next().await { + let chunk = item?; + if (body.len() + chunk.len()) > limit { + return Err(ProtoBufPayloadError::Overflow); + } else { + body.extend_from_slice(&chunk); + } } - }).and_then(|body| Ok(::decode(&mut body.into_buf())?)); - self.fut = Some(Box::new(fut)); - self.poll() + + return Ok(::decode(&mut body.into_buf())?); + } + .boxed_local(), + ); + self.poll(task) } } @@ -248,8 +258,8 @@ impl ProtoBufResponseBuilder for HttpResponseBuilder { #[cfg(test)] mod tests { use super::*; - use actix_web::test::{block_on, TestRequest}; - use http::header; + use actix_web::test::TestRequest; + use actix_web::http::header; impl PartialEq for ProtoBufPayloadError { fn eq(&self, other: &ProtoBufPayloadError) -> bool { @@ -275,44 +285,42 @@ mod tests { pub name: String, } - #[test] - fn test_protobuf() { + #[actix_rt::test] + async fn test_protobuf() { let protobuf = ProtoBuf(MyObject { number: 9, name: "test".to_owned(), }); let req = TestRequest::default().to_http_request(); - let resp = protobuf.respond_to(&req).unwrap(); + let resp = protobuf.respond_to(&req).await.unwrap(); assert_eq!( resp.headers().get(header::CONTENT_TYPE).unwrap(), "application/protobuf" ); } - #[test] - fn test_protobuf_message() { + #[actix_rt::test] + async fn test_protobuf_message() { let (req, mut pl) = TestRequest::default().to_http_parts(); - let protobuf = block_on(ProtoBufMessage::::new(&req, &mut pl)); + let protobuf = ProtoBufMessage::::new(&req, &mut pl).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::ContentType); - let (req, mut pl) = TestRequest::default() - .header( + let (req, mut pl) = TestRequest::with_header( header::CONTENT_TYPE, - header::HeaderValue::from_static("application/text"), + "application/text", ).to_http_parts(); - let protobuf = block_on(ProtoBufMessage::::new(&req, &mut pl)); + let protobuf = ProtoBufMessage::::new(&req, &mut pl).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::ContentType); - let (req, mut pl) = TestRequest::default() - .header( + let (req, mut pl) = TestRequest::with_header( header::CONTENT_TYPE, - header::HeaderValue::from_static("application/protobuf"), + "application/protobuf", ).header( header::CONTENT_LENGTH, - header::HeaderValue::from_static("10000"), + "10000", ).to_http_parts(); let protobuf = - block_on(ProtoBufMessage::::new(&req, &mut pl).limit(100)); + ProtoBufMessage::::new(&req, &mut pl).limit(100).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::Overflow); } } From fe2553474e77c467a0cdf0c9349684bffd4a34da Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 2 Jan 2020 04:22:52 +0900 Subject: [PATCH 3/5] Run rustfmt --- src/lib.rs | 57 +++++++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3199bb54f..b82795e61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ use derive_more::Display; use std::fmt; +use std::future::Future; use std::ops::{Deref, DerefMut}; use std::pin::Pin; -use std::future::Future; use std::task; use std::task::Poll; @@ -15,7 +15,7 @@ use actix_web::dev::{HttpResponseBuilder, Payload}; use actix_web::error::{Error, PayloadError, ResponseError}; use actix_web::http::header::{CONTENT_LENGTH, CONTENT_TYPE}; use actix_web::{FromRequest, HttpMessage, HttpRequest, HttpResponse, Responder}; -use futures::future::{ready, LocalBoxFuture, FutureExt, Ready}; +use futures::future::{ready, FutureExt, LocalBoxFuture, Ready}; use futures::StreamExt; #[derive(Debug, Display)] @@ -140,14 +140,16 @@ impl Responder for ProtoBuf { fn respond_to(self, _: &HttpRequest) -> Self::Future { let mut buf = Vec::new(); - ready(self.0 - .encode(&mut buf) - .map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e))) - .and_then(|()| { - Ok(HttpResponse::Ok() - .content_type("application/protobuf") - .body(buf)) - })) + ready( + self.0 + .encode(&mut buf) + .map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e))) + .and_then(|()| { + Ok(HttpResponse::Ok() + .content_type("application/protobuf") + .body(buf)) + }), + ) } } @@ -200,7 +202,10 @@ impl ProtoBufMessage { impl Future for ProtoBufMessage { type Output = Result; - fn poll(mut self: Pin<&mut Self>, task: &mut task::Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + task: &mut task::Context<'_>, + ) -> Poll { if let Some(ref mut fut) = self.fut { return Pin::new(fut).poll(task); } @@ -216,7 +221,10 @@ impl Future for ProtoBufMessage { } } - let mut stream = self.stream.take().expect("ProtoBufMessage could not be used second time"); + let mut stream = self + .stream + .take() + .expect("ProtoBufMessage could not be used second time"); self.fut = Some( async move { @@ -258,8 +266,8 @@ impl ProtoBufResponseBuilder for HttpResponseBuilder { #[cfg(test)] mod tests { use super::*; - use actix_web::test::TestRequest; use actix_web::http::header; + use actix_web::test::TestRequest; impl PartialEq for ProtoBufPayloadError { fn eq(&self, other: &ProtoBufPayloadError) -> bool { @@ -305,22 +313,19 @@ mod tests { let protobuf = ProtoBufMessage::::new(&req, &mut pl).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::ContentType); - let (req, mut pl) = TestRequest::with_header( - header::CONTENT_TYPE, - "application/text", - ).to_http_parts(); + let (req, mut pl) = + TestRequest::with_header(header::CONTENT_TYPE, "application/text") + .to_http_parts(); let protobuf = ProtoBufMessage::::new(&req, &mut pl).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::ContentType); - let (req, mut pl) = TestRequest::with_header( - header::CONTENT_TYPE, - "application/protobuf", - ).header( - header::CONTENT_LENGTH, - "10000", - ).to_http_parts(); - let protobuf = - ProtoBufMessage::::new(&req, &mut pl).limit(100).await; + let (req, mut pl) = + TestRequest::with_header(header::CONTENT_TYPE, "application/protobuf") + .header(header::CONTENT_LENGTH, "10000") + .to_http_parts(); + let protobuf = ProtoBufMessage::::new(&req, &mut pl) + .limit(100) + .await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::Overflow); } } From 06a24990713b3beda8de522e17a0aaaeab30de81 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 2 Jan 2020 04:26:17 +0900 Subject: [PATCH 4/5] Update CHANGES.md --- CHANGES.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 4702f335e..9a3aa9c0f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## 0.5.0 (in the future) + +* Migrate to actix-web 2.0.0 and std::future + ## 0.4.1 (2019-10-03) * Upgrade prost and prost-derive to 0.5.0 From d7387ca5b55ebc4e8daa396921661c0c3191ed4d Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 2 Jan 2020 04:29:10 +0900 Subject: [PATCH 5/5] Tweak CI --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5ac82ea72..74c230f9e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,8 +4,8 @@ rust: - beta - nightly -sudo: required -dist: trusty +os: linux +dist: xenial env: global: @@ -26,7 +26,7 @@ addons: before_script: - | if [[ "$TRAVIS_RUST_VERSION" == "nightly" ]]; then - ( ( cargo install clippy && export CLIPPY=true ) || export CLIPPY=false ); + ( ( rustup component add clippy && export CLIPPY=true ) || export CLIPPY=false ); fi - export PATH=$PATH:~/.cargo/bin