1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

migrate actix-multipart

This commit is contained in:
Nikolay Kim 2019-11-21 14:25:50 +06:00
parent 60ada97b3d
commit 471f82f0e0
4 changed files with 177 additions and 168 deletions

View File

@ -36,7 +36,7 @@ members = [
"actix-framed", "actix-framed",
"actix-session", "actix-session",
"actix-identity", "actix-identity",
#"actix-multipart", "actix-multipart",
"actix-web-actors", "actix-web-actors",
"actix-web-codegen", "actix-web-codegen",
"test-server", "test-server",
@ -125,9 +125,9 @@ actix-http = { path = "actix-http" }
actix-http-test = { path = "test-server" } actix-http-test = { path = "test-server" }
actix-web-codegen = { path = "actix-web-codegen" } actix-web-codegen = { path = "actix-web-codegen" }
# actix-web-actors = { path = "actix-web-actors" } # actix-web-actors = { path = "actix-web-actors" }
# actix-session = { path = "actix-session" } actix-session = { path = "actix-session" }
actix-files = { path = "actix-files" } actix-files = { path = "actix-files" }
# actix-multipart = { path = "actix-multipart" } actix-multipart = { path = "actix-multipart" }
awc = { path = "awc" } awc = { path = "awc" }
actix-codec = { git = "https://github.com/actix/actix-net.git" } actix-codec = { git = "https://github.com/actix/actix-net.git" }

View File

@ -18,17 +18,18 @@ name = "actix_multipart"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-web = { version = "1.0.9", default-features = false } actix-web = { version = "2.0.0-alpha.1", default-features = false }
actix-service = "0.4.2" actix-service = "1.0.0-alpha.1"
actix-utils = "0.5.0-alpha.1"
bytes = "0.4" bytes = "0.4"
derive_more = "0.15.0" derive_more = "0.15.0"
httparse = "1.3" httparse = "1.3"
futures = "0.1.24" futures = "0.3.1"
log = "0.4" log = "0.4"
mime = "0.3" mime = "0.3"
time = "0.1" time = "0.1"
twoway = "0.2" twoway = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "0.2.2" actix-rt = "1.0.0-alpha.1"
actix-http = "0.2.11" actix-http = "0.3.0-alpha.1"

View File

@ -1,5 +1,6 @@
//! Multipart payload support //! Multipart payload support
use actix_web::{dev::Payload, Error, FromRequest, HttpRequest}; use actix_web::{dev::Payload, Error, FromRequest, HttpRequest};
use futures::future::{ok, Ready};
use crate::server::Multipart; use crate::server::Multipart;
@ -10,33 +11,31 @@ use crate::server::Multipart;
/// ## Server example /// ## Server example
/// ///
/// ```rust /// ```rust
/// # use futures::{Future, Stream}; /// use futures::{Stream, StreamExt};
/// # use futures::future::{ok, result, Either};
/// use actix_web::{web, HttpResponse, Error}; /// use actix_web::{web, HttpResponse, Error};
/// use actix_multipart as mp; /// use actix_multipart as mp;
/// ///
/// fn index(payload: mp::Multipart) -> impl Future<Item = HttpResponse, Error = Error> { /// async fn index(mut payload: mp::Multipart) -> Result<HttpResponse, Error> {
/// payload.from_err() // <- get multipart stream for current request /// // iterate over multipart stream
/// .and_then(|field| { // <- iterate over multipart items /// while let Some(item) = payload.next().await {
/// let mut field = item?;
///
/// // Field in turn is stream of *Bytes* object /// // Field in turn is stream of *Bytes* object
/// field.from_err() /// while let Some(chunk) = field.next().await {
/// .fold((), |_, chunk| { /// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk?));
/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk)); /// }
/// Ok::<_, Error>(()) /// }
/// }) /// Ok(HttpResponse::Ok().into())
/// })
/// .fold((), |_, _| Ok::<_, Error>(()))
/// .map(|_| HttpResponse::Ok().into())
/// } /// }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
impl FromRequest for Multipart { impl FromRequest for Multipart {
type Error = Error; type Error = Error;
type Future = Result<Multipart, Error>; type Future = Ready<Result<Multipart, Error>>;
type Config = (); type Config = ();
#[inline] #[inline]
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
Ok(Multipart::new(req.headers(), payload.take())) ok(Multipart::new(req.headers(), payload.take()))
} }
} }

