From ae084d1146acedd146cc80d80c29092df96ff7d1 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 3 Jan 2018 09:23:58 -0800 Subject: [PATCH] added helper future for reading request body --- src/error.rs | 6 +++ src/httprequest.rs | 109 +++++++++++++++++++++++++++++++++++++++++++-- src/json.rs | 4 +- src/lib.rs | 2 +- 4 files changed, 115 insertions(+), 6 deletions(-) diff --git a/src/error.rs b/src/error.rs index 31ae69fb..29a94d4c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -196,6 +196,12 @@ pub enum PayloadError { /// Content encoding stream corruption #[fail(display="Can not decode content-encoding.")] EncodingCorrupted, + /// A payload reached size limit. + #[fail(display="A payload reached size limit.")] + Overflow, + /// A payload length is unknown. + #[fail(display="A payload length is unknown.")] + UnknownLength, /// Parse error #[fail(display="{}", _0)] ParseError(#[cause] IoError), diff --git a/src/httprequest.rs b/src/httprequest.rs index c39c533d..2c56534d 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -3,7 +3,7 @@ use std::{str, fmt, mem}; use std::rc::Rc; use std::net::SocketAddr; use std::collections::HashMap; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use cookie::Cookie; use futures::{Async, Future, Stream, Poll}; use http_range::HttpRange; @@ -14,11 +14,12 @@ use http::{header, Uri, Method, Version, HeaderMap, Extensions}; use info::ConnectionInfo; use param::Params; use router::Router; -use payload::Payload; +use payload::{Payload, ReadAny}; use json::JsonBody; use multipart::Multipart; use helpers::SharedHttpMessage; -use error::{ParseError, UrlGenerationError, CookieParseError, HttpRangeError, UrlencodedError}; +use error::{ParseError, UrlGenerationError, + CookieParseError, HttpRangeError, PayloadError, UrlencodedError}; pub struct HttpMessage { @@ -424,6 +425,36 @@ impl HttpRequest { msg.payload.as_mut().unwrap() } + /// Load request body. + /// + /// By default only 256Kb payload reads to a memory, then `BAD REQUEST` + /// http response get returns to a peer. Use `RequestBody::limit()` + /// method to change upper limit. + /// + /// ```rust + /// # extern crate bytes; + /// # extern crate actix_web; + /// # extern crate futures; + /// # #[macro_use] extern crate serde_derive; + /// use actix_web::*; + /// use bytes::Bytes; + /// use futures::future::Future; + /// + /// fn index(mut req: HttpRequest) -> Box> { + /// req.body() // <- get Body future + /// .limit(1024) // <- change max size of the body to a 1kb + /// .from_err() + /// .and_then(|bytes: Bytes| { // <- complete body + /// println!("==== BODY ==== {:?}", bytes); + /// Ok(httpcodes::HTTPOk.into()) + /// }).responder() + /// } + /// # fn main() {} + /// ``` + pub fn body(&mut self) -> RequestBody { + RequestBody::from_request(self) + } + /// Return stream to http payload processes as multipart. /// /// Content-type: multipart/form-data; @@ -642,6 +673,78 @@ impl Future for UrlEncoded { } } +/// Future that resolves to a complete request body. +pub struct RequestBody { + pl: ReadAny, + body: BytesMut, + limit: usize, + error: Option, +} + +impl RequestBody { + + /// Create `RequestBody` for request. + pub fn from_request(req: &mut HttpRequest) -> RequestBody { + let mut body = RequestBody { + pl: req.payload().readany(), + body: BytesMut::new(), + limit: 262_144, + error: None + }; + + if let Some(len) = req.headers().get(header::CONTENT_LENGTH) { + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + if len > 262_144 { + body.error = Some(PayloadError::Overflow); + } + } else { + body.error = Some(PayloadError::UnknownLength); + } + } else { + body.error = Some(PayloadError::UnknownLength); + } + } + + body + } + + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } +} + +impl Future for RequestBody { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll { + if let Some(err) = self.error.take() { + return Err(err) + } + + loop { + return match self.pl.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + Ok(Async::Ready(self.body.take().freeze())) + }, + Ok(Async::Ready(Some(chunk))) => { + if (self.body.len() + chunk.len()) > self.limit { + Err(PayloadError::Overflow) + } else { + self.body.extend_from_slice(&chunk); + continue + } + }, + Err(err) => Err(err), + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/json.rs b/src/json.rs index 263f6028..ce9f72ee 100644 --- a/src/json.rs +++ b/src/json.rs @@ -116,7 +116,7 @@ impl Future for JsonBody { type Error = JsonPayloadError; fn poll(&mut self) -> Poll { - if let Some(mut req) = self.req.take() { + if let Some(req) = self.req.take() { if let Some(len) = req.headers().get(CONTENT_LENGTH) { if let Ok(s) = len.to_str() { if let Ok(len) = s.parse::() { @@ -134,7 +134,7 @@ impl Future for JsonBody { } let limit = self.limit; - let fut = req.payload_mut().readany() + let fut = req.payload().readany() .from_err() .fold(BytesMut::new(), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { diff --git a/src/lib.rs b/src/lib.rs index e453a201..5238a684 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,7 +172,7 @@ pub mod dev { pub use router::{Router, Pattern}; pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; pub use param::{FromParam, Params}; - pub use httprequest::UrlEncoded; + pub use httprequest::{UrlEncoded, RequestBody}; pub use httpresponse::HttpResponseBuilder; pub use server::{ServerSettings, PauseServer, ResumeServer, StopServer};