use std::str; use bytes::{Bytes, BytesMut}; use encoding::all::UTF_8; use encoding::types::{DecoderTrap, Encoding}; use encoding::EncodingRef; use futures::{Async, Poll, Stream}; use crate::dev::Payload; use crate::error::{PayloadError, ReadlinesError}; use crate::HttpMessage; /// Stream to read request line by line. pub struct Readlines { stream: Payload, buff: BytesMut, limit: usize, checked_buff: bool, encoding: EncodingRef, err: Option, } impl Readlines where T: HttpMessage, T::Stream: Stream, { /// Create a new stream to read request line by line. pub fn new(req: &mut T) -> Self { let encoding = match req.encoding() { Ok(enc) => enc, Err(err) => return Self::err(err.into()), }; Readlines { stream: req.take_payload(), buff: BytesMut::with_capacity(262_144), limit: 262_144, checked_buff: true, err: None, encoding, } } /// Change max line size. By default max size is 256Kb pub fn limit(mut self, limit: usize) -> Self { self.limit = limit; self } fn err(err: ReadlinesError) -> Self { Readlines { stream: Payload::None, buff: BytesMut::new(), limit: 262_144, checked_buff: true, encoding: UTF_8, err: Some(err), } } } impl Stream for Readlines where T: HttpMessage, T::Stream: Stream, { type Item = String; type Error = ReadlinesError; fn poll(&mut self) -> Poll, Self::Error> { if let Some(err) = self.err.take() { return Err(err); } // check if there is a newline in the buffer if !self.checked_buff { let mut found: Option = None; for (ind, b) in self.buff.iter().enumerate() { if *b == b'\n' { found = Some(ind); break; } } if let Some(ind) = found { // check if line is longer than limit if ind + 1 > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = self.encoding as *const Encoding; let line = if enc == UTF_8 { str::from_utf8(&self.buff.split_to(ind + 1)) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { self.encoding .decode(&self.buff.split_to(ind + 1), DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; return Ok(Async::Ready(Some(line))); } self.checked_buff = true; } // poll req for more bytes match self.stream.poll() { Ok(Async::Ready(Some(mut bytes))) => { // check if there is a newline in bytes let mut found: Option = None; for (ind, b) in bytes.iter().enumerate() { if *b == b'\n' { found = Some(ind); break; } } if let Some(ind) = found { // check if line is longer than limit if ind + 1 > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = self.encoding as *const Encoding; let line = if enc == UTF_8 { str::from_utf8(&bytes.split_to(ind + 1)) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { self.encoding .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; // extend buffer with rest of the bytes; self.buff.extend_from_slice(&bytes); self.checked_buff = false; return Ok(Async::Ready(Some(line))); } self.buff.extend_from_slice(&bytes); Ok(Async::NotReady) } Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => { if self.buff.is_empty() { return Ok(Async::Ready(None)); } if self.buff.len() > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = self.encoding as *const Encoding; let line = if enc == UTF_8 { str::from_utf8(&self.buff) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { self.encoding .decode(&self.buff, DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; self.buff.clear(); Ok(Async::Ready(Some(line))) } Err(e) => Err(ReadlinesError::from(e)), } } } #[cfg(test)] mod tests { use super::*; use crate::test::{block_on, TestRequest}; #[test] fn test_readlines() { let mut req = TestRequest::default() .set_payload(Bytes::from_static( b"Lorem Ipsum is simply dummy text of the printing and typesetting\n\ industry. Lorem Ipsum has been the industry's standard dummy\n\ Contrary to popular belief, Lorem Ipsum is not simply random text.", )) .to_request(); let stream = match block_on(Readlines::new(&mut req).into_future()) { Ok((Some(s), stream)) => { assert_eq!( s, "Lorem Ipsum is simply dummy text of the printing and typesetting\n" ); stream } _ => unreachable!("error"), }; let stream = match block_on(stream.into_future()) { Ok((Some(s), stream)) => { assert_eq!( s, "industry. Lorem Ipsum has been the industry's standard dummy\n" ); stream } _ => unreachable!("error"), }; match block_on(stream.into_future()) { Ok((Some(s), _)) => { assert_eq!( s, "Contrary to popular belief, Lorem Ipsum is not simply random text." ); } _ => unreachable!("error"), } } }