From 0a3b776aa7bd3789fb2f61c2a1d504d6cba52e2a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 26 Feb 2018 05:55:07 +0300 Subject: [PATCH] refactor multipart stream --- examples/multipart/src/main.rs | 2 +- src/error.rs | 3 + src/httprequest.rs | 5 +- src/json.rs | 12 +- src/multipart.rs | 315 ++++++++++++++------------------- src/payload.rs | 142 +++++++++++++++ 6 files changed, 293 insertions(+), 186 deletions(-) diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs index 7da6145a9..343dde167 100644 --- a/examples/multipart/src/main.rs +++ b/examples/multipart/src/main.rs @@ -11,7 +11,7 @@ use futures::{Future, Stream}; use futures::future::{result, Either}; -fn index(mut req: HttpRequest) -> Box> +fn index(req: HttpRequest) -> Box> { println!("{:?}", req); diff --git a/src/error.rs b/src/error.rs index b63206ff1..458850a0f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -294,6 +294,9 @@ pub enum MultipartError { /// Multipart boundary is not found #[fail(display="Multipart boundary is not found")] Boundary, + /// Multipart stream is incomplete + #[fail(display="Multipart stream is incomplete")] + Incomplete, /// Error during field parsing #[fail(display="{}", _0)] Parse(#[cause] ParseError), diff --git a/src/httprequest.rs b/src/httprequest.rs index 0d721c87f..3ca4fdf96 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -520,8 +520,9 @@ impl HttpRequest { /// } /// # fn main() {} /// ``` - pub fn multipart(self) -> Multipart { - Multipart::from_request(self) + pub fn multipart(self) -> Multipart> { + let boundary = Multipart::boundary(self.headers()); + Multipart::new(boundary, self) } /// Parse `application/x-www-form-urlencoded` encoded body. diff --git a/src/json.rs b/src/json.rs index 7f13db848..341bc32dd 100644 --- a/src/json.rs +++ b/src/json.rs @@ -188,27 +188,31 @@ mod tests { #[test] fn test_json_body() { - let mut req = HttpRequest::default(); + let req = HttpRequest::default(); let mut json = req.json::(); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); - let mut json = req.json::().content_type("text/json"); + let mut req = HttpRequest::default(); req.headers_mut().insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json")); + let mut json = req.json::().content_type("text/json"); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); - let mut json = req.json::().limit(100); + let mut req = HttpRequest::default(); req.headers_mut().insert(header::CONTENT_TYPE, header::HeaderValue::from_static("application/json")); req.headers_mut().insert(header::CONTENT_LENGTH, header::HeaderValue::from_static("10000")); + let mut json = req.json::().limit(100); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::Overflow); + let mut req = HttpRequest::default(); + req.headers_mut().insert(header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json")); req.headers_mut().insert(header::CONTENT_LENGTH, header::HeaderValue::from_static("16")); req.payload_mut().unread_data(Bytes::from_static(b"{\"name\": \"test\"}")); let mut json = req.json::(); assert_eq!(json.poll().ok().unwrap(), Async::Ready(MyObject{name: "test".to_owned()})); } - } diff --git a/src/multipart.rs b/src/multipart.rs index f97ccf3db..d3790a9a4 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -9,12 +9,11 @@ use httparse; use bytes::Bytes; use http::HttpTryFrom; use http::header::{self, HeaderMap, HeaderName, HeaderValue}; -use futures::{Async, Future, Stream, Poll}; +use futures::{Async, Stream, Poll}; use futures::task::{Task, current as current_task}; use error::{ParseError, PayloadError, MultipartError}; -use payload::Payload; -use httprequest::HttpRequest; +use payload::PayloadHelper; const MAX_HEADERS: usize = 32; @@ -24,27 +23,24 @@ const MAX_HEADERS: usize = 32; /// Stream implementation. /// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart` /// is used for nested multipart streams. -#[derive(Debug)] -pub struct Multipart { +pub struct Multipart { safety: Safety, error: Option, - inner: Option>>, + inner: Option>>>, } /// -#[derive(Debug)] -pub enum MultipartItem { +pub enum MultipartItem { /// Multipart field - Field(Field), + Field(Field), /// Nested multipart stream - Nested(Multipart), + Nested(Multipart), } -#[derive(Debug)] -enum InnerMultipartItem { +enum InnerMultipartItem { None, - Field(Rc>), - Multipart(Rc>), + Field(Rc>>), + Multipart(Rc>>), } #[derive(PartialEq, Debug)] @@ -59,57 +55,14 @@ enum InnerState { Headers, } -#[derive(Debug)] -struct InnerMultipart { - payload: PayloadRef, +struct InnerMultipart { + payload: PayloadRef, boundary: String, state: InnerState, - item: InnerMultipartItem, + item: InnerMultipartItem, } -impl Multipart { - - /// Create multipart instance for boundary. - pub fn new(boundary: String, payload: Payload) -> Multipart { - Multipart { - error: None, - safety: Safety::new(), - inner: Some(Rc::new(RefCell::new( - InnerMultipart { - payload: PayloadRef::new(payload), - boundary: boundary, - state: InnerState::FirstBoundary, - item: InnerMultipartItem::None, - }))) - } - } - - /// Create multipart instance for request. - pub fn from_request(req: HttpRequest) -> Multipart { - match Multipart::boundary(req.headers()) { - Ok(boundary) => Multipart::new(boundary, req.payload().clone()), - Err(err) => - Multipart { - error: Some(err), - safety: Safety::new(), - inner: None, - } - } - } - - // /// Create multipart instance for client response. - // pub fn from_response(resp: &mut ClientResponse) -> Multipart { - // match Multipart::boundary(resp.headers()) { - // Ok(boundary) => Multipart::new(boundary, resp.payload().clone()), - // Err(err) => - // Multipart { - // error: Some(err), - // safety: Safety::new(), - // inner: None, - // } - // } - // } - +impl Multipart<()> { /// Extract boundary info from headers. pub fn boundary(headers: &HeaderMap) -> Result { if let Some(content_type) = headers.get(header::CONTENT_TYPE) { @@ -132,8 +85,34 @@ impl Multipart { } } -impl Stream for Multipart { - type Item = MultipartItem; +impl Multipart where S: Stream { + + /// Create multipart instance for boundary. + pub fn new(boundary: Result, stream: S) -> Multipart { + match boundary { + Ok(boundary) => Multipart { + error: None, + safety: Safety::new(), + inner: Some(Rc::new(RefCell::new( + InnerMultipart { + payload: PayloadRef::new(PayloadHelper::new(stream)), + boundary: boundary, + state: InnerState::FirstBoundary, + item: InnerMultipartItem::None, + }))) + }, + Err(err) => + Multipart { + error: Some(err), + safety: Safety::new(), + inner: None, + } + } + } +} + +impl Stream for Multipart where S: Stream { + type Item = MultipartItem; type Error = MultipartError; fn poll(&mut self) -> Poll, Self::Error> { @@ -147,13 +126,14 @@ impl Stream for Multipart { } } -impl InnerMultipart { +impl InnerMultipart where S: Stream { - fn read_headers(payload: &mut Payload) -> Poll + fn read_headers(payload: &mut PayloadHelper) -> Poll { - match payload.readuntil(b"\r\n\r\n").poll()? { + match payload.readuntil(b"\r\n\r\n")? { Async::NotReady => Ok(Async::NotReady), - Async::Ready(bytes) => { + Async::Ready(None) => Err(MultipartError::Incomplete), + Async::Ready(Some(bytes)) => { let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; match httparse::parse_headers(&bytes, &mut hdrs) { Ok(httparse::Status::Complete((_, hdrs))) => { @@ -179,12 +159,14 @@ impl InnerMultipart { } } - fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll + fn read_boundary(payload: &mut PayloadHelper, boundary: &str) + -> Poll { // TODO: need to read epilogue - match payload.readline().poll()? { + match payload.readline()? { Async::NotReady => Ok(Async::NotReady), - Async::Ready(chunk) => { + Async::Ready(None) => Err(MultipartError::Incomplete), + Async::Ready(Some(chunk)) => { if chunk.len() == boundary.len() + 4 && &chunk[..2] == b"--" && &chunk[2..boundary.len()+2] == boundary.as_bytes() @@ -203,39 +185,42 @@ impl InnerMultipart { } } - fn skip_until_boundary(payload: &mut Payload, boundary: &str) -> Poll + fn skip_until_boundary(payload: &mut PayloadHelper, boundary: &str) + -> Poll { let mut eof = false; loop { - if let Async::Ready(chunk) = payload.readline().poll()? { - if chunk.is_empty() { - //ValueError("Could not find starting boundary %r" - //% (self._boundary)) - } - if chunk.len() < boundary.len() { - continue - } - if &chunk[..2] == b"--" && &chunk[2..chunk.len()-2] == boundary.as_bytes() { - break; - } else { - if chunk.len() < boundary.len() + 2{ + match payload.readline()? { + Async::Ready(Some(chunk)) => { + if chunk.is_empty() { + //ValueError("Could not find starting boundary %r" + //% (self._boundary)) + } + if chunk.len() < boundary.len() { continue } - let b: &[u8] = boundary.as_ref(); - if &chunk[..boundary.len()] == b && - &chunk[boundary.len()..boundary.len()+2] == b"--" { - eof = true; - break; + if &chunk[..2] == b"--" && &chunk[2..chunk.len()-2] == boundary.as_bytes() { + break; + } else { + if chunk.len() < boundary.len() + 2{ + continue } - } - } else { - return Ok(Async::NotReady) + let b: &[u8] = boundary.as_ref(); + if &chunk[..boundary.len()] == b && + &chunk[boundary.len()..boundary.len()+2] == b"--" { + eof = true; + break; + } + } + }, + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(None) => return Err(MultipartError::Incomplete), } } Ok(Async::Ready(eof)) } - fn poll(&mut self, safety: &Safety) -> Poll, MultipartError> { + fn poll(&mut self, safety: &Safety) -> Poll>, MultipartError> { if self.state == InnerState::Eof { Ok(Async::Ready(None)) } else { @@ -247,25 +232,18 @@ impl InnerMultipart { let stop = match self.item { InnerMultipartItem::Field(ref mut field) => { match field.borrow_mut().poll(safety)? { - Async::NotReady => { - return Ok(Async::NotReady) - } - Async::Ready(Some(_)) => - continue, - Async::Ready(None) => - true, + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(Some(_)) => continue, + Async::Ready(None) => true, } - } + }, InnerMultipartItem::Multipart(ref mut multipart) => { match multipart.borrow_mut().poll(safety)? { - Async::NotReady => - return Ok(Async::NotReady), - Async::Ready(Some(_)) => - continue, - Async::Ready(None) => - true, + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(Some(_)) => continue, + Async::Ready(None) => true, } - } + }, _ => false, }; if stop { @@ -281,25 +259,22 @@ impl InnerMultipart { match self.state { // read until first boundary InnerState::FirstBoundary => { - if let Async::Ready(eof) = - InnerMultipart::skip_until_boundary(payload, &self.boundary)? - { - if eof { - self.state = InnerState::Eof; - return Ok(Async::Ready(None)); - } else { - self.state = InnerState::Headers; - } - } else { - return Ok(Async::NotReady) + match InnerMultipart::skip_until_boundary(payload, &self.boundary)? { + Async::Ready(eof) => { + if eof { + self.state = InnerState::Eof; + return Ok(Async::Ready(None)); + } else { + self.state = InnerState::Headers; + } + }, + Async::NotReady => return Ok(Async::NotReady), } - } + }, // read boundary InnerState::Boundary => { match InnerMultipart::read_boundary(payload, &self.boundary)? { - Async::NotReady => { - return Ok(Async::NotReady) - } + Async::NotReady => return Ok(Async::NotReady), Async::Ready(eof) => { if eof { self.state = InnerState::Eof; @@ -375,7 +350,7 @@ impl InnerMultipart { } } -impl Drop for InnerMultipart { +impl Drop for InnerMultipart { fn drop(&mut self) { // InnerMultipartItem::Field has to be dropped first because of Safety. self.item = InnerMultipartItem::None; @@ -383,17 +358,17 @@ impl Drop for InnerMultipart { } /// A single field in a multipart stream -pub struct Field { +pub struct Field { ct: mime::Mime, headers: HeaderMap, - inner: Rc>, + inner: Rc>>, safety: Safety, } -impl Field { +impl Field where S: Stream { fn new(safety: Safety, headers: HeaderMap, - ct: mime::Mime, inner: Rc>) -> Self { + ct: mime::Mime, inner: Rc>>) -> Self { Field { ct: ct, headers: headers, @@ -411,7 +386,7 @@ impl Field { } } -impl Stream for Field { +impl Stream for Field where S: Stream { type Item = Bytes; type Error = MultipartError; @@ -424,7 +399,7 @@ impl Stream for Field { } } -impl fmt::Debug for Field { +impl fmt::Debug for Field { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let res = write!(f, "\nMultipartField: {}\n", self.ct); let _ = write!(f, " boundary: {}\n", self.inner.borrow().boundary); @@ -441,18 +416,17 @@ impl fmt::Debug for Field { } } -#[derive(Debug)] -struct InnerField { - payload: Option, +struct InnerField { + payload: Option>, boundary: String, eof: bool, length: Option, } -impl InnerField { +impl InnerField where S: Stream { - fn new(payload: PayloadRef, boundary: String, headers: &HeaderMap) - -> Result + fn new(payload: PayloadRef, boundary: String, headers: &HeaderMap) + -> Result, PayloadError> { let len = if let Some(len) = headers.get(header::CONTENT_LENGTH) { if let Ok(s) = len.to_str() { @@ -477,14 +451,15 @@ impl InnerField { /// Reads body part content chunk of the specified size. /// The body part must has `Content-Length` header with proper value. - fn read_len(payload: &mut Payload, size: &mut u64) -> Poll, MultipartError> + fn read_len(payload: &mut PayloadHelper, size: &mut u64) + -> Poll, MultipartError> { if *size == 0 { Ok(Async::Ready(None)) } else { - match payload.poll() { + match payload.readany() { Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::Ready(None)) => Err(MultipartError::Incomplete), Ok(Async::Ready(Some(mut chunk))) => { let len = cmp::min(chunk.len() as u64, *size); *size -= len; @@ -501,16 +476,19 @@ impl InnerField { /// Reads content chunk of body part with unknown length. /// The `Content-Length` header for body part is not necessary. - fn read_stream(payload: &mut Payload, boundary: &str) -> Poll, MultipartError> + fn read_stream(payload: &mut PayloadHelper, boundary: &str) + -> Poll, MultipartError> { - match payload.readuntil(b"\r").poll()? { + match payload.readuntil(b"\r")? { Async::NotReady => Ok(Async::NotReady), - Async::Ready(mut chunk) => { + Async::Ready(None) => Err(MultipartError::Incomplete), + Async::Ready(Some(mut chunk)) => { if chunk.len() == 1 { payload.unread_data(chunk); - match payload.readexactly(boundary.len() + 4).poll()? { + match payload.readexactly(boundary.len() + 4)? { Async::NotReady => Ok(Async::NotReady), - Async::Ready(chunk) => { + Async::Ready(None) => Err(MultipartError::Incomplete), + Async::Ready(Some(chunk)) => { if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" && &chunk[4..] == boundary.as_bytes() { @@ -535,24 +513,6 @@ impl InnerField { if self.payload.is_none() { return Ok(Async::Ready(None)) } - if self.eof { - if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { - match payload.readline().poll()? { - Async::NotReady => - return Ok(Async::NotReady), - Async::Ready(chunk) => { - assert_eq!( - chunk.as_ref(), b"\r\n", - "reader did not read all the data or it is malformed"); - } - } - } else { - return Ok(Async::NotReady); - } - - self.payload.take(); - return Ok(Async::Ready(None)) - } let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { let res = if let Some(ref mut len) = self.length { @@ -566,12 +526,13 @@ impl InnerField { Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), Async::Ready(None) => { self.eof = true; - match payload.readline().poll()? { + match payload.readline()? { Async::NotReady => Async::NotReady, - Async::Ready(chunk) => { - assert_eq!( - chunk.as_ref(), b"\r\n", - "reader did not read all the data or it is malformed"); + Async::Ready(None) => Async::Ready(None), + Async::Ready(Some(line)) => { + if line.as_ref() != b"\r\n" { + warn!("multipart field did not read all the data or it is malformed"); + } Async::Ready(None) } } @@ -588,25 +549,22 @@ impl InnerField { } } -#[derive(Debug)] -struct PayloadRef { - task: Option, - payload: Rc, +struct PayloadRef { + payload: Rc>, } -impl PayloadRef { - fn new(payload: Payload) -> PayloadRef { +impl PayloadRef where S: Stream { + fn new(payload: PayloadHelper) -> PayloadRef { PayloadRef { - task: None, payload: Rc::new(payload), } } - fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut Payload> + fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut PayloadHelper> where 'a: 'b { if s.current() { - let payload: &mut Payload = unsafe { + let payload: &mut PayloadHelper = unsafe { &mut *(self.payload.as_ref() as *const _ as *mut _)}; Some(payload) } else { @@ -615,10 +573,9 @@ impl PayloadRef { } } -impl Clone for PayloadRef { - fn clone(&self) -> PayloadRef { +impl Clone for PayloadRef { + fn clone(&self) -> PayloadRef { PayloadRef { - task: Some(current_task()), payload: Rc::clone(&self.payload), } } @@ -733,7 +690,7 @@ mod tests { sender.feed_data(bytes); let mut multipart = Multipart::new( - "abbc761f78ff4d7cb7573b5a23f96ef0".to_owned(), payload); + Ok("abbc761f78ff4d7cb7573b5a23f96ef0".to_owned()), payload); match multipart.poll() { Ok(Async::Ready(Some(item))) => { match item { diff --git a/src/payload.rs b/src/payload.rs index 8d5bd7206..4cc0eaf68 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -411,6 +411,148 @@ impl Inner { } } +pub struct PayloadHelper { + len: usize, + items: VecDeque, + stream: S, +} + +impl PayloadHelper where S: Stream { + + pub fn new(stream: S) -> Self { + PayloadHelper { + len: 0, + items: VecDeque::new(), + stream: stream, + } + } + + fn poll_stream(&mut self) -> Poll { + self.stream.poll().map(|res| { + match res { + Async::Ready(Some(data)) => { + self.len += data.len(); + self.items.push_back(data); + Async::Ready(true) + }, + Async::Ready(None) => Async::Ready(false), + Async::NotReady => Async::NotReady, + } + }) + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn readany(&mut self) -> Poll, PayloadError> { + if let Some(data) = self.items.pop_front() { + self.len -= data.len(); + Ok(Async::Ready(Some(data))) + } else { + match self.poll_stream()? { + Async::Ready(true) => self.readany(), + Async::Ready(false) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + } + + pub fn readexactly(&mut self, size: usize) -> Poll, PayloadError> { + if size <= self.len { + let mut buf = BytesMut::with_capacity(size); + while buf.len() < size { + let mut chunk = self.items.pop_front().unwrap(); + let rem = cmp::min(size - buf.len(), chunk.len()); + self.len -= rem; + buf.extend_from_slice(&chunk.split_to(rem)); + if !chunk.is_empty() { + self.items.push_front(chunk); + return Ok(Async::Ready(Some(buf.freeze()))) + } + } + } + + match self.poll_stream()? { + Async::Ready(true) => self.readexactly(size), + Async::Ready(false) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + + pub fn readuntil(&mut self, line: &[u8]) -> Poll, PayloadError> { + let mut idx = 0; + let mut num = 0; + let mut offset = 0; + let mut found = false; + let mut length = 0; + + for no in 0..self.items.len() { + { + let chunk = &self.items[no]; + for (pos, ch) in chunk.iter().enumerate() { + if *ch == line[idx] { + idx += 1; + if idx == line.len() { + num = no; + offset = pos+1; + length += pos+1; + found = true; + break; + } + } else { + idx = 0 + } + } + if !found { + length += chunk.len() + } + } + + if found { + let mut buf = BytesMut::with_capacity(length); + if num > 0 { + for _ in 0..num { + buf.extend_from_slice(&self.items.pop_front().unwrap()); + } + } + if offset > 0 { + let mut chunk = self.items.pop_front().unwrap(); + buf.extend_from_slice(&chunk.split_to(offset)); + if !chunk.is_empty() { + self.items.push_front(chunk) + } + } + self.len -= length; + return Ok(Async::Ready(Some(buf.freeze()))) + } + } + + match self.poll_stream()? { + Async::Ready(true) => self.readuntil(line), + Async::Ready(false) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + + pub fn readline(&mut self) -> Poll, PayloadError> { + self.readuntil(b"\n") + } + + pub fn unread_data(&mut self, data: Bytes) { + self.len += data.len(); + self.items.push_front(data); + } + + pub fn remaining(&mut self) -> Bytes { + self.items.iter_mut() + .fold(BytesMut::new(), |mut b, c| { + b.extend_from_slice(c); + b + }).freeze() + } +} + #[cfg(test)] mod tests { use super::*;