From 6c765739d08c4e414369cc220456a70a7f0c6fb7 Mon Sep 17 00:00:00 2001 From: Dursun Akkurt Date: Wed, 13 Jun 2018 20:43:03 +0300 Subject: [PATCH] add HttpMessage::readlines() --- src/httpmessage.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/src/httpmessage.rs b/src/httpmessage.rs index a9d68d3ab..a8ae50edb 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -3,7 +3,7 @@ use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; use encoding::types::{DecoderTrap, Encoding}; use encoding::EncodingRef; -use futures::{Future, Poll, Stream}; +use futures::{Future, Poll, Stream, Async}; use http::{header, HeaderMap}; use http_range::HttpRange; use mime::Mime; @@ -260,6 +260,101 @@ pub trait HttpMessage { let boundary = Multipart::boundary(self.headers()); Multipart::new(boundary, self) } + + /// Return stream of lines. + fn readlines(self) -> Readlines + where + Self: Stream + Sized, + { + Readlines::new(self) + } +} + +/// Stream to read request line by line. +pub struct Readlines +where + T: HttpMessage + Stream + 'static, +{ + req: T, + buff: Vec, + limit: usize, +} + +impl Readlines +where + T: HttpMessage + Stream + 'static, +{ + /// Create a new stream to read request line by line. + fn new(req: T) -> Self { + Readlines { + req, + buff: Vec::with_capacity(256), + limit: 262_144, + } + } + + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self + } +} + +impl Stream for Readlines +where + T: HttpMessage + Stream + 'static, +{ + type Item = String; + type Error = ReadlinesError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.req.poll() { + Ok(Async::Ready(Some(bytes))) => { + for b in bytes.iter() { + if *b == '\n' as u8 { + self.buff.push(*b); + let line = str::from_utf8(&*self.buff)?.to_owned(); + self.buff.clear(); + return Ok(Async::Ready(Some(line))); + } else { + self.buff.push(*b); + } + if self.limit < self.buff.len() { + return Err(ReadlinesError::LimitOverflow); + } + } + Ok(Async::NotReady) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.buff.len() == 0 { + return Ok(Async::Ready(None)); + } + let line = str::from_utf8(&*self.buff)?.to_owned(); + self.buff.clear(); + return Ok(Async::Ready(Some(line))) + }, + Err(e) => Err(ReadlinesError::from(e)), + } + } +} + +pub enum ReadlinesError { + EncodingError, + PayloadError(PayloadError), + LimitOverflow, +} + +impl From for ReadlinesError { + fn from(err: PayloadError) -> Self { + ReadlinesError::PayloadError(err) + } +} + +impl From for ReadlinesError { + fn from(_: str::Utf8Error) -> Self { + ReadlinesError::EncodingError + } } /// Future that resolves to a complete http message body.