1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-27 17:22:57 +01:00

move httpmessage futures to actix-web

This commit is contained in:
Nikolay Kim 2019-03-17 01:02:51 -07:00
parent fd141ef9b1
commit fa66a07ec5
7 changed files with 84 additions and 762 deletions

View File

@ -1,10 +1,9 @@
use std::{env, io};
use actix_http::HttpMessage;
use actix_http::{HttpService, Request, Response};
use actix_http::{error::PayloadError, HttpService, Request, Response};
use actix_server::Server;
use bytes::Bytes;
use futures::Future;
use bytes::BytesMut;
use futures::{Future, Stream};
use http::header::HeaderValue;
use log::info;
@ -18,12 +17,20 @@ fn main() -> io::Result<()> {
.client_timeout(1000)
.client_disconnect(1000)
.finish(|mut req: Request| {
req.body().limit(512).and_then(|bytes: Bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header("x-head", HeaderValue::from_static("dummy value!"));
Ok(res.body(bytes))
})
req.take_payload()
.fold(BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, PayloadError>(body)
})
.and_then(|bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header(
"x-head",
HeaderValue::from_static("dummy value!"),
);
Ok(res.body(bytes))
})
})
})?
.run()

View File

@ -1,20 +1,25 @@
use std::{env, io};
use actix_http::http::HeaderValue;
use actix_http::HttpMessage;
use actix_http::{Error, HttpService, Request, Response};
use actix_http::{error::PayloadError, Error, HttpService, Request, Response};
use actix_server::Server;
use bytes::Bytes;
use futures::Future;
use bytes::BytesMut;
use futures::{Future, Stream};
use log::info;
fn handle_request(mut req: Request) -> impl Future<Item = Response, Error = Error> {
req.body().limit(512).from_err().and_then(|bytes: Bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header("x-head", HeaderValue::from_static("dummy value!"));
Ok(res.body(bytes))
})
req.take_payload()
.fold(BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, PayloadError>(body)
})
.from_err()
.and_then(|bytes| {
info!("request body: {:?}", bytes);
let mut res = Response::Ok();
res.header("x-head", HeaderValue::from_static("dummy value!"));
Ok(res.body(bytes))
})
}
fn main() -> io::Result<()> {

View File

@ -390,80 +390,6 @@ impl ResponseError for ContentTypeError {
}
}
/// A set of errors that can occur during parsing urlencoded payloads
#[derive(Debug, Display, From)]
pub enum UrlencodedError {
/// Can not decode chunked transfer encoding
#[display(fmt = "Can not decode chunked transfer encoding")]
Chunked,
/// Payload size is bigger than allowed. (default: 256kB)
#[display(fmt = "Urlencoded payload size is bigger than allowed. (default: 256kB)")]
Overflow,
/// Payload size is now known
#[display(fmt = "Payload size is now known")]
UnknownLength,
/// Content type error
#[display(fmt = "Content type error")]
ContentType,
/// Parse error
#[display(fmt = "Parse error")]
Parse,
/// Payload error
#[display(fmt = "Error that occur during reading payload: {}", _0)]
Payload(PayloadError),
}
/// Return `BadRequest` for `UrlencodedError`
impl ResponseError for UrlencodedError {
fn error_response(&self) -> Response {
match *self {
UrlencodedError::Overflow => Response::new(StatusCode::PAYLOAD_TOO_LARGE),
UrlencodedError::UnknownLength => Response::new(StatusCode::LENGTH_REQUIRED),
_ => Response::new(StatusCode::BAD_REQUEST),
}
}
}
/// A set of errors that can occur during parsing json payloads
#[derive(Debug, Display, From)]
pub enum JsonPayloadError {
/// Payload size is bigger than allowed. (default: 256kB)
#[display(fmt = "Json payload size is bigger than allowed. (default: 256kB)")]
Overflow,
/// Content type error
#[display(fmt = "Content type error")]
ContentType,
/// Deserialize error
#[display(fmt = "Json deserialize error: {}", _0)]
Deserialize(JsonError),
/// Payload error
#[display(fmt = "Error that occur during reading payload: {}", _0)]
Payload(PayloadError),
}
/// Return `BadRequest` for `UrlencodedError`
impl ResponseError for JsonPayloadError {
fn error_response(&self) -> Response {
match *self {
JsonPayloadError::Overflow => Response::new(StatusCode::PAYLOAD_TOO_LARGE),
_ => Response::new(StatusCode::BAD_REQUEST),
}
}
}
/// Error type returned when reading body as lines.
#[derive(From)]
pub enum ReadlinesError {
/// Error when decoding a line.
EncodingError,
/// Payload error.
PayloadError(PayloadError),
/// Line limit exceeded.
LimitOverflow,
/// ContentType error.
ContentTypeError(ContentTypeError),
}
/// Helper type that can wrap any error and generate custom response.
///
/// In following example any `io::Error` will be converted into "BAD REQUEST"

