2019-06-18 14:43:25 +08:00
|
|
|
use std::borrow::Cow;
|
2019-11-20 23:33:22 +06:00
|
|
|
use std::future::Future;
|
|
|
|
use std::pin::Pin;
|
2019-03-17 00:48:40 -07:00
|
|
|
use std::str;
|
2019-11-20 23:33:22 +06:00
|
|
|
use std::task::{Context, Poll};
|
2019-03-17 00:48:40 -07:00
|
|
|
|
|
|
|
use bytes::{Bytes, BytesMut};
|
2019-06-18 14:43:25 +08:00
|
|
|
use encoding_rs::{Encoding, UTF_8};
|
2019-11-20 23:33:22 +06:00
|
|
|
use futures::Stream;
|
|
|
|
use pin_project::pin_project;
|
2019-03-17 00:48:40 -07:00
|
|
|
|
|
|
|
use crate::dev::Payload;
|
|
|
|
use crate::error::{PayloadError, ReadlinesError};
|
|
|
|
use crate::HttpMessage;
|
|
|
|
|
|
|
|
/// Stream to read request line by line.
|
|
|
|
pub struct Readlines<T: HttpMessage> {
|
|
|
|
stream: Payload<T::Stream>,
|
|
|
|
buff: BytesMut,
|
|
|
|
limit: usize,
|
|
|
|
checked_buff: bool,
|
2019-06-18 14:43:25 +08:00
|
|
|
encoding: &'static Encoding,
|
2019-03-17 00:48:40 -07:00
|
|
|
err: Option<ReadlinesError>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> Readlines<T>
|
|
|
|
where
|
|
|
|
T: HttpMessage,
|
2019-11-20 23:33:22 +06:00
|
|
|
T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
2019-03-17 00:48:40 -07:00
|
|
|
{
|
|
|
|
/// Create a new stream to read request line by line.
|
|
|
|
pub fn new(req: &mut T) -> Self {
|
|
|
|
let encoding = match req.encoding() {
|
|
|
|
Ok(enc) => enc,
|
|
|
|
Err(err) => return Self::err(err.into()),
|
|
|
|
};
|
|
|
|
|
|
|
|
Readlines {
|
|
|
|
stream: req.take_payload(),
|
|
|
|
buff: BytesMut::with_capacity(262_144),
|
|
|
|
limit: 262_144,
|
|
|
|
checked_buff: true,
|
|
|
|
err: None,
|
|
|
|
encoding,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Change max line size. By default max size is 256Kb
|
|
|
|
pub fn limit(mut self, limit: usize) -> Self {
|
|
|
|
self.limit = limit;
|
|
|
|
self
|
|
|
|
}
|
|
|
|
|
|
|
|
fn err(err: ReadlinesError) -> Self {
|
|
|
|
Readlines {
|
|
|
|
stream: Payload::None,
|
|
|
|
buff: BytesMut::new(),
|
|
|
|
limit: 262_144,
|
|
|
|
checked_buff: true,
|
|
|
|
encoding: UTF_8,
|
|
|
|
err: Some(err),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> Stream for Readlines<T>
|
|
|
|
where
|
|
|
|
T: HttpMessage,
|
2019-11-20 23:33:22 +06:00
|
|
|
T::Stream: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
2019-03-17 00:48:40 -07:00
|
|
|
{
|
2019-11-20 23:33:22 +06:00
|
|
|
type Item = Result<String, ReadlinesError>;
|
2019-03-17 00:48:40 -07:00
|
|
|
|
2019-11-20 23:33:22 +06:00
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
|
|
let this = self.get_mut();
|
|
|
|
|
|
|
|
if let Some(err) = this.err.take() {
|
|
|
|
return Poll::Ready(Some(Err(err)));
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// check if there is a newline in the buffer
|
2019-11-20 23:33:22 +06:00
|
|
|
if !this.checked_buff {
|
2019-03-17 00:48:40 -07:00
|
|
|
let mut found: Option<usize> = None;
|
2019-11-20 23:33:22 +06:00
|
|
|
for (ind, b) in this.buff.iter().enumerate() {
|
2019-03-17 00:48:40 -07:00
|
|
|
if *b == b'\n' {
|
|
|
|
found = Some(ind);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if let Some(ind) = found {
|
|
|
|
// check if line is longer than limit
|
2019-11-20 23:33:22 +06:00
|
|
|
if ind + 1 > this.limit {
|
|
|
|
return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
let line = if this.encoding == UTF_8 {
|
|
|
|
str::from_utf8(&this.buff.split_to(ind + 1))
|
2019-03-17 00:48:40 -07:00
|
|
|
.map_err(|_| ReadlinesError::EncodingError)?
|
|
|
|
.to_owned()
|
|
|
|
} else {
|
2019-11-20 23:33:22 +06:00
|
|
|
this.encoding
|
2019-06-18 14:43:25 +08:00
|
|
|
.decode_without_bom_handling_and_without_replacement(
|
2019-11-20 23:33:22 +06:00
|
|
|
&this.buff.split_to(ind + 1),
|
2019-06-18 14:43:25 +08:00
|
|
|
)
|
|
|
|
.map(Cow::into_owned)
|
|
|
|
.ok_or(ReadlinesError::EncodingError)?
|
2019-03-17 00:48:40 -07:00
|
|
|
};
|
2019-11-20 23:33:22 +06:00
|
|
|
return Poll::Ready(Some(Ok(line)));
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
this.checked_buff = true;
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
|
|
|
// poll req for more bytes
|
2019-11-20 23:33:22 +06:00
|
|
|
match Pin::new(&mut this.stream).poll_next(cx) {
|
|
|
|
Poll::Ready(Some(Ok(mut bytes))) => {
|
2019-03-17 00:48:40 -07:00
|
|
|
// check if there is a newline in bytes
|
|
|
|
let mut found: Option<usize> = None;
|
|
|
|
for (ind, b) in bytes.iter().enumerate() {
|
|
|
|
if *b == b'\n' {
|
|
|
|
found = Some(ind);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if let Some(ind) = found {
|
|
|
|
// check if line is longer than limit
|
2019-11-20 23:33:22 +06:00
|
|
|
if ind + 1 > this.limit {
|
|
|
|
return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
let line = if this.encoding == UTF_8 {
|
2019-03-17 00:48:40 -07:00
|
|
|
str::from_utf8(&bytes.split_to(ind + 1))
|
|
|
|
.map_err(|_| ReadlinesError::EncodingError)?
|
|
|
|
.to_owned()
|
|
|
|
} else {
|
2019-11-20 23:33:22 +06:00
|
|
|
this.encoding
|
2019-06-18 14:43:25 +08:00
|
|
|
.decode_without_bom_handling_and_without_replacement(
|
|
|
|
&bytes.split_to(ind + 1),
|
|
|
|
)
|
|
|
|
.map(Cow::into_owned)
|
|
|
|
.ok_or(ReadlinesError::EncodingError)?
|
2019-03-17 00:48:40 -07:00
|
|
|
};
|
|
|
|
// extend buffer with rest of the bytes;
|
2019-11-20 23:33:22 +06:00
|
|
|
this.buff.extend_from_slice(&bytes);
|
|
|
|
this.checked_buff = false;
|
|
|
|
return Poll::Ready(Some(Ok(line)));
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
this.buff.extend_from_slice(&bytes);
|
|
|
|
Poll::Pending
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
Poll::Ready(None) => {
|
|
|
|
if this.buff.is_empty() {
|
|
|
|
return Poll::Ready(None);
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
if this.buff.len() > this.limit {
|
|
|
|
return Poll::Ready(Some(Err(ReadlinesError::LimitOverflow)));
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
let line = if this.encoding == UTF_8 {
|
|
|
|
str::from_utf8(&this.buff)
|
2019-03-17 00:48:40 -07:00
|
|
|
.map_err(|_| ReadlinesError::EncodingError)?
|
|
|
|
.to_owned()
|
|
|
|
} else {
|
2019-11-20 23:33:22 +06:00
|
|
|
this.encoding
|
|
|
|
.decode_without_bom_handling_and_without_replacement(&this.buff)
|
2019-06-18 14:43:25 +08:00
|
|
|
.map(Cow::into_owned)
|
|
|
|
.ok_or(ReadlinesError::EncodingError)?
|
2019-03-17 00:48:40 -07:00
|
|
|
};
|
2019-11-20 23:33:22 +06:00
|
|
|
this.buff.clear();
|
|
|
|
Poll::Ready(Some(Ok(line)))
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
2019-11-20 23:33:22 +06:00
|
|
|
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(ReadlinesError::from(e)))),
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
2019-11-20 23:33:22 +06:00
|
|
|
use futures::stream::StreamExt;
|
|
|
|
|
2019-03-17 00:48:40 -07:00
|
|
|
use super::*;
|
|
|
|
use crate::test::{block_on, TestRequest};
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_readlines() {
|
2019-11-20 23:33:22 +06:00
|
|
|
block_on(async {
|
|
|
|
let mut req = TestRequest::default()
|
2019-03-17 00:48:40 -07:00
|
|
|
.set_payload(Bytes::from_static(
|
|
|
|
b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\
|
|
|
|
industry. Lorem Ipsum has been the industry's standard dummy\n\
|
|
|
|
Contrary to popular belief, Lorem Ipsum is not simply random text.",
|
|
|
|
))
|
|
|
|
.to_request();
|
|
|
|
|
2019-11-20 23:33:22 +06:00
|
|
|
let mut stream = Readlines::new(&mut req);
|
|
|
|
assert_eq!(
|
|
|
|
stream.next().await.unwrap().unwrap(),
|
|
|
|
"Lorem Ipsum is simply dummy text of the printing and typesetting\n"
|
|
|
|
);
|
2019-03-17 00:48:40 -07:00
|
|
|
|
2019-11-20 23:33:22 +06:00
|
|
|
assert_eq!(
|
|
|
|
stream.next().await.unwrap().unwrap(),
|
|
|
|
"industry. Lorem Ipsum has been the industry's standard dummy\n"
|
|
|
|
);
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
stream.next().await.unwrap().unwrap(),
|
|
|
|
"Contrary to popular belief, Lorem Ipsum is not simply random text."
|
|
|
|
);
|
|
|
|
})
|
2019-03-17 00:48:40 -07:00
|
|
|
}
|
|
|
|
}
|