1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

add HttpMessage::readlines()

This commit is contained in:
Dursun Akkurt 2018-06-13 20:43:03 +03:00
parent c8528e8920
commit 6c765739d0

View File

@ -3,7 +3,7 @@ use encoding::all::UTF_8;
use encoding::label::encoding_from_whatwg_label;
use encoding::types::{DecoderTrap, Encoding};
use encoding::EncodingRef;
use futures::{Future, Poll, Stream};
use futures::{Future, Poll, Stream, Async};
use http::{header, HeaderMap};
use http_range::HttpRange;
use mime::Mime;
@ -260,6 +260,101 @@ pub trait HttpMessage {
let boundary = Multipart::boundary(self.headers());
Multipart::new(boundary, self)
}
/// Return stream of lines.
fn readlines(self) -> Readlines<Self>
where
Self: Stream<Item = Bytes, Error = PayloadError> + Sized,
{
Readlines::new(self)
}
}
/// Stream to read request line by line.
pub struct Readlines<T>
where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{
req: T,
buff: Vec<u8>,
limit: usize,
}
impl<T> Readlines<T>
where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{
/// Create a new stream to read request line by line.
fn new(req: T) -> Self {
Readlines {
req,
buff: Vec::with_capacity(256),
limit: 262_144,
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}
}
impl<T> Stream for Readlines<T>
where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{
type Item = String;
type Error = ReadlinesError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.req.poll() {
Ok(Async::Ready(Some(bytes))) => {
for b in bytes.iter() {
if *b == '\n' as u8 {
self.buff.push(*b);
let line = str::from_utf8(&*self.buff)?.to_owned();
self.buff.clear();
return Ok(Async::Ready(Some(line)));
} else {
self.buff.push(*b);
}
if self.limit < self.buff.len() {
return Err(ReadlinesError::LimitOverflow);
}
}
Ok(Async::NotReady)
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
if self.buff.len() == 0 {
return Ok(Async::Ready(None));
}
let line = str::from_utf8(&*self.buff)?.to_owned();
self.buff.clear();
return Ok(Async::Ready(Some(line)))
},
Err(e) => Err(ReadlinesError::from(e)),
}
}
}
pub enum ReadlinesError {
EncodingError,
PayloadError(PayloadError),
LimitOverflow,
}
impl From<PayloadError> for ReadlinesError {
fn from(err: PayloadError) -> Self {
ReadlinesError::PayloadError(err)
}
}
impl From<str::Utf8Error> for ReadlinesError {
fn from(_: str::Utf8Error) -> Self {
ReadlinesError::EncodingError
}
}
/// Future that resolves to a complete http message body.