From e84c95968fe89a47475309e3fad5e7f9c7b1a1cd Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 28 Mar 2019 05:34:33 -0700 Subject: [PATCH] reuse PayloadBuffer from actix-http --- actix-http/src/error.rs | 12 +- actix-http/src/h1/payload.rs | 9 -- src/types/multipart.rs | 288 +---------------------------------- 3 files changed, 12 insertions(+), 297 deletions(-) diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index a026fe9d..4329970d 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -254,7 +254,10 @@ impl From for ParseError { /// A set of errors that can occur during payload parsing pub enum PayloadError { /// A payload reached EOF, but is not complete. - #[display(fmt = "A payload reached EOF, but is not complete.")] + #[display( + fmt = "A payload reached EOF, but is not complete. With error: {:?}", + _0 + )] Incomplete(Option), /// Content encoding stream corruption #[display(fmt = "Can not decode content-encoding.")] @@ -909,13 +912,12 @@ mod tests { fn test_payload_error() { let err: PayloadError = io::Error::new(io::ErrorKind::Other, "ParseError").into(); - assert_eq!(format!("{}", err), "ParseError"); - assert_eq!(format!("{}", err.cause().unwrap()), "ParseError"); + assert!(format!("{}", err).contains("ParseError")); - let err = PayloadError::Incomplete; + let err = PayloadError::Incomplete(None); assert_eq!( format!("{}", err), - "A payload reached EOF, but is not complete." + "A payload reached EOF, but is not complete. With error: None" ); } diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index 979dd015..73d05c4b 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -502,15 +502,6 @@ mod tests { use actix_rt::Runtime; use futures::future::{lazy, result}; - #[test] - fn test_error() { - let err = PayloadError::Incomplete(None); - assert_eq!( - format!("{}", err), - "A payload reached EOF, but is not complete." - ); - } - #[test] fn test_basic() { Runtime::new() diff --git a/src/types/multipart.rs b/src/types/multipart.rs index d66053ff..50ef3813 100644 --- a/src/types/multipart.rs +++ b/src/types/multipart.rs @@ -1,11 +1,10 @@ //! Multipart payload support use std::cell::{RefCell, UnsafeCell}; -use std::collections::VecDeque; use std::marker::PhantomData; use std::rc::Rc; use std::{cmp, fmt}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use futures::task::{current as current_task, Task}; use futures::{Async, Poll, Stream}; use httparse; @@ -22,6 +21,9 @@ use crate::HttpMessage; const MAX_HEADERS: usize = 32; +type PayloadBuffer = + actix_http::h1::PayloadBuffer>>; + /// The server-side implementation of `multipart/form-data` requests. /// /// This will parse the incoming stream into `MultipartItem` instances via its @@ -125,7 +127,7 @@ impl Multipart { safety: Safety::new(), inner: Some(Rc::new(RefCell::new(InnerMultipart { boundary, - payload: PayloadRef::new(PayloadBuffer::new(stream)), + payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))), state: InnerState::FirstBoundary, item: InnerMultipartItem::None, }))), @@ -712,157 +714,6 @@ impl Drop for Safety { } } -/// Payload buffer -pub struct PayloadBuffer { - len: usize, - items: VecDeque, - stream: Box>, -} - -impl PayloadBuffer { - /// Create new `PayloadBuffer` instance - pub fn new(stream: S) -> Self - where - S: Stream + 'static, - { - PayloadBuffer { - len: 0, - items: VecDeque::new(), - stream: Box::new(stream), - } - } - - #[inline] - fn poll_stream(&mut self) -> Poll { - self.stream.poll().map(|res| match res { - Async::Ready(Some(data)) => { - self.len += data.len(); - self.items.push_back(data); - Async::Ready(true) - } - Async::Ready(None) => Async::Ready(false), - Async::NotReady => Async::NotReady, - }) - } - - /// Read first available chunk of bytes - #[inline] - pub fn readany(&mut self) -> Poll, PayloadError> { - if let Some(data) = self.items.pop_front() { - self.len -= data.len(); - Ok(Async::Ready(Some(data))) - } else { - match self.poll_stream()? { - Async::Ready(true) => self.readany(), - Async::Ready(false) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } - } - } - - /// Read exact number of bytes - #[inline] - pub fn read_exact(&mut self, size: usize) -> Poll, PayloadError> { - if size <= self.len { - self.len -= size; - let mut chunk = self.items.pop_front().unwrap(); - if size < chunk.len() { - let buf = chunk.split_to(size); - self.items.push_front(chunk); - Ok(Async::Ready(Some(buf))) - } else if size == chunk.len() { - Ok(Async::Ready(Some(chunk))) - } else { - let mut buf = BytesMut::with_capacity(size); - buf.extend_from_slice(&chunk); - - while buf.len() < size { - let mut chunk = self.items.pop_front().unwrap(); - let rem = cmp::min(size - buf.len(), chunk.len()); - buf.extend_from_slice(&chunk.split_to(rem)); - if !chunk.is_empty() { - self.items.push_front(chunk); - } - } - Ok(Async::Ready(Some(buf.freeze()))) - } - } else { - match self.poll_stream()? { - Async::Ready(true) => self.read_exact(size), - Async::Ready(false) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } - } - } - - /// Read until specified ending - pub fn read_until(&mut self, line: &[u8]) -> Poll, PayloadError> { - let mut idx = 0; - let mut num = 0; - let mut offset = 0; - let mut found = false; - let mut length = 0; - - for no in 0..self.items.len() { - { - let chunk = &self.items[no]; - for (pos, ch) in chunk.iter().enumerate() { - if *ch == line[idx] { - idx += 1; - if idx == line.len() { - num = no; - offset = pos + 1; - length += pos + 1; - found = true; - break; - } - } else { - idx = 0 - } - } - if !found { - length += chunk.len() - } - } - - if found { - let mut buf = BytesMut::with_capacity(length); - if num > 0 { - for _ in 0..num { - buf.extend_from_slice(&self.items.pop_front().unwrap()); - } - } - if offset > 0 { - let mut chunk = self.items.pop_front().unwrap(); - buf.extend_from_slice(&chunk.split_to(offset)); - if !chunk.is_empty() { - self.items.push_front(chunk) - } - } - self.len -= length; - return Ok(Async::Ready(Some(buf.freeze()))); - } - } - - match self.poll_stream()? { - Async::Ready(true) => self.read_until(line), - Async::Ready(false) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } - } - - /// Read bytes until new line delimiter - pub fn readline(&mut self) -> Poll, PayloadError> { - self.read_until(b"\n") - } - - /// Put unprocessed data back to the buffer - pub fn unprocessed(&mut self, data: Bytes) { - self.len += data.len(); - self.items.push_front(data); - } -} - #[cfg(test)] mod tests { use bytes::Bytes; @@ -1005,133 +856,4 @@ mod tests { } }); } - - #[test] - fn test_basic() { - run_on(|| { - let (_sender, payload) = create_stream(); - { - let mut payload = PayloadBuffer::new(payload); - assert_eq!(payload.len, 0); - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); - } - }); - } - - #[test] - fn test_eof() { - run_on(|| { - let (sender, payload) = create_stream(); - let mut payload = PayloadBuffer::new(payload); - - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); - sender.unbounded_send(Ok(Bytes::from("data"))).unwrap(); - drop(sender); - - assert_eq!( - Async::Ready(Some(Bytes::from("data"))), - payload.readany().ok().unwrap() - ); - assert_eq!(payload.len, 0); - assert_eq!(Async::Ready(None), payload.readany().ok().unwrap()); - }); - } - - #[test] - fn test_err() { - run_on(|| { - let (sender, payload) = create_stream(); - let mut payload = PayloadBuffer::new(payload); - - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); - - sender - .unbounded_send(Err(PayloadError::Incomplete(None))) - .unwrap(); - payload.readany().err().unwrap(); - }); - } - - #[test] - fn test_readany() { - run_on(|| { - let (sender, payload) = create_stream(); - let mut payload = PayloadBuffer::new(payload); - - sender.unbounded_send(Ok(Bytes::from("line1"))).unwrap(); - sender.unbounded_send(Ok(Bytes::from("line2"))).unwrap(); - - assert_eq!( - Async::Ready(Some(Bytes::from("line1"))), - payload.readany().ok().unwrap() - ); - assert_eq!(payload.len, 0); - - assert_eq!( - Async::Ready(Some(Bytes::from("line2"))), - payload.readany().ok().unwrap() - ); - assert_eq!(payload.len, 0); - }); - } - - #[test] - fn test_readexactly() { - run_on(|| { - let (sender, payload) = create_stream(); - let mut payload = PayloadBuffer::new(payload); - - assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap()); - - sender.unbounded_send(Ok(Bytes::from("line1"))).unwrap(); - sender.unbounded_send(Ok(Bytes::from("line2"))).unwrap(); - - assert_eq!( - Async::Ready(Some(Bytes::from_static(b"li"))), - payload.read_exact(2).ok().unwrap() - ); - assert_eq!(payload.len, 3); - - assert_eq!( - Async::Ready(Some(Bytes::from_static(b"ne1l"))), - payload.read_exact(4).ok().unwrap() - ); - assert_eq!(payload.len, 4); - - sender - .unbounded_send(Err(PayloadError::Incomplete(None))) - .unwrap(); - payload.read_exact(10).err().unwrap(); - }); - } - - #[test] - fn test_readuntil() { - run_on(|| { - let (sender, payload) = create_stream(); - let mut payload = PayloadBuffer::new(payload); - - assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap()); - - sender.unbounded_send(Ok(Bytes::from("line1"))).unwrap(); - sender.unbounded_send(Ok(Bytes::from("line2"))).unwrap(); - - assert_eq!( - Async::Ready(Some(Bytes::from("line"))), - payload.read_until(b"ne").ok().unwrap() - ); - assert_eq!(payload.len, 1); - - assert_eq!( - Async::Ready(Some(Bytes::from("1line2"))), - payload.read_until(b"2").ok().unwrap() - ); - assert_eq!(payload.len, 0); - - sender - .unbounded_send(Err(PayloadError::Incomplete(None))) - .unwrap(); - payload.read_until(b"b").err().unwrap(); - }); - } }