From 0eeda4c39878e0f7c2deb6b8376da4fe9014eabd Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 2 Jan 2020 03:40:29 +0900 Subject: [PATCH] Migrate to actix-web v2 --- Cargo.toml | 12 ++-- examples/prost-example/Cargo.toml | 10 +-- examples/prost-example/src/main.rs | 24 +++---- src/lib.rs | 106 ++++++++++++++++------------- 4 files changed, 77 insertions(+), 75 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2e1bb7ba..c12834838 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "actix-protobuf" version = "0.4.1" edition = "2018" -authors = ["kingxsp "] +authors = ["kingxsp , Yuki Okushi "] description = "Protobuf support for actix-web framework." readme = "README.md" keywords = ["actix"] @@ -21,16 +21,16 @@ path = "src/lib.rs" [dependencies] bytes = "0.4" -futures = "0.1" -derive_more = "0.14" +futures = "0.3.1" +derive_more = "0.99" -actix = "0.8.1" -actix-web = "1.0.0-rc" +actix = "0.9" +actix-rt = "1" +actix-web = "2" prost = "0.5.0" [dev-dependencies] -http = "^0.1" prost-derive = "0.5.0" [workspace] diff --git a/examples/prost-example/Cargo.toml b/examples/prost-example/Cargo.toml index 977c17a3f..555b74913 100644 --- a/examples/prost-example/Cargo.toml +++ b/examples/prost-example/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "prost-example" -version = "0.3.0" -authors = ["kingxsp "] +version = "0.4.0" +edition = "2018" +authors = ["kingxsp , Yuki Okushi "] [dependencies] bytes = "0.4" @@ -10,6 +11,7 @@ env_logger = "*" prost = "0.5.0" prost-derive = "0.5.0" -actix = "0.8.1" -actix-web = "1.0.0-rc" +actix = "0.9" +actix-rt = "1" +actix-web = "2" actix-protobuf = { path="../../" } diff --git a/examples/prost-example/src/main.rs b/examples/prost-example/src/main.rs index 5f1029b7e..61dd729c9 100644 --- a/examples/prost-example/src/main.rs +++ b/examples/prost-example/src/main.rs @@ -1,9 +1,3 @@ -extern crate actix; -extern crate actix_protobuf; -extern crate actix_web; -extern crate bytes; -extern crate env_logger; -extern crate prost; #[macro_use] extern crate prost_derive; @@ -18,25 +12,23 @@ pub struct MyObj { pub name: String, } -fn index(msg: ProtoBuf) -> Result { +async fn index(msg: ProtoBuf) -> Result { println!("model: {:?}", msg); HttpResponse::Ok().protobuf(msg.0) // <- send response } -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); +#[actix_rt::main] +async fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "actix_web=debug,actix_server=info"); env_logger::init(); - let sys = actix::System::new("prost-example"); HttpServer::new(|| { App::new() .wrap(middleware::Logger::default()) .service(web::resource("/").route(web::post().to(index))) - }).bind("127.0.0.1:8081") - .unwrap() + }) + .bind("127.0.0.1:8081")? .shutdown_timeout(1) - .start(); - - println!("Started http server: 127.0.0.1:8081"); - let _ = sys.run(); + .run() + .await } diff --git a/src/lib.rs b/src/lib.rs index ee6648242..3199bb54f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,10 @@ use derive_more::Display; use std::fmt; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::future::Future; +use std::task; +use std::task::Poll; use bytes::{BytesMut, IntoBuf}; use prost::DecodeError as ProtoBufDecodeError; @@ -11,7 +15,8 @@ use actix_web::dev::{HttpResponseBuilder, Payload}; use actix_web::error::{Error, PayloadError, ResponseError}; use actix_web::http::header::{CONTENT_LENGTH, CONTENT_TYPE}; use actix_web::{FromRequest, HttpMessage, HttpRequest, HttpResponse, Responder}; -use futures::{Future, Poll, Stream}; +use futures::future::{ready, LocalBoxFuture, FutureExt, Ready}; +use futures::StreamExt; #[derive(Debug, Display)] pub enum ProtoBufPayloadError { @@ -111,7 +116,7 @@ where { type Config = ProtoBufConfig; type Error = Error; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; #[inline] fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { @@ -119,29 +124,30 @@ where .app_data::() .map(|c| c.limit) .unwrap_or(262_144); - Box::new( - ProtoBufMessage::new(req, payload) - .limit(limit) - .map_err(move |e| e.into()) - .map(ProtoBuf), - ) + ProtoBufMessage::new(req, payload) + .limit(limit) + .map(move |res| match res { + Err(e) => Err(e.into()), + Ok(item) => Ok(ProtoBuf(item)), + }) + .boxed_local() } } impl Responder for ProtoBuf { type Error = Error; - type Future = Result; + type Future = Ready>; fn respond_to(self, _: &HttpRequest) -> Self::Future { let mut buf = Vec::new(); - self.0 + ready(self.0 .encode(&mut buf) .map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e))) .and_then(|()| { Ok(HttpResponse::Ok() .content_type("application/protobuf") .body(buf)) - }) + })) } } @@ -150,7 +156,7 @@ pub struct ProtoBufMessage { length: Option, stream: Option, err: Option, - fut: Option>>, + fut: Option>>, } impl ProtoBufMessage { @@ -192,40 +198,44 @@ impl ProtoBufMessage { } impl Future for ProtoBufMessage { - type Item = T; - type Error = ProtoBufPayloadError; + type Output = Result; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, task: &mut task::Context<'_>) -> Poll { if let Some(ref mut fut) = self.fut { - return fut.poll(); + return Pin::new(fut).poll(task); } if let Some(err) = self.err.take() { - return Err(err); + return Poll::Ready(Err(err)); } let limit = self.limit; if let Some(len) = self.length.take() { if len > limit { - return Err(ProtoBufPayloadError::Overflow); + return Poll::Ready(Err(ProtoBufPayloadError::Overflow)); } } - let fut = self - .stream - .take() - .expect("ProtoBufMessage could not be used second time") - .from_err() - .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { - if (body.len() + chunk.len()) > limit { - Err(ProtoBufPayloadError::Overflow) - } else { - body.extend_from_slice(&chunk); - Ok(body) + let mut stream = self.stream.take().expect("ProtoBufMessage could not be used second time"); + + self.fut = Some( + async move { + let mut body = BytesMut::with_capacity(8192); + + while let Some(item) = stream.next().await { + let chunk = item?; + if (body.len() + chunk.len()) > limit { + return Err(ProtoBufPayloadError::Overflow); + } else { + body.extend_from_slice(&chunk); + } } - }).and_then(|body| Ok(::decode(&mut body.into_buf())?)); - self.fut = Some(Box::new(fut)); - self.poll() + + return Ok(::decode(&mut body.into_buf())?); + } + .boxed_local(), + ); + self.poll(task) } } @@ -248,8 +258,8 @@ impl ProtoBufResponseBuilder for HttpResponseBuilder { #[cfg(test)] mod tests { use super::*; - use actix_web::test::{block_on, TestRequest}; - use http::header; + use actix_web::test::TestRequest; + use actix_web::http::header; impl PartialEq for ProtoBufPayloadError { fn eq(&self, other: &ProtoBufPayloadError) -> bool { @@ -275,44 +285,42 @@ mod tests { pub name: String, } - #[test] - fn test_protobuf() { + #[actix_rt::test] + async fn test_protobuf() { let protobuf = ProtoBuf(MyObject { number: 9, name: "test".to_owned(), }); let req = TestRequest::default().to_http_request(); - let resp = protobuf.respond_to(&req).unwrap(); + let resp = protobuf.respond_to(&req).await.unwrap(); assert_eq!( resp.headers().get(header::CONTENT_TYPE).unwrap(), "application/protobuf" ); } - #[test] - fn test_protobuf_message() { + #[actix_rt::test] + async fn test_protobuf_message() { let (req, mut pl) = TestRequest::default().to_http_parts(); - let protobuf = block_on(ProtoBufMessage::::new(&req, &mut pl)); + let protobuf = ProtoBufMessage::::new(&req, &mut pl).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::ContentType); - let (req, mut pl) = TestRequest::default() - .header( + let (req, mut pl) = TestRequest::with_header( header::CONTENT_TYPE, - header::HeaderValue::from_static("application/text"), + "application/text", ).to_http_parts(); - let protobuf = block_on(ProtoBufMessage::::new(&req, &mut pl)); + let protobuf = ProtoBufMessage::::new(&req, &mut pl).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::ContentType); - let (req, mut pl) = TestRequest::default() - .header( + let (req, mut pl) = TestRequest::with_header( header::CONTENT_TYPE, - header::HeaderValue::from_static("application/protobuf"), + "application/protobuf", ).header( header::CONTENT_LENGTH, - header::HeaderValue::from_static("10000"), + "10000", ).to_http_parts(); let protobuf = - block_on(ProtoBufMessage::::new(&req, &mut pl).limit(100)); + ProtoBufMessage::::new(&req, &mut pl).limit(100).await; assert_eq!(protobuf.err().unwrap(), ProtoBufPayloadError::Overflow); } }