use bytes::{Bytes, BytesMut}; use futures::{Poll, Future, Stream}; use http::header::CONTENT_LENGTH; use bytes::IntoBuf; use prost::Message; use error::{Error, ProtoBufPayloadError, PayloadError}; use handler::Responder; use httpmessage::HttpMessage; use httprequest::HttpRequest; use httpresponse::HttpResponse; #[derive(Debug)] pub struct ProtoBuf(pub T); impl Responder for ProtoBuf { type Item = HttpResponse; type Error = Error; fn respond_to(self, _: HttpRequest) -> Result { let mut buf = Vec::new(); self.0.encode(&mut buf) .map_err(Error::from) .and_then(|()| { Ok(HttpResponse::Ok() .content_type("application/protobuf") .body(buf) .into()) }) } } pub struct ProtoBufBody{ limit: usize, ct: &'static str, req: Option, fut: Option>>, } impl ProtoBufBody { /// Create `ProtoBufBody` for request. pub fn new(req: T) -> Self { ProtoBufBody{ limit: 262_144, req: Some(req), fut: None, ct: "application/protobuf", } } /// Change max size of payload. By default max size is 256Kb pub fn limit(mut self, limit: usize) -> Self { self.limit = limit; self } /// Set allowed content type. /// /// By default *application/protobuf* content type is used. Set content type /// to empty string if you want to disable content type check. pub fn content_type(mut self, ct: &'static str) -> Self { self.ct = ct; self } } impl Future for ProtoBufBody where T: HttpMessage + Stream + 'static { type Item = U; type Error = ProtoBufPayloadError; fn poll(&mut self) -> Poll { if let Some(req) = self.req.take() { if let Some(len) = req.headers().get(CONTENT_LENGTH) { if let Ok(s) = len.to_str() { if let Ok(len) = s.parse::() { if len > self.limit { return Err(ProtoBufPayloadError::Overflow); } } else { return Err(ProtoBufPayloadError::Overflow); } } } // check content-type if !self.ct.is_empty() && req.content_type() != self.ct { return Err(ProtoBufPayloadError::ContentType) } let limit = self.limit; let fut = req.from_err() .fold(BytesMut::new(), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { Err(ProtoBufPayloadError::Overflow) } else { body.extend_from_slice(&chunk); Ok(body) } }) .and_then(|body| Ok(::decode(&mut body.into_buf())?)); self.fut = Some(Box::new(fut)); } self.fut.as_mut().expect("ProtoBufBody could not be used second time").poll() } }