View File

@ -1,15 +1,17 @@
//! Multipart payload support //! Multipart payload support
use std::cell::{Cell, RefCell, RefMut}; use std::cell::{Cell, RefCell, RefMut};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll};
use std::{cmp, fmt}; use std::{cmp, fmt};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::task::{current as current_task, Task}; use futures::stream::{LocalBoxStream, Stream, StreamExt};
use futures::{Async, Poll, Stream};
use httparse; use httparse;
use mime; use mime;
use actix_utils::task::LocalWaker;
use actix_web::error::{ParseError, PayloadError}; use actix_web::error::{ParseError, PayloadError};
use actix_web::http::header::{ use actix_web::http::header::{
self, ContentDisposition, HeaderMap, HeaderName, HeaderValue, self, ContentDisposition, HeaderMap, HeaderName, HeaderValue,
@ -60,7 +62,7 @@ impl Multipart {
/// Create multipart instance for boundary. /// Create multipart instance for boundary.
pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
where where
S: Stream<Item = Bytes, Error = PayloadError> + 'static, S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
{ {
match Self::boundary(headers) { match Self::boundary(headers) {
Ok(boundary) => Multipart { Ok(boundary) => Multipart {
@ -104,22 +106,25 @@ impl Multipart {
} }
impl Stream for Multipart { impl Stream for Multipart {
type Item = Field; type Item = Result<Field, MultipartError>;
type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(err) = self.error.take() { if let Some(err) = self.error.take() {
Err(err) Poll::Ready(Some(Err(err)))
} else if self.safety.current() { } else if self.safety.current() {
let mut inner = self.inner.as_mut().unwrap().borrow_mut(); let this = self.get_mut();
if let Some(mut payload) = inner.payload.get_mut(&self.safety) { let mut inner = this.inner.as_mut().unwrap().borrow_mut();
payload.poll_stream()?; if let Some(mut payload) = inner.payload.get_mut(&this.safety) {
payload.poll_stream(cx)?;
} }
inner.poll(&self.safety) inner.poll(&this.safety, cx)
} else if !self.safety.is_clean() { } else if !self.safety.is_clean() {
Err(MultipartError::NotConsumed) Poll::Ready(Some(Err(MultipartError::NotConsumed)))
} else { } else {
Ok(Async::NotReady) Poll::Pending
} }
} }
} }
@ -238,9 +243,13 @@ impl InnerMultipart {
Ok(Some(eof)) Ok(Some(eof))
} }
fn poll(&mut self, safety: &Safety) -> Poll<Option<Field>, MultipartError> { fn poll(
&mut self,
safety: &Safety,
cx: &mut Context,
) -> Poll<Option<Result<Field, MultipartError>>> {
if self.state == InnerState::Eof { if self.state == InnerState::Eof {
Ok(Async::Ready(None)) Poll::Ready(None)
} else { } else {
// release field // release field
loop { loop {
@ -249,10 +258,13 @@ impl InnerMultipart {
if safety.current() { if safety.current() {
let stop = match self.item { let stop = match self.item {
InnerMultipartItem::Field(ref mut field) => { InnerMultipartItem::Field(ref mut field) => {
match field.borrow_mut().poll(safety)? { match field.borrow_mut().poll(safety) {
Async::NotReady => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
Async::Ready(Some(_)) => continue, Poll::Ready(Some(Ok(_))) => continue,
Async::Ready(None) => true, Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)))
}
Poll::Ready(None) => true,
} }
} }
InnerMultipartItem::None => false, InnerMultipartItem::None => false,
@ -277,12 +289,12 @@ impl InnerMultipart {
Some(eof) => { Some(eof) => {
if eof { if eof {
self.state = InnerState::Eof; self.state = InnerState::Eof;
return Ok(Async::Ready(None)); return Poll::Ready(None);
} else { } else {
self.state = InnerState::Headers; self.state = InnerState::Headers;
} }
} }
None => return Ok(Async::NotReady), None => return Poll::Pending,
} }
} }
// read boundary // read boundary
@ -291,11 +303,11 @@ impl InnerMultipart {
&mut *payload, &mut *payload,
&self.boundary, &self.boundary,
)? { )? {
None => return Ok(Async::NotReady), None => return Poll::Pending,
Some(eof) => { Some(eof) => {
if eof { if eof {
self.state = InnerState::Eof; self.state = InnerState::Eof;
return Ok(Async::Ready(None)); return Poll::Ready(None);
} else { } else {
self.state = InnerState::Headers; self.state = InnerState::Headers;
} }
@ -311,14 +323,14 @@ impl InnerMultipart {
self.state = InnerState::Boundary; self.state = InnerState::Boundary;
headers headers
} else { } else {
return Ok(Async::NotReady); return Poll::Pending;
} }
} else { } else {
unreachable!() unreachable!()
} }
} else { } else {
log::debug!("NotReady: field is in flight"); log::debug!("NotReady: field is in flight");
return Ok(Async::NotReady); return Poll::Pending;
}; };
// content type // content type
@ -335,7 +347,7 @@ impl InnerMultipart {
// nested multipart stream // nested multipart stream
if mt.type_() == mime::MULTIPART { if mt.type_() == mime::MULTIPART {
Err(MultipartError::Nested) Poll::Ready(Some(Err(MultipartError::Nested)))
} else { } else {
let field = Rc::new(RefCell::new(InnerField::new( let field = Rc::new(RefCell::new(InnerField::new(
self.payload.clone(), self.payload.clone(),
@ -344,12 +356,7 @@ impl InnerMultipart {
)?)); )?));
self.item = InnerMultipartItem::Field(Rc::clone(&field)); self.item = InnerMultipartItem::Field(Rc::clone(&field));
Ok(Async::Ready(Some(Field::new( Poll::Ready(Some(Ok(Field::new(safety.clone(cx), headers, mt, field))))
safety.clone(),
headers,
mt,
field,
))))
} }
} }
} }
@ -409,23 +416,21 @@ impl Field {
} }
impl Stream for Field { impl Stream for Field {
type Item = Bytes; type Item = Result<Bytes, MultipartError>;
type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.safety.current() { if self.safety.current() {
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
if let Some(mut payload) = if let Some(mut payload) =
inner.payload.as_ref().unwrap().get_mut(&self.safety) inner.payload.as_ref().unwrap().get_mut(&self.safety)
{ {
payload.poll_stream()?; payload.poll_stream(cx)?;
} }
inner.poll(&self.safety) inner.poll(&self.safety)
} else if !self.safety.is_clean() { } else if !self.safety.is_clean() {
Err(MultipartError::NotConsumed) Poll::Ready(Some(Err(MultipartError::NotConsumed)))
} else { } else {
Ok(Async::NotReady) Poll::Pending
} }
} }
} }
@ -482,9 +487,9 @@ impl InnerField {
fn read_len( fn read_len(
payload: &mut PayloadBuffer, payload: &mut PayloadBuffer,
size: &mut u64, size: &mut u64,
) -> Poll<Option<Bytes>, MultipartError> { ) -> Poll<Option<Result<Bytes, MultipartError>>> {
if *size == 0 { if *size == 0 {
Ok(Async::Ready(None)) Poll::Ready(None)
} else { } else {
match payload.read_max(*size)? { match payload.read_max(*size)? {
Some(mut chunk) => { Some(mut chunk) => {
@ -494,13 +499,13 @@ impl InnerField {
if !chunk.is_empty() { if !chunk.is_empty() {
payload.unprocessed(chunk); payload.unprocessed(chunk);
} }
Ok(Async::Ready(Some(ch))) Poll::Ready(Some(Ok(ch)))
} }
None => { None => {
if payload.eof && (*size != 0) { if payload.eof && (*size != 0) {
Err(MultipartError::Incomplete) Poll::Ready(Some(Err(MultipartError::Incomplete)))
} else { } else {
Ok(Async::NotReady) Poll::Pending
} }
} }
} }
@ -512,15 +517,15 @@ impl InnerField {
fn read_stream( fn read_stream(
payload: &mut PayloadBuffer, payload: &mut PayloadBuffer,
boundary: &str, boundary: &str,
) -> Poll<Option<Bytes>, MultipartError> { ) -> Poll<Option<Result<Bytes, MultipartError>>> {
let mut pos = 0; let mut pos = 0;
let len = payload.buf.len(); let len = payload.buf.len();
if len == 0 { if len == 0 {
return if payload.eof { return if payload.eof {
Err(MultipartError::Incomplete) Poll::Ready(Some(Err(MultipartError::Incomplete)))
} else { } else {
Ok(Async::NotReady) Poll::Pending
}; };
} }
@ -537,10 +542,10 @@ impl InnerField {
if let Some(b_len) = b_len { if let Some(b_len) = b_len {
let b_size = boundary.len() + b_len; let b_size = boundary.len() + b_len;
if len < b_size { if len < b_size {
return Ok(Async::NotReady); return Poll::Pending;
} else if &payload.buf[b_len..b_size] == boundary.as_bytes() { } else if &payload.buf[b_len..b_size] == boundary.as_bytes() {
// found boundary // found boundary
return Ok(Async::Ready(None)); return Poll::Ready(None);
} }
} }
} }
@ -552,9 +557,9 @@ impl InnerField {
// check if we have enough data for boundary detection // check if we have enough data for boundary detection
if cur + 4 > len { if cur + 4 > len {
if cur > 0 { if cur > 0 {
Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
} else { } else {
Ok(Async::NotReady) Poll::Pending
} }
} else { } else {
// check boundary // check boundary
@ -565,7 +570,7 @@ impl InnerField {
{ {
if cur != 0 { if cur != 0 {
// return buffer // return buffer
Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
} else { } else {
pos = cur + 1; pos = cur + 1;
continue; continue;
@ -577,49 +582,51 @@ impl InnerField {
} }
} }
} else { } else {
Ok(Async::Ready(Some(payload.buf.take().freeze()))) Poll::Ready(Some(Ok(payload.buf.take().freeze())))
}; };
} }
} }
fn poll(&mut self, s: &Safety) -> Poll<Option<Bytes>, MultipartError> { fn poll(&mut self, s: &Safety) -> Poll<Option<Result<Bytes, MultipartError>>> {
if self.payload.is_none() { if self.payload.is_none() {
return Ok(Async::Ready(None)); return Poll::Ready(None);
} }
let result = if let Some(mut payload) = self.payload.as_ref().unwrap().get_mut(s) let result = if let Some(mut payload) = self.payload.as_ref().unwrap().get_mut(s)
{ {
if !self.eof { if !self.eof {
let res = if let Some(ref mut len) = self.length { let res = if let Some(ref mut len) = self.length {
InnerField::read_len(&mut *payload, len)? InnerField::read_len(&mut *payload, len)
} else { } else {
InnerField::read_stream(&mut *payload, &self.boundary)? InnerField::read_stream(&mut *payload, &self.boundary)
}; };
match res { match res {
Async::NotReady => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
Async::Ready(Some(bytes)) => return Ok(Async::Ready(Some(bytes))), Poll::Ready(Some(Ok(bytes))) => return Poll::Ready(Some(Ok(bytes))),
Async::Ready(None) => self.eof = true, Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => self.eof = true,
} }
} }
match payload.readline()? { match payload.readline() {
None => Async::Ready(None), Ok(None) => Poll::Ready(None),
Some(line) => { Ok(Some(line)) => {
if line.as_ref() != b"\r\n" { if line.as_ref() != b"\r\n" {
log::warn!("multipart field did not read all the data or it is malformed"); log::warn!("multipart field did not read all the data or it is malformed");
} }
Async::Ready(None) Poll::Ready(None)
} }
Err(e) => Poll::Ready(Some(Err(e))),
} }
} else { } else {
Async::NotReady Poll::Pending
}; };
if Async::Ready(None) == result { if let Poll::Ready(None) = result {
self.payload.take(); self.payload.take();
} }
Ok(result) result
} }
} }
@ -659,7 +666,7 @@ impl Clone for PayloadRef {
/// most task. /// most task.
#[derive(Debug)] #[derive(Debug)]
struct Safety { struct Safety {
task: Option<Task>, task: LocalWaker,
level: usize, level: usize,
payload: Rc<PhantomData<bool>>, payload: Rc<PhantomData<bool>>,
clean: Rc<Cell<bool>>, clean: Rc<Cell<bool>>,
@ -669,7 +676,7 @@ impl Safety {
fn new() -> Safety { fn new() -> Safety {
let payload = Rc::new(PhantomData); let payload = Rc::new(PhantomData);
Safety { Safety {
task: None, task: LocalWaker::new(),
level: Rc::strong_count(&payload), level: Rc::strong_count(&payload),
clean: Rc::new(Cell::new(true)), clean: Rc::new(Cell::new(true)),
payload, payload,
@ -683,17 +690,17 @@ impl Safety {
fn is_clean(&self) -> bool { fn is_clean(&self) -> bool {
self.clean.get() self.clean.get()
} }
}
impl Clone for Safety { fn clone(&self, cx: &mut Context) -> Safety {
fn clone(&self) -> Safety {
let payload = Rc::clone(&self.payload); let payload = Rc::clone(&self.payload);
Safety { let s = Safety {
task: Some(current_task()), task: LocalWaker::new(),
level: Rc::strong_count(&payload), level: Rc::strong_count(&payload),
clean: self.clean.clone(), clean: self.clean.clone(),
payload, payload,
} };
s.task.register(cx.waker());
s
} }
} }
@ -704,7 +711,7 @@ impl Drop for Safety {
self.clean.set(true); self.clean.set(true);
} }
if let Some(task) = self.task.take() { if let Some(task) = self.task.take() {
task.notify() task.wake()
} }
} }
} }
@ -713,31 +720,32 @@ impl Drop for Safety {
struct PayloadBuffer { struct PayloadBuffer {
eof: bool, eof: bool,
buf: BytesMut, buf: BytesMut,
stream: Box<dyn Stream<Item = Bytes, Error = PayloadError>>, stream: LocalBoxStream<'static, Result<Bytes, PayloadError>>,
} }
impl PayloadBuffer { impl PayloadBuffer {
/// Create new `PayloadBuffer` instance /// Create new `PayloadBuffer` instance
fn new<S>(stream: S) -> Self fn new<S>(stream: S) -> Self
where where
S: Stream<Item = Bytes, Error = PayloadError> + 'static, S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{ {
PayloadBuffer { PayloadBuffer {
eof: false, eof: false,
buf: BytesMut::new(), buf: BytesMut::new(),
stream: Box::new(stream), stream: stream.boxed_local(),
} }
} }
fn poll_stream(&mut self) -> Result<(), PayloadError> { fn poll_stream(&mut self, cx: &mut Context) -> Result<(), PayloadError> {
loop { loop {
match self.stream.poll()? { match Pin::new(&mut self.stream).poll_next(cx) {
Async::Ready(Some(data)) => self.buf.extend_from_slice(&data), Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data),
Async::Ready(None) => { Poll::Ready(Some(Err(e))) => return Err(e),
Poll::Ready(None) => {
self.eof = true; self.eof = true;
return Ok(()); return Ok(());
} }
Async::NotReady => return Ok(()), Poll::Pending => return Ok(()),
} }
} }
} }
@ -800,13 +808,14 @@ impl PayloadBuffer {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_http::h1::Payload;
use bytes::Bytes;
use futures::unsync::mpsc;
use super::*; use super::*;
use actix_http::h1::Payload;
use actix_utils::mpsc;
use actix_web::http::header::{DispositionParam, DispositionType}; use actix_web::http::header::{DispositionParam, DispositionType};
use actix_web::test::run_on; use actix_web::test::block_on;
use bytes::Bytes;
use futures::future::lazy;
#[test] #[test]
fn test_boundary() { fn test_boundary() {
@ -852,12 +861,12 @@ mod tests {
} }
fn create_stream() -> ( fn create_stream() -> (
mpsc::UnboundedSender<Result<Bytes, PayloadError>>, mpsc::Sender<Result<Bytes, PayloadError>>,
impl Stream<Item = Bytes, Error = PayloadError>, impl Stream<Item = Result<Bytes, PayloadError>>,
) { ) {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::channel();
(tx, rx.map_err(|_| panic!()).and_then(|res| res)) (tx, rx.map(|res| res.map_err(|_| panic!())))
} }
fn create_simple_request_with_header() -> (Bytes, HeaderMap) { fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
@ -884,28 +893,28 @@ mod tests {
#[test] #[test]
fn test_multipart_no_end_crlf() { fn test_multipart_no_end_crlf() {
run_on(|| { block_on(async {
let (sender, payload) = create_stream(); let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header(); let (bytes, headers) = create_simple_request_with_header();
let bytes_stripped = bytes.slice_to(bytes.len()); // strip crlf let bytes_stripped = bytes.slice_to(bytes.len()); // strip crlf
sender.unbounded_send(Ok(bytes_stripped)).unwrap(); sender.send(Ok(bytes_stripped)).unwrap();
drop(sender); // eof drop(sender); // eof
let mut multipart = Multipart::new(&headers, payload); let mut multipart = Multipart::new(&headers, payload);
match multipart.poll().unwrap() { match multipart.next().await.unwrap() {
Async::Ready(Some(_)) => (), Ok(_) => (),
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll().unwrap() { match multipart.next().await.unwrap() {
Async::Ready(Some(_)) => (), Ok(_) => (),
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll().unwrap() { match multipart.next().await {
Async::Ready(None) => (), None => (),
_ => unreachable!(), _ => unreachable!(),
} }
}) })
@ -913,15 +922,15 @@ mod tests {
#[test] #[test]
fn test_multipart() { fn test_multipart() {
run_on(|| { block_on(async {
let (sender, payload) = create_stream(); let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header(); let (bytes, headers) = create_simple_request_with_header();
sender.unbounded_send(Ok(bytes)).unwrap(); sender.send(Ok(bytes)).unwrap();
let mut multipart = Multipart::new(&headers, payload); let mut multipart = Multipart::new(&headers, payload);
match multipart.poll().unwrap() { match multipart.next().await {
Async::Ready(Some(mut field)) => { Some(Ok(mut field)) => {
let cd = field.content_disposition().unwrap(); let cd = field.content_disposition().unwrap();
assert_eq!(cd.disposition, DispositionType::FormData); assert_eq!(cd.disposition, DispositionType::FormData);
assert_eq!(cd.parameters[0], DispositionParam::Name("file".into())); assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
@ -929,37 +938,37 @@ mod tests {
assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN); assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll().unwrap() { match field.next().await.unwrap() {
Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"), Ok(chunk) => assert_eq!(chunk, "test"),
_ => unreachable!(), _ => unreachable!(),
} }
match field.poll().unwrap() { match field.next().await {
Async::Ready(None) => (), None => (),
_ => unreachable!(), _ => unreachable!(),
} }
} }
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll().unwrap() { match multipart.next().await.unwrap() {
Async::Ready(Some(mut field)) => { Ok(mut field) => {
assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN); assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll() { match field.next().await {
Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "data"), Some(Ok(chunk)) => assert_eq!(chunk, "data"),
_ => unreachable!(), _ => unreachable!(),
} }
match field.poll() { match field.next().await {
Ok(Async::Ready(None)) => (), None => (),
_ => unreachable!(), _ => unreachable!(),
} }
} }
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll().unwrap() { match multipart.next().await {
Async::Ready(None) => (), None => (),
_ => unreachable!(), _ => unreachable!(),
} }
}); });
@ -967,15 +976,15 @@ mod tests {
#[test] #[test]
fn test_stream() { fn test_stream() {
run_on(|| { block_on(async {
let (sender, payload) = create_stream(); let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header(); let (bytes, headers) = create_simple_request_with_header();
sender.unbounded_send(Ok(bytes)).unwrap(); sender.send(Ok(bytes)).unwrap();
let mut multipart = Multipart::new(&headers, payload); let mut multipart = Multipart::new(&headers, payload);
match multipart.poll().unwrap() { match multipart.next().await.unwrap() {
Async::Ready(Some(mut field)) => { Ok(mut field) => {
let cd = field.content_disposition().unwrap(); let cd = field.content_disposition().unwrap();
assert_eq!(cd.disposition, DispositionType::FormData); assert_eq!(cd.disposition, DispositionType::FormData);
assert_eq!(cd.parameters[0], DispositionParam::Name("file".into())); assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
@ -983,37 +992,37 @@ mod tests {
assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN); assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll().unwrap() { match field.next().await.unwrap() {
Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"), Ok(chunk) => assert_eq!(chunk, "test"),
_ => unreachable!(), _ => unreachable!(),
} }
match field.poll().unwrap() { match field.next().await {
Async::Ready(None) => (), None => (),
_ => unreachable!(), _ => unreachable!(),
} }
} }
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll().unwrap() { match multipart.next().await {
Async::Ready(Some(mut field)) => { Some(Ok(mut field)) => {
assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN); assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll() { match field.next().await {
Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "data"), Some(Ok(chunk)) => assert_eq!(chunk, "data"),
_ => unreachable!(), _ => unreachable!(),
} }
match field.poll() { match field.next().await {
Ok(Async::Ready(None)) => (), None => (),
_ => unreachable!(), _ => unreachable!(),
} }
} }
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll().unwrap() { match multipart.next().await {
Async::Ready(None) => (), None => (),
_ => unreachable!(), _ => unreachable!(),
} }
}); });
@ -1021,26 +1030,26 @@ mod tests {
#[test] #[test]
fn test_basic() { fn test_basic() {
run_on(|| { block_on(async {
let (_, payload) = Payload::create(false); let (_, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(payload.buf.len(), 0); assert_eq!(payload.buf.len(), 0);
payload.poll_stream().unwrap(); lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(None, payload.read_max(1).unwrap()); assert_eq!(None, payload.read_max(1).unwrap());
}) })
} }
#[test] #[test]
fn test_eof() { fn test_eof() {
run_on(|| { block_on(async {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_max(4).unwrap()); assert_eq!(None, payload.read_max(4).unwrap());
sender.feed_data(Bytes::from("data")); sender.feed_data(Bytes::from("data"));
sender.feed_eof(); sender.feed_eof();
payload.poll_stream().unwrap(); lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap()); assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap());
assert_eq!(payload.buf.len(), 0); assert_eq!(payload.buf.len(), 0);
@ -1051,24 +1060,24 @@ mod tests {
#[test] #[test]
fn test_err() { fn test_err() {
run_on(|| { block_on(async {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_max(1).unwrap()); assert_eq!(None, payload.read_max(1).unwrap());
sender.set_error(PayloadError::Incomplete(None)); sender.set_error(PayloadError::Incomplete(None));
payload.poll_stream().err().unwrap(); lazy(|cx| payload.poll_stream(cx)).await.err().unwrap();
}) })
} }
#[test] #[test]
fn test_readmax() { fn test_readmax() {
run_on(|| { block_on(async {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
payload.poll_stream().unwrap(); lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf.len(), 10); assert_eq!(payload.buf.len(), 10);
assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap()); assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap());
@ -1081,7 +1090,7 @@ mod tests {
#[test] #[test]
fn test_readexactly() { fn test_readexactly() {
run_on(|| { block_on(async {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
@ -1089,7 +1098,7 @@ mod tests {
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
payload.poll_stream().unwrap(); lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2)); assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2));
assert_eq!(payload.buf.len(), 8); assert_eq!(payload.buf.len(), 8);
@ -1101,7 +1110,7 @@ mod tests {
#[test] #[test]
fn test_readuntil() { fn test_readuntil() {
run_on(|| { block_on(async {
let (mut sender, payload) = Payload::create(false); let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload); let mut payload = PayloadBuffer::new(payload);
@ -1109,7 +1118,7 @@ mod tests {
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
payload.poll_stream().unwrap(); lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!( assert_eq!(
Some(Bytes::from("line")), Some(Bytes::from("line")),