From fdf061610d6afd01af82848dc9f0961b94d7997b Mon Sep 17 00:00:00 2001 From: Stefan Puhlmann Date: Tue, 21 May 2019 22:09:27 +0200 Subject: [PATCH] Adapting protobuf example to use actix_protobuf --- protobuf/Cargo.toml | 8 +-- protobuf/client.py | 2 +- protobuf/src/main.rs | 42 ++++++++------- protobuf/src/protobuf.rs | 109 --------------------------------------- 4 files changed, 29 insertions(+), 132 deletions(-) delete mode 100644 protobuf/src/protobuf.rs diff --git a/protobuf/Cargo.toml b/protobuf/Cargo.toml index ed6c0c6..1a401de 100644 --- a/protobuf/Cargo.toml +++ b/protobuf/Cargo.toml @@ -10,7 +10,9 @@ bytes = "0.4" futures = "0.1" env_logger = "0.6" derive_more = "0.14" -prost = "0.2.0" -prost-derive = "0.2.0" +prost = "0.4.0" +prost-derive = "0.4.0" -actix-web = "1.0.0-beta.4" +actix = "0.8.1" +actix-web = "1.0.0-rc" +actix-protobuf = "0.4.0" diff --git a/protobuf/client.py b/protobuf/client.py index ab91365..965f11b 100644 --- a/protobuf/client.py +++ b/protobuf/client.py @@ -46,7 +46,7 @@ async def fetch(session): obj = test_pb2.MyObj() obj.number = 9 obj.name = 'USB' - async with session.post('http://localhost:8080/', data=obj.SerializeToString(), + async with session.post('http://localhost:8081/', data=obj.SerializeToString(), headers={"content-type": "application/protobuf"}) as resp: print(resp.status) data = await resp.read() diff --git a/protobuf/src/main.rs b/protobuf/src/main.rs index 043fb59..56cbb35 100644 --- a/protobuf/src/main.rs +++ b/protobuf/src/main.rs @@ -1,12 +1,16 @@ +extern crate actix; +extern crate actix_protobuf; +extern crate actix_web; +extern crate bytes; +extern crate env_logger; +extern crate prost; #[macro_use] extern crate prost_derive; -use actix_web::{middleware, web, App, Error, HttpResponse, HttpServer}; -use futures::Future; -mod protobuf; -use protobuf::ProtoBufResponseBuilder; +use actix_protobuf::*; +use actix_web::*; -#[derive(Clone, Debug, PartialEq, Message)] +#[derive(Clone, PartialEq, Message)] pub struct MyObj { #[prost(int32, tag = "1")] pub number: i32, @@ -14,25 +18,25 @@ pub struct MyObj { pub name: String, } -/// This handler uses `ProtoBufMessage` for loading protobuf object. -fn index(pl: web::Payload) -> impl Future { - protobuf::ProtoBufMessage::new(pl) - .from_err() // convert all errors into `Error` - .and_then(|val: MyObj| { - println!("model: {:?}", val); - Ok(HttpResponse::Ok().protobuf(val)?) // <- send response - }) +fn index(msg: ProtoBuf) -> Result { + println!("model: {:?}", msg); + HttpResponse::Ok().protobuf(msg.0) // <- send response } -fn main() -> std::io::Result<()> { - std::env::set_var("RUST_LOG", "actix_web=info,actix_server=info"); +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info,actix_server=info"); env_logger::init(); + let sys = actix::System::new("protobuf-example"); HttpServer::new(|| { App::new() .wrap(middleware::Logger::default()) - .service(web::resource("/").route(web::post().to_async(index))) - }) - .bind("127.0.0.1:8080")? - .run() + .service(web::resource("/").route(web::post().to(index))) + }).bind("127.0.0.1:8081") + .unwrap() + .shutdown_timeout(1) + .start(); + + println!("Started http server: 127.0.0.1:8081"); + let _ = sys.run(); } diff --git a/protobuf/src/protobuf.rs b/protobuf/src/protobuf.rs deleted file mode 100644 index edde625..0000000 --- a/protobuf/src/protobuf.rs +++ /dev/null @@ -1,109 +0,0 @@ -use bytes::BytesMut; -use futures::{Future, Poll, Stream}; - -use bytes::{Bytes, IntoBuf}; -use derive_more::{Display, From}; -use prost::DecodeError as ProtoBufDecodeError; -use prost::EncodeError as ProtoBufEncodeError; -use prost::Message; - -use actix_web::dev::HttpResponseBuilder; -use actix_web::error::{Error, PayloadError, ResponseError}; -use actix_web::http::header::CONTENT_TYPE; -use actix_web::{HttpRequest, HttpResponse, Responder}; - -#[derive(Debug, Display, From)] -pub enum ProtoBufPayloadError { - /// Payload size is bigger than 256k - #[display(fmt = "Payload size is bigger than 256k")] - Overflow, - /// Content type error - #[display(fmt = "Content type error")] - ContentType, - /// Serialize error - #[display(fmt = "ProtoBud serialize error: {}", _0)] - Serialize(ProtoBufEncodeError), - /// Deserialize error - #[display(fmt = "ProtoBud deserialize error: {}", _0)] - Deserialize(ProtoBufDecodeError), - /// Payload error - #[display(fmt = "Error that occur during reading payload: {}", _0)] - Payload(PayloadError), -} - -impl ResponseError for ProtoBufPayloadError { - fn error_response(&self) -> HttpResponse { - match *self { - ProtoBufPayloadError::Overflow => HttpResponse::PayloadTooLarge().into(), - _ => HttpResponse::BadRequest().into(), - } - } -} - -#[derive(Debug)] -pub struct ProtoBuf(pub T); - -impl Responder for ProtoBuf { - type Error = Error; - type Future = Result; - - fn respond_to(self, _: &HttpRequest) -> Result { - let mut buf = Vec::new(); - self.0 - .encode(&mut buf) - .map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e))) - .and_then(|()| { - Ok(HttpResponse::Ok() - .content_type("application/protobuf") - .body(buf) - .into()) - }) - } -} - -pub struct ProtoBufMessage { - fut: Box>, -} - -impl ProtoBufMessage { - /// Create `ProtoBufMessage` for request. - pub fn new(pl: S) -> Self - where - S: Stream + 'static, - { - let fut = pl - .map_err(|e| ProtoBufPayloadError::Payload(e)) - .fold(BytesMut::new(), move |mut body, chunk| { - body.extend_from_slice(&chunk); - Ok::<_, ProtoBufPayloadError>(body) - }) - .and_then(|body| Ok(::decode(&mut body.into_buf())?)); - - ProtoBufMessage { fut: Box::new(fut) } - } -} - -impl Future for ProtoBufMessage where { - type Item = U; - type Error = ProtoBufPayloadError; - - fn poll(&mut self) -> Poll { - self.fut.poll() - } -} - -pub trait ProtoBufResponseBuilder { - fn protobuf(&mut self, value: T) -> Result; -} - -impl ProtoBufResponseBuilder for HttpResponseBuilder { - fn protobuf(&mut self, value: T) -> Result { - self.header(CONTENT_TYPE, "application/protobuf"); - - let mut body = Vec::new(); - value - .encode(&mut body) - .map_err(|e| ProtoBufPayloadError::Serialize(e))?; - Ok(self.body(body)) - } -}