diff --git a/CHANGES.md b/CHANGES.md index e69de29bb..f34c74de5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## 0.1.0 (2017-10-23) + +* First release \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 000000000..a574d806e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "actix-protobuf" +version = "0.1.0" +authors = ["kingxsp "] + +[lib] +name = "actix_protobuf" +path = "src/lib.rs" + +[features] +default = ["use_prost"] + +use_prost = ["prost"] + + + +[dependencies] +bytes = "0.4" +futures = "0.1" +failure = "0.1" +env_logger = "*" + +actix = "^0.5" +actix-web = "^0.4" + +prost = { version="^0.2", optional = true } + +[dev-dependencies] +http = "^0.1.5" +prost-derive = "^0.2" + + +[workspace] +members = [ + "./", + "examples/prost-example", +] \ No newline at end of file diff --git a/examples/prost-example/Cargo.toml b/examples/prost-example/Cargo.toml new file mode 100644 index 000000000..6cd47d969 --- /dev/null +++ b/examples/prost-example/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "prost-example" +version = "0.1.0" +authors = ["kingxsp "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +env_logger = "*" + +prost = "0.2" +prost-derive = "0.2" + +actix = "0.5" +actix-web = "0.4" +actix-protobuf = { path="../../" } \ No newline at end of file diff --git a/examples/prost-example/client.py b/examples/prost-example/client.py new file mode 100644 index 000000000..c55f78885 --- /dev/null +++ b/examples/prost-example/client.py @@ -0,0 +1,68 @@ +# just start server and run client.py + +# wget https://github.com/google/protobuf/releases/download/v3.5.1/protobuf-python-3.5.1.zip +# unzip protobuf-python-3.5.1.zip.1 +# cd protobuf-3.5.1/python/ +# python3.6 setup.py install + +# pip3.6 install --upgrade pip +# pip3.6 install aiohttp + +# python3.6 client.py + +#!/usr/bin/env python +import test_pb2 +import traceback +import sys + +import asyncio +import aiohttp + +def op(): + try: + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + + #Serialize + sendDataStr = obj.SerializeToString() + #print serialized string value + print('serialized string:', sendDataStr) + #------------------------# + # message transmission # + #------------------------# + receiveDataStr = sendDataStr + receiveData = test_pb2.MyObj() + + #Deserialize + receiveData.ParseFromString(receiveDataStr) + print('pares serialize string, return: devId = ', receiveData.number, ', name = ', receiveData.name) + except(Exception, e): + print(Exception, ':', e) + print(traceback.print_exc()) + errInfo = sys.exc_info() + print(errInfo[0], ':', errInfo[1]) + + +async def fetch(session): + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + async with session.post('http://localhost:8080/', data=obj.SerializeToString(), + headers={"content-type": "application/protobuf"}) as resp: + print(resp.status) + data = await resp.read() + receiveObj = test_pb2.MyObj() + receiveObj.ParseFromString(data) + print(receiveObj) + +async def go(loop): + obj = test_pb2.MyObj() + obj.number = 9 + obj.name = 'USB' + async with aiohttp.ClientSession(loop=loop) as session: + await fetch(session) + +loop = asyncio.get_event_loop() +loop.run_until_complete(go(loop)) +loop.close() \ No newline at end of file diff --git a/examples/prost-example/src/main.rs b/examples/prost-example/src/main.rs new file mode 100644 index 000000000..0b2309859 --- /dev/null +++ b/examples/prost-example/src/main.rs @@ -0,0 +1,50 @@ +extern crate actix; +extern crate actix_web; +extern crate actix_protobuf; +extern crate bytes; +extern crate futures; +extern crate env_logger; +extern crate prost; +#[macro_use] +extern crate prost_derive; + +use actix_web::*; +use actix_protobuf::*; +use futures::Future; + +#[derive(Clone, Debug, PartialEq, Message)] +pub struct MyObj { + #[prost(int32, tag="1")] + pub number: i32, + #[prost(string, tag="2")] + pub name: String, +} + + +fn index(req: HttpRequest) -> Box> { + req.protobuf() + .from_err() // convert all errors into `Error` + .and_then(|val: MyObj| { + println!("model: {:?}", val); + Ok(httpcodes::HTTPOk.build().protobuf(val)?) // <- send response + }) + .responder() +} + + +fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); + let _ = env_logger::init(); + let sys = actix::System::new("prost-example"); + + let _addr = HttpServer::new(|| { + Application::new() + .middleware(middleware::Logger::default()) + .resource("/", |r| r.method(Method::POST).f(index))}) + .bind("127.0.0.1:8080").unwrap() + .shutdown_timeout(1) + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/examples/prost-example/test.proto b/examples/prost-example/test.proto new file mode 100644 index 000000000..8ec278ca4 --- /dev/null +++ b/examples/prost-example/test.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message MyObj { + int32 number = 1; + string name = 2; +} \ No newline at end of file diff --git a/examples/prost-example/test_pb2.py b/examples/prost-example/test_pb2.py new file mode 100644 index 000000000..05e71f3a6 --- /dev/null +++ b/examples/prost-example/test_pb2.py @@ -0,0 +1,76 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test.proto', + package='', + syntax='proto3', + serialized_pb=_b('\n\ntest.proto\"%\n\x05MyObj\x12\x0e\n\x06number\x18\x01 \x01(\x05\x12\x0c\n\x04name\x18\x02 \x01(\tb\x06proto3') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_MYOBJ = _descriptor.Descriptor( + name='MyObj', + full_name='MyObj', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='number', full_name='MyObj.number', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='name', full_name='MyObj.name', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=14, + serialized_end=51, +) + +DESCRIPTOR.message_types_by_name['MyObj'] = _MYOBJ + +MyObj = _reflection.GeneratedProtocolMessageType('MyObj', (_message.Message,), dict( + DESCRIPTOR = _MYOBJ, + __module__ = 'test_pb2' + # @@protoc_insertion_point(class_scope:MyObj) + )) +_sym_db.RegisterMessage(MyObj) + + +# @@protoc_insertion_point(module_scope) diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 000000000..31b7493fa --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,21 @@ +extern crate actix; +extern crate actix_web; +extern crate bytes; +extern crate futures; +#[macro_use] +extern crate failure; +extern crate env_logger; + +#[cfg(test)] +extern crate http; + +#[cfg(feature="use_prost")] +extern crate prost; +#[cfg(test)] +#[cfg(feature="use_prost")] +#[macro_use] extern crate prost_derive; + +#[cfg(feature="use_prost")] +mod use_prost; +#[cfg(feature="use_prost")] +pub use use_prost::{ ProtoBuf, ProtoBufResponseBuilder, ProtoBufHttpMessage }; \ No newline at end of file diff --git a/src/use_prost.rs b/src/use_prost.rs new file mode 100644 index 000000000..ba3a26867 --- /dev/null +++ b/src/use_prost.rs @@ -0,0 +1,247 @@ +use bytes::{Bytes, BytesMut}; +use futures::{Poll, Future, Stream}; + +use bytes::IntoBuf; +use prost::Message; +use prost::DecodeError as ProtoBufDecodeError; +use prost::EncodeError as ProtoBufEncodeError; + +use actix_web::header::http::{CONTENT_TYPE, CONTENT_LENGTH}; +use actix_web::{Responder, HttpMessage, HttpRequest, HttpResponse}; +use actix_web::dev::HttpResponseBuilder; +use actix_web::error::{Error, PayloadError, ResponseError}; +use actix_web::httpcodes::{HttpBadRequest, HttpPayloadTooLarge}; + + +#[derive(Fail, Debug)] +pub enum ProtoBufPayloadError { + /// Payload size is bigger than 256k + #[fail(display="Payload size is bigger than 256k")] + Overflow, + /// Content type error + #[fail(display="Content type error")] + ContentType, + /// Serialize error + #[fail(display="ProtoBud serialize error: {}", _0)] + Serialize(#[cause] ProtoBufEncodeError), + /// Deserialize error + #[fail(display="ProtoBud deserialize error: {}", _0)] + Deserialize(#[cause] ProtoBufDecodeError), + /// Payload error + #[fail(display="Error that occur during reading payload: {}", _0)] + Payload(#[cause] PayloadError), +} + +impl ResponseError for ProtoBufPayloadError { + + fn error_response(&self) -> HttpResponse { + match *self { + ProtoBufPayloadError::Overflow => HttpPayloadTooLarge.into(), + _ => HttpBadRequest.into(), + } + } +} + +impl From for ProtoBufPayloadError { + fn from(err: PayloadError) -> ProtoBufPayloadError { + ProtoBufPayloadError::Payload(err) + } +} + +impl From for ProtoBufPayloadError { + fn from(err: ProtoBufDecodeError) -> ProtoBufPayloadError { + ProtoBufPayloadError::Deserialize(err) + } +} + +#[derive(Debug)] +pub struct ProtoBuf(pub T); + +impl Responder for ProtoBuf { + type Item = HttpResponse; + type Error = Error; + + fn respond_to(self, _: HttpRequest) -> Result { + let mut buf = Vec::new(); + self.0.encode(&mut buf) + .map_err(|e| Error::from(ProtoBufPayloadError::Serialize(e))) + .and_then(|()| { + Ok(HttpResponse::Ok() + .content_type("application/protobuf") + .body(buf) + .into()) + }) + } +} + +pub struct ProtoBufMessage{ + limit: usize, + ct: &'static str, + req: Option, + fut: Option>>, +} + +impl ProtoBufMessage { + + /// Create `ProtoBufMessage` for request. + pub fn new(req: T) -> Self { + ProtoBufMessage{ + limit: 262_144, + req: Some(req), + fut: None, + ct: "application/protobuf", + } + } + + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + /// Set allowed content type. + /// + /// By default *application/protobuf* content type is used. Set content type + /// to empty string if you want to disable content type check. + pub fn content_type(mut self, ct: &'static str) -> Self { + self.ct = ct; + self + } +} + +impl Future for ProtoBufMessage + where T: HttpMessage + Stream + 'static +{ + type Item = U; + type Error = ProtoBufPayloadError; + + fn poll(&mut self) -> Poll { + if let Some(req) = self.req.take() { + if let Some(len) = req.headers().get(CONTENT_LENGTH) { + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + if len > self.limit { + return Err(ProtoBufPayloadError::Overflow); + } + } else { + return Err(ProtoBufPayloadError::Overflow); + } + } + } + // check content-type + if !self.ct.is_empty() && req.content_type() != self.ct { + return Err(ProtoBufPayloadError::ContentType) + } + + let limit = self.limit; + let fut = req.from_err() + .fold(BytesMut::new(), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(ProtoBufPayloadError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .and_then(|body| Ok(::decode(&mut body.into_buf())?)); + self.fut = Some(Box::new(fut)); + } + + self.fut.as_mut().expect("ProtoBufBody could not be used second time").poll() + } +} + + +pub trait ProtoBufResponseBuilder { + + fn protobuf(&mut self, value: T) -> Result; +} + +impl ProtoBufResponseBuilder for HttpResponseBuilder { + + fn protobuf(&mut self, value: T) -> Result { + self.header(CONTENT_TYPE, "application/protobuf"); + + let mut body = Vec::new(); + value.encode(&mut body).map_err(|e| ProtoBufPayloadError::Serialize(e))?; + Ok(self.body(body)?) + } +} + + + +pub trait ProtoBufHttpMessage { + fn protobuf(self) -> ProtoBufMessage + where Self: Stream + Sized; +} + +impl ProtoBufHttpMessage for HttpRequest { + + #[inline] + fn protobuf(self) -> ProtoBufMessage + where Self: Stream + Sized + { + ProtoBufMessage::new(self) + } +} + + + +#[cfg(test)] +mod tests { + use super::*; + use http::header; + + impl PartialEq for ProtoBufPayloadError { + fn eq(&self, other: &ProtoBufPayloadError) -> bool { + match *self { + ProtoBufPayloadError::Overflow => match *other { + ProtoBufPayloadError::Overflow => true, + _ => false, + }, + ProtoBufPayloadError::ContentType => match *other { + ProtoBufPayloadError::ContentType => true, + _ => false, + }, + _ => false, + } + } + } + + + #[derive(Clone, Debug, PartialEq, Message)] + pub struct MyObject { + #[prost(int32, tag="1")] + pub number: i32, + #[prost(string, tag="2")] + pub name: String, + } + + #[test] + fn test_protobuf() { + let protobuf = ProtoBuf(MyObject{number: 9 , name: "test".to_owned()}); + let resp = protobuf.respond_to(HttpRequest::default()).unwrap(); + assert_eq!(resp.headers().get(header::CONTENT_TYPE).unwrap(), "application/protobuf"); + } + + #[test] + fn test_protobuf_message() { + let req = HttpRequest::default(); + let mut protobuf = req.protobuf::(); + assert_eq!(protobuf.poll().err().unwrap(), ProtoBufPayloadError::ContentType); + + let mut req = HttpRequest::default(); + req.headers_mut().insert(header::CONTENT_TYPE, + header::HeaderValue::from_static("application/protobuf")); + let mut protobuf = req.protobuf::().content_type("text/protobuf"); + assert_eq!(protobuf.poll().err().unwrap(), ProtoBufPayloadError::ContentType); + + let mut req = HttpRequest::default(); + req.headers_mut().insert(header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json")); + req.headers_mut().insert(header::CONTENT_LENGTH, + header::HeaderValue::from_static("10000")); + let mut protobuf = req.protobuf::().limit(100); + assert_eq!(protobuf.poll().err().unwrap(), ProtoBufPayloadError::Overflow); + } +} \ No newline at end of file