diff --git a/Cargo.toml b/Cargo.toml index 98df9176b..dd22754d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ rand = "0.4" regex = "0.2" serde = "1.0" serde_json = "1.0" +prost = "^0.2" sha1 = "0.6" smallvec = "0.6" time = "0.1" @@ -109,6 +110,7 @@ members = [ "examples/diesel", "examples/r2d2", "examples/json", + "examples/protobuf", "examples/hello-world", "examples/http-proxy", "examples/multipart", diff --git a/examples/protobuf/Cargo.toml b/examples/protobuf/Cargo.toml new file mode 100644 index 000000000..75776ce5a --- /dev/null +++ b/examples/protobuf/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "protobuf-example" +version = "0.1.0" +authors = ["kingxsp "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +env_logger = "*" + +prost = "0.2.0" +prost-derive = "0.2.0" + +actix = "0.5" +actix-web = { path="../../" } \ No newline at end of file diff --git a/examples/protobuf/client.py b/examples/protobuf/client.py new file mode 100644 index 000000000..ab91365d8 --- /dev/null +++ b/examples/protobuf/client.py @@ -0,0 +1,66 @@ +# just start server and run client.py + +# wget https://github.com/google/protobuf/releases/download/v3.5.1/protobuf-python-3.5.1.zip +# unzip protobuf-python-3.5.1.zip.1 +# cd protobuf-3.5.1/python/ +# python3.6 setup.py install + +# pip3.6 install --upgrade pip +# pip3.6 install aiohttp + +#!/usr/bin/env python +import test_pb2 +import traceback +import sys + +import asyncio +import aiohttp + +def op(): + try: + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + + #Serialize + sendDataStr = obj.SerializeToString() + #print serialized string value + print('serialized string:', sendDataStr) + #------------------------# + # message transmission # + #------------------------# + receiveDataStr = sendDataStr + receiveData = test_pb2.MyObj() + + #Deserialize + receiveData.ParseFromString(receiveDataStr) + print('pares serialize string, return: devId = ', receiveData.number, ', name = ', receiveData.name) + except(Exception, e): + print(Exception, ':', e) + print(traceback.print_exc()) + errInfo = sys.exc_info() + print(errInfo[0], ':', errInfo[1]) + + +async def fetch(session): + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + async with session.post('http://localhost:8080/', data=obj.SerializeToString(), + headers={"content-type": "application/protobuf"}) as resp: + print(resp.status) + data = await resp.read() + receiveObj = test_pb2.MyObj() + receiveObj.ParseFromString(data) + print(receiveObj) + +async def go(loop): + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + async with aiohttp.ClientSession(loop=loop) as session: + await fetch(session) + +loop = asyncio.get_event_loop() +loop.run_until_complete(go(loop)) +loop.close() \ No newline at end of file diff --git a/examples/protobuf/src/main.rs b/examples/protobuf/src/main.rs new file mode 100644 index 000000000..ff411fff8 --- /dev/null +++ b/examples/protobuf/src/main.rs @@ -0,0 +1,50 @@ +extern crate actix; +extern crate actix_web; +extern crate bytes; +extern crate futures; +extern crate env_logger; +extern crate prost; +#[macro_use] +extern crate prost_derive; + +use actix_web::*; +use futures::Future; + + +#[derive(Clone, Debug, PartialEq, Message)] +pub struct MyObj { + #[prost(int32, tag="1")] + pub number: i32, + #[prost(string, tag="2")] + pub name: String, +} + + +/// This handler uses `HttpRequest::json()` for loading serde json object. +fn index(req: HttpRequest) -> Box> { + req.protobuf() + .from_err() // convert all errors into `Error` + .and_then(|val: MyObj| { + println!("model: {:?}", val); + Ok(httpcodes::HTTPOk.build().protobuf(val)?) // <- send response + }) + .responder() +} + + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + let _ = env_logger::init(); + let sys = actix::System::new("protobuf-example"); + + let addr = HttpServer::new(|| { + Application::new() + .middleware(middleware::Logger::default()) + .resource("/", |r| r.method(Method::POST).f(index))}) + .bind("127.0.0.1:8080").unwrap() + .shutdown_timeout(1) + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/examples/protobuf/test.proto b/examples/protobuf/test.proto new file mode 100644 index 000000000..8ec278ca4 --- /dev/null +++ b/examples/protobuf/test.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message MyObj { + int32 number = 1; + string name = 2; +} \ No newline at end of file diff --git a/examples/protobuf/test_pb2.py b/examples/protobuf/test_pb2.py new file mode 100644 index 000000000..05e71f3a6 --- /dev/null +++ b/examples/protobuf/test_pb2.py @@ -0,0 +1,76 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test.proto', + package='', + syntax='proto3', + serialized_pb=_b('\n\ntest.proto\"%\n\x05MyObj\x12\x0e\n\x06number\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\tb\x06proto3') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_MYOBJ = _descriptor.Descriptor( + name='MyObj', + full_name='MyObj', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='number', full_name='MyObj.number', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='name', full_name='MyObj.name', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=14, + serialized_end=51, +) + +DESCRIPTOR.message_types_by_name['MyObj'] = _MYOBJ + +MyObj = _reflection.GeneratedProtocolMessageType('MyObj', (_message.Message,), dict( + DESCRIPTOR = _MYOBJ, + __module__ = 'test_pb2' + # @@protoc_insertion_point(class_scope:MyObj) + )) +_sym_db.RegisterMessage(MyObj) + + +# @@protoc_insertion_point(module_scope) diff --git a/src/error.rs b/src/error.rs index 40ecf7045..3c3a088b7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,6 +15,8 @@ use http::{header, StatusCode, Error as HttpError}; use http::uri::InvalidUriBytes; use http_range::HttpRangeParseError; use serde_json::error::Error as JsonError; +use prost::EncodeError as ProtoBufEncodeError; +use prost::DecodeError as ProtoBufDecodeError; pub use url::ParseError as UrlParseError; // re-exports @@ -107,6 +109,10 @@ impl From for Error { /// `InternalServerError` for `JsonError` impl ResponseError for JsonError {} +/// `InternalServerError` for `ProtoBufEncodeError` `ProtoBufDecodeError` +impl ResponseError for ProtoBufEncodeError {} +impl ResponseError for ProtoBufDecodeError {} + /// `InternalServerError` for `UrlParseError` impl ResponseError for UrlParseError {} @@ -450,6 +456,44 @@ impl From for JsonPayloadError { } } +#[derive(Fail, Debug)] +pub enum ProtoBufPayloadError { + /// Payload size is bigger than 256k + #[fail(display="Payload size is bigger than 256k")] + Overflow, + /// Content type error + #[fail(display="Content type error")] + ContentType, + /// Deserialize error + #[fail(display="Json deserialize error: {}", _0)] + Deserialize(#[cause] ProtoBufDecodeError), + /// Payload error + #[fail(display="Error that occur during reading payload: {}", _0)] + Payload(#[cause] PayloadError), +} + +impl ResponseError for ProtoBufPayloadError { + + fn error_response(&self) -> HttpResponse { + match *self { + ProtoBufPayloadError::Overflow => httpcodes::HttpPayloadTooLarge.into(), + _ => httpcodes::HttpBadRequest.into(), + } + } +} + +impl From for ProtoBufPayloadError { + fn from(err: PayloadError) -> ProtoBufPayloadError { + ProtoBufPayloadError::Payload(err) + } +} + +impl From for ProtoBufPayloadError { + fn from(err: ProtoBufDecodeError) -> ProtoBufPayloadError { + ProtoBufPayloadError::Deserialize(err) + } +} + /// Errors which can occur when attempting to interpret a segment string as a /// valid path segment. #[derive(Fail, Debug, PartialEq)] diff --git a/src/httpmessage.rs b/src/httpmessage.rs index 69065c49c..61ab3c4da 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -4,6 +4,7 @@ use bytes::{Bytes, BytesMut}; use futures::{Future, Stream, Poll}; use http_range::HttpRange; use serde::de::DeserializeOwned; +use prost::Message; use mime::Mime; use url::form_urlencoded; use encoding::all::UTF_8; @@ -12,6 +13,7 @@ use encoding::label::encoding_from_whatwg_label; use http::{header, HeaderMap}; use json::JsonBody; +use protobuf::ProtoBufBody; use header::Header; use multipart::Multipart; use error::{ParseError, ContentTypeError, @@ -209,6 +211,12 @@ pub trait HttpMessage { JsonBody::new(self) } + fn protobuf(self) -> ProtoBufBody + where Self: Stream + Sized + { + ProtoBufBody::new(self) + } + /// Return stream to http payload processes as multipart. /// /// Content-type: multipart/form-data; diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 9c99d4d68..2147a42c1 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -11,6 +11,7 @@ use http::{StatusCode, Version, HeaderMap, HttpTryFrom, Error as HttpError}; use http::header::{self, HeaderName, HeaderValue}; use serde_json; use serde::Serialize; +use prost::Message; use body::Body; use error::Error; @@ -508,6 +509,22 @@ impl HttpResponseBuilder { Ok(self.body(body)?) } + pub fn protobuf(&mut self, value: T) -> Result { + let mut body = Vec::new(); + value.encode(&mut body)?; + + let contains = if let Some(parts) = parts(&mut self.response, &self.err) { + parts.headers.contains_key(header::CONTENT_TYPE) + } else { + true + }; + if !contains { + self.header(header::CONTENT_TYPE, "application/protobuf"); + } + + Ok(self.body(body)?) + } + /// Set an empty body and generate `HttpResponse` /// /// `HttpResponseBuilder` can not be used after this call. diff --git a/src/lib.rs b/src/lib.rs index f89549377..39e1e48a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ extern crate url; extern crate libc; extern crate serde; extern crate serde_json; +extern crate prost; extern crate flate2; extern crate brotli2; extern crate encoding; @@ -111,6 +112,7 @@ mod httprequest; mod httpresponse; mod info; mod json; +mod protobuf; mod route; mod router; mod resource; @@ -132,6 +134,7 @@ pub mod server; pub use error::{Error, Result, ResponseError}; pub use body::{Body, Binary}; pub use json::Json; +pub use protobuf::ProtoBuf; pub use application::Application; pub use httpmessage::HttpMessage; pub use httprequest::HttpRequest; diff --git a/src/protobuf.rs b/src/protobuf.rs new file mode 100644 index 000000000..439f26179 --- /dev/null +++ b/src/protobuf.rs @@ -0,0 +1,113 @@ +use bytes::{Bytes, BytesMut}; +use futures::{Poll, Future, Stream}; +use http::header::CONTENT_LENGTH; + +use bytes::IntoBuf; +use prost::Message; + +use error::{Error, ProtoBufPayloadError, PayloadError}; +use handler::Responder; +use httpmessage::HttpMessage; +use httprequest::HttpRequest; +use httpresponse::HttpResponse; + + +#[derive(Debug)] +pub struct ProtoBuf(pub T); + +impl Responder for ProtoBuf { + type Item = HttpResponse; + type Error = Error; + + fn respond_to(self, _: HttpRequest) -> Result { + let mut buf = Vec::new(); + self.0.encode(&mut buf) + .map_err(Error::from) + .and_then(|()| { + Ok(HttpResponse::Ok() + .content_type("application/protobuf") + .body(buf) + .into()) + }) + } +} + + + + +pub struct ProtoBufBody{ + limit: usize, + ct: &'static str, + req: Option, + fut: Option>>, +} + +impl ProtoBufBody { + + /// Create `ProtoBufBody` for request. + pub fn new(req: T) -> Self { + ProtoBufBody{ + limit: 262_144, + req: Some(req), + fut: None, + ct: "application/protobuf", + } + } + + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + /// Set allowed content type. + /// + /// By default *application/protobuf* content type is used. Set content type + /// to empty string if you want to disable content type check. + pub fn content_type(mut self, ct: &'static str) -> Self { + self.ct = ct; + self + } +} + +impl Future for ProtoBufBody + where T: HttpMessage + Stream + 'static +{ + type Item = U; + type Error = ProtoBufPayloadError; + + fn poll(&mut self) -> Poll { + if let Some(req) = self.req.take() { + if let Some(len) = req.headers().get(CONTENT_LENGTH) { + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + if len > self.limit { + return Err(ProtoBufPayloadError::Overflow); + } + } else { + return Err(ProtoBufPayloadError::Overflow); + } + } + } + // check content-type + if !self.ct.is_empty() && req.content_type() != self.ct { + return Err(ProtoBufPayloadError::ContentType) + } + + let limit = self.limit; + let fut = req.from_err() + .fold(BytesMut::new(), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(ProtoBufPayloadError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .and_then(|body| Ok(::decode(&mut body.into_buf())?)); + self.fut = Some(Box::new(fut)); + } + + self.fut.as_mut().expect("ProtoBufBody could not be used second time").poll() + } +} \ No newline at end of file