1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

move payload futures from actix-http

This commit is contained in:
Nikolay Kim 2019-03-17 00:48:40 -07:00
parent c80884904c
commit 9012c46fe1
8 changed files with 688 additions and 24 deletions

View File

@ -81,7 +81,7 @@ mime = "0.3"
net2 = "0.2.33"
parking_lot = "0.7"
regex = "1.0"
serde = "1.0"
serde = { version = "1.0", features=["derive"] }
serde_json = "1.0"
serde_urlencoded = "^0.5.3"
time = "0.1"

View File

@ -3,8 +3,12 @@ use std::fmt;
pub use actix_http::error::*;
use derive_more::{Display, From};
use serde_json::error::Error as JsonError;
use url::ParseError as UrlParseError;
use crate::http::StatusCode;
use crate::HttpResponse;
/// Errors which can occur when attempting to generate resource uri.
#[derive(Debug, PartialEq, Display, From)]
pub enum UrlGenerationError {
@ -41,3 +45,100 @@ impl<E: fmt::Debug> From<actix_rt::blocking::BlockingError<E>> for BlockingError
}
}
}
/// A set of errors that can occur during parsing urlencoded payloads
#[derive(Debug, Display, From)]
pub enum UrlencodedError {
/// Can not decode chunked transfer encoding
#[display(fmt = "Can not decode chunked transfer encoding")]
Chunked,
/// Payload size is bigger than allowed. (default: 256kB)
#[display(fmt = "Urlencoded payload size is bigger than allowed. (default: 256kB)")]
Overflow,
/// Payload size is now known
#[display(fmt = "Payload size is now known")]
UnknownLength,
/// Content type error
#[display(fmt = "Content type error")]
ContentType,
/// Parse error
#[display(fmt = "Parse error")]
Parse,
/// Payload error
#[display(fmt = "Error that occur during reading payload: {}", _0)]
Payload(PayloadError),
}
/// Return `BadRequest` for `UrlencodedError`
impl ResponseError for UrlencodedError {
fn error_response(&self) -> HttpResponse {
match *self {
UrlencodedError::Overflow => {
HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE)
}
UrlencodedError::UnknownLength => {
HttpResponse::new(StatusCode::LENGTH_REQUIRED)
}
_ => HttpResponse::new(StatusCode::BAD_REQUEST),
}
}
}
/// A set of errors that can occur during parsing json payloads
#[derive(Debug, Display, From)]
pub enum JsonPayloadError {
/// Payload size is bigger than allowed. (default: 256kB)
#[display(fmt = "Json payload size is bigger than allowed. (default: 256kB)")]
Overflow,
/// Content type error
#[display(fmt = "Content type error")]
ContentType,
/// Deserialize error
#[display(fmt = "Json deserialize error: {}", _0)]
Deserialize(JsonError),
/// Payload error
#[display(fmt = "Error that occur during reading payload: {}", _0)]
Payload(PayloadError),
}
/// Return `BadRequest` for `UrlencodedError`
impl ResponseError for JsonPayloadError {
fn error_response(&self) -> HttpResponse {
match *self {
JsonPayloadError::Overflow => {
HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE)
}
_ => HttpResponse::new(StatusCode::BAD_REQUEST),
}
}
}
/// Error type returned when reading body as lines.
#[derive(From, Display, Debug)]
pub enum ReadlinesError {
/// Error when decoding a line.
#[display(fmt = "Encoding error")]
/// Payload size is bigger than allowed. (default: 256kB)
EncodingError,
/// Payload error.
#[display(fmt = "Error that occur during reading payload: {}", _0)]
Payload(PayloadError),
/// Line limit exceeded.
#[display(fmt = "Line limit exceeded")]
LimitOverflow,
/// ContentType error.
#[display(fmt = "Content-type error")]
ContentTypeError(ContentTypeError),
}
/// Return `BadRequest` for `ReadlinesError`
impl ResponseError for ReadlinesError {
fn error_response(&self) -> HttpResponse {
match *self {
ReadlinesError::LimitOverflow => {
HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE)
}
_ => HttpResponse::new(StatusCode::BAD_REQUEST),
}
}
}