View File

@ -1,22 +1,14 @@
use std::cell::{Ref, RefMut};
use std::str;
use bytes::{Bytes, BytesMut};
use cookie::Cookie;
use encoding::all::UTF_8;
use encoding::label::encoding_from_whatwg_label;
use encoding::types::{DecoderTrap, Encoding};
use encoding::EncodingRef;
use futures::{Async, Future, Poll, Stream};
use http::{header, HeaderMap};
use mime::Mime;
use serde::de::DeserializeOwned;
use serde_urlencoded;
use crate::error::{
ContentTypeError, CookieParseError, ParseError, PayloadError, ReadlinesError,
UrlencodedError,
};
use crate::error::{ContentTypeError, CookieParseError, ParseError};
use crate::extensions::Extensions;
use crate::header::Header;
use crate::payload::Payload;
@ -143,88 +135,6 @@ pub trait HttpMessage: Sized {
}
None
}
/// Load http message body.
///
/// By default only 256Kb payload reads to a memory, then
/// `PayloadError::Overflow` get returned. Use `MessageBody::limit()`
/// method to change upper limit.
///
/// ## Server example
///
/// ```rust,ignore
/// # extern crate bytes;
/// # extern crate actix_web;
/// # extern crate futures;
/// # #[macro_use] extern crate serde_derive;
/// use actix_web::{
/// AsyncResponder, FutureResponse, HttpMessage, HttpRequest, Response,
/// };
/// use bytes::Bytes;
/// use futures::future::Future;
///
/// fn index(mut req: HttpRequest) -> FutureResponse<Response> {
/// req.body() // <- get Body future
/// .limit(1024) // <- change max size of the body to a 1kb
/// .from_err()
/// .and_then(|bytes: Bytes| { // <- complete body
/// println!("==== BODY ==== {:?}", bytes);
/// Ok(Response::Ok().into())
/// }).responder()
/// }
/// # fn main() {}
/// ```
fn body(&mut self) -> MessageBody<Self>
where
Self::Stream: Stream<Item = Bytes, Error = PayloadError> + Sized,
{
MessageBody::new(self)
}
/// Parse `application/x-www-form-urlencoded` encoded request's body.
/// Return `UrlEncoded` future. Form can be deserialized to any type that
/// implements `Deserialize` trait from *serde*.
///
/// Returns error:
///
/// * content type is not `application/x-www-form-urlencoded`
/// * content-length is greater than 256k
///
/// ## Server example
///
/// ```rust,ignore
/// # extern crate actix_web;
/// # extern crate futures;
/// # use futures::Future;
/// # use std::collections::HashMap;
/// use actix_web::{FutureResponse, HttpMessage, HttpRequest, Response};
///
/// fn index(mut req: HttpRequest) -> FutureResponse<Response> {
/// Box::new(
/// req.urlencoded::<HashMap<String, String>>() // <- get UrlEncoded future
/// .from_err()
/// .and_then(|params| { // <- url encoded parameters
/// println!("==== BODY ==== {:?}", params);
/// Ok(Response::Ok().into())
/// }),
/// )
/// }
/// # fn main() {}
/// ```
fn urlencoded<T: DeserializeOwned>(&mut self) -> UrlEncoded<Self, T>
where
Self::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
UrlEncoded::new(self)
}
/// Return stream of lines.
fn readlines(&mut self) -> Readlines<Self>
where
Self::Stream: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
Readlines::new(self)
}
}
impl<'a, T> HttpMessage for &'a mut T
@ -253,383 +163,12 @@ where
}
}
/// Stream to read request line by line.
pub struct Readlines<T: HttpMessage> {
stream: Payload<T::Stream>,
buff: BytesMut,
limit: usize,
checked_buff: bool,
encoding: EncodingRef,
err: Option<ReadlinesError>,
}
impl<T> Readlines<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create a new stream to read request line by line.
fn new(req: &mut T) -> Self {
let encoding = match req.encoding() {
Ok(enc) => enc,
Err(err) => return Self::err(err.into()),
};
Readlines {
stream: req.take_payload(),
buff: BytesMut::with_capacity(262_144),
limit: 262_144,
checked_buff: true,
err: None,
encoding,
}
}
/// Change max line size. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
fn err(err: ReadlinesError) -> Self {
Readlines {
stream: Payload::None,
buff: BytesMut::new(),
limit: 262_144,
checked_buff: true,
encoding: UTF_8,
err: Some(err),
}
}
}
impl<T> Stream for Readlines<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
type Item = String;
type Error = ReadlinesError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(err) = self.err.take() {
return Err(err);
}
// check if there is a newline in the buffer
if !self.checked_buff {
let mut found: Option<usize> = None;
for (ind, b) in self.buff.iter().enumerate() {
if *b == b'\n' {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind + 1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff.split_to(ind + 1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&self.buff.split_to(ind + 1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
return Ok(Async::Ready(Some(line)));
}
self.checked_buff = true;
}
// poll req for more bytes
match self.stream.poll() {
Ok(Async::Ready(Some(mut bytes))) => {
// check if there is a newline in bytes
let mut found: Option<usize> = None;
for (ind, b) in bytes.iter().enumerate() {
if *b == b'\n' {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind + 1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&bytes.split_to(ind + 1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&bytes.split_to(ind + 1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
// extend buffer with rest of the bytes;
self.buff.extend_from_slice(&bytes);
self.checked_buff = false;
return Ok(Async::Ready(Some(line)));
}
self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady)
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
if self.buff.is_empty() {
return Ok(Async::Ready(None));
}
if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff)
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&self.buff, DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
self.buff.clear();
Ok(Async::Ready(Some(line)))
}
Err(e) => Err(ReadlinesError::from(e)),
}
}
}
/// Future that resolves to a complete http message body.
pub struct MessageBody<T: HttpMessage> {
limit: usize,
length: Option<usize>,
stream: Payload<T::Stream>,
err: Option<PayloadError>,
fut: Option<Box<Future<Item = Bytes, Error = PayloadError>>>,
}
impl<T> MessageBody<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create `MessageBody` for request.
pub fn new(req: &mut T) -> MessageBody<T> {
let mut len = None;
if let Some(l) = req.headers().get(header::CONTENT_LENGTH) {
if let Ok(s) = l.to_str() {
if let Ok(l) = s.parse::<usize>() {
len = Some(l)
} else {
return Self::err(PayloadError::UnknownLength);
}
} else {
return Self::err(PayloadError::UnknownLength);
}
}
MessageBody {
stream: req.take_payload(),
limit: 262_144,
length: len,
fut: None,
err: None,
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
fn err(e: PayloadError) -> Self {
MessageBody {
stream: Payload::None,
limit: 262_144,
fut: None,
err: Some(e),
length: None,
}
}
}
impl<T> Future for MessageBody<T>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut {
return fut.poll();
}
if let Some(err) = self.err.take() {
return Err(err);
}
if let Some(len) = self.length.take() {
if len > self.limit {
return Err(PayloadError::Overflow);
}
}
// future
let limit = self.limit;
self.fut = Some(Box::new(
std::mem::replace(&mut self.stream, Payload::None)
.from_err()
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(PayloadError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.map(|body| body.freeze()),
));
self.poll()
}
}
/// Future that resolves to a parsed urlencoded values.
pub struct UrlEncoded<T: HttpMessage, U> {
stream: Payload<T::Stream>,
limit: usize,
length: Option<usize>,
encoding: EncodingRef,
err: Option<UrlencodedError>,
fut: Option<Box<Future<Item = U, Error = UrlencodedError>>>,
}
impl<T, U> UrlEncoded<T, U>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create a new future to URL encode a request
pub fn new(req: &mut T) -> UrlEncoded<T, U> {
// check content type
if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" {
return Self::err(UrlencodedError::ContentType);
}
let encoding = match req.encoding() {
Ok(enc) => enc,
Err(_) => return Self::err(UrlencodedError::ContentType),
};
let mut len = None;
if let Some(l) = req.headers().get(header::CONTENT_LENGTH) {
if let Ok(s) = l.to_str() {
if let Ok(l) = s.parse::<usize>() {
len = Some(l)
} else {
return Self::err(UrlencodedError::UnknownLength);
}
} else {
return Self::err(UrlencodedError::UnknownLength);
}
};
UrlEncoded {
encoding,
stream: req.take_payload(),
limit: 262_144,
length: len,
fut: None,
err: None,
}
}
fn err(e: UrlencodedError) -> Self {
UrlEncoded {
stream: Payload::None,
limit: 262_144,
fut: None,
err: Some(e),
length: None,
encoding: UTF_8,
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
impl<T, U> Future for UrlEncoded<T, U>
where
T: HttpMessage,
T::Stream: Stream<Item = Bytes, Error = PayloadError> + 'static,
U: DeserializeOwned + 'static,
{
type Item = U;
type Error = UrlencodedError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut {
return fut.poll();
}
if let Some(err) = self.err.take() {
return Err(err);
}
// payload size
let limit = self.limit;
if let Some(len) = self.length.take() {
if len > limit {
return Err(UrlencodedError::Overflow);
}
}
// future
let encoding = self.encoding;
let fut = std::mem::replace(&mut self.stream, Payload::None)
.from_err()
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit {
Err(UrlencodedError::Overflow)
} else {
body.extend_from_slice(&chunk);
Ok(body)
}
})
.and_then(move |body| {
if (encoding as *const Encoding) == UTF_8 {
serde_urlencoded::from_bytes::<U>(&body)
.map_err(|_| UrlencodedError::Parse)
} else {
let body = encoding
.decode(&body, DecoderTrap::Strict)
.map_err(|_| UrlencodedError::Parse)?;
serde_urlencoded::from_str::<U>(&body)
.map_err(|_| UrlencodedError::Parse)
}
});
self.fut = Some(Box::new(fut));
self.poll()
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use encoding::all::ISO_8859_2;
use encoding::Encoding;
use futures::Async;
use mime;
use serde_derive::Deserialize;
use super::*;
use crate::test::TestRequest;
@ -720,166 +259,4 @@ mod tests {
.finish();
assert!(req.chunked().is_err());
}
impl PartialEq for UrlencodedError {
fn eq(&self, other: &UrlencodedError) -> bool {
match *self {
UrlencodedError::Chunked => match *other {
UrlencodedError::Chunked => true,
_ => false,
},
UrlencodedError::Overflow => match *other {
UrlencodedError::Overflow => true,
_ => false,
},
UrlencodedError::UnknownLength => match *other {
UrlencodedError::UnknownLength => true,
_ => false,
},
UrlencodedError::ContentType => match *other {
UrlencodedError::ContentType => true,
_ => false,
},
_ => false,
}
}
}
#[derive(Deserialize, Debug, PartialEq)]
struct Info {
hello: String,
}
#[test]
fn test_urlencoded_error() {
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
.header(header::CONTENT_LENGTH, "xxxx")
.finish();
assert_eq!(
req.urlencoded::<Info>().poll().err().unwrap(),
UrlencodedError::UnknownLength
);
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
.header(header::CONTENT_LENGTH, "1000000")
.finish();
assert_eq!(
req.urlencoded::<Info>().poll().err().unwrap(),
UrlencodedError::Overflow
);
let mut req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain")
.header(header::CONTENT_LENGTH, "10")
.finish();
assert_eq!(
req.urlencoded::<Info>().poll().err().unwrap(),
UrlencodedError::ContentType
);
}
#[test]
fn test_urlencoded() {
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
)
.header(header::CONTENT_LENGTH, "11")
.set_payload(Bytes::from_static(b"hello=world"))
.finish();
let result = req.urlencoded::<Info>().poll().ok().unwrap();
assert_eq!(
result,
Async::Ready(Info {
hello: "world".to_owned()
})
);
let mut req = TestRequest::with_header(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded; charset=utf-8",
)
.header(header::CONTENT_LENGTH, "11")
.set_payload(Bytes::from_static(b"hello=world"))
.finish();
let result = req.urlencoded().poll().ok().unwrap();
assert_eq!(
result,
Async::Ready(Info {
hello: "world".to_owned()
})
);
}
#[test]
fn test_message_body() {
let mut req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish();
match req.body().poll().err().unwrap() {
PayloadError::UnknownLength => (),
_ => unreachable!("error"),
}
let mut req =
TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish();
match req.body().poll().err().unwrap() {
PayloadError::Overflow => (),
_ => unreachable!("error"),
}
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(b"test"))
.finish();
match req.body().poll().ok().unwrap() {
Async::Ready(bytes) => assert_eq!(bytes, Bytes::from_static(b"test")),
_ => unreachable!("error"),
}
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(b"11111111111111"))
.finish();
match req.body().limit(5).poll().err().unwrap() {
PayloadError::Overflow => (),
_ => unreachable!("error"),
}
}
#[test]
fn test_readlines() {
let mut req = TestRequest::default()
.set_payload(Bytes::from_static(
b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
industry. Lorem Ipsum has been the industry's standard dummy\n\
Contrary to popular belief, Lorem Ipsum is not simply random text.",
))
.finish();
let mut r = Readlines::new(&mut req);
match r.poll().ok().unwrap() {
Async::Ready(Some(s)) => assert_eq!(
s,
"Lorem Ipsum is simply dummy text of the printing and typesetting\n"
),
_ => unreachable!("error"),
}
match r.poll().ok().unwrap() {
Async::Ready(Some(s)) => assert_eq!(
s,
"industry. Lorem Ipsum has been the industry's standard dummy\n"
),
_ => unreachable!("error"),
}
match r.poll().ok().unwrap() {
Async::Ready(Some(s)) => assert_eq!(
s,
"Contrary to popular belief, Lorem Ipsum is not simply random text."
),
_ => unreachable!("error"),
}
}
}

