1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

added helper future for reading request body

This commit is contained in:
Nikolay Kim 2018-01-03 09:23:58 -08:00
parent 88031b7fde
commit ae084d1146
4 changed files with 115 additions and 6 deletions

View File

@ -196,6 +196,12 @@ pub enum PayloadError {
/// Content encoding stream corruption /// Content encoding stream corruption
#[fail(display="Can not decode content-encoding.")] #[fail(display="Can not decode content-encoding.")]
EncodingCorrupted, EncodingCorrupted,
/// A payload reached size limit.
#[fail(display="A payload reached size limit.")]
Overflow,
/// A payload length is unknown.
#[fail(display="A payload length is unknown.")]
UnknownLength,
/// Parse error /// Parse error
#[fail(display="{}", _0)] #[fail(display="{}", _0)]
ParseError(#[cause] IoError), ParseError(#[cause] IoError),

View File

@ -3,7 +3,7 @@ use std::{str, fmt, mem};
use std::rc::Rc; use std::rc::Rc;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::collections::HashMap; use std::collections::HashMap;
use bytes::BytesMut; use bytes::{Bytes, BytesMut};
use cookie::Cookie; use cookie::Cookie;
use futures::{Async, Future, Stream, Poll}; use futures::{Async, Future, Stream, Poll};
use http_range::HttpRange; use http_range::HttpRange;
@ -14,11 +14,12 @@ use http::{header, Uri, Method, Version, HeaderMap, Extensions};
use info::ConnectionInfo; use info::ConnectionInfo;
use param::Params; use param::Params;
use router::Router; use router::Router;
use payload::Payload; use payload::{Payload, ReadAny};
use json::JsonBody; use json::JsonBody;
use multipart::Multipart; use multipart::Multipart;
use helpers::SharedHttpMessage; use helpers::SharedHttpMessage;
use error::{ParseError, UrlGenerationError, CookieParseError, HttpRangeError, UrlencodedError}; use error::{ParseError, UrlGenerationError,
CookieParseError, HttpRangeError, PayloadError, UrlencodedError};
pub struct HttpMessage { pub struct HttpMessage {
@ -424,6 +425,36 @@ impl<S> HttpRequest<S> {
msg.payload.as_mut().unwrap() msg.payload.as_mut().unwrap()
} }
/// Load request body.
///
/// By default only 256Kb payload reads to a memory, then `BAD REQUEST`
/// http response get returns to a peer. Use `RequestBody::limit()`
/// method to change upper limit.
///
/// ```rust
/// # extern crate bytes;
/// # extern crate actix_web;
/// # extern crate futures;
/// # #[macro_use] extern crate serde_derive;
/// use actix_web::*;
/// use bytes::Bytes;
/// use futures::future::Future;
///
/// fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
/// req.body() // <- get Body future
/// .limit(1024) // <- change max size of the body to a 1kb
/// .from_err()
/// .and_then(|bytes: Bytes| { // <- complete body
/// println!("==== BODY ==== {:?}", bytes);
/// Ok(httpcodes::HTTPOk.into())
/// }).responder()
/// }
/// # fn main() {}
/// ```
pub fn body(&mut self) -> RequestBody {
RequestBody::from_request(self)
}
/// Return stream to http payload processes as multipart. /// Return stream to http payload processes as multipart.
/// ///
/// Content-type: multipart/form-data; /// Content-type: multipart/form-data;
@ -642,6 +673,78 @@ impl Future for UrlEncoded {
} }
} }
/// Future that resolves to a complete request body.
pub struct RequestBody {
pl: ReadAny,
body: BytesMut,
limit: usize,
error: Option<PayloadError>,
}
impl RequestBody {
/// Create `RequestBody` for request.
pub fn from_request<S>(req: &mut HttpRequest<S>) -> RequestBody {
let mut body = RequestBody {
pl: req.payload().readany(),
body: BytesMut::new(),
limit: 262_144,
error: None
};
if let Some(len) = req.headers().get(header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
if len > 262_144 {
body.error = Some(PayloadError::Overflow);
}
} else {
body.error = Some(PayloadError::UnknownLength);
}
} else {
body.error = Some(PayloadError::UnknownLength);
}
}
body
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
impl Future for RequestBody {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(err) = self.error.take() {
return Err(err)
}
loop {
return match self.pl.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
Ok(Async::Ready(self.body.take().freeze()))
},
Ok(Async::Ready(Some(chunk))) => {
if (self.body.len() + chunk.len()) > self.limit {
Err(PayloadError::Overflow)
} else {
self.body.extend_from_slice(&chunk);
continue
}
},
Err(err) => Err(err),
}
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -116,7 +116,7 @@ impl<S, T: DeserializeOwned + 'static> Future for JsonBody<S, T> {
type Error = JsonPayloadError; type Error = JsonPayloadError;
fn poll(&mut self) -> Poll<T, JsonPayloadError> { fn poll(&mut self) -> Poll<T, JsonPayloadError> {
if let Some(mut req) = self.req.take() { if let Some(req) = self.req.take() {
if let Some(len) = req.headers().get(CONTENT_LENGTH) { if let Some(len) = req.headers().get(CONTENT_LENGTH) {
if let Ok(s) = len.to_str() { if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<usize>() { if let Ok(len) = s.parse::<usize>() {
@ -134,7 +134,7 @@ impl<S, T: DeserializeOwned + 'static> Future for JsonBody<S, T> {
} }
let limit = self.limit; let limit = self.limit;
let fut = req.payload_mut().readany() let fut = req.payload().readany()
.from_err() .from_err()
.fold(BytesMut::new(), move |mut body, chunk| { .fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit { if (body.len() + chunk.len()) > limit {

View File

@ -172,7 +172,7 @@ pub mod dev {
pub use router::{Router, Pattern}; pub use router::{Router, Pattern};
pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler};
pub use param::{FromParam, Params}; pub use param::{FromParam, Params};
pub use httprequest::UrlEncoded; pub use httprequest::{UrlEncoded, RequestBody};
pub use httpresponse::HttpResponseBuilder; pub use httpresponse::HttpResponseBuilder;
pub use server::{ServerSettings, PauseServer, ResumeServer, StopServer}; pub use server::{ServerSettings, PauseServer, ResumeServer, StopServer};