diff --git a/CHANGES.md b/CHANGES.md index 5265d9d79..ee109fe07 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ ### Added +* Add `HttpMessage::readlines()` for reading line by line. + * Add `ClientRequestBuilder::form()` for sending `application/x-www-form-urlencoded` requests. * Add method to configure custom error handler to Form extractor. diff --git a/src/error.rs b/src/error.rs index bbafb1c41..c272a2dca 100644 --- a/src/error.rs +++ b/src/error.rs @@ -590,6 +590,30 @@ impl From for JsonPayloadError { } } +/// Error type returned when reading body as lines. +pub enum ReadlinesError { + /// Error when decoding a line. + EncodingError, + /// Payload error. + PayloadError(PayloadError), + /// Line limit exceeded. + LimitOverflow, + /// ContentType error. + ContentTypeError(ContentTypeError), +} + +impl From for ReadlinesError { + fn from(err: PayloadError) -> Self { + ReadlinesError::PayloadError(err) + } +} + +impl From for ReadlinesError { + fn from(err: ContentTypeError) -> Self { + ReadlinesError::ContentTypeError(err) + } +} + /// Errors which can occur when attempting to interpret a segment string as a /// valid path segment. #[derive(Fail, Debug, PartialEq)] diff --git a/src/httpmessage.rs b/src/httpmessage.rs index a9d68d3ab..82c50d775 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -3,7 +3,7 @@ use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; use encoding::types::{DecoderTrap, Encoding}; use encoding::EncodingRef; -use futures::{Future, Poll, Stream}; +use futures::{Future, Poll, Stream, Async}; use http::{header, HeaderMap}; use http_range::HttpRange; use mime::Mime; @@ -13,6 +13,7 @@ use std::str; use error::{ ContentTypeError, HttpRangeError, ParseError, PayloadError, UrlencodedError, + ReadlinesError }; use header::Header; use json::JsonBody; @@ -260,6 +261,143 @@ pub trait HttpMessage { let boundary = Multipart::boundary(self.headers()); Multipart::new(boundary, self) } + + /// Return stream of lines. + fn readlines(self) -> Readlines + where + Self: Stream + Sized, + { + Readlines::new(self) + } +} + +/// Stream to read request line by line. +pub struct Readlines +where + T: HttpMessage + Stream + 'static, +{ + req: T, + buff: BytesMut, + limit: usize, + checked_buff: bool, +} + +impl Readlines +where + T: HttpMessage + Stream + 'static, +{ + /// Create a new stream to read request line by line. + fn new(req: T) -> Self { + Readlines { + req, + buff: BytesMut::with_capacity(262_144), + limit: 262_144, + checked_buff: true, + } + } + + /// Change max line size. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } +} + +impl Stream for Readlines +where + T: HttpMessage + Stream + 'static, +{ + type Item = String; + type Error = ReadlinesError; + + fn poll(&mut self) -> Poll, Self::Error> { + let encoding = self.req.encoding()?; + // check if there is a newline in the buffer + if !self.checked_buff { + let mut found: Option = None; + for (ind, b) in self.buff.iter().enumerate() { + if *b == '\n' as u8 { + found = Some(ind); + break; + } + } + if let Some(ind) = found { + // check if line is longer than limit + if ind+1 > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&self.buff.split_to(ind+1)) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + encoding + .decode(&self.buff.split_to(ind+1), DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + return Ok(Async::Ready(Some(line))); + } + self.checked_buff = true; + } + // poll req for more bytes + match self.req.poll() { + Ok(Async::Ready(Some(mut bytes))) => { + // check if there is a newline in bytes + let mut found: Option = None; + for (ind, b) in bytes.iter().enumerate() { + if *b == '\n' as u8 { + found = Some(ind); + break; + } + } + if let Some(ind) = found { + // check if line is longer than limit + if ind+1 > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&bytes.split_to(ind+1)) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + encoding + .decode(&bytes.split_to(ind+1), DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + // extend buffer with rest of the bytes; + self.buff.extend_from_slice(&bytes); + self.checked_buff = false; + return Ok(Async::Ready(Some(line))); + } + self.buff.extend_from_slice(&bytes); + Ok(Async::NotReady) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.buff.len() == 0 { + return Ok(Async::Ready(None)); + } + if self.buff.len() > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&self.buff) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + encoding + .decode(&self.buff, DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + self.buff.clear(); + return Ok(Async::Ready(Some(line))) + }, + Err(e) => Err(ReadlinesError::from(e)), + } + } } /// Future that resolves to a complete http message body. @@ -667,4 +805,30 @@ mod tests { _ => unreachable!("error"), } } + + #[test] + fn test_readlines() { + let mut req = HttpRequest::default(); + req.payload_mut().unread_data(Bytes::from_static( + b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\ + industry. Lorem Ipsum has been the industry's standard dummy\n\ + Contrary to popular belief, Lorem Ipsum is not simply random text." + )); + let mut r = Readlines::new(req); + match r.poll().ok().unwrap() { + Async::Ready(Some(s)) => assert_eq!(s, + "Lorem Ipsum is simply dummy text of the printing and typesetting\n"), + _ => unreachable!("error"), + } + match r.poll().ok().unwrap() { + Async::Ready(Some(s)) => assert_eq!(s, + "industry. Lorem Ipsum has been the industry's standard dummy\n"), + _ => unreachable!("error"), + } + match r.poll().ok().unwrap() { + Async::Ready(Some(s)) => assert_eq!(s, + "Contrary to popular belief, Lorem Ipsum is not simply random text."), + _ => unreachable!("error"), + } + } }