View File

@ -97,24 +97,9 @@ pub use self::httpmessage::HttpMessage;
pub use self::message::{Head, Message, RequestHead, ResponseHead};
pub use self::payload::{Payload, PayloadStream};
pub use self::request::Request;
pub use self::response::Response;
pub use self::response::{Response, ResponseBuilder};
pub use self::service::{HttpService, SendError, SendResponse};
pub mod dev {
//! The `actix-web` prelude for library developers
//!
//! The purpose of this module is to alleviate imports of many common actix
//! traits by adding a glob import to the top of actix heavy modules:
//!
//! ```
//! # #![allow(unused_imports)]
//! use actix_http::dev::*;
//! ```
pub use crate::httpmessage::{MessageBody, Readlines, UrlEncoded};
pub use crate::response::ResponseBuilder;
}
pub mod http {
//! Various HTTP related types

View File

@ -1,9 +1,11 @@
use actix_service::NewService;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use futures::future::{self, ok};
use futures::{Future, Stream};
use actix_http::HttpMessage;
use actix_http::{client, HttpService, Request, Response};
use actix_http::{
client, error::PayloadError, HttpMessage, HttpService, Request, Response,
};
use actix_http_test::TestServer;
const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
@ -28,6 +30,16 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World";
fn load_body<S>(stream: S) -> impl Future<Item = BytesMut, Error = PayloadError>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
stream.fold(BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, PayloadError>(body)
})
}
#[test]
fn test_h1_v2() {
env_logger::init();
@ -51,7 +63,7 @@ fn test_h1_v2() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
let request = srv.post().finish().unwrap();
@ -59,7 +71,7 @@ fn test_h1_v2() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}

