From 9012c46fe1e2ced25480e3aa4fd200899368e81a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 17 Mar 2019 00:48:40 -0700 Subject: [PATCH] move payload futures from actix-http --- Cargo.toml | 2 +- src/error.rs | 101 +++++++++++++++++ src/lib.rs | 3 + src/types/form.rs | 238 ++++++++++++++++++++++++++++++++++++++--- src/types/json.rs | 8 +- src/types/mod.rs | 5 +- src/types/payload.rs | 145 ++++++++++++++++++++++++- src/types/readlines.rs | 210 ++++++++++++++++++++++++++++++++++++ 8 files changed, 688 insertions(+), 24 deletions(-) create mode 100644 src/types/readlines.rs diff --git a/Cargo.toml b/Cargo.toml index 87a54b6a..ba36dd2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ mime = "0.3" net2 = "0.2.33" parking_lot = "0.7" regex = "1.0" -serde = "1.0" +serde = { version = "1.0", features=["derive"] } serde_json = "1.0" serde_urlencoded = "^0.5.3" time = "0.1" diff --git a/src/error.rs b/src/error.rs index 06840708..bf224a22 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,8 +3,12 @@ use std::fmt; pub use actix_http::error::*; use derive_more::{Display, From}; +use serde_json::error::Error as JsonError; use url::ParseError as UrlParseError; +use crate::http::StatusCode; +use crate::HttpResponse; + /// Errors which can occur when attempting to generate resource uri. #[derive(Debug, PartialEq, Display, From)] pub enum UrlGenerationError { @@ -41,3 +45,100 @@ impl From> for BlockingError } } } + +/// A set of errors that can occur during parsing urlencoded payloads +#[derive(Debug, Display, From)] +pub enum UrlencodedError { + /// Can not decode chunked transfer encoding + #[display(fmt = "Can not decode chunked transfer encoding")] + Chunked, + /// Payload size is bigger than allowed. (default: 256kB) + #[display(fmt = "Urlencoded payload size is bigger than allowed. (default: 256kB)")] + Overflow, + /// Payload size is now known + #[display(fmt = "Payload size is now known")] + UnknownLength, + /// Content type error + #[display(fmt = "Content type error")] + ContentType, + /// Parse error + #[display(fmt = "Parse error")] + Parse, + /// Payload error + #[display(fmt = "Error that occur during reading payload: {}", _0)] + Payload(PayloadError), +} + +/// Return `BadRequest` for `UrlencodedError` +impl ResponseError for UrlencodedError { + fn error_response(&self) -> HttpResponse { + match *self { + UrlencodedError::Overflow => { + HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE) + } + UrlencodedError::UnknownLength => { + HttpResponse::new(StatusCode::LENGTH_REQUIRED) + } + _ => HttpResponse::new(StatusCode::BAD_REQUEST), + } + } +} + +/// A set of errors that can occur during parsing json payloads +#[derive(Debug, Display, From)] +pub enum JsonPayloadError { + /// Payload size is bigger than allowed. (default: 256kB) + #[display(fmt = "Json payload size is bigger than allowed. (default: 256kB)")] + Overflow, + /// Content type error + #[display(fmt = "Content type error")] + ContentType, + /// Deserialize error + #[display(fmt = "Json deserialize error: {}", _0)] + Deserialize(JsonError), + /// Payload error + #[display(fmt = "Error that occur during reading payload: {}", _0)] + Payload(PayloadError), +} + +/// Return `BadRequest` for `UrlencodedError` +impl ResponseError for JsonPayloadError { + fn error_response(&self) -> HttpResponse { + match *self { + JsonPayloadError::Overflow => { + HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE) + } + _ => HttpResponse::new(StatusCode::BAD_REQUEST), + } + } +} + +/// Error type returned when reading body as lines. +#[derive(From, Display, Debug)] +pub enum ReadlinesError { + /// Error when decoding a line. + #[display(fmt = "Encoding error")] + /// Payload size is bigger than allowed. (default: 256kB) + EncodingError, + /// Payload error. + #[display(fmt = "Error that occur during reading payload: {}", _0)] + Payload(PayloadError), + /// Line limit exceeded. + #[display(fmt = "Line limit exceeded")] + LimitOverflow, + /// ContentType error. + #[display(fmt = "Content-type error")] + ContentTypeError(ContentTypeError), +} + +/// Return `BadRequest` for `ReadlinesError` +impl ResponseError for ReadlinesError { + fn error_response(&self) -> HttpResponse { + match *self { + ReadlinesError::LimitOverflow => { + HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE) + } + _ => HttpResponse::new(StatusCode::BAD_REQUEST), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 18cf93f4..d6bcf4e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,7 +58,10 @@ pub mod dev { pub use crate::service::{ HttpServiceFactory, ServiceFromRequest, ServiceRequest, ServiceResponse, }; + pub use crate::types::form::UrlEncoded; pub use crate::types::json::JsonBody; + pub use crate::types::payload::HttpMessageBody; + pub use crate::types::readlines::Readlines; pub use actix_http::body::{Body, BodyLength, MessageBody, ResponseBody}; pub use actix_http::dev::ResponseBuilder as HttpResponseBuilder; diff --git a/src/types/form.rs b/src/types/form.rs index 4a5e9729..58fa3761 100644 --- a/src/types/form.rs +++ b/src/types/form.rs @@ -3,13 +3,17 @@ use std::rc::Rc; use std::{fmt, ops}; -use actix_http::dev::UrlEncoded; -use actix_http::error::{Error, UrlencodedError}; -use bytes::Bytes; -use futures::{Future, Stream}; +use actix_http::error::{Error, PayloadError, UrlencodedError}; +use actix_http::{HttpMessage, Payload}; +use bytes::{Bytes, BytesMut}; +use encoding::all::UTF_8; +use encoding::types::{DecoderTrap, Encoding}; +use encoding::EncodingRef; +use futures::{Future, Poll, Stream}; use serde::de::DeserializeOwned; use crate::extract::FromRequest; +use crate::http::header::CONTENT_LENGTH; use crate::request::HttpRequest; use crate::service::ServiceFromRequest; @@ -167,13 +171,145 @@ impl Default for FormConfig { } } +/// Future that resolves to a parsed urlencoded values. +/// +/// Parse `application/x-www-form-urlencoded` encoded request's body. +/// Return `UrlEncoded` future. Form can be deserialized to any type that +/// implements `Deserialize` trait from *serde*. +/// +/// Returns error: +/// +/// * content type is not `application/x-www-form-urlencoded` +/// * content-length is greater than 32k +/// +pub struct UrlEncoded { + stream: Payload, + limit: usize, + length: Option, + encoding: EncodingRef, + err: Option, + fut: Option>>, +} + +impl UrlEncoded +where + T: HttpMessage, + T::Stream: Stream, +{ + /// Create a new future to URL encode a request + pub fn new(req: &mut T) -> UrlEncoded { + // check content type + if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" { + return Self::err(UrlencodedError::ContentType); + } + let encoding = match req.encoding() { + Ok(enc) => enc, + Err(_) => return Self::err(UrlencodedError::ContentType), + }; + + let mut len = None; + if let Some(l) = req.headers().get(CONTENT_LENGTH) { + if let Ok(s) = l.to_str() { + if let Ok(l) = s.parse::() { + len = Some(l) + } else { + return Self::err(UrlencodedError::UnknownLength); + } + } else { + return Self::err(UrlencodedError::UnknownLength); + } + }; + + UrlEncoded { + encoding, + stream: req.take_payload(), + limit: 32_768, + length: len, + fut: None, + err: None, + } + } + + fn err(e: UrlencodedError) -> Self { + UrlEncoded { + stream: Payload::None, + limit: 32_768, + fut: None, + err: Some(e), + length: None, + encoding: UTF_8, + } + } + + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } +} + +impl Future for UrlEncoded +where + T: HttpMessage, + T::Stream: Stream + 'static, + U: DeserializeOwned + 'static, +{ + type Item = U; + type Error = UrlencodedError; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut { + return fut.poll(); + } + + if let Some(err) = self.err.take() { + return Err(err); + } + + // payload size + let limit = self.limit; + if let Some(len) = self.length.take() { + if len > limit { + return Err(UrlencodedError::Overflow); + } + } + + // future + let encoding = self.encoding; + let fut = std::mem::replace(&mut self.stream, Payload::None) + .from_err() + .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(UrlencodedError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .and_then(move |body| { + if (encoding as *const Encoding) == UTF_8 { + serde_urlencoded::from_bytes::(&body) + .map_err(|_| UrlencodedError::Parse) + } else { + let body = encoding + .decode(&body, DecoderTrap::Strict) + .map_err(|_| UrlencodedError::Parse)?; + serde_urlencoded::from_str::(&body) + .map_err(|_| UrlencodedError::Parse) + } + }); + self.fut = Some(Box::new(fut)); + self.poll() + } +} + #[cfg(test)] mod tests { - use actix_http::http::header; use bytes::Bytes; - use serde_derive::Deserialize; + use serde::Deserialize; use super::*; + use crate::http::header::CONTENT_TYPE; use crate::test::{block_on, TestRequest}; #[derive(Deserialize, Debug, PartialEq)] @@ -183,15 +319,91 @@ mod tests { #[test] fn test_form() { - let mut req = TestRequest::with_header( - header::CONTENT_TYPE, - "application/x-www-form-urlencoded", - ) - .header(header::CONTENT_LENGTH, "11") - .set_payload(Bytes::from_static(b"hello=world")) - .to_from(); + let mut req = + TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .header(CONTENT_LENGTH, "11") + .set_payload(Bytes::from_static(b"hello=world")) + .to_from(); let s = block_on(Form::::from_request(&mut req)).unwrap(); assert_eq!(s.hello, "world"); } + + fn eq(err: UrlencodedError, other: UrlencodedError) -> bool { + match err { + UrlencodedError::Chunked => match other { + UrlencodedError::Chunked => true, + _ => false, + }, + UrlencodedError::Overflow => match other { + UrlencodedError::Overflow => true, + _ => false, + }, + UrlencodedError::UnknownLength => match other { + UrlencodedError::UnknownLength => true, + _ => false, + }, + UrlencodedError::ContentType => match other { + UrlencodedError::ContentType => true, + _ => false, + }, + _ => false, + } + } + + #[test] + fn test_urlencoded_error() { + let mut req = + TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .header(CONTENT_LENGTH, "xxxx") + .to_request(); + let info = block_on(UrlEncoded::<_, Info>::new(&mut req)); + assert!(eq(info.err().unwrap(), UrlencodedError::UnknownLength)); + + let mut req = + TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .header(CONTENT_LENGTH, "1000000") + .to_request(); + let info = block_on(UrlEncoded::<_, Info>::new(&mut req)); + assert!(eq(info.err().unwrap(), UrlencodedError::Overflow)); + + let mut req = TestRequest::with_header(CONTENT_TYPE, "text/plain") + .header(CONTENT_LENGTH, "10") + .to_request(); + let info = block_on(UrlEncoded::<_, Info>::new(&mut req)); + assert!(eq(info.err().unwrap(), UrlencodedError::ContentType)); + } + + #[test] + fn test_urlencoded() { + let mut req = + TestRequest::with_header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .header(CONTENT_LENGTH, "11") + .set_payload(Bytes::from_static(b"hello=world")) + .to_request(); + + let info = block_on(UrlEncoded::<_, Info>::new(&mut req)).unwrap(); + assert_eq!( + info, + Info { + hello: "world".to_owned() + } + ); + + let mut req = TestRequest::with_header( + CONTENT_TYPE, + "application/x-www-form-urlencoded; charset=utf-8", + ) + .header(CONTENT_LENGTH, "11") + .set_payload(Bytes::from_static(b"hello=world")) + .to_request(); + + let info = block_on(UrlEncoded::<_, Info>::new(&mut req)).unwrap(); + assert_eq!( + info, + Info { + hello: "world".to_owned() + } + ); + } } diff --git a/src/types/json.rs b/src/types/json.rs index 74ee5eb2..18a6be90 100644 --- a/src/types/json.rs +++ b/src/types/json.rs @@ -393,7 +393,7 @@ mod tests { #[test] fn test_json_body() { let mut req = TestRequest::default().to_request(); - let json = block_on(req.json::()); + let json = block_on(JsonBody::<_, MyObject>::new(&mut req)); assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); let mut req = TestRequest::default() @@ -402,7 +402,7 @@ mod tests { header::HeaderValue::from_static("application/text"), ) .to_request(); - let json = block_on(req.json::()); + let json = block_on(JsonBody::<_, MyObject>::new(&mut req)); assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); let mut req = TestRequest::default() @@ -416,7 +416,7 @@ mod tests { ) .to_request(); - let json = block_on(req.json::().limit(100)); + let json = block_on(JsonBody::<_, MyObject>::new(&mut req).limit(100)); assert!(json_eq(json.err().unwrap(), JsonPayloadError::Overflow)); let mut req = TestRequest::default() @@ -431,7 +431,7 @@ mod tests { .set_payload(Bytes::from_static(b"{\"name\": \"test\"}")) .to_request(); - let json = block_on(req.json::()); + let json = block_on(JsonBody::<_, MyObject>::new(&mut req)); assert_eq!( json.ok().unwrap(), MyObject { diff --git a/src/types/mod.rs b/src/types/mod.rs index 2fc3ca93..30ee7309 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,10 +1,11 @@ //! Helper types -mod form; +pub(crate) mod form; pub(crate) mod json; mod path; -mod payload; +pub(crate) mod payload; mod query; +pub(crate) mod readlines; pub use self::form::{Form, FormConfig}; pub use self::json::{Json, JsonConfig}; diff --git a/src/types/payload.rs b/src/types/payload.rs index 7164a544..402486b6 100644 --- a/src/types/payload.rs +++ b/src/types/payload.rs @@ -1,10 +1,9 @@ //! Payload/Bytes/String extractors use std::str; -use actix_http::dev::MessageBody; use actix_http::error::{Error, ErrorBadRequest, PayloadError}; use actix_http::HttpMessage; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use encoding::all::UTF_8; use encoding::types::{DecoderTrap, Encoding}; use futures::future::{err, Either, FutureResult}; @@ -12,6 +11,7 @@ use futures::{Future, Poll, Stream}; use mime::Mime; use crate::extract::FromRequest; +use crate::http::header; use crate::service::ServiceFromRequest; /// Payload extractor returns request 's payload stream. @@ -152,7 +152,7 @@ where } let limit = cfg.limit; - Either::A(Box::new(MessageBody::new(req).limit(limit).from_err())) + Either::A(Box::new(HttpMessageBody::new(req).limit(limit).from_err())) } } @@ -213,7 +213,7 @@ where let limit = cfg.limit; Either::A(Box::new( - MessageBody::new(req) + HttpMessageBody::new(req) .limit(limit) .from_err() .and_then(move |body| { @@ -287,6 +287,109 @@ impl Default for PayloadConfig { } } +/// Future that resolves to a complete http message body. +/// +/// Load http message body. +/// +/// By default only 256Kb payload reads to a memory, then +/// `PayloadError::Overflow` get returned. Use `MessageBody::limit()` +/// method to change upper limit. +pub struct HttpMessageBody { + limit: usize, + length: Option, + stream: actix_http::Payload, + err: Option, + fut: Option>>, +} + +impl HttpMessageBody +where + T: HttpMessage, + T::Stream: Stream, +{ + /// Create `MessageBody` for request. + pub fn new(req: &mut T) -> HttpMessageBody { + let mut len = None; + if let Some(l) = req.headers().get(header::CONTENT_LENGTH) { + if let Ok(s) = l.to_str() { + if let Ok(l) = s.parse::() { + len = Some(l) + } else { + return Self::err(PayloadError::UnknownLength); + } + } else { + return Self::err(PayloadError::UnknownLength); + } + } + + HttpMessageBody { + stream: req.take_payload(), + limit: 262_144, + length: len, + fut: None, + err: None, + } + } + + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + fn err(e: PayloadError) -> Self { + HttpMessageBody { + stream: actix_http::Payload::None, + limit: 262_144, + fut: None, + err: Some(e), + length: None, + } + } +} + +impl Future for HttpMessageBody +where + T: HttpMessage, + T::Stream: Stream + 'static, +{ + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut { + return fut.poll(); + } + + if let Some(err) = self.err.take() { + return Err(err); + } + + if let Some(len) = self.length.take() { + if len > self.limit { + return Err(PayloadError::Overflow); + } + } + + // future + let limit = self.limit; + self.fut = Some(Box::new( + std::mem::replace(&mut self.stream, actix_http::Payload::None) + .from_err() + .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(PayloadError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .map(|body| body.freeze()), + )); + self.poll() + } +} + #[cfg(test)] mod tests { use bytes::Bytes; @@ -332,4 +435,38 @@ mod tests { let s = block_on(String::from_request(&mut req)).unwrap(); assert_eq!(s, "hello=world"); } + + #[test] + fn test_message_body() { + let mut req = + TestRequest::with_header(header::CONTENT_LENGTH, "xxxx").to_request(); + let res = block_on(HttpMessageBody::new(&mut req)); + match res.err().unwrap() { + PayloadError::UnknownLength => (), + _ => unreachable!("error"), + } + + let mut req = + TestRequest::with_header(header::CONTENT_LENGTH, "1000000").to_request(); + let res = block_on(HttpMessageBody::new(&mut req)); + match res.err().unwrap() { + PayloadError::Overflow => (), + _ => unreachable!("error"), + } + + let mut req = TestRequest::default() + .set_payload(Bytes::from_static(b"test")) + .to_request(); + let res = block_on(HttpMessageBody::new(&mut req)); + assert_eq!(res.ok().unwrap(), Bytes::from_static(b"test")); + + let mut req = TestRequest::default() + .set_payload(Bytes::from_static(b"11111111111111")) + .to_request(); + let res = block_on(HttpMessageBody::new(&mut req).limit(5)); + match res.err().unwrap() { + PayloadError::Overflow => (), + _ => unreachable!("error"), + } + } } diff --git a/src/types/readlines.rs b/src/types/readlines.rs new file mode 100644 index 00000000..2c7f699a --- /dev/null +++ b/src/types/readlines.rs @@ -0,0 +1,210 @@ +use std::str; + +use bytes::{Bytes, BytesMut}; +use encoding::all::UTF_8; +use encoding::types::{DecoderTrap, Encoding}; +use encoding::EncodingRef; +use futures::{Async, Poll, Stream}; + +use crate::dev::Payload; +use crate::error::{PayloadError, ReadlinesError}; +use crate::HttpMessage; + +/// Stream to read request line by line. +pub struct Readlines { + stream: Payload, + buff: BytesMut, + limit: usize, + checked_buff: bool, + encoding: EncodingRef, + err: Option, +} + +impl Readlines +where + T: HttpMessage, + T::Stream: Stream, +{ + /// Create a new stream to read request line by line. + pub fn new(req: &mut T) -> Self { + let encoding = match req.encoding() { + Ok(enc) => enc, + Err(err) => return Self::err(err.into()), + }; + + Readlines { + stream: req.take_payload(), + buff: BytesMut::with_capacity(262_144), + limit: 262_144, + checked_buff: true, + err: None, + encoding, + } + } + + /// Change max line size. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } + + fn err(err: ReadlinesError) -> Self { + Readlines { + stream: Payload::None, + buff: BytesMut::new(), + limit: 262_144, + checked_buff: true, + encoding: UTF_8, + err: Some(err), + } + } +} + +impl Stream for Readlines +where + T: HttpMessage, + T::Stream: Stream, +{ + type Item = String; + type Error = ReadlinesError; + + fn poll(&mut self) -> Poll, Self::Error> { + if let Some(err) = self.err.take() { + return Err(err); + } + + // check if there is a newline in the buffer + if !self.checked_buff { + let mut found: Option = None; + for (ind, b) in self.buff.iter().enumerate() { + if *b == b'\n' { + found = Some(ind); + break; + } + } + if let Some(ind) = found { + // check if line is longer than limit + if ind + 1 > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = self.encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&self.buff.split_to(ind + 1)) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + self.encoding + .decode(&self.buff.split_to(ind + 1), DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + return Ok(Async::Ready(Some(line))); + } + self.checked_buff = true; + } + // poll req for more bytes + match self.stream.poll() { + Ok(Async::Ready(Some(mut bytes))) => { + // check if there is a newline in bytes + let mut found: Option = None; + for (ind, b) in bytes.iter().enumerate() { + if *b == b'\n' { + found = Some(ind); + break; + } + } + if let Some(ind) = found { + // check if line is longer than limit + if ind + 1 > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = self.encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&bytes.split_to(ind + 1)) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + self.encoding + .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + // extend buffer with rest of the bytes; + self.buff.extend_from_slice(&bytes); + self.checked_buff = false; + return Ok(Async::Ready(Some(line))); + } + self.buff.extend_from_slice(&bytes); + Ok(Async::NotReady) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.buff.is_empty() { + return Ok(Async::Ready(None)); + } + if self.buff.len() > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = self.encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&self.buff) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + self.encoding + .decode(&self.buff, DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + self.buff.clear(); + Ok(Async::Ready(Some(line))) + } + Err(e) => Err(ReadlinesError::from(e)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::{block_on, TestRequest}; + + #[test] + fn test_readlines() { + let mut req = TestRequest::default() + .set_payload(Bytes::from_static( + b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\ + industry. Lorem Ipsum has been the industry's standard dummy\n\ + Contrary to popular belief, Lorem Ipsum is not simply random text.", + )) + .to_request(); + let stream = match block_on(Readlines::new(&mut req).into_future()) { + Ok((Some(s), stream)) => { + assert_eq!( + s, + "Lorem Ipsum is simply dummy text of the printing and typesetting\n" + ); + stream + } + _ => unreachable!("error"), + }; + + let stream = match block_on(stream.into_future()) { + Ok((Some(s), stream)) => { + assert_eq!( + s, + "industry. Lorem Ipsum has been the industry's standard dummy\n" + ); + stream + } + _ => unreachable!("error"), + }; + + match block_on(stream.into_future()) { + Ok((Some(s), stream)) => { + assert_eq!( + s, + "Contrary to popular belief, Lorem Ipsum is not simply random text." + ); + } + _ => unreachable!("error"), + } + } +}