View File

@ -58,7 +58,10 @@ pub mod dev {
pub use crate::service::{
HttpServiceFactory, ServiceFromRequest, ServiceRequest, ServiceResponse,
};
pub use crate::types::form::UrlEncoded;
pub use crate::types::json::JsonBody;
pub use crate::types::payload::HttpMessageBody;
pub use crate::types::readlines::Readlines;
pub use actix_http::body::{Body, BodyLength, MessageBody, ResponseBody};
pub use actix_http::dev::ResponseBuilder as HttpResponseBuilder;

View File

@ -3,13 +3,17 @@
use std::rc::Rc;
use std::{fmt, ops};
use actix_http::dev::UrlEncoded;
use actix_http::error::{Error, UrlencodedError};
use bytes::Bytes;
use futures::{Future, Stream};
use actix_http::error::{Error, PayloadError, UrlencodedError};
use actix_http::{HttpMessage, Payload};
use bytes::{Bytes, BytesMut};
use encoding::all::UTF_8;
use encoding::types::{DecoderTrap, Encoding};
use encoding::EncodingRef;
use futures::{Future, Poll, Stream};
use serde::de::DeserializeOwned;
use crate::extract::FromRequest;
use crate::http::header::CONTENT_LENGTH;
use crate::request::HttpRequest;
use crate::service::ServiceFromRequest;
@ -167,13 +171,145 @@ impl Default for FormConfig {
}
}
/// Future that resolves to a parsed urlencoded values.
///
/// Parse `application/x-www-form-urlencoded` encoded request's body.
/// Return `UrlEncoded` future. Form can be deserialized to any type that
/// implements `Deserialize` trait from *serde*.
///
/// Returns error:
///
/// * content type is not `application/x-www-form-urlencoded`
/// * content-length is greater than 32k
///
pub struct UrlEncoded<T: HttpMessage, U> {
stream: Payload<T::Stream>,
limit: usize,
length: Option<usize>,
encoding: EncodingRef,
err: Option<UrlencodedError>,
fut: Option<Box<Future<Item = U, Error = UrlencodedError>>>,
}
impl<T, U> UrlEncoded<T, U>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create a new future to URL encode a request
pub fn new(req: &mut T) -> UrlEncoded<T, U> {
// check content type
if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" {
return Self::err(UrlencodedError::ContentType);
}
let encoding = match req.encoding() {
Ok(enc) => enc,
Err(_) => return Self::err(UrlencodedError::ContentType),
};
let mut len = None;
if let Some(l) = req.headers().get(CONTENT_LENGTH) {
if let Ok(s) = l.to_str() {
if let Ok(l) = s.parse::<usize>() {
len = Some(l)
} else {
return Self::err(UrlencodedError::UnknownLength);
}
} else {
return Self::err(UrlencodedError::UnknownLength);
}
};
UrlEncoded {
encoding,
stream: req.take_payload(),
limit: 32_768,
length: len,
fut: None,
err: None,
}
}
fn err(e: UrlencodedError) -> Self {
UrlEncoded {
stream: Payload::None,
limit: 32_768,
fut: None,
err: Some(e),
length: None,
encoding: UTF_8,
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
impl<T, U> Future for UrlEncoded<T, U>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError> + 'static,
U: DeserializeOwned + 'static,
{
type Item = U;
type Error = UrlencodedError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut {
return fut.poll();
}
if let Some(err) = self.err.take() {
return Err(err);
}
// payload size
let limit = self.limit;
if let Some(len) = self.length.take() {
if len > limit {
return Err(UrlencodedError::Overflow);
}
}
// future
let encoding = self.encoding;
let fut = std::mem::replace(&mut self.stream, Payload::None)
.from_err()
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(UrlencodedError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.and_then(move |body| {
if (encoding as *const Encoding) == UTF_8 {
serde_urlencoded::from_bytes::<U>(&body)
.map_err(|_| UrlencodedError::Parse)
} else {
let body = encoding
.decode(&body, DecoderTrap::Strict)
.map_err(|_| UrlencodedError::Parse)?;
serde_urlencoded::from_str::<U>(&body)
.map_err(|_| UrlencodedError::Parse)
}
});
self.fut = Some(Box::new(fut));
self.poll()
}
}
#[cfg(test)]
mod tests {
use actix_http::http::header;
use bytes::Bytes;
use serde_derive::Deserialize;
use serde::Deserialize;
use super::*;
use crate::http::header::CONTENT_TYPE;
use crate::test::{block_on, TestRequest};
#[derive(Deserialize, Debug, PartialEq)]
@ -183,15 +319,91 @@ mod tests {
#[test]
fn test_form() {
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
.header(header::CONTENT_LENGTH, "11")
let mut req =
TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded")
.header(CONTENT_LENGTH, "11")
.set_payload(Bytes::from_static(b"hello=world"))
.to_from();
let s = block_on(Form::<Info>::from_request(&mut req)).unwrap();
assert_eq!(s.hello, "world");
}
fn eq(err: UrlencodedError, other: UrlencodedError) -> bool {
match err {
UrlencodedError::Chunked => match other {
UrlencodedError::Chunked => true,
_ => false,
},
UrlencodedError::Overflow => match other {
UrlencodedError::Overflow => true,
_ => false,
},
UrlencodedError::UnknownLength => match other {
UrlencodedError::UnknownLength => true,
_ => false,
},
UrlencodedError::ContentType => match other {
UrlencodedError::ContentType => true,
_ => false,
},
_ => false,
}
}
#[test]
fn test_urlencoded_error() {
let mut req =
TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded")
.header(CONTENT_LENGTH, "xxxx")
.to_request();
let info = block_on(UrlEncoded::<_, Info>::new(&mut req));
assert!(eq(info.err().unwrap(), UrlencodedError::UnknownLength));
let mut req =
TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded")
.header(CONTENT_LENGTH, "1000000")
.to_request();
let info = block_on(UrlEncoded::<_, Info>::new(&mut req));
assert!(eq(info.err().unwrap(), UrlencodedError::Overflow));
let mut req = TestRequest::with_header(CONTENT_TYPE, "text/plain")
.header(CONTENT_LENGTH, "10")
.to_request();
let info = block_on(UrlEncoded::<_, Info>::new(&mut req));
assert!(eq(info.err().unwrap(), UrlencodedError::ContentType));
}
#[test]
fn test_urlencoded() {
let mut req =
TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded")
.header(CONTENT_LENGTH, "11")
.set_payload(Bytes::from_static(b"hello=world"))
.to_request();
let info = block_on(UrlEncoded::<_, Info>::new(&mut req)).unwrap();
assert_eq!(
info,
Info {
hello: "world".to_owned()
}
);
let mut req = TestRequest::with_header(
CONTENT_TYPE,
"application/x-www-form-urlencoded; charset=utf-8",
)
.header(CONTENT_LENGTH, "11")
.set_payload(Bytes::from_static(b"hello=world"))
.to_request();
let info = block_on(UrlEncoded::<_, Info>::new(&mut req)).unwrap();
assert_eq!(
info,
Info {
hello: "world".to_owned()
}
);
}
}

View File

@ -393,7 +393,7 @@ mod tests {
#[test]
fn test_json_body() {
let mut req = TestRequest::default().to_request();
let json = block_on(req.json::<MyObject>());
let json = block_on(JsonBody::<_, MyObject>::new(&mut req));
assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType));
let mut req = TestRequest::default()
@ -402,7 +402,7 @@ mod tests {
header::HeaderValue::from_static("application/text"),
)
.to_request();
let json = block_on(req.json::<MyObject>());
let json = block_on(JsonBody::<_, MyObject>::new(&mut req));
assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType));
let mut req = TestRequest::default()
@ -416,7 +416,7 @@ mod tests {
)
.to_request();
let json = block_on(req.json::<MyObject>().limit(100));
let json = block_on(JsonBody::<_, MyObject>::new(&mut req).limit(100));
assert!(json_eq(json.err().unwrap(), JsonPayloadError::Overflow));
let mut req = TestRequest::default()
@ -431,7 +431,7 @@ mod tests {
.set_payload(Bytes::from_static(b"{\"name\": \"test\"}"))
.to_request();
let json = block_on(req.json::<MyObject>());
let json = block_on(JsonBody::<_, MyObject>::new(&mut req));
assert_eq!(
json.ok().unwrap(),
MyObject {

View File

@ -1,10 +1,11 @@
//! Helper types
mod form;
pub(crate) mod form;
pub(crate) mod json;
mod path;
mod payload;
pub(crate) mod payload;
mod query;
pub(crate) mod readlines;
pub use self::form::{Form, FormConfig};
pub use self::json::{Json, JsonConfig};

View File

@ -1,10 +1,9 @@
//! Payload/Bytes/String extractors
use std::str;
use actix_http::dev::MessageBody;
use actix_http::error::{Error, ErrorBadRequest, PayloadError};
use actix_http::HttpMessage;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use encoding::all::UTF_8;
use encoding::types::{DecoderTrap, Encoding};
use futures::future::{err, Either, FutureResult};
@ -12,6 +11,7 @@ use futures::{Future, Poll, Stream};
use mime::Mime;
use crate::extract::FromRequest;
use crate::http::header;
use crate::service::ServiceFromRequest;
/// Payload extractor returns request 's payload stream.
@ -152,7 +152,7 @@ where
}
let limit = cfg.limit;
Either::A(Box::new(MessageBody::new(req).limit(limit).from_err()))
Either::A(Box::new(HttpMessageBody::new(req).limit(limit).from_err()))
}
}
@ -213,7 +213,7 @@ where
let limit = cfg.limit;
Either::A(Box::new(
MessageBody::new(req)
HttpMessageBody::new(req)
.limit(limit)
.from_err()
.and_then(move |body| {
@ -287,6 +287,109 @@ impl Default for PayloadConfig {
}
}
/// Future that resolves to a complete http message body.
///
/// Load http message body.
///
/// By default only 256Kb payload reads to a memory, then
/// `PayloadError::Overflow` get returned. Use `MessageBody::limit()`
/// method to change upper limit.
pub struct HttpMessageBody<T: HttpMessage> {
limit: usize,
length: Option<usize>,
stream: actix_http::Payload<T::Stream>,
err: Option<PayloadError>,
fut: Option<Box<Future<Item = Bytes, Error = PayloadError>>>,
}
impl<T> HttpMessageBody<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create `MessageBody` for request.
pub fn new(req: &mut T) -> HttpMessageBody<T> {
let mut len = None;
if let Some(l) = req.headers().get(header::CONTENT_LENGTH) {
if let Ok(s) = l.to_str() {
if let Ok(l) = s.parse::<usize>() {
len = Some(l)
} else {
return Self::err(PayloadError::UnknownLength);
}
} else {
return Self::err(PayloadError::UnknownLength);
}
}
HttpMessageBody {
stream: req.take_payload(),
limit: 262_144,
length: len,
fut: None,
err: None,
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
fn err(e: PayloadError) -> Self {
HttpMessageBody {
stream: actix_http::Payload::None,
limit: 262_144,
fut: None,
err: Some(e),
length: None,
}
}
}
impl<T> Future for HttpMessageBody<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut {
return fut.poll();
}
if let Some(err) = self.err.take() {
return Err(err);
}
if let Some(len) = self.length.take() {
if len > self.limit {
return Err(PayloadError::Overflow);
}
}
// future
let limit = self.limit;
self.fut = Some(Box::new(
std::mem::replace(&mut self.stream, actix_http::Payload::None)
.from_err()
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(PayloadError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.map(|body| body.freeze()),
));
self.poll()
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
@ -332,4 +435,38 @@ mod tests {
let s = block_on(String::from_request(&mut req)).unwrap();
assert_eq!(s, "hello=world");
}
#[test]
fn test_message_body() {
let mut req =
TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").to_request();
let res = block_on(HttpMessageBody::new(&mut req));
match res.err().unwrap() {
PayloadError::UnknownLength => (),
_ => unreachable!("error"),
}
let mut req =
TestRequest::with_header(header::CONTENT_LENGTH, "1000000").to_request();
let res = block_on(HttpMessageBody::new(&mut req));
match res.err().unwrap() {
PayloadError::Overflow => (),
_ => unreachable!("error"),
}
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(b"test"))
.to_request();
let res = block_on(HttpMessageBody::new(&mut req));
assert_eq!(res.ok().unwrap(), Bytes::from_static(b"test"));
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(b"11111111111111"))
.to_request();
let res = block_on(HttpMessageBody::new(&mut req).limit(5));
match res.err().unwrap() {
PayloadError::Overflow => (),
_ => unreachable!("error"),
}
}
}

210
src/types/readlines.rs Normal file
View File

@ -0,0 +1,210 @@
use std::str;
use bytes::{Bytes, BytesMut};
use encoding::all::UTF_8;
use encoding::types::{DecoderTrap, Encoding};
use encoding::EncodingRef;
use futures::{Async, Poll, Stream};
use crate::dev::Payload;
use crate::error::{PayloadError, ReadlinesError};
use crate::HttpMessage;
/// Stream to read request line by line.
pub struct Readlines<T: HttpMessage> {
stream: Payload<T::Stream>,
buff: BytesMut,
limit: usize,
checked_buff: bool,
encoding: EncodingRef,
err: Option<ReadlinesError>,
}
impl<T> Readlines<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create a new stream to read request line by line.
pub fn new(req: &mut T) -> Self {
let encoding = match req.encoding() {
Ok(enc) => enc,
Err(err) => return Self::err(err.into()),
};
Readlines {
stream: req.take_payload(),
buff: BytesMut::with_capacity(262_144),
limit: 262_144,
checked_buff: true,
err: None,
encoding,
}
}
/// Change max line size. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
fn err(err: ReadlinesError) -> Self {
Readlines {
stream: Payload::None,
buff: BytesMut::new(),
limit: 262_144,
checked_buff: true,
encoding: UTF_8,
err: Some(err),
}
}
}
impl<T> Stream for Readlines<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
type Item = String;
type Error = ReadlinesError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(err) = self.err.take() {
return Err(err);
}
// check if there is a newline in the buffer
if !self.checked_buff {
let mut found: Option<usize> = None;
for (ind, b) in self.buff.iter().enumerate() {
if *b == b'\n' {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind + 1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff.split_to(ind + 1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&self.buff.split_to(ind + 1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
return Ok(Async::Ready(Some(line)));
}
self.checked_buff = true;
}
// poll req for more bytes
match self.stream.poll() {
Ok(Async::Ready(Some(mut bytes))) => {
// check if there is a newline in bytes
let mut found: Option<usize> = None;
for (ind, b) in bytes.iter().enumerate() {
if *b == b'\n' {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind + 1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&bytes.split_to(ind + 1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&bytes.split_to(ind + 1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
// extend buffer with rest of the bytes;
self.buff.extend_from_slice(&bytes);
self.checked_buff = false;
return Ok(Async::Ready(Some(line)));
}
self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady)
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
if self.buff.is_empty() {
return Ok(Async::Ready(None));
}
if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff)
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&self.buff, DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
self.buff.clear();
Ok(Async::Ready(Some(line)))
}
Err(e) => Err(ReadlinesError::from(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::{block_on, TestRequest};
#[test]
fn test_readlines() {
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(
b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
industry. Lorem Ipsum has been the industry's standard dummy\n\
Contrary to popular belief, Lorem Ipsum is not simply random text.",
))
.to_request();
let stream = match block_on(Readlines::new(&mut req).into_future()) {
Ok((Some(s), stream)) => {
assert_eq!(
s,
"Lorem Ipsum is simply dummy text of the printing and typesetting\n"
);
stream
}
_ => unreachable!("error"),
};
let stream = match block_on(stream.into_future()) {
Ok((Some(s), stream)) => {
assert_eq!(
s,
"industry. Lorem Ipsum has been the industry's standard dummy\n"
);
stream
}
_ => unreachable!("error"),
};
match block_on(stream.into_future()) {
Ok((Some(s), stream)) => {
assert_eq!(
s,
"Contrary to popular belief, Lorem Ipsum is not simply random text."
);
}
_ => unreachable!("error"),
}
}
}