View File

@ -6,16 +6,27 @@ use actix_codec::{AsyncRead, AsyncWrite};
use actix_http_test::TestServer;
use actix_server_config::ServerConfig;
use actix_service::{fn_cfg_factory, NewService};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use futures::future::{self, ok, Future};
use futures::stream::once;
use futures::stream::{once, Stream};
use actix_http::body::Body;
use actix_http::error::PayloadError;
use actix_http::{
body, client, error, http, http::header, Error, HttpMessage as HttpMessage2,
HttpService, KeepAlive, Request, Response,
};
fn load_body<S>(stream: S) -> impl Future<Item = BytesMut, Error = PayloadError>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
stream.fold(BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, PayloadError>(body)
})
}
#[test]
fn test_h1() {
let mut srv = TestServer::new(|| {
@ -131,8 +142,7 @@ fn test_h2_body() -> std::io::Result<()> {
.and_then(
HttpService::build()
.h2(|mut req: Request<_>| {
req.body()
.limit(1024 * 1024)
load_body(req.take_payload())
.and_then(|body| Ok(Response::Ok().body(body)))
})
.map_err(|_| ()),
@ -145,7 +155,7 @@ fn test_h2_body() -> std::io::Result<()> {
let mut response = srv.send_request(req).unwrap();
assert!(response.status().is_success());
let body = srv.block_on(response.body().limit(1024 * 1024)).unwrap();
let body = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(&body, data.as_bytes());
Ok(())
}
@ -440,7 +450,7 @@ fn test_h1_headers() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from(data2));
}
@ -486,7 +496,7 @@ fn test_h2_headers() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from(data2));
}
@ -523,7 +533,7 @@ fn test_h1_body() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
@ -546,7 +556,7 @@ fn test_h2_body2() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
@ -569,7 +579,7 @@ fn test_h1_head_empty() {
}
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}
@ -601,7 +611,7 @@ fn test_h2_head_empty() {
}
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}
@ -626,7 +636,7 @@ fn test_h1_head_binary() {
}
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}
@ -661,7 +671,7 @@ fn test_h2_head_binary() {
}
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}
@ -728,7 +738,7 @@ fn test_h1_body_length() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
@ -756,7 +766,7 @@ fn test_h2_body_length() {
assert!(response.status().is_success());
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
@ -787,7 +797,7 @@ fn test_h1_body_chunked_explicit() {
);
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
// decode
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
@ -821,7 +831,7 @@ fn test_h2_body_chunked_explicit() {
assert!(!response.headers().contains_key(header::TRANSFER_ENCODING));
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
// decode
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
@ -850,7 +860,7 @@ fn test_h1_body_chunked_implicit() {
);
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
}
@ -874,7 +884,7 @@ fn test_h1_response_http_error_handling() {
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}
@ -907,7 +917,7 @@ fn test_h2_response_http_error_handling() {
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}
@ -923,7 +933,7 @@ fn test_h1_service_error() {
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}
@ -947,6 +957,6 @@ fn test_h2_service_error() {
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response
let bytes = srv.block_on(response.body()).unwrap();
let bytes = srv.block_on(load_body(response.take_payload())).unwrap();
assert!(bytes.is_empty());
}