1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

change poll method of Readlines

This commit is contained in:
Dursun Akkurt 2018-06-13 22:41:35 +03:00
parent 6d95e34552
commit ad9aacf521

View File

@ -13,6 +13,7 @@ use std::str;
use error::{ use error::{
ContentTypeError, HttpRangeError, ParseError, PayloadError, UrlencodedError, ContentTypeError, HttpRangeError, ParseError, PayloadError, UrlencodedError,
Error, ErrorBadRequest
}; };
use header::Header; use header::Header;
use json::JsonBody; use json::JsonBody;
@ -276,7 +277,7 @@ where
T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static, T: HttpMessage + Stream<Item = Bytes, Error = PayloadError> + 'static,
{ {
req: T, req: T,
buff: Vec<u8>, buff: BytesMut,
limit: usize, limit: usize,
} }
@ -288,12 +289,12 @@ where
fn new(req: T) -> Self { fn new(req: T) -> Self {
Readlines { Readlines {
req, req,
buff: Vec::with_capacity(262_144), buff: BytesMut::with_capacity(262_144),
limit: 262_144, limit: 262_144,
} }
} }
/// Change max size of payload. By default max size is 256Kb /// Change max line size. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self { pub fn limit(mut self, limit: usize) -> Self {
self.limit = limit; self.limit = limit;
self self
@ -308,21 +309,63 @@ where
type Error = ReadlinesError; type Error = ReadlinesError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let encoding = self.req.encoding()?;
// check if there is a newline in the buffer
let mut found: Option<usize> = None;
for (ind, b) in self.buff.iter().enumerate() {
if *b == '\n' as u8 {
found = Some(ind);
break;
}
}
if let Some(ind) = found {
// check if line is longer than limit
if ind+1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff.split_to(ind+1))
.map_err(|_| ErrorBadRequest("Can not decode body"))?
.to_owned()
} else {
encoding
.decode(&self.buff.split_to(ind+1), DecoderTrap::Strict)
.map_err(|_| ErrorBadRequest("Can not decode body"))?
};
return Ok(Async::Ready(Some(line)));
}
// poll req for more bytes
match self.req.poll() { match self.req.poll() {
Ok(Async::Ready(Some(bytes))) => { Ok(Async::Ready(Some(mut bytes))) => {
for b in bytes.iter() { // check if there is a newline in bytes
let mut found: Option<usize> = None;
for (ind, b) in bytes.iter().enumerate() {
if *b == '\n' as u8 { if *b == '\n' as u8 {
self.buff.push(*b); found = Some(ind);
let line = str::from_utf8(&*self.buff)?.to_owned(); break;
self.buff.clear();
return Ok(Async::Ready(Some(line)));
} else {
self.buff.push(*b);
}
if self.limit < self.buff.len() {
return Err(ReadlinesError::LimitOverflow);
} }
} }
if let Some(ind) = found {
// check if line is longer than limit
if ind+1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&bytes.split_to(ind+1))
.map_err(|_| ErrorBadRequest("Can not decode body"))?
.to_owned()
} else {
encoding
.decode(&bytes.split_to(ind+1), DecoderTrap::Strict)
.map_err(|_| ErrorBadRequest("Can not decode body"))?
};
// extend buffer with rest of the bytes;
self.buff.extend_from_slice(&bytes);
return Ok(Async::Ready(Some(line)));
}
self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady) Ok(Async::NotReady)
}, },
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
@ -330,7 +373,19 @@ where
if self.buff.len() == 0 { if self.buff.len() == 0 {
return Ok(Async::Ready(None)); return Ok(Async::Ready(None));
} }
let line = str::from_utf8(&*self.buff)?.to_owned(); if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff)
.map_err(|_| ErrorBadRequest("Can not decode body"))?
.to_owned()
} else {
encoding
.decode(&self.buff, DecoderTrap::Strict)
.map_err(|_| ErrorBadRequest("Can not decode body"))?
};
self.buff.clear(); self.buff.clear();
return Ok(Async::Ready(Some(line))) return Ok(Async::Ready(Some(line)))
}, },
@ -343,6 +398,7 @@ pub enum ReadlinesError {
EncodingError, EncodingError,
PayloadError(PayloadError), PayloadError(PayloadError),
LimitOverflow, LimitOverflow,
ContentTypeError(ContentTypeError),
} }
impl From<PayloadError> for ReadlinesError { impl From<PayloadError> for ReadlinesError {
@ -351,12 +407,18 @@ impl From<PayloadError> for ReadlinesError {
} }
} }
impl From<str::Utf8Error> for ReadlinesError { impl From<Error> for ReadlinesError {
fn from(_: str::Utf8Error) -> Self { fn from(_: Error) -> Self {
ReadlinesError::EncodingError ReadlinesError::EncodingError
} }
} }
impl From<ContentTypeError> for ReadlinesError {
fn from(err: ContentTypeError) -> Self {
ReadlinesError::ContentTypeError(err)
}
}
/// Future that resolves to a complete http message body. /// Future that resolves to a complete http message body.
pub struct MessageBody<T> { pub struct MessageBody<T> {
limit: usize, limit: usize,