1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-24 00:21:08 +01:00

Merge pull request #312 from eddomuke/master

Add HttpMessage::readlines()
This commit is contained in:
Özgür Akkurt 2018-06-14 00:40:29 +03:00 committed by GitHub
commit 4c646962a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 191 additions and 1 deletions

View File

@ -4,6 +4,8 @@
### Added ### Added
* Add `HttpMessage::readlines()` for reading line by line.
* Add `ClientRequestBuilder::form()` for sending `application/x-www-form-urlencoded` requests. * Add `ClientRequestBuilder::form()` for sending `application/x-www-form-urlencoded` requests.
* Add method to configure custom error handler to Form extractor. * Add method to configure custom error handler to Form extractor.

View File

@ -590,6 +590,30 @@ impl From<JsonError> for JsonPayloadError {
} }
} }
/// Error type returned when reading body as lines.
pub enum ReadlinesError {
/// Error when decoding a line.
EncodingError,
/// Payload error.
PayloadError(PayloadError),
/// Line limit exceeded.
LimitOverflow,
/// ContentType error.
ContentTypeError(ContentTypeError),
}
impl From<PayloadError> for ReadlinesError {
fn from(err: PayloadError) -> Self {
ReadlinesError::PayloadError(err)
}
}
impl From<ContentTypeError> for ReadlinesError {
fn from(err: ContentTypeError) -> Self {
ReadlinesError::ContentTypeError(err)
}
}
/// Errors which can occur when attempting to interpret a segment string as a /// Errors which can occur when attempting to interpret a segment string as a
/// valid path segment. /// valid path segment.
#[derive(Fail, Debug, PartialEq)] #[derive(Fail, Debug, PartialEq)]

View File

@ -3,7 +3,7 @@ use encoding::all::UTF_8;
use encoding::label::encoding_from_whatwg_label; use encoding::label::encoding_from_whatwg_label;
use encoding::types::{DecoderTrap, Encoding}; use encoding::types::{DecoderTrap, Encoding};
use encoding::EncodingRef; use encoding::EncodingRef;
use futures::{Future, Poll, Stream}; use futures::{Future, Poll, Stream, Async};
use http::{header, HeaderMap}; use http::{header, HeaderMap};
use http_range::HttpRange; use http_range::HttpRange;
use mime::Mime; use mime::Mime;
@ -13,6 +13,7 @@ use std::str;
use error::{ use error::{
ContentTypeError, HttpRangeError, ParseError, PayloadError, UrlencodedError, ContentTypeError, HttpRangeError, ParseError, PayloadError, UrlencodedError,
ReadlinesError
}; };
use header::Header; use header::Header;
use json::JsonBody; use json::JsonBody;
@ -260,6 +261,143 @@ pub trait HttpMessage {
let boundary = Multipart::boundary(self.headers()); let boundary = Multipart::boundary(self.headers());
Multipart::new(boundary, self) Multipart::new(boundary, self)
} }
/// Return stream of lines.
fn readlines(self) -> Readlines<Self>
where
Self: Stream<Item = Bytes, Error = PayloadError> + Sized,
{
Readlines::new(self)
}
}
/// Stream to read request line by line.
pub struct Readlines<T>
where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{
req: T,
buff: BytesMut,
limit: usize,
checked_buff: bool,
}
impl<T> Readlines<T>
where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{
/// Create a new stream to read request line by line.
fn new(req: T) -> Self {
Readlines {
req,
buff: BytesMut::with_capacity(262_144),
limit: 262_144,
checked_buff: true,
}
}
/// Change max line size. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
impl<T> Stream for Readlines<T>
where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{
type Item = String;
type Error = ReadlinesError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let encoding = self.req.encoding()?;
// check if there is a newline in the buffer
if !self.checked_buff {
let mut found: Option<usize> = None;
for (ind, b) in self.buff.iter().enumerate() {
if *b == '\n' as u8 {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind+1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff.split_to(ind+1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
encoding
.decode(&self.buff.split_to(ind+1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
return Ok(Async::Ready(Some(line)));
}
self.checked_buff = true;
}
// poll req for more bytes
match self.req.poll() {
Ok(Async::Ready(Some(mut bytes))) => {
// check if there is a newline in bytes
let mut found: Option<usize> = None;
for (ind, b) in bytes.iter().enumerate() {
if *b == '\n' as u8 {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind+1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&bytes.split_to(ind+1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
encoding
.decode(&bytes.split_to(ind+1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
// extend buffer with rest of the bytes;
self.buff.extend_from_slice(&bytes);
self.checked_buff = false;
return Ok(Async::Ready(Some(line)));
}
self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady)
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
if self.buff.len() == 0 {
return Ok(Async::Ready(None));
}
if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff)
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
encoding
.decode(&self.buff, DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
self.buff.clear();
return Ok(Async::Ready(Some(line)))
},
Err(e) => Err(ReadlinesError::from(e)),
}
}
} }
/// Future that resolves to a complete http message body. /// Future that resolves to a complete http message body.
@ -667,4 +805,30 @@ mod tests {
_ => unreachable!("error"), _ => unreachable!("error"),
} }
} }
#[test]
fn test_readlines() {
let mut req = HttpRequest::default();
req.payload_mut().unread_data(Bytes::from_static(
b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
industry. Lorem Ipsum has been the industry's standard dummy\n\
Contrary to popular belief, Lorem Ipsum is not simply random text."
));
let mut r = Readlines::new(req);
match r.poll().ok().unwrap() {
Async::Ready(Some(s)) => assert_eq!(s,
"Lorem Ipsum is simply dummy text of the printing and typesetting\n"),
_ => unreachable!("error"),
}
match r.poll().ok().unwrap() {
Async::Ready(Some(s)) => assert_eq!(s,
"industry. Lorem Ipsum has been the industry's standard dummy\n"),
_ => unreachable!("error"),
}
match r.poll().ok().unwrap() {
Async::Ready(Some(s)) => assert_eq!(s,
"Contrary to popular belief, Lorem Ipsum is not simply random text."),
_ => unreachable!("error"),
}
}
} }