1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00
actix-extras/src/multipart.rs

806 lines
27 KiB
Rust
Raw Normal View History

2017-11-20 04:51:14 +01:00
//! Multipart requests support
2017-10-19 08:43:50 +02:00
use std::cell::RefCell;
use std::marker::PhantomData;
2018-04-14 01:02:01 +02:00
use std::rc::Rc;
use std::{cmp, fmt};
2017-10-19 08:43:50 +02:00
use bytes::Bytes;
2018-04-14 01:02:01 +02:00
use futures::task::{current as current_task, Task};
use futures::{Async, Poll, Stream};
use http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue};
2018-04-29 07:55:47 +02:00
use http::HttpTryFrom;
2018-04-14 01:02:01 +02:00
use httparse;
use mime;
2017-10-19 08:43:50 +02:00
2018-04-14 01:02:01 +02:00
use error::{MultipartError, ParseError, PayloadError};
2018-02-26 03:55:07 +01:00
use payload::PayloadHelper;
2017-10-19 08:43:50 +02:00
const MAX_HEADERS: usize = 32;
/// The server-side implementation of `multipart/form-data` requests.
///
/// This will parse the incoming stream into `MultipartItem` instances via its
/// Stream implementation.
/// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart`
/// is used for nested multipart streams.
2018-02-26 03:55:07 +01:00
pub struct Multipart<S> {
2017-10-19 08:43:50 +02:00
safety: Safety,
2017-12-19 18:51:28 +01:00
error: Option<MultipartError>,
2018-02-26 03:55:07 +01:00
inner: Option<Rc<RefCell<InnerMultipart<S>>>>,
2017-10-19 08:43:50 +02:00
}
2017-10-20 01:41:21 +02:00
///
2018-02-26 03:55:07 +01:00
pub enum MultipartItem<S> {
2017-10-20 01:22:21 +02:00
/// Multipart field
2018-02-26 03:55:07 +01:00
Field(Field<S>),
2017-10-20 01:22:21 +02:00
/// Nested multipart stream
2018-02-26 03:55:07 +01:00
Nested(Multipart<S>),
2017-10-19 08:43:50 +02:00
}
2018-02-26 03:55:07 +01:00
enum InnerMultipartItem<S> {
2017-10-19 08:43:50 +02:00
None,
2018-02-26 03:55:07 +01:00
Field(Rc<RefCell<InnerField<S>>>),
Multipart(Rc<RefCell<InnerMultipart<S>>>),
2017-10-20 01:22:21 +02:00
}
#[derive(PartialEq, Debug)]
enum InnerState {
/// Stream eof
Eof,
/// Skip data until first boundary
FirstBoundary,
/// Reading boundary
Boundary,
/// Reading Headers,
Headers,
}
2018-02-26 03:55:07 +01:00
struct InnerMultipart<S> {
payload: PayloadRef<S>,
2017-10-20 01:22:21 +02:00
boundary: String,
state: InnerState,
2018-02-26 03:55:07 +01:00
item: InnerMultipartItem<S>,
2017-10-19 08:43:50 +02:00
}
2018-02-26 03:55:07 +01:00
impl Multipart<()> {
2017-10-22 18:30:05 +02:00
/// Extract boundary info from headers.
pub fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {
if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
2017-10-19 08:43:50 +02:00
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
if let Some(boundary) = ct.get_param(mime::BOUNDARY) {
2017-10-20 01:22:21 +02:00
Ok(boundary.as_str().to_owned())
} else {
Err(MultipartError::Boundary)
2017-10-19 08:43:50 +02:00
}
2017-10-20 01:22:21 +02:00
} else {
Err(MultipartError::ParseContentType)
2017-10-19 08:43:50 +02:00
}
2017-10-20 01:22:21 +02:00
} else {
Err(MultipartError::ParseContentType)
2017-10-19 08:43:50 +02:00
}
2017-10-20 01:22:21 +02:00
} else {
Err(MultipartError::NoContentType)
}
}
}
2018-04-14 01:02:01 +02:00
impl<S> Multipart<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
2018-02-26 03:55:07 +01:00
/// Create multipart instance for boundary.
pub fn new(boundary: Result<String, MultipartError>, stream: S) -> Multipart<S> {
match boundary {
Ok(boundary) => Multipart {
error: None,
safety: Safety::new(),
2018-04-14 01:02:01 +02:00
inner: Some(Rc::new(RefCell::new(InnerMultipart {
boundary,
payload: PayloadRef::new(PayloadHelper::new(stream)),
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))),
},
Err(err) => Multipart {
error: Some(err),
safety: Safety::new(),
inner: None,
2018-02-26 03:55:07 +01:00
},
}
}
}
2018-04-14 01:02:01 +02:00
impl<S> Stream for Multipart<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
2018-02-26 03:55:07 +01:00
type Item = MultipartItem<S>;
2017-10-20 01:22:21 +02:00
type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
2017-12-19 18:51:28 +01:00
if let Some(err) = self.error.take() {
Err(err)
} else if self.safety.current() {
2018-05-17 21:20:20 +02:00
self.inner.as_mut().unwrap().borrow_mut().poll(&self.safety)
2017-10-20 01:22:21 +02:00
} else {
Ok(Async::NotReady)
2017-10-19 08:43:50 +02:00
}
}
2017-10-20 01:22:21 +02:00
}
2017-10-19 08:43:50 +02:00
2018-04-14 01:02:01 +02:00
impl<S> InnerMultipart<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
fn read_headers(payload: &mut PayloadHelper<S>) -> Poll<HeaderMap, MultipartError> {
2018-03-09 14:36:40 +01:00
match payload.read_until(b"\r\n\r\n")? {
2017-10-19 08:43:50 +02:00
Async::NotReady => Ok(Async::NotReady),
2018-02-26 03:55:07 +01:00
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(bytes)) => {
2017-10-19 08:43:50 +02:00
let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
match httparse::parse_headers(&bytes, &mut hdrs) {
Ok(httparse::Status::Complete((_, hdrs))) => {
// convert headers
let mut headers = HeaderMap::with_capacity(hdrs.len());
for h in hdrs {
if let Ok(name) = HeaderName::try_from(h.name) {
if let Ok(value) = HeaderValue::try_from(h.value) {
headers.append(name, value);
} else {
2018-04-14 01:02:01 +02:00
return Err(ParseError::Header.into());
2017-10-19 08:43:50 +02:00
}
} else {
2018-04-14 01:02:01 +02:00
return Err(ParseError::Header.into());
2017-10-19 08:43:50 +02:00
}
}
Ok(Async::Ready(headers))
}
2017-10-20 01:22:21 +02:00
Ok(httparse::Status::Partial) => Err(ParseError::Header.into()),
Err(err) => Err(ParseError::from(err).into()),
2017-10-19 08:43:50 +02:00
}
}
}
}
2018-04-14 01:02:01 +02:00
fn read_boundary(
2018-04-29 07:55:47 +02:00
payload: &mut PayloadHelper<S>, boundary: &str,
2018-04-14 01:02:01 +02:00
) -> Poll<bool, MultipartError> {
2017-10-19 08:43:50 +02:00
// TODO: need to read epilogue
2018-02-26 03:55:07 +01:00
match payload.readline()? {
2017-10-19 08:43:50 +02:00
Async::NotReady => Ok(Async::NotReady),
2018-02-26 03:55:07 +01:00
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(chunk)) => {
2018-05-17 21:20:20 +02:00
if chunk.len() == boundary.len() + 4
&& &chunk[..2] == b"--"
2018-04-14 01:02:01 +02:00
&& &chunk[2..boundary.len() + 2] == boundary.as_bytes()
2017-10-19 08:43:50 +02:00
{
Ok(Async::Ready(false))
2018-05-17 21:20:20 +02:00
} else if chunk.len() == boundary.len() + 6
&& &chunk[..2] == b"--"
2018-04-14 01:02:01 +02:00
&& &chunk[2..boundary.len() + 2] == boundary.as_bytes()
&& &chunk[boundary.len() + 2..boundary.len() + 4] == b"--"
2017-10-19 08:43:50 +02:00
{
Ok(Async::Ready(true))
} else {
2017-10-20 01:22:21 +02:00
Err(MultipartError::Boundary)
2017-10-19 08:43:50 +02:00
}
}
}
}
2018-04-14 01:02:01 +02:00
fn skip_until_boundary(
2018-04-29 07:55:47 +02:00
payload: &mut PayloadHelper<S>, boundary: &str,
2018-04-14 01:02:01 +02:00
) -> Poll<bool, MultipartError> {
2017-10-19 08:43:50 +02:00
let mut eof = false;
loop {
2018-02-26 03:55:07 +01:00
match payload.readline()? {
Async::Ready(Some(chunk)) => {
if chunk.is_empty() {
//ValueError("Could not find starting boundary %r"
//% (self._boundary))
}
if chunk.len() < boundary.len() {
2018-04-14 01:02:01 +02:00
continue;
2017-10-23 07:23:38 +02:00
}
2018-04-14 01:02:01 +02:00
if &chunk[..2] == b"--"
&& &chunk[2..chunk.len() - 2] == boundary.as_bytes()
{
2018-02-26 03:55:07 +01:00
break;
} else {
2018-04-14 01:02:01 +02:00
if chunk.len() < boundary.len() + 2 {
continue;
2017-10-19 08:43:50 +02:00
}
2018-02-26 03:55:07 +01:00
let b: &[u8] = boundary.as_ref();
2018-04-14 01:02:01 +02:00
if &chunk[..boundary.len()] == b
&& &chunk[boundary.len()..boundary.len() + 2] == b"--"
{
eof = true;
break;
}
2018-02-26 03:55:07 +01:00
}
2018-04-14 01:02:01 +02:00
}
2018-02-26 03:55:07 +01:00
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Err(MultipartError::Incomplete),
2017-10-19 08:43:50 +02:00
}
}
Ok(Async::Ready(eof))
}
2018-04-14 01:02:01 +02:00
fn poll(
2018-04-29 07:55:47 +02:00
&mut self, safety: &Safety,
2018-04-14 01:02:01 +02:00
) -> Poll<Option<MultipartItem<S>>, MultipartError> {
2017-10-20 01:22:21 +02:00
if self.state == InnerState::Eof {
2017-10-19 08:43:50 +02:00
Ok(Async::Ready(None))
} else {
// release field
loop {
2017-10-20 01:22:21 +02:00
// Nested multipart streams of fields has to be consumed
// before switching to next
if safety.current() {
let stop = match self.item {
InnerMultipartItem::Field(ref mut field) => {
match field.borrow_mut().poll(safety)? {
2018-02-26 03:55:07 +01:00
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(Some(_)) => continue,
Async::Ready(None) => true,
2017-10-20 01:22:21 +02:00
}
2018-04-14 01:02:01 +02:00
}
2017-10-20 01:22:21 +02:00
InnerMultipartItem::Multipart(ref mut multipart) => {
match multipart.borrow_mut().poll(safety)? {
2018-02-26 03:55:07 +01:00
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(Some(_)) => continue,
Async::Ready(None) => true,
2017-10-20 01:22:21 +02:00
}
2018-04-14 01:02:01 +02:00
}
2017-10-20 01:22:21 +02:00
_ => false,
};
if stop {
self.item = InnerMultipartItem::None;
}
if let InnerMultipartItem::None = self.item {
break;
2017-10-19 08:43:50 +02:00
}
}
}
2017-10-20 01:22:21 +02:00
let headers = if let Some(payload) = self.payload.get_mut(safety) {
match self.state {
// read until first boundary
InnerState::FirstBoundary => {
2018-04-14 01:02:01 +02:00
match InnerMultipart::skip_until_boundary(
payload,
&self.boundary,
)? {
2018-02-26 03:55:07 +01:00
Async::Ready(eof) => {
if eof {
self.state = InnerState::Eof;
return Ok(Async::Ready(None));
} else {
self.state = InnerState::Headers;
}
2018-04-14 01:02:01 +02:00
}
2018-02-26 03:55:07 +01:00
Async::NotReady => return Ok(Async::NotReady),
2017-10-20 01:22:21 +02:00
}
2018-04-14 01:02:01 +02:00
}
2017-10-19 08:43:50 +02:00
// read boundary
2017-10-20 01:22:21 +02:00
InnerState::Boundary => {
match InnerMultipart::read_boundary(payload, &self.boundary)? {
2018-02-26 03:55:07 +01:00
Async::NotReady => return Ok(Async::NotReady),
2017-10-20 01:22:21 +02:00
Async::Ready(eof) => {
if eof {
self.state = InnerState::Eof;
return Ok(Async::Ready(None));
} else {
self.state = InnerState::Headers;
}
}
}
2017-10-19 08:43:50 +02:00
}
2017-10-20 01:22:21 +02:00
_ => (),
2017-10-19 08:43:50 +02:00
}
2017-10-20 01:22:21 +02:00
// read field headers for next field
if self.state == InnerState::Headers {
2018-04-14 01:02:01 +02:00
if let Async::Ready(headers) = InnerMultipart::read_headers(payload)?
{
2017-10-20 01:22:21 +02:00
self.state = InnerState::Boundary;
headers
} else {
2018-04-14 01:02:01 +02:00
return Ok(Async::NotReady);
2017-10-20 01:22:21 +02:00
}
2017-10-19 08:43:50 +02:00
} else {
2017-10-20 01:22:21 +02:00
unreachable!()
2017-10-19 08:43:50 +02:00
}
} else {
debug!("NotReady: field is in flight");
2018-04-14 01:02:01 +02:00
return Ok(Async::NotReady);
2017-10-19 08:43:50 +02:00
};
2017-10-20 01:22:21 +02:00
// content type
let mut mt = mime::APPLICATION_OCTET_STREAM;
if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
mt = ct;
}
}
}
2017-10-23 06:40:41 +02:00
self.state = InnerState::Boundary;
2017-10-20 01:22:21 +02:00
// nested multipart stream
if mt.type_() == mime::MULTIPART {
let inner = if let Some(boundary) = mt.get_param(mime::BOUNDARY) {
2018-04-14 01:02:01 +02:00
Rc::new(RefCell::new(InnerMultipart {
payload: self.payload.clone(),
boundary: boundary.as_str().to_owned(),
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))
2017-10-20 01:22:21 +02:00
} else {
2018-04-14 01:02:01 +02:00
return Err(MultipartError::Boundary);
2017-10-20 01:22:21 +02:00
};
self.item = InnerMultipartItem::Multipart(Rc::clone(&inner));
2018-04-14 01:02:01 +02:00
Ok(Async::Ready(Some(MultipartItem::Nested(Multipart {
safety: safety.clone(),
error: None,
inner: Some(inner),
}))))
2017-10-20 01:22:21 +02:00
} else {
let field = Rc::new(RefCell::new(InnerField::new(
2018-04-14 01:02:01 +02:00
self.payload.clone(),
self.boundary.clone(),
&headers,
)?));
2017-10-20 01:22:21 +02:00
self.item = InnerMultipartItem::Field(Rc::clone(&field));
2017-10-19 08:43:50 +02:00
2018-04-14 01:02:01 +02:00
Ok(Async::Ready(Some(MultipartItem::Field(Field::new(
safety.clone(),
headers,
mt,
field,
)?))))
2017-10-20 01:22:21 +02:00
}
2017-10-19 08:43:50 +02:00
}
}
}
2018-02-26 03:55:07 +01:00
impl<S> Drop for InnerMultipart<S> {
2017-10-20 01:22:21 +02:00
fn drop(&mut self) {
// InnerMultipartItem::Field has to be dropped first because of Safety.
self.item = InnerMultipartItem::None;
}
}
2017-10-19 08:43:50 +02:00
/// A single field in a multipart stream
2018-02-26 03:55:07 +01:00
pub struct Field<S> {
2017-10-19 08:43:50 +02:00
ct: mime::Mime,
cd: ContentDisposition,
2017-10-19 08:43:50 +02:00
headers: HeaderMap,
2018-02-26 03:55:07 +01:00
inner: Rc<RefCell<InnerField<S>>>,
2017-10-19 08:43:50 +02:00
safety: Safety,
}
2018-04-14 01:02:01 +02:00
impl<S> Field<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
fn new(
safety: Safety, headers: HeaderMap, ct: mime::Mime,
inner: Rc<RefCell<InnerField<S>>>,
) -> Result<Self, MultipartError> {
// RFC 7578: 'Each part MUST contain a Content-Disposition header field
// where the disposition type is "form-data".'
let cd = ContentDisposition::from_raw(
headers.get(::http::header::CONTENT_DISPOSITION)
).map_err(|_| MultipartError::ParseContentDisposition)?;
Ok(Field {
2018-04-14 01:02:01 +02:00
ct,
cd,
2018-04-14 01:02:01 +02:00
headers,
inner,
safety,
})
2017-10-19 08:43:50 +02:00
}
/// Get a map of headers
2017-10-19 08:43:50 +02:00
pub fn headers(&self) -> &HeaderMap {
&self.headers
}
/// Get the content type of the field
2017-10-19 08:43:50 +02:00
pub fn content_type(&self) -> &mime::Mime {
&self.ct
}
/// Get the content disposition of the field
pub fn content_disposition(&self) -> &ContentDisposition {
&self.cd
}
2017-10-19 08:43:50 +02:00
}
2018-04-14 01:02:01 +02:00
impl<S> Stream for Field<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
2017-12-19 18:55:49 +01:00
type Item = Bytes;
2017-10-20 01:22:21 +02:00
type Error = MultipartError;
2017-10-19 08:43:50 +02:00
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.safety.current() {
self.inner.borrow_mut().poll(&self.safety)
} else {
Ok(Async::NotReady)
}
}
}
2018-02-26 03:55:07 +01:00
impl<S> fmt::Debug for Field<S> {
2017-10-19 08:43:50 +02:00
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2018-04-09 23:25:30 +02:00
let res = writeln!(f, "\nMultipartField: {}", self.ct);
let _ = writeln!(f, " boundary: {}", self.inner.borrow().boundary);
let _ = writeln!(f, " headers:");
2018-03-08 00:41:46 +01:00
for (key, val) in self.headers.iter() {
2018-04-09 23:25:30 +02:00
let _ = writeln!(f, " {:?}: {:?}", key, val);
2017-10-19 08:43:50 +02:00
}
res
}
}
2018-02-26 03:55:07 +01:00
struct InnerField<S> {
payload: Option<PayloadRef<S>>,
2017-10-19 08:43:50 +02:00
boundary: String,
eof: bool,
length: Option<u64>,
}
2018-04-14 01:02:01 +02:00
impl<S> InnerField<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
fn new(
2018-04-29 07:55:47 +02:00
payload: PayloadRef<S>, boundary: String, headers: &HeaderMap,
2018-04-14 01:02:01 +02:00
) -> Result<InnerField<S>, PayloadError> {
2017-10-19 08:43:50 +02:00
let len = if let Some(len) = headers.get(header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
Some(len)
} else {
2018-04-14 01:02:01 +02:00
return Err(PayloadError::Incomplete);
2017-10-19 08:43:50 +02:00
}
} else {
2018-04-14 01:02:01 +02:00
return Err(PayloadError::Incomplete);
2017-10-19 08:43:50 +02:00
}
} else {
None
};
Ok(InnerField {
2018-02-26 23:33:56 +01:00
boundary,
2017-10-19 08:43:50 +02:00
payload: Some(payload),
eof: false,
2018-04-14 01:02:01 +02:00
length: len,
})
2017-10-19 08:43:50 +02:00
}
/// Reads body part content chunk of the specified size.
/// The body part must has `Content-Length` header with proper value.
2018-04-14 01:02:01 +02:00
fn read_len(
2018-04-29 07:55:47 +02:00
payload: &mut PayloadHelper<S>, size: &mut u64,
2018-04-14 01:02:01 +02:00
) -> Poll<Option<Bytes>, MultipartError> {
2017-10-19 08:43:50 +02:00
if *size == 0 {
Ok(Async::Ready(None))
} else {
2018-02-26 03:55:07 +01:00
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::NotReady) => Ok(Async::NotReady),
2018-02-26 03:55:07 +01:00
Ok(Async::Ready(None)) => Err(MultipartError::Incomplete),
2017-10-27 08:14:33 +02:00
Ok(Async::Ready(Some(mut chunk))) => {
2017-12-19 09:18:57 +01:00
let len = cmp::min(chunk.len() as u64, *size);
2017-10-19 08:43:50 +02:00
*size -= len;
2017-12-19 09:18:57 +01:00
let ch = chunk.split_to(len as usize);
if !chunk.is_empty() {
payload.unread_data(chunk);
2017-10-19 08:43:50 +02:00
}
Ok(Async::Ready(Some(ch)))
2018-04-14 01:02:01 +02:00
}
Err(err) => Err(err.into()),
2017-10-19 08:43:50 +02:00
}
}
}
/// Reads content chunk of body part with unknown length.
/// The `Content-Length` header for body part is not necessary.
2018-04-14 01:02:01 +02:00
fn read_stream(
2018-04-29 07:55:47 +02:00
payload: &mut PayloadHelper<S>, boundary: &str,
2018-04-14 01:02:01 +02:00
) -> Poll<Option<Bytes>, MultipartError> {
2018-03-09 14:36:40 +01:00
match payload.read_until(b"\r")? {
2017-10-19 08:43:50 +02:00
Async::NotReady => Ok(Async::NotReady),
2018-02-26 03:55:07 +01:00
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(mut chunk)) => {
2017-10-19 08:43:50 +02:00
if chunk.len() == 1 {
payload.unread_data(chunk);
2018-03-09 14:36:40 +01:00
match payload.read_exact(boundary.len() + 4)? {
2017-10-19 08:43:50 +02:00
Async::NotReady => Ok(Async::NotReady),
2018-02-26 03:55:07 +01:00
Async::Ready(None) => Err(MultipartError::Incomplete),
2018-06-05 17:52:46 +02:00
Async::Ready(Some(mut chunk)) => {
2018-05-17 21:20:20 +02:00
if &chunk[..2] == b"\r\n"
&& &chunk[2..4] == b"--"
2018-04-14 01:02:01 +02:00
&& &chunk[4..] == boundary.as_bytes()
2017-10-19 08:43:50 +02:00
{
2018-03-09 02:19:50 +01:00
payload.unread_data(chunk);
2017-10-19 08:43:50 +02:00
Ok(Async::Ready(None))
} else {
2018-06-05 17:52:46 +02:00
// \r might be part of data stream
let ch = chunk.split_to(1);
payload.unread_data(chunk);
Ok(Async::Ready(Some(ch)))
2017-10-19 08:43:50 +02:00
}
}
}
} else {
let to = chunk.len() - 1;
let ch = chunk.split_to(to);
payload.unread_data(chunk);
Ok(Async::Ready(Some(ch)))
}
}
}
}
2017-12-19 18:55:49 +01:00
fn poll(&mut self, s: &Safety) -> Poll<Option<Bytes>, MultipartError> {
2017-10-19 08:43:50 +02:00
if self.payload.is_none() {
2018-04-14 01:02:01 +02:00
return Ok(Async::Ready(None));
2017-10-19 08:43:50 +02:00
}
let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) {
let res = if let Some(ref mut len) = self.length {
InnerField::read_len(payload, len)?
} else {
InnerField::read_stream(payload, &self.boundary)?
};
match res {
Async::NotReady => Async::NotReady,
2017-12-19 18:55:49 +01:00
Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)),
2017-10-19 08:43:50 +02:00
Async::Ready(None) => {
self.eof = true;
2018-02-26 03:55:07 +01:00
match payload.readline()? {
2017-10-19 08:43:50 +02:00
Async::NotReady => Async::NotReady,
2018-02-26 03:55:07 +01:00
Async::Ready(None) => Async::Ready(None),
Async::Ready(Some(line)) => {
if line.as_ref() != b"\r\n" {
warn!("multipart field did not read all the data or it is malformed");
}
2017-10-19 08:43:50 +02:00
Async::Ready(None)
}
}
}
}
} else {
Async::NotReady
};
if Async::Ready(None) == result {
self.payload.take();
}
Ok(result)
}
}
2018-02-26 03:55:07 +01:00
struct PayloadRef<S> {
payload: Rc<PayloadHelper<S>>,
2017-10-19 08:43:50 +02:00
}
2018-04-14 01:02:01 +02:00
impl<S> PayloadRef<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
2018-02-26 03:55:07 +01:00
fn new(payload: PayloadHelper<S>) -> PayloadRef<S> {
2017-10-19 08:43:50 +02:00
PayloadRef {
payload: Rc::new(payload),
}
}
2018-02-26 03:55:07 +01:00
fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut PayloadHelper<S>>
2018-04-14 01:02:01 +02:00
where
'a: 'b,
2017-10-19 08:43:50 +02:00
{
if s.current() {
2018-04-14 01:02:01 +02:00
let payload: &mut PayloadHelper<S> =
unsafe { &mut *(self.payload.as_ref() as *const _ as *mut _) };
2017-10-19 08:43:50 +02:00
Some(payload)
} else {
None
}
}
}
2018-02-26 03:55:07 +01:00
impl<S> Clone for PayloadRef<S> {
fn clone(&self) -> PayloadRef<S> {
2017-10-19 08:43:50 +02:00
PayloadRef {
payload: Rc::clone(&self.payload),
}
}
}
2018-04-14 01:02:01 +02:00
/// Counter. It tracks of number of clones of payloads and give access to
/// payload only to top most task panics if Safety get destroyed and it not top
/// most task.
2017-10-19 08:43:50 +02:00
#[derive(Debug)]
struct Safety {
task: Option<Task>,
level: usize,
payload: Rc<PhantomData<bool>>,
}
impl Safety {
fn new() -> Safety {
let payload = Rc::new(PhantomData);
Safety {
task: None,
level: Rc::strong_count(&payload),
2018-02-26 23:33:56 +01:00
payload,
2017-10-19 08:43:50 +02:00
}
}
fn current(&self) -> bool {
Rc::strong_count(&self.payload) == self.level
}
}
impl Clone for Safety {
fn clone(&self) -> Safety {
let payload = Rc::clone(&self.payload);
Safety {
task: Some(current_task()),
level: Rc::strong_count(&payload),
2018-02-26 23:33:56 +01:00
payload,
2017-10-19 08:43:50 +02:00
}
}
}
impl Drop for Safety {
fn drop(&mut self) {
// parent task is dead
if Rc::strong_count(&self.payload) != self.level {
panic!("Safety get dropped but it is not from top-most task");
}
if let Some(task) = self.task.take() {
task.notify()
}
}
}
2017-10-22 18:30:05 +02:00
2017-10-23 06:40:41 +02:00
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use futures::future::{lazy, result};
2017-11-06 10:24:49 +01:00
use payload::{Payload, PayloadWriter};
2018-05-25 06:03:16 +02:00
use tokio::runtime::current_thread::Runtime;
2017-10-23 06:40:41 +02:00
#[test]
fn test_boundary() {
let headers = HeaderMap::new();
match Multipart::boundary(&headers) {
Err(MultipartError::NoContentType) => (),
2018-03-30 00:55:27 +02:00
_ => unreachable!("should not happen"),
2017-10-23 06:40:41 +02:00
}
2017-10-22 18:30:05 +02:00
2017-10-23 06:40:41 +02:00
let mut headers = HeaderMap::new();
2018-04-29 18:09:08 +02:00
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static("test"),
);
2017-10-22 18:30:05 +02:00
2017-10-23 06:40:41 +02:00
match Multipart::boundary(&headers) {
Err(MultipartError::ParseContentType) => (),
2018-03-30 00:55:27 +02:00
_ => unreachable!("should not happen"),
2017-10-23 06:40:41 +02:00
}
2017-10-22 18:30:05 +02:00
2017-10-23 06:40:41 +02:00
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
2018-04-14 01:02:01 +02:00
header::HeaderValue::from_static("multipart/mixed"),
);
2017-10-23 06:40:41 +02:00
match Multipart::boundary(&headers) {
Err(MultipartError::Boundary) => (),
2018-03-30 00:55:27 +02:00
_ => unreachable!("should not happen"),
2017-10-23 06:40:41 +02:00
}
2017-10-22 18:45:53 +02:00
2017-10-23 06:40:41 +02:00
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static(
2018-04-14 01:02:01 +02:00
"multipart/mixed; boundary=\"5c02368e880e436dab70ed54e1c58209\"",
),
);
assert_eq!(
Multipart::boundary(&headers).unwrap(),
"5c02368e880e436dab70ed54e1c58209"
);
2017-10-23 06:40:41 +02:00
}
#[test]
fn test_multipart() {
2018-05-25 06:03:16 +02:00
Runtime::new()
2018-04-14 01:02:01 +02:00
.unwrap()
2018-05-25 06:03:16 +02:00
.block_on(lazy(|| {
2018-04-14 01:02:01 +02:00
let (mut sender, payload) = Payload::new(false);
2017-10-23 06:40:41 +02:00
2018-04-14 01:02:01 +02:00
let bytes = Bytes::from(
2017-10-23 07:23:38 +02:00
"testasdadsad\r\n\
--abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
2017-10-23 06:40:41 +02:00
Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
test\r\n\
2017-10-23 07:23:38 +02:00
--abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
data\r\n\
2017-10-23 06:40:41 +02:00
--abbc761f78ff4d7cb7573b5a23f96ef0--\r\n");
2018-04-14 01:02:01 +02:00
sender.feed_data(bytes);
let mut multipart = Multipart::new(
Ok("abbc761f78ff4d7cb7573b5a23f96ef0".to_owned()),
payload,
);
match multipart.poll() {
Ok(Async::Ready(Some(item))) => match item {
2017-10-23 06:40:41 +02:00
MultipartItem::Field(mut field) => {
assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll() {
2018-04-14 01:02:01 +02:00
Ok(Async::Ready(Some(chunk))) => {
assert_eq!(chunk, "test")
}
_ => unreachable!(),
2017-10-23 06:40:41 +02:00
}
match field.poll() {
Ok(Async::Ready(None)) => (),
2018-04-14 01:02:01 +02:00
_ => unreachable!(),
2017-10-23 06:40:41 +02:00
}
2018-04-14 01:02:01 +02:00
}
_ => unreachable!(),
},
_ => unreachable!(),
2017-10-23 06:40:41 +02:00
}
2017-10-23 07:23:38 +02:00
2018-04-14 01:02:01 +02:00
match multipart.poll() {
Ok(Async::Ready(Some(item))) => match item {
2017-10-23 07:23:38 +02:00
MultipartItem::Field(mut field) => {
assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll() {
2018-04-14 01:02:01 +02:00
Ok(Async::Ready(Some(chunk))) => {
assert_eq!(chunk, "data")
}
_ => unreachable!(),
2017-10-23 07:23:38 +02:00
}
match field.poll() {
Ok(Async::Ready(None)) => (),
2018-04-14 01:02:01 +02:00
_ => unreachable!(),
2017-10-23 07:23:38 +02:00
}
2018-04-14 01:02:01 +02:00
}
_ => unreachable!(),
},
_ => unreachable!(),
2017-10-23 07:23:38 +02:00
}
2018-04-14 01:02:01 +02:00
match multipart.poll() {
Ok(Async::Ready(None)) => (),
_ => unreachable!(),
}
2017-10-22 18:30:05 +02:00
2018-04-14 01:02:01 +02:00
let res: Result<(), ()> = Ok(());
result(res)
}))
.unwrap();
2017-10-23 06:40:41 +02:00
}
2017-10-22 18:30:05 +02:00
}