From fa66a07ec5070621292b411ab48b501cad18002e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 17 Mar 2019 01:02:51 -0700 Subject: [PATCH] move httpmessage futures to actix-web --- examples/echo.rs | 27 +- examples/echo2.rs | 25 +- src/error.rs | 74 ----- src/httpmessage.rs | 627 +------------------------------------------ src/lib.rs | 17 +- tests/test_client.rs | 22 +- tests/test_server.rs | 54 ++-- 7 files changed, 84 insertions(+), 762 deletions(-) diff --git a/examples/echo.rs b/examples/echo.rs index 8ec0e6a97..c36292c44 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -1,10 +1,9 @@ use std::{env, io}; -use actix_http::HttpMessage; -use actix_http::{HttpService, Request, Response}; +use actix_http::{error::PayloadError, HttpService, Request, Response}; use actix_server::Server; -use bytes::Bytes; -use futures::Future; +use bytes::BytesMut; +use futures::{Future, Stream}; use http::header::HeaderValue; use log::info; @@ -18,12 +17,20 @@ fn main() -> io::Result<()> { .client_timeout(1000) .client_disconnect(1000) .finish(|mut req: Request| { - req.body().limit(512).and_then(|bytes: Bytes| { - info!("request body: {:?}", bytes); - let mut res = Response::Ok(); - res.header("x-head", HeaderValue::from_static("dummy value!")); - Ok(res.body(bytes)) - }) + req.take_payload() + .fold(BytesMut::new(), move |mut body, chunk| { + body.extend_from_slice(&chunk); + Ok::<_, PayloadError>(body) + }) + .and_then(|bytes| { + info!("request body: {:?}", bytes); + let mut res = Response::Ok(); + res.header( + "x-head", + HeaderValue::from_static("dummy value!"), + ); + Ok(res.body(bytes)) + }) }) })? .run() diff --git a/examples/echo2.rs b/examples/echo2.rs index 101adc1cf..b239796b4 100644 --- a/examples/echo2.rs +++ b/examples/echo2.rs @@ -1,20 +1,25 @@ use std::{env, io}; use actix_http::http::HeaderValue; -use actix_http::HttpMessage; -use actix_http::{Error, HttpService, Request, Response}; +use actix_http::{error::PayloadError, Error, HttpService, Request, Response}; use actix_server::Server; -use bytes::Bytes; -use futures::Future; +use bytes::BytesMut; +use futures::{Future, Stream}; use log::info; fn handle_request(mut req: Request) -> impl Future { - req.body().limit(512).from_err().and_then(|bytes: Bytes| { - info!("request body: {:?}", bytes); - let mut res = Response::Ok(); - res.header("x-head", HeaderValue::from_static("dummy value!")); - Ok(res.body(bytes)) - }) + req.take_payload() + .fold(BytesMut::new(), move |mut body, chunk| { + body.extend_from_slice(&chunk); + Ok::<_, PayloadError>(body) + }) + .from_err() + .and_then(|bytes| { + info!("request body: {:?}", bytes); + let mut res = Response::Ok(); + res.header("x-head", HeaderValue::from_static("dummy value!")); + Ok(res.body(bytes)) + }) } fn main() -> io::Result<()> { diff --git a/src/error.rs b/src/error.rs index 696162f86..e0a416ef8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -390,80 +390,6 @@ impl ResponseError for ContentTypeError { } } -/// A set of errors that can occur during parsing urlencoded payloads -#[derive(Debug, Display, From)] -pub enum UrlencodedError { - /// Can not decode chunked transfer encoding - #[display(fmt = "Can not decode chunked transfer encoding")] - Chunked, - /// Payload size is bigger than allowed. (default: 256kB) - #[display(fmt = "Urlencoded payload size is bigger than allowed. (default: 256kB)")] - Overflow, - /// Payload size is now known - #[display(fmt = "Payload size is now known")] - UnknownLength, - /// Content type error - #[display(fmt = "Content type error")] - ContentType, - /// Parse error - #[display(fmt = "Parse error")] - Parse, - /// Payload error - #[display(fmt = "Error that occur during reading payload: {}", _0)] - Payload(PayloadError), -} - -/// Return `BadRequest` for `UrlencodedError` -impl ResponseError for UrlencodedError { - fn error_response(&self) -> Response { - match *self { - UrlencodedError::Overflow => Response::new(StatusCode::PAYLOAD_TOO_LARGE), - UrlencodedError::UnknownLength => Response::new(StatusCode::LENGTH_REQUIRED), - _ => Response::new(StatusCode::BAD_REQUEST), - } - } -} - -/// A set of errors that can occur during parsing json payloads -#[derive(Debug, Display, From)] -pub enum JsonPayloadError { - /// Payload size is bigger than allowed. (default: 256kB) - #[display(fmt = "Json payload size is bigger than allowed. (default: 256kB)")] - Overflow, - /// Content type error - #[display(fmt = "Content type error")] - ContentType, - /// Deserialize error - #[display(fmt = "Json deserialize error: {}", _0)] - Deserialize(JsonError), - /// Payload error - #[display(fmt = "Error that occur during reading payload: {}", _0)] - Payload(PayloadError), -} - -/// Return `BadRequest` for `UrlencodedError` -impl ResponseError for JsonPayloadError { - fn error_response(&self) -> Response { - match *self { - JsonPayloadError::Overflow => Response::new(StatusCode::PAYLOAD_TOO_LARGE), - _ => Response::new(StatusCode::BAD_REQUEST), - } - } -} - -/// Error type returned when reading body as lines. -#[derive(From)] -pub enum ReadlinesError { - /// Error when decoding a line. - EncodingError, - /// Payload error. - PayloadError(PayloadError), - /// Line limit exceeded. - LimitOverflow, - /// ContentType error. - ContentTypeError(ContentTypeError), -} - /// Helper type that can wrap any error and generate custom response. /// /// In following example any `io::Error` will be converted into "BAD REQUEST" diff --git a/src/httpmessage.rs b/src/httpmessage.rs index 8573e917d..117e10a81 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -1,22 +1,14 @@ use std::cell::{Ref, RefMut}; use std::str; -use bytes::{Bytes, BytesMut}; use cookie::Cookie; use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; -use encoding::types::{DecoderTrap, Encoding}; use encoding::EncodingRef; -use futures::{Async, Future, Poll, Stream}; use http::{header, HeaderMap}; use mime::Mime; -use serde::de::DeserializeOwned; -use serde_urlencoded; -use crate::error::{ - ContentTypeError, CookieParseError, ParseError, PayloadError, ReadlinesError, - UrlencodedError, -}; +use crate::error::{ContentTypeError, CookieParseError, ParseError}; use crate::extensions::Extensions; use crate::header::Header; use crate::payload::Payload; @@ -143,88 +135,6 @@ pub trait HttpMessage: Sized { } None } - - /// Load http message body. - /// - /// By default only 256Kb payload reads to a memory, then - /// `PayloadError::Overflow` get returned. Use `MessageBody::limit()` - /// method to change upper limit. - /// - /// ## Server example - /// - /// ```rust,ignore - /// # extern crate bytes; - /// # extern crate actix_web; - /// # extern crate futures; - /// # #[macro_use] extern crate serde_derive; - /// use actix_web::{ - /// AsyncResponder, FutureResponse, HttpMessage, HttpRequest, Response, - /// }; - /// use bytes::Bytes; - /// use futures::future::Future; - /// - /// fn index(mut req: HttpRequest) -> FutureResponse { - /// req.body() // <- get Body future - /// .limit(1024) // <- change max size of the body to a 1kb - /// .from_err() - /// .and_then(|bytes: Bytes| { // <- complete body - /// println!("==== BODY ==== {:?}", bytes); - /// Ok(Response::Ok().into()) - /// }).responder() - /// } - /// # fn main() {} - /// ``` - fn body(&mut self) -> MessageBody - where - Self::Stream: Stream + Sized, - { - MessageBody::new(self) - } - - /// Parse `application/x-www-form-urlencoded` encoded request's body. - /// Return `UrlEncoded` future. Form can be deserialized to any type that - /// implements `Deserialize` trait from *serde*. - /// - /// Returns error: - /// - /// * content type is not `application/x-www-form-urlencoded` - /// * content-length is greater than 256k - /// - /// ## Server example - /// - /// ```rust,ignore - /// # extern crate actix_web; - /// # extern crate futures; - /// # use futures::Future; - /// # use std::collections::HashMap; - /// use actix_web::{FutureResponse, HttpMessage, HttpRequest, Response}; - /// - /// fn index(mut req: HttpRequest) -> FutureResponse { - /// Box::new( - /// req.urlencoded::>() // <- get UrlEncoded future - /// .from_err() - /// .and_then(|params| { // <- url encoded parameters - /// println!("==== BODY ==== {:?}", params); - /// Ok(Response::Ok().into()) - /// }), - /// ) - /// } - /// # fn main() {} - /// ``` - fn urlencoded(&mut self) -> UrlEncoded - where - Self::Stream: Stream, - { - UrlEncoded::new(self) - } - - /// Return stream of lines. - fn readlines(&mut self) -> Readlines - where - Self::Stream: Stream + 'static, - { - Readlines::new(self) - } } impl<'a, T> HttpMessage for &'a mut T @@ -253,383 +163,12 @@ where } } -/// Stream to read request line by line. -pub struct Readlines { - stream: Payload, - buff: BytesMut, - limit: usize, - checked_buff: bool, - encoding: EncodingRef, - err: Option, -} - -impl Readlines -where - T: HttpMessage, - T::Stream: Stream, -{ - /// Create a new stream to read request line by line. - fn new(req: &mut T) -> Self { - let encoding = match req.encoding() { - Ok(enc) => enc, - Err(err) => return Self::err(err.into()), - }; - - Readlines { - stream: req.take_payload(), - buff: BytesMut::with_capacity(262_144), - limit: 262_144, - checked_buff: true, - err: None, - encoding, - } - } - - /// Change max line size. By default max size is 256Kb - pub fn limit(mut self, limit: usize) -> Self { - self.limit = limit; - self - } - - fn err(err: ReadlinesError) -> Self { - Readlines { - stream: Payload::None, - buff: BytesMut::new(), - limit: 262_144, - checked_buff: true, - encoding: UTF_8, - err: Some(err), - } - } -} - -impl Stream for Readlines -where - T: HttpMessage, - T::Stream: Stream, -{ - type Item = String; - type Error = ReadlinesError; - - fn poll(&mut self) -> Poll, Self::Error> { - if let Some(err) = self.err.take() { - return Err(err); - } - - // check if there is a newline in the buffer - if !self.checked_buff { - let mut found: Option = None; - for (ind, b) in self.buff.iter().enumerate() { - if *b == b'\n' { - found = Some(ind); - break; - } - } - if let Some(ind) = found { - // check if line is longer than limit - if ind + 1 > self.limit { - return Err(ReadlinesError::LimitOverflow); - } - let enc: *const Encoding = self.encoding as *const Encoding; - let line = if enc == UTF_8 { - str::from_utf8(&self.buff.split_to(ind + 1)) - .map_err(|_| ReadlinesError::EncodingError)? - .to_owned() - } else { - self.encoding - .decode(&self.buff.split_to(ind + 1), DecoderTrap::Strict) - .map_err(|_| ReadlinesError::EncodingError)? - }; - return Ok(Async::Ready(Some(line))); - } - self.checked_buff = true; - } - // poll req for more bytes - match self.stream.poll() { - Ok(Async::Ready(Some(mut bytes))) => { - // check if there is a newline in bytes - let mut found: Option = None; - for (ind, b) in bytes.iter().enumerate() { - if *b == b'\n' { - found = Some(ind); - break; - } - } - if let Some(ind) = found { - // check if line is longer than limit - if ind + 1 > self.limit { - return Err(ReadlinesError::LimitOverflow); - } - let enc: *const Encoding = self.encoding as *const Encoding; - let line = if enc == UTF_8 { - str::from_utf8(&bytes.split_to(ind + 1)) - .map_err(|_| ReadlinesError::EncodingError)? - .to_owned() - } else { - self.encoding - .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) - .map_err(|_| ReadlinesError::EncodingError)? - }; - // extend buffer with rest of the bytes; - self.buff.extend_from_slice(&bytes); - self.checked_buff = false; - return Ok(Async::Ready(Some(line))); - } - self.buff.extend_from_slice(&bytes); - Ok(Async::NotReady) - } - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => { - if self.buff.is_empty() { - return Ok(Async::Ready(None)); - } - if self.buff.len() > self.limit { - return Err(ReadlinesError::LimitOverflow); - } - let enc: *const Encoding = self.encoding as *const Encoding; - let line = if enc == UTF_8 { - str::from_utf8(&self.buff) - .map_err(|_| ReadlinesError::EncodingError)? - .to_owned() - } else { - self.encoding - .decode(&self.buff, DecoderTrap::Strict) - .map_err(|_| ReadlinesError::EncodingError)? - }; - self.buff.clear(); - Ok(Async::Ready(Some(line))) - } - Err(e) => Err(ReadlinesError::from(e)), - } - } -} - -/// Future that resolves to a complete http message body. -pub struct MessageBody { - limit: usize, - length: Option, - stream: Payload, - err: Option, - fut: Option>>, -} - -impl MessageBody -where - T: HttpMessage, - T::Stream: Stream, -{ - /// Create `MessageBody` for request. - pub fn new(req: &mut T) -> MessageBody { - let mut len = None; - if let Some(l) = req.headers().get(header::CONTENT_LENGTH) { - if let Ok(s) = l.to_str() { - if let Ok(l) = s.parse::() { - len = Some(l) - } else { - return Self::err(PayloadError::UnknownLength); - } - } else { - return Self::err(PayloadError::UnknownLength); - } - } - - MessageBody { - stream: req.take_payload(), - limit: 262_144, - length: len, - fut: None, - err: None, - } - } - - /// Change max size of payload. By default max size is 256Kb - pub fn limit(mut self, limit: usize) -> Self { - self.limit = limit; - self - } - - fn err(e: PayloadError) -> Self { - MessageBody { - stream: Payload::None, - limit: 262_144, - fut: None, - err: Some(e), - length: None, - } - } -} - -impl Future for MessageBody -where - T: HttpMessage, - T::Stream: Stream + 'static, -{ - type Item = Bytes; - type Error = PayloadError; - - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut { - return fut.poll(); - } - - if let Some(err) = self.err.take() { - return Err(err); - } - - if let Some(len) = self.length.take() { - if len > self.limit { - return Err(PayloadError::Overflow); - } - } - - // future - let limit = self.limit; - self.fut = Some(Box::new( - std::mem::replace(&mut self.stream, Payload::None) - .from_err() - .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { - if (body.len() + chunk.len()) > limit { - Err(PayloadError::Overflow) - } else { - body.extend_from_slice(&chunk); - Ok(body) - } - }) - .map(|body| body.freeze()), - )); - self.poll() - } -} - -/// Future that resolves to a parsed urlencoded values. -pub struct UrlEncoded { - stream: Payload, - limit: usize, - length: Option, - encoding: EncodingRef, - err: Option, - fut: Option>>, -} - -impl UrlEncoded -where - T: HttpMessage, - T::Stream: Stream, -{ - /// Create a new future to URL encode a request - pub fn new(req: &mut T) -> UrlEncoded { - // check content type - if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" { - return Self::err(UrlencodedError::ContentType); - } - let encoding = match req.encoding() { - Ok(enc) => enc, - Err(_) => return Self::err(UrlencodedError::ContentType), - }; - - let mut len = None; - if let Some(l) = req.headers().get(header::CONTENT_LENGTH) { - if let Ok(s) = l.to_str() { - if let Ok(l) = s.parse::() { - len = Some(l) - } else { - return Self::err(UrlencodedError::UnknownLength); - } - } else { - return Self::err(UrlencodedError::UnknownLength); - } - }; - - UrlEncoded { - encoding, - stream: req.take_payload(), - limit: 262_144, - length: len, - fut: None, - err: None, - } - } - - fn err(e: UrlencodedError) -> Self { - UrlEncoded { - stream: Payload::None, - limit: 262_144, - fut: None, - err: Some(e), - length: None, - encoding: UTF_8, - } - } - - /// Change max size of payload. By default max size is 256Kb - pub fn limit(mut self, limit: usize) -> Self { - self.limit = limit; - self - } -} - -impl Future for UrlEncoded -where - T: HttpMessage, - T::Stream: Stream + 'static, - U: DeserializeOwned + 'static, -{ - type Item = U; - type Error = UrlencodedError; - - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut { - return fut.poll(); - } - - if let Some(err) = self.err.take() { - return Err(err); - } - - // payload size - let limit = self.limit; - if let Some(len) = self.length.take() { - if len > limit { - return Err(UrlencodedError::Overflow); - } - } - - // future - let encoding = self.encoding; - let fut = std::mem::replace(&mut self.stream, Payload::None) - .from_err() - .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { - if (body.len() + chunk.len()) > limit { - Err(UrlencodedError::Overflow) - } else { - body.extend_from_slice(&chunk); - Ok(body) - } - }) - .and_then(move |body| { - if (encoding as *const Encoding) == UTF_8 { - serde_urlencoded::from_bytes::(&body) - .map_err(|_| UrlencodedError::Parse) - } else { - let body = encoding - .decode(&body, DecoderTrap::Strict) - .map_err(|_| UrlencodedError::Parse)?; - serde_urlencoded::from_str::(&body) - .map_err(|_| UrlencodedError::Parse) - } - }); - self.fut = Some(Box::new(fut)); - self.poll() - } -} - #[cfg(test)] mod tests { + use bytes::Bytes; use encoding::all::ISO_8859_2; use encoding::Encoding; - use futures::Async; use mime; - use serde_derive::Deserialize; use super::*; use crate::test::TestRequest; @@ -720,166 +259,4 @@ mod tests { .finish(); assert!(req.chunked().is_err()); } - - impl PartialEq for UrlencodedError { - fn eq(&self, other: &UrlencodedError) -> bool { - match *self { - UrlencodedError::Chunked => match *other { - UrlencodedError::Chunked => true, - _ => false, - }, - UrlencodedError::Overflow => match *other { - UrlencodedError::Overflow => true, - _ => false, - }, - UrlencodedError::UnknownLength => match *other { - UrlencodedError::UnknownLength => true, - _ => false, - }, - UrlencodedError::ContentType => match *other { - UrlencodedError::ContentType => true, - _ => false, - }, - _ => false, - } - } - } - - #[derive(Deserialize, Debug, PartialEq)] - struct Info { - hello: String, - } - - #[test] - fn test_urlencoded_error() { - let mut req = TestRequest::with_header( - header::CONTENT_TYPE, - "application/x-www-form-urlencoded", - ) - .header(header::CONTENT_LENGTH, "xxxx") - .finish(); - assert_eq!( - req.urlencoded::().poll().err().unwrap(), - UrlencodedError::UnknownLength - ); - - let mut req = TestRequest::with_header( - header::CONTENT_TYPE, - "application/x-www-form-urlencoded", - ) - .header(header::CONTENT_LENGTH, "1000000") - .finish(); - assert_eq!( - req.urlencoded::().poll().err().unwrap(), - UrlencodedError::Overflow - ); - - let mut req = TestRequest::with_header(header::CONTENT_TYPE, "text/plain") - .header(header::CONTENT_LENGTH, "10") - .finish(); - assert_eq!( - req.urlencoded::().poll().err().unwrap(), - UrlencodedError::ContentType - ); - } - - #[test] - fn test_urlencoded() { - let mut req = TestRequest::with_header( - header::CONTENT_TYPE, - "application/x-www-form-urlencoded", - ) - .header(header::CONTENT_LENGTH, "11") - .set_payload(Bytes::from_static(b"hello=world")) - .finish(); - - let result = req.urlencoded::().poll().ok().unwrap(); - assert_eq!( - result, - Async::Ready(Info { - hello: "world".to_owned() - }) - ); - - let mut req = TestRequest::with_header( - header::CONTENT_TYPE, - "application/x-www-form-urlencoded; charset=utf-8", - ) - .header(header::CONTENT_LENGTH, "11") - .set_payload(Bytes::from_static(b"hello=world")) - .finish(); - - let result = req.urlencoded().poll().ok().unwrap(); - assert_eq!( - result, - Async::Ready(Info { - hello: "world".to_owned() - }) - ); - } - - #[test] - fn test_message_body() { - let mut req = TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").finish(); - match req.body().poll().err().unwrap() { - PayloadError::UnknownLength => (), - _ => unreachable!("error"), - } - - let mut req = - TestRequest::with_header(header::CONTENT_LENGTH, "1000000").finish(); - match req.body().poll().err().unwrap() { - PayloadError::Overflow => (), - _ => unreachable!("error"), - } - - let mut req = TestRequest::default() - .set_payload(Bytes::from_static(b"test")) - .finish(); - match req.body().poll().ok().unwrap() { - Async::Ready(bytes) => assert_eq!(bytes, Bytes::from_static(b"test")), - _ => unreachable!("error"), - } - - let mut req = TestRequest::default() - .set_payload(Bytes::from_static(b"11111111111111")) - .finish(); - match req.body().limit(5).poll().err().unwrap() { - PayloadError::Overflow => (), - _ => unreachable!("error"), - } - } - - #[test] - fn test_readlines() { - let mut req = TestRequest::default() - .set_payload(Bytes::from_static( - b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\ - industry. Lorem Ipsum has been the industry's standard dummy\n\ - Contrary to popular belief, Lorem Ipsum is not simply random text.", - )) - .finish(); - let mut r = Readlines::new(&mut req); - match r.poll().ok().unwrap() { - Async::Ready(Some(s)) => assert_eq!( - s, - "Lorem Ipsum is simply dummy text of the printing and typesetting\n" - ), - _ => unreachable!("error"), - } - match r.poll().ok().unwrap() { - Async::Ready(Some(s)) => assert_eq!( - s, - "industry. Lorem Ipsum has been the industry's standard dummy\n" - ), - _ => unreachable!("error"), - } - match r.poll().ok().unwrap() { - Async::Ready(Some(s)) => assert_eq!( - s, - "Contrary to popular belief, Lorem Ipsum is not simply random text." - ), - _ => unreachable!("error"), - } - } } diff --git a/src/lib.rs b/src/lib.rs index 443266a9b..9a87b77f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,24 +97,9 @@ pub use self::httpmessage::HttpMessage; pub use self::message::{Head, Message, RequestHead, ResponseHead}; pub use self::payload::{Payload, PayloadStream}; pub use self::request::Request; -pub use self::response::Response; +pub use self::response::{Response, ResponseBuilder}; pub use self::service::{HttpService, SendError, SendResponse}; -pub mod dev { - //! The `actix-web` prelude for library developers - //! - //! The purpose of this module is to alleviate imports of many common actix - //! traits by adding a glob import to the top of actix heavy modules: - //! - //! ``` - //! # #![allow(unused_imports)] - //! use actix_http::dev::*; - //! ``` - - pub use crate::httpmessage::{MessageBody, Readlines, UrlEncoded}; - pub use crate::response::ResponseBuilder; -} - pub mod http { //! Various HTTP related types diff --git a/tests/test_client.rs b/tests/test_client.rs index 90e1a4f4a..2832b1b70 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -1,9 +1,11 @@ use actix_service::NewService; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::future::{self, ok}; +use futures::{Future, Stream}; -use actix_http::HttpMessage; -use actix_http::{client, HttpService, Request, Response}; +use actix_http::{ + client, error::PayloadError, HttpMessage, HttpService, Request, Response, +}; use actix_http_test::TestServer; const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ @@ -28,6 +30,16 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; +fn load_body(stream: S) -> impl Future +where + S: Stream, +{ + stream.fold(BytesMut::new(), move |mut body, chunk| { + body.extend_from_slice(&chunk); + Ok::<_, PayloadError>(body) + }) +} + #[test] fn test_h1_v2() { env_logger::init(); @@ -51,7 +63,7 @@ fn test_h1_v2() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); let request = srv.post().finish().unwrap(); @@ -59,7 +71,7 @@ fn test_h1_v2() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } diff --git a/tests/test_server.rs b/tests/test_server.rs index 98f740941..8a7316cdf 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -6,16 +6,27 @@ use actix_codec::{AsyncRead, AsyncWrite}; use actix_http_test::TestServer; use actix_server_config::ServerConfig; use actix_service::{fn_cfg_factory, NewService}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::future::{self, ok, Future}; -use futures::stream::once; +use futures::stream::{once, Stream}; use actix_http::body::Body; +use actix_http::error::PayloadError; use actix_http::{ body, client, error, http, http::header, Error, HttpMessage as HttpMessage2, HttpService, KeepAlive, Request, Response, }; +fn load_body(stream: S) -> impl Future +where + S: Stream, +{ + stream.fold(BytesMut::new(), move |mut body, chunk| { + body.extend_from_slice(&chunk); + Ok::<_, PayloadError>(body) + }) +} + #[test] fn test_h1() { let mut srv = TestServer::new(|| { @@ -131,8 +142,7 @@ fn test_h2_body() -> std::io::Result<()> { .and_then( HttpService::build() .h2(|mut req: Request<_>| { - req.body() - .limit(1024 * 1024) + load_body(req.take_payload()) .and_then(|body| Ok(Response::Ok().body(body))) }) .map_err(|_| ()), @@ -145,7 +155,7 @@ fn test_h2_body() -> std::io::Result<()> { let mut response = srv.send_request(req).unwrap(); assert!(response.status().is_success()); - let body = srv.block_on(response.body().limit(1024 * 1024)).unwrap(); + let body = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(&body, data.as_bytes()); Ok(()) } @@ -440,7 +450,7 @@ fn test_h1_headers() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from(data2)); } @@ -486,7 +496,7 @@ fn test_h2_headers() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from(data2)); } @@ -523,7 +533,7 @@ fn test_h1_body() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } @@ -546,7 +556,7 @@ fn test_h2_body2() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } @@ -569,7 +579,7 @@ fn test_h1_head_empty() { } // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); } @@ -601,7 +611,7 @@ fn test_h2_head_empty() { } // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); } @@ -626,7 +636,7 @@ fn test_h1_head_binary() { } // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); } @@ -661,7 +671,7 @@ fn test_h2_head_binary() { } // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); } @@ -728,7 +738,7 @@ fn test_h1_body_length() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } @@ -756,7 +766,7 @@ fn test_h2_body_length() { assert!(response.status().is_success()); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } @@ -787,7 +797,7 @@ fn test_h1_body_chunked_explicit() { ); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); // decode assert_eq!(bytes, Bytes::from_static(STR.as_ref())); @@ -821,7 +831,7 @@ fn test_h2_body_chunked_explicit() { assert!(!response.headers().contains_key(header::TRANSFER_ENCODING)); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); // decode assert_eq!(bytes, Bytes::from_static(STR.as_ref())); @@ -850,7 +860,7 @@ fn test_h1_body_chunked_implicit() { ); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } @@ -874,7 +884,7 @@ fn test_h1_response_http_error_handling() { assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); } @@ -907,7 +917,7 @@ fn test_h2_response_http_error_handling() { assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); } @@ -923,7 +933,7 @@ fn test_h1_service_error() { assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); } @@ -947,6 +957,6 @@ fn test_h2_service_error() { assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR); // read response - let bytes = srv.block_on(response.body()).unwrap(); + let bytes = srv.block_on(load_body(response.take_payload())).unwrap(); assert!(bytes.is_empty()); }