1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-25 09:59:21 +02:00

update multipart impl

This commit is contained in:
Nikolay Kim
2017-10-19 16:22:21 -07:00
parent aaef550bc5
commit 24804250a8
8 changed files with 353 additions and 143 deletions

View File

@ -10,6 +10,7 @@ use httparse;
use http::{StatusCode, Error as HttpError};
use HttpRangeParseError;
use multipart::MultipartError;
use httpresponse::{Body, HttpResponse};
@ -131,6 +132,14 @@ impl From<cookie::ParseError> for HttpResponse {
}
}
/// Return `BadRequest` for `MultipartError`
impl From<MultipartError> for HttpResponse {
fn from(err: MultipartError) -> Self {
HttpResponse::new(StatusCode::BAD_REQUEST,
Body::Binary(err.description().into()))
}
}
/// Return `BadRequest` for `HttpRangeParseError`
impl From<HttpRangeParseError> for HttpResponse {
fn from(_: HttpRangeParseError) -> Self {

View File

@ -178,7 +178,7 @@ impl HttpRequest {
///
/// Content-type: multipart/form-data;
pub fn multipart(&self, payload: Payload) -> Result<Multipart, MultipartError> {
Multipart::new(self, payload)
Ok(Multipart::new(Multipart::boundary(self)?, payload))
}
/// Parse `application/x-www-form-urlencoded` encoded body.

View File

@ -2,6 +2,7 @@
use std::{cmp, fmt};
use std::rc::Rc;
use std::cell::RefCell;
use std::error::Error;
use std::marker::PhantomData;
use mime;
@ -12,14 +13,68 @@ use http::header::{self, HeaderMap, HeaderName, HeaderValue};
use futures::{Async, Stream, Poll};
use futures::task::{Task, current as current_task};
use error::ParseError;
use payload::{Payload, PayloadError};
use httprequest::HttpRequest;
const MAX_HEADERS: usize = 32;
/// A set of errors that can occur during parsing multipart streams.
#[derive(Debug)]
pub struct MultipartError {
pub payload: Payload,
pub enum MultipartError {
/// Content-Type header is not found
NoContentType,
/// Can not parse Content-Type header
ParseContentType,
/// Multipart boundary is not found
Boundary,
/// Error during field parsing
Parse(ParseError),
/// Payload error
Payload(PayloadError),
}
impl fmt::Display for MultipartError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
MultipartError::Parse(ref e) => fmt::Display::fmt(e, f),
MultipartError::Payload(ref e) => fmt::Display::fmt(e, f),
ref e => f.write_str(e.description()),
}
}
}
impl Error for MultipartError {
fn description(&self) -> &str {
match *self {
MultipartError::NoContentType => "No Content-type header found",
MultipartError::ParseContentType => "Can not parse Content-Type header",
MultipartError::Boundary => "Multipart boundary is not found",
MultipartError::Parse(ref e) => e.description(),
MultipartError::Payload(ref e) => e.description(),
}
}
fn cause(&self) -> Option<&Error> {
match *self {
MultipartError::Parse(ref error) => Some(error),
MultipartError::Payload(ref error) => Some(error),
_ => None,
}
}
}
impl From<ParseError> for MultipartError {
fn from(err: ParseError) -> MultipartError {
MultipartError::Parse(err)
}
}
impl From<PayloadError> for MultipartError {
fn from(err: PayloadError) -> MultipartError {
MultipartError::Payload(err)
}
}
/// The server-side implementation of `multipart/form-data` requests.
@ -31,51 +86,95 @@ pub struct MultipartError {
#[derive(Debug)]
pub struct Multipart {
safety: Safety,
payload: PayloadRef,
boundary: String,
eof: bool,
bof: bool,
item: InnerMultipartItem,
inner: Rc<RefCell<InnerMultipart>>,
}
#[derive(Debug)]
pub enum MultipartItem {
// Multipart field
/// Multipart field
Field(Field),
// Nested multipart item
Multipart(Multipart),
/// Nested multipart stream
Nested(Multipart),
}
#[derive(Debug)]
enum InnerMultipartItem {
None,
Field(Rc<RefCell<InnerField>>),
// Nested multipart item
// Multipart(Multipart),
Multipart(Rc<RefCell<InnerMultipart>>),
}
#[derive(PartialEq, Debug)]
enum InnerState {
/// Stream eof
Eof,
/// Skip data until first boundary
FirstBoundary,
/// Reading boundary
Boundary,
/// Reading Headers,
Headers,
}
#[derive(Debug)]
struct InnerMultipart {
payload: PayloadRef,
boundary: String,
state: InnerState,
item: InnerMultipartItem,
}
impl Multipart {
pub fn new(req: &HttpRequest, payload: Payload) -> Result<Multipart, MultipartError> {
pub fn new(boundary: String, payload: Payload) -> Multipart {
Multipart {
safety: Safety::new(),
inner: Rc::new(RefCell::new(
InnerMultipart {
payload: PayloadRef::new(payload),
boundary: boundary,
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))
}
}
pub fn boundary(req: &HttpRequest) -> Result<String, MultipartError> {
if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
if let Some(boundary) = ct.get_param(mime::BOUNDARY) {
return Ok(Multipart {
safety: Safety::new(),
payload: PayloadRef::new(payload),
boundary: boundary.as_str().to_owned(),
eof: false,
bof: true,
item: InnerMultipartItem::None,
})
Ok(boundary.as_str().to_owned())
} else {
Err(MultipartError::Boundary)
}
} else {
Err(MultipartError::ParseContentType)
}
} else {
Err(MultipartError::ParseContentType)
}
} else {
Err(MultipartError::NoContentType)
}
Err(MultipartError{payload: payload})
}
}
fn read_headers(payload: &mut Payload) -> Poll<HeaderMap, PayloadError>
impl Stream for Multipart {
type Item = MultipartItem;
type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.safety.current() {
self.inner.borrow_mut().poll(&self.safety)
} else {
Ok(Async::NotReady)
}
}
}
impl InnerMultipart {
fn read_headers(payload: &mut Payload) -> Poll<HeaderMap, MultipartError>
{
match payload.readuntil(b"\r\n\r\n")? {
Async::NotReady => Ok(Async::NotReady),
@ -90,21 +189,22 @@ impl Multipart {
if let Ok(value) = HeaderValue::try_from(h.value) {
headers.append(name, value);
} else {
return Err(PayloadError::Incomplete)
return Err(ParseError::Header.into())
}
} else {
return Err(PayloadError::Incomplete)
return Err(ParseError::Header.into())
}
}
Ok(Async::Ready(headers))
}
Ok(httparse::Status::Partial) | Err(_) => Err(PayloadError::Incomplete),
Ok(httparse::Status::Partial) => Err(ParseError::Header.into()),
Err(err) => Err(ParseError::from(err).into()),
}
}
}
}
fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, PayloadError>
fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, MultipartError>
{
// TODO: need to read epilogue
match payload.readline()? {
@ -122,13 +222,13 @@ impl Multipart {
{
Ok(Async::Ready(true))
} else {
Err(PayloadError::Incomplete)
Err(MultipartError::Boundary)
}
}
}
}
fn skip_until_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, PayloadError>
fn skip_until_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, MultipartError>
{
let mut eof = false;
loop {
@ -154,91 +254,147 @@ impl Multipart {
}
Ok(Async::Ready(eof))
}
}
impl Drop for Multipart {
fn drop(&mut self) {
// InnerMultipartItem::Field has to be dropped first because of Safety.
self.item = InnerMultipartItem::None;
}
}
impl Stream for Multipart {
type Item = MultipartItem;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.eof {
fn poll(&mut self, safety: &Safety) -> Poll<Option<MultipartItem>, MultipartError> {
if self.state == InnerState::Eof {
Ok(Async::Ready(None))
} else {
// release field
loop {
let stop = match self.item {
InnerMultipartItem::Field(ref mut field) => {
match field.borrow_mut().poll(&self.safety)? {
Async::NotReady =>
return Ok(Async::NotReady),
Async::Ready(Some(_)) =>
continue,
Async::Ready(None) =>
true,
// Nested multipart streams of fields has to be consumed
// before switching to next
if safety.current() {
let stop = match self.item {
InnerMultipartItem::Field(ref mut field) => {
match field.borrow_mut().poll(safety)? {
Async::NotReady =>
return Ok(Async::NotReady),
Async::Ready(Some(_)) =>
continue,
Async::Ready(None) =>
true,
}
}
InnerMultipartItem::Multipart(ref mut multipart) => {
match multipart.borrow_mut().poll(safety)? {
Async::NotReady =>
return Ok(Async::NotReady),
Async::Ready(Some(_)) =>
continue,
Async::Ready(None) =>
true,
}
}
_ => false,
};
if stop {
self.item = InnerMultipartItem::None;
}
if let InnerMultipartItem::None = self.item {
break;
}
_ => false,
};
if stop {
self.item = InnerMultipartItem::None;
}
if let InnerMultipartItem::None = self.item {
break;
}
}
let headers = if let Some(payload) = self.payload.get_mut(&self.safety) {
// read until first boundary
if self.bof {
if let Async::Ready(eof) =
Multipart::skip_until_boundary(payload, &self.boundary)?
{
self.eof = eof;
let headers = if let Some(payload) = self.payload.get_mut(safety) {
match self.state {
// read until first boundary
InnerState::FirstBoundary => {
if let Async::Ready(eof) =
InnerMultipart::skip_until_boundary(payload, &self.boundary)?
{
if eof {
self.state = InnerState::Eof;
return Ok(Async::Ready(None));
} else {
self.state = InnerState::Headers;
}
} else {
return Ok(Async::NotReady)
}
}
// read boundary
InnerState::Boundary => {
match InnerMultipart::read_boundary(payload, &self.boundary)? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(eof) => {
if eof {
self.state = InnerState::Eof;
return Ok(Async::Ready(None));
} else {
self.state = InnerState::Headers;
}
}
}
}
_ => (),
}
// read field headers for next field
if self.state == InnerState::Headers {
if let Async::Ready(headers) = InnerMultipart::read_headers(payload)? {
self.state = InnerState::Boundary;
headers
} else {
return Ok(Async::NotReady)
}
self.bof = false;
} else {
// read boundary
match Multipart::read_boundary(payload, &self.boundary)? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(eof) => self.eof = eof,
}
}
if self.eof {
return Ok(Async::Ready(None))
}
// read field headers
if let Async::Ready(headers) = Multipart::read_headers(payload)? {
headers
} else {
return Ok(Async::NotReady)
unreachable!()
}
} else {
debug!("NotReady: field is in flight");
return Ok(Async::NotReady)
};
//
let field = Rc::new(RefCell::new(InnerField::new(
self.payload.clone(), self.boundary.clone(), &headers)?));
self.item = InnerMultipartItem::Field(Rc::clone(&field));
// content type
let mut mt = mime::APPLICATION_OCTET_STREAM;
if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
mt = ct;
}
}
}
Ok(Async::Ready(Some(
MultipartItem::Field(Field::new(self.safety.clone(), headers, field)))))
// nested multipart stream
if mt.type_() == mime::MULTIPART {
let inner = if let Some(boundary) = mt.get_param(mime::BOUNDARY) {
Rc::new(RefCell::new(
InnerMultipart {
payload: self.payload.clone(),
boundary: boundary.as_str().to_owned(),
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))
} else {
return Err(MultipartError::Boundary)
};
self.item = InnerMultipartItem::Multipart(Rc::clone(&inner));
Ok(Async::Ready(Some(
MultipartItem::Nested(
Multipart{safety: safety.clone(), inner: inner}))))
} else {
let field = Rc::new(RefCell::new(InnerField::new(
self.payload.clone(), self.boundary.clone(), &headers)?));
self.item = InnerMultipartItem::Field(Rc::clone(&field));
Ok(Async::Ready(Some(
MultipartItem::Field(
Field::new(safety.clone(), headers, mt, field)))))
}
}
}
}
impl Drop for InnerMultipart {
fn drop(&mut self) {
// InnerMultipartItem::Field has to be dropped first because of Safety.
self.item = InnerMultipartItem::None;
}
}
/// A single field in a multipart stream
pub struct Field {
ct: mime::Mime,
@ -247,19 +403,16 @@ pub struct Field {
safety: Safety,
}
/// A field's chunk
#[derive(PartialEq, Debug)]
pub struct FieldChunk(pub Bytes);
impl Field {
fn new(safety: Safety, headers: HeaderMap, inner: Rc<RefCell<InnerField>>) -> Self {
let mut mt = mime::APPLICATION_OCTET_STREAM;
if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
mt = ct;
}
}
}
fn new(safety: Safety, headers: HeaderMap,
ct: mime::Mime, inner: Rc<RefCell<InnerField>>) -> Self {
Field {
ct: mt,
ct: ct,
headers: headers,
inner: inner,
safety: safety,
@ -276,8 +429,8 @@ impl Field {
}
impl Stream for Field {
type Item = Bytes;
type Error = PayloadError;
type Item = FieldChunk;
type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.safety.current() {
@ -341,7 +494,7 @@ impl InnerField {
/// Reads body part content chunk of the specified size.
/// The body part must has `Content-Length` header with proper value.
fn read_len(payload: &mut Payload, size: &mut u64) -> Poll<Option<Bytes>, PayloadError>
fn read_len(payload: &mut Payload, size: &mut u64) -> Poll<Option<Bytes>, MultipartError>
{
if *size == 0 {
Ok(Async::Ready(None))
@ -358,14 +511,14 @@ impl InnerField {
}
Ok(Async::Ready(Some(ch)))
},
Async::Ready(Some(Err(err))) => Err(err)
Async::Ready(Some(Err(err))) => Err(err.into())
}
}
}
/// Reads content chunk of body part with unknown length.
/// The `Content-Length` header for body part is not necessary.
fn read_stream(payload: &mut Payload, boundary: &str) -> Poll<Option<Bytes>, PayloadError>
fn read_stream(payload: &mut Payload, boundary: &str) -> Poll<Option<Bytes>, MultipartError>
{
match payload.readuntil(b"\r")? {
Async::NotReady => Ok(Async::NotReady),
@ -395,7 +548,7 @@ impl InnerField {
}
}
fn poll(&mut self, s: &Safety) -> Poll<Option<Bytes>, PayloadError> {
fn poll(&mut self, s: &Safety) -> Poll<Option<FieldChunk>, MultipartError> {
if self.payload.is_none() {
return Ok(Async::Ready(None))
}
@ -427,7 +580,7 @@ impl InnerField {
match res {
Async::NotReady => Async::NotReady,
Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)),
Async::Ready(Some(bytes)) => Async::Ready(Some(FieldChunk(bytes))),
Async::Ready(None) => {
self.eof = true;
match payload.readline()? {

View File

@ -1,7 +1,9 @@
use std::fmt;
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use std::convert::From;
use std::collections::VecDeque;
use std::error::Error;
use std::io::Error as IoError;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream};
@ -21,6 +23,31 @@ pub enum PayloadError {
ParseError(IoError),
}
impl fmt::Display for PayloadError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PayloadError::ParseError(ref e) => fmt::Display::fmt(e, f),
ref e => f.write_str(e.description()),
}
}
}
impl Error for PayloadError {
fn description(&self) -> &str {
match *self {
PayloadError::Incomplete => "A payload reached EOF, but is not complete.",
PayloadError::ParseError(ref e) => e.description(),
}
}
fn cause(&self) -> Option<&Error> {
match *self {
PayloadError::ParseError(ref error) => Some(error),
_ => None,
}
}
}
impl From<IoError> for PayloadError {
fn from(err: IoError) -> PayloadError {
PayloadError::ParseError(err)