From 24804250a8d1ae7e843132bbff1784861c0f736e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 19 Oct 2017 16:22:21 -0700 Subject: [PATCH] update multipart impl --- Cargo.toml | 6 +- examples/multipart/Cargo.toml | 3 +- examples/multipart/client.py | 18 +- examples/multipart/src/main.rs | 66 +++--- src/error.rs | 9 + src/httprequest.rs | 2 +- src/multipart.rs | 365 +++++++++++++++++++++++---------- src/payload.rs | 27 +++ 8 files changed, 353 insertions(+), 143 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a143b7d7f..bd5c80adf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,9 +49,9 @@ tokio-io = "0.1" tokio-proto = "0.1" [dependencies.actix] -#path = "../actix" -#git = "https://github.com/fafhrd91/actix.git" -version = "0.2" +# path = "../actix" +git = "https://github.com/fafhrd91/actix.git" +#version = "0.2" default-features = false features = [] diff --git a/examples/multipart/Cargo.toml b/examples/multipart/Cargo.toml index 596e42043..8dc69edf7 100644 --- a/examples/multipart/Cargo.toml +++ b/examples/multipart/Cargo.toml @@ -9,5 +9,6 @@ path = "src/main.rs" [dependencies] env_logger = "*" -actix = "0.2" +# actix = "0.2" +actix = { git = "https://github.com/fafhrd91/actix.git" } actix-web = { path = "../../" } diff --git a/examples/multipart/client.py b/examples/multipart/client.py index 698f291bd..35f97c1a6 100644 --- a/examples/multipart/client.py +++ b/examples/multipart/client.py @@ -2,7 +2,7 @@ import asyncio import aiohttp -def client(): +def req1(): with aiohttp.MultipartWriter() as writer: writer.append('test') writer.append_json({'passed': True}) @@ -14,5 +14,19 @@ def client(): assert 200 == resp.status +def req2(): + with aiohttp.MultipartWriter() as writer: + writer.append('test') + writer.append_json({'passed': True}) + writer.append(open('src/main.rs')) + + resp = yield from aiohttp.request( + "post", 'http://localhost:8080/multipart', + data=writer, headers=writer.headers) + print(resp) + assert 200 == resp.status + + loop = asyncio.get_event_loop() -loop.run_until_complete(client()) +loop.run_until_complete(req1()) +loop.run_until_complete(req2()) diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs index c2041b1ac..ea983aac4 100644 --- a/examples/multipart/src/main.rs +++ b/examples/multipart/src/main.rs @@ -1,3 +1,4 @@ +#![allow(unused_variables)] extern crate actix; extern crate actix_web; extern crate env_logger; @@ -16,39 +17,44 @@ impl Route for MyRoute { fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext) -> Reply { println!("{:?}", req); - match req.multipart(payload) { - Ok(multipart) => { - ctx.add_stream(multipart); - Reply::async(MyRoute) - }, - // can not read multipart - Err(_) => { - Reply::reply(httpcodes::HTTPBadRequest) - } - } - } -} -impl ResponseType for MyRoute { - type Item = (); - type Error = (); -} + // get Multipart stream + WrapStream::::actstream(req.multipart(payload)?) + .and_then(|item, act, ctx| { + // Multipart stream is a string of Fields and nested Multiparts + match item { + multipart::MultipartItem::Field(field) => { + println!("==== FIELD ==== {:?}", field); -impl StreamHandler for MyRoute { - fn finished(&mut self, ctx: &mut Self::Context) { - println!("FINISHED"); - ctx.start(httpcodes::HTTPOk); - ctx.write_eof(); - } -} + // Read field's stream + fut::Either::A( + field.actstream() + .map(|chunk, act, ctx| { + println!( + "-- CHUNK: \n{}", + std::str::from_utf8(&chunk.0).unwrap()); + }) + .finish()) + }, + multipart::MultipartItem::Nested(mp) => { + // Do nothing for nested multipart stream + fut::Either::B(fut::ok(())) + } + } + }) + // wait until stream finish + .finish() + .map_err(|e, act, ctx| { + ctx.start(httpcodes::HTTPBadRequest); + ctx.write_eof(); + }) + .map(|_, act, ctx| { + ctx.start(httpcodes::HTTPOk); + ctx.write_eof(); + }) + .spawn(ctx); -impl Handler for MyRoute { - fn handle(&mut self, msg: multipart::MultipartItem, ctx: &mut HttpContext) - -> Response - { - println!("==== FIELD ==== {:?}", msg); - //if let Some(req) = self.req.take() { - Self::empty() + Reply::async(MyRoute) } } diff --git a/src/error.rs b/src/error.rs index e24832ff1..34497a8af 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,6 +10,7 @@ use httparse; use http::{StatusCode, Error as HttpError}; use HttpRangeParseError; +use multipart::MultipartError; use httpresponse::{Body, HttpResponse}; @@ -131,6 +132,14 @@ impl From for HttpResponse { } } +/// Return `BadRequest` for `MultipartError` +impl From for HttpResponse { + fn from(err: MultipartError) -> Self { + HttpResponse::new(StatusCode::BAD_REQUEST, + Body::Binary(err.description().into())) + } +} + /// Return `BadRequest` for `HttpRangeParseError` impl From for HttpResponse { fn from(_: HttpRangeParseError) -> Self { diff --git a/src/httprequest.rs b/src/httprequest.rs index 95551b6b5..45bdd77bf 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -178,7 +178,7 @@ impl HttpRequest { /// /// Content-type: multipart/form-data; pub fn multipart(&self, payload: Payload) -> Result { - Multipart::new(self, payload) + Ok(Multipart::new(Multipart::boundary(self)?, payload)) } /// Parse `application/x-www-form-urlencoded` encoded body. diff --git a/src/multipart.rs b/src/multipart.rs index 82ed25716..970841273 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -2,6 +2,7 @@ use std::{cmp, fmt}; use std::rc::Rc; use std::cell::RefCell; +use std::error::Error; use std::marker::PhantomData; use mime; @@ -12,14 +13,68 @@ use http::header::{self, HeaderMap, HeaderName, HeaderValue}; use futures::{Async, Stream, Poll}; use futures::task::{Task, current as current_task}; +use error::ParseError; use payload::{Payload, PayloadError}; use httprequest::HttpRequest; const MAX_HEADERS: usize = 32; +/// A set of errors that can occur during parsing multipart streams. #[derive(Debug)] -pub struct MultipartError { - pub payload: Payload, +pub enum MultipartError { + /// Content-Type header is not found + NoContentType, + /// Can not parse Content-Type header + ParseContentType, + /// Multipart boundary is not found + Boundary, + /// Error during field parsing + Parse(ParseError), + /// Payload error + Payload(PayloadError), +} + +impl fmt::Display for MultipartError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + MultipartError::Parse(ref e) => fmt::Display::fmt(e, f), + MultipartError::Payload(ref e) => fmt::Display::fmt(e, f), + ref e => f.write_str(e.description()), + } + } +} + +impl Error for MultipartError { + fn description(&self) -> &str { + match *self { + MultipartError::NoContentType => "No Content-type header found", + MultipartError::ParseContentType => "Can not parse Content-Type header", + MultipartError::Boundary => "Multipart boundary is not found", + MultipartError::Parse(ref e) => e.description(), + MultipartError::Payload(ref e) => e.description(), + } + } + + fn cause(&self) -> Option<&Error> { + match *self { + MultipartError::Parse(ref error) => Some(error), + MultipartError::Payload(ref error) => Some(error), + _ => None, + } + } +} + + +impl From for MultipartError { + fn from(err: ParseError) -> MultipartError { + MultipartError::Parse(err) + } +} + +impl From for MultipartError { + fn from(err: PayloadError) -> MultipartError { + MultipartError::Payload(err) + } } /// The server-side implementation of `multipart/form-data` requests. @@ -31,51 +86,95 @@ pub struct MultipartError { #[derive(Debug)] pub struct Multipart { safety: Safety, - payload: PayloadRef, - boundary: String, - eof: bool, - bof: bool, - item: InnerMultipartItem, + inner: Rc>, } #[derive(Debug)] pub enum MultipartItem { - // Multipart field + /// Multipart field Field(Field), - // Nested multipart item - Multipart(Multipart), + /// Nested multipart stream + Nested(Multipart), } #[derive(Debug)] enum InnerMultipartItem { None, Field(Rc>), - // Nested multipart item - // Multipart(Multipart), + Multipart(Rc>), +} + +#[derive(PartialEq, Debug)] +enum InnerState { + /// Stream eof + Eof, + /// Skip data until first boundary + FirstBoundary, + /// Reading boundary + Boundary, + /// Reading Headers, + Headers, +} + +#[derive(Debug)] +struct InnerMultipart { + payload: PayloadRef, + boundary: String, + state: InnerState, + item: InnerMultipartItem, } impl Multipart { - pub fn new(req: &HttpRequest, payload: Payload) -> Result { + pub fn new(boundary: String, payload: Payload) -> Multipart { + Multipart { + safety: Safety::new(), + inner: Rc::new(RefCell::new( + InnerMultipart { + payload: PayloadRef::new(payload), + boundary: boundary, + state: InnerState::FirstBoundary, + item: InnerMultipartItem::None, + })) + } + } + + pub fn boundary(req: &HttpRequest) -> Result { if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) { if let Ok(content_type) = content_type.to_str() { if let Ok(ct) = content_type.parse::() { if let Some(boundary) = ct.get_param(mime::BOUNDARY) { - return Ok(Multipart { - safety: Safety::new(), - payload: PayloadRef::new(payload), - boundary: boundary.as_str().to_owned(), - eof: false, - bof: true, - item: InnerMultipartItem::None, - }) + Ok(boundary.as_str().to_owned()) + } else { + Err(MultipartError::Boundary) } + } else { + Err(MultipartError::ParseContentType) } + } else { + Err(MultipartError::ParseContentType) } + } else { + Err(MultipartError::NoContentType) } - Err(MultipartError{payload: payload}) } +} - fn read_headers(payload: &mut Payload) -> Poll +impl Stream for Multipart { + type Item = MultipartItem; + type Error = MultipartError; + + fn poll(&mut self) -> Poll, Self::Error> { + if self.safety.current() { + self.inner.borrow_mut().poll(&self.safety) + } else { + Ok(Async::NotReady) + } + } +} + +impl InnerMultipart { + + fn read_headers(payload: &mut Payload) -> Poll { match payload.readuntil(b"\r\n\r\n")? { Async::NotReady => Ok(Async::NotReady), @@ -90,21 +189,22 @@ impl Multipart { if let Ok(value) = HeaderValue::try_from(h.value) { headers.append(name, value); } else { - return Err(PayloadError::Incomplete) + return Err(ParseError::Header.into()) } } else { - return Err(PayloadError::Incomplete) + return Err(ParseError::Header.into()) } } Ok(Async::Ready(headers)) } - Ok(httparse::Status::Partial) | Err(_) => Err(PayloadError::Incomplete), + Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), + Err(err) => Err(ParseError::from(err).into()), } } } } - fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll + fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll { // TODO: need to read epilogue match payload.readline()? { @@ -122,13 +222,13 @@ impl Multipart { { Ok(Async::Ready(true)) } else { - Err(PayloadError::Incomplete) + Err(MultipartError::Boundary) } } } } - fn skip_until_boundary(payload: &mut Payload, boundary: &str) -> Poll + fn skip_until_boundary(payload: &mut Payload, boundary: &str) -> Poll { let mut eof = false; loop { @@ -154,91 +254,147 @@ impl Multipart { } Ok(Async::Ready(eof)) } -} -impl Drop for Multipart { - fn drop(&mut self) { - // InnerMultipartItem::Field has to be dropped first because of Safety. - self.item = InnerMultipartItem::None; - } -} - -impl Stream for Multipart { - type Item = MultipartItem; - type Error = PayloadError; - - fn poll(&mut self) -> Poll, Self::Error> { - if self.eof { + fn poll(&mut self, safety: &Safety) -> Poll, MultipartError> { + if self.state == InnerState::Eof { Ok(Async::Ready(None)) } else { // release field loop { - let stop = match self.item { - InnerMultipartItem::Field(ref mut field) => { - match field.borrow_mut().poll(&self.safety)? { - Async::NotReady => - return Ok(Async::NotReady), - Async::Ready(Some(_)) => - continue, - Async::Ready(None) => - true, + // Nested multipart streams of fields has to be consumed + // before switching to next + if safety.current() { + let stop = match self.item { + InnerMultipartItem::Field(ref mut field) => { + match field.borrow_mut().poll(safety)? { + Async::NotReady => + return Ok(Async::NotReady), + Async::Ready(Some(_)) => + continue, + Async::Ready(None) => + true, + } } + InnerMultipartItem::Multipart(ref mut multipart) => { + match multipart.borrow_mut().poll(safety)? { + Async::NotReady => + return Ok(Async::NotReady), + Async::Ready(Some(_)) => + continue, + Async::Ready(None) => + true, + } + } + _ => false, + }; + if stop { + self.item = InnerMultipartItem::None; + } + if let InnerMultipartItem::None = self.item { + break; } - _ => false, - }; - if stop { - self.item = InnerMultipartItem::None; - } - if let InnerMultipartItem::None = self.item { - break; } } - let headers = if let Some(payload) = self.payload.get_mut(&self.safety) { - // read until first boundary - if self.bof { - if let Async::Ready(eof) = - Multipart::skip_until_boundary(payload, &self.boundary)? - { - self.eof = eof; + let headers = if let Some(payload) = self.payload.get_mut(safety) { + match self.state { + // read until first boundary + InnerState::FirstBoundary => { + if let Async::Ready(eof) = + InnerMultipart::skip_until_boundary(payload, &self.boundary)? + { + if eof { + self.state = InnerState::Eof; + return Ok(Async::Ready(None)); + } else { + self.state = InnerState::Headers; + } + } else { + return Ok(Async::NotReady) + } + } + // read boundary + InnerState::Boundary => { + match InnerMultipart::read_boundary(payload, &self.boundary)? { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(eof) => { + if eof { + self.state = InnerState::Eof; + return Ok(Async::Ready(None)); + } else { + self.state = InnerState::Headers; + } + } + } + } + _ => (), + } + + // read field headers for next field + if self.state == InnerState::Headers { + if let Async::Ready(headers) = InnerMultipart::read_headers(payload)? { + self.state = InnerState::Boundary; + headers } else { return Ok(Async::NotReady) } - self.bof = false; } else { - // read boundary - match Multipart::read_boundary(payload, &self.boundary)? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(eof) => self.eof = eof, - } - } - - if self.eof { - return Ok(Async::Ready(None)) - } - - // read field headers - if let Async::Ready(headers) = Multipart::read_headers(payload)? { - headers - } else { - return Ok(Async::NotReady) + unreachable!() } } else { debug!("NotReady: field is in flight"); return Ok(Async::NotReady) }; - // - let field = Rc::new(RefCell::new(InnerField::new( - self.payload.clone(), self.boundary.clone(), &headers)?)); - self.item = InnerMultipartItem::Field(Rc::clone(&field)); + // content type + let mut mt = mime::APPLICATION_OCTET_STREAM; + if let Some(content_type) = headers.get(header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + if let Ok(ct) = content_type.parse::() { + mt = ct; + } + } + } - Ok(Async::Ready(Some( - MultipartItem::Field(Field::new(self.safety.clone(), headers, field))))) + // nested multipart stream + if mt.type_() == mime::MULTIPART { + let inner = if let Some(boundary) = mt.get_param(mime::BOUNDARY) { + Rc::new(RefCell::new( + InnerMultipart { + payload: self.payload.clone(), + boundary: boundary.as_str().to_owned(), + state: InnerState::FirstBoundary, + item: InnerMultipartItem::None, + })) + } else { + return Err(MultipartError::Boundary) + }; + + self.item = InnerMultipartItem::Multipart(Rc::clone(&inner)); + + Ok(Async::Ready(Some( + MultipartItem::Nested( + Multipart{safety: safety.clone(), inner: inner})))) + } else { + let field = Rc::new(RefCell::new(InnerField::new( + self.payload.clone(), self.boundary.clone(), &headers)?)); + self.item = InnerMultipartItem::Field(Rc::clone(&field)); + + Ok(Async::Ready(Some( + MultipartItem::Field( + Field::new(safety.clone(), headers, mt, field))))) + } } } } +impl Drop for InnerMultipart { + fn drop(&mut self) { + // InnerMultipartItem::Field has to be dropped first because of Safety. + self.item = InnerMultipartItem::None; + } +} + /// A single field in a multipart stream pub struct Field { ct: mime::Mime, @@ -247,19 +403,16 @@ pub struct Field { safety: Safety, } +/// A field's chunk +#[derive(PartialEq, Debug)] +pub struct FieldChunk(pub Bytes); + impl Field { - fn new(safety: Safety, headers: HeaderMap, inner: Rc>) -> Self { - let mut mt = mime::APPLICATION_OCTET_STREAM; - if let Some(content_type) = headers.get(header::CONTENT_TYPE) { - if let Ok(content_type) = content_type.to_str() { - if let Ok(ct) = content_type.parse::() { - mt = ct; - } - } - } + fn new(safety: Safety, headers: HeaderMap, + ct: mime::Mime, inner: Rc>) -> Self { Field { - ct: mt, + ct: ct, headers: headers, inner: inner, safety: safety, @@ -276,8 +429,8 @@ impl Field { } impl Stream for Field { - type Item = Bytes; - type Error = PayloadError; + type Item = FieldChunk; + type Error = MultipartError; fn poll(&mut self) -> Poll, Self::Error> { if self.safety.current() { @@ -341,7 +494,7 @@ impl InnerField { /// Reads body part content chunk of the specified size. /// The body part must has `Content-Length` header with proper value. - fn read_len(payload: &mut Payload, size: &mut u64) -> Poll, PayloadError> + fn read_len(payload: &mut Payload, size: &mut u64) -> Poll, MultipartError> { if *size == 0 { Ok(Async::Ready(None)) @@ -358,14 +511,14 @@ impl InnerField { } Ok(Async::Ready(Some(ch))) }, - Async::Ready(Some(Err(err))) => Err(err) + Async::Ready(Some(Err(err))) => Err(err.into()) } } } /// Reads content chunk of body part with unknown length. /// The `Content-Length` header for body part is not necessary. - fn read_stream(payload: &mut Payload, boundary: &str) -> Poll, PayloadError> + fn read_stream(payload: &mut Payload, boundary: &str) -> Poll, MultipartError> { match payload.readuntil(b"\r")? { Async::NotReady => Ok(Async::NotReady), @@ -395,7 +548,7 @@ impl InnerField { } } - fn poll(&mut self, s: &Safety) -> Poll, PayloadError> { + fn poll(&mut self, s: &Safety) -> Poll, MultipartError> { if self.payload.is_none() { return Ok(Async::Ready(None)) } @@ -427,7 +580,7 @@ impl InnerField { match res { Async::NotReady => Async::NotReady, - Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), + Async::Ready(Some(bytes)) => Async::Ready(Some(FieldChunk(bytes))), Async::Ready(None) => { self.eof = true; match payload.readline()? { diff --git a/src/payload.rs b/src/payload.rs index 2ea9bddb2..f8f8ac1f4 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -1,7 +1,9 @@ +use std::fmt; use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::convert::From; use std::collections::VecDeque; +use std::error::Error; use std::io::Error as IoError; use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; @@ -21,6 +23,31 @@ pub enum PayloadError { ParseError(IoError), } +impl fmt::Display for PayloadError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + PayloadError::ParseError(ref e) => fmt::Display::fmt(e, f), + ref e => f.write_str(e.description()), + } + } +} + +impl Error for PayloadError { + fn description(&self) -> &str { + match *self { + PayloadError::Incomplete => "A payload reached EOF, but is not complete.", + PayloadError::ParseError(ref e) => e.description(), + } + } + + fn cause(&self) -> Option<&Error> { + match *self { + PayloadError::ParseError(ref error) => Some(error), + _ => None, + } + } +} + impl From for PayloadError { fn from(err: IoError) -> PayloadError { PayloadError::ParseError(err)