From e738361e09b7533ab77f5269400b6429622e6a67 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 3 Apr 2019 12:28:58 -0700 Subject: [PATCH] move multipart support to separate crate --- CHANGES.md | 7 + Cargo.toml | 2 +- actix-http/CHANGES.md | 2 + actix-http/src/h1/mod.rs | 2 +- actix-http/src/h1/payload.rs | 4 +- actix-multipart/CHANGES.md | 5 + actix-multipart/Cargo.toml | 34 ++ actix-multipart/README.md | 1 + actix-multipart/src/error.rs | 46 ++ actix-multipart/src/extractor.rs | 57 +++ actix-multipart/src/lib.rs | 6 + .../src/server.rs | 420 ++++++++++++------ src/types/mod.rs | 2 - 13 files changed, 454 insertions(+), 134 deletions(-) create mode 100644 actix-multipart/CHANGES.md create mode 100644 actix-multipart/Cargo.toml create mode 100644 actix-multipart/README.md create mode 100644 actix-multipart/src/error.rs create mode 100644 actix-multipart/src/extractor.rs create mode 100644 actix-multipart/src/lib.rs rename src/types/multipart.rs => actix-multipart/src/server.rs (70%) diff --git a/CHANGES.md b/CHANGES.md index d6ff547d5..fc690ee50 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [1.0.0-alpha.3] - 2019-04-xx + +### Changed + +* Move multipart support to actix-multipart crate + + ## [1.0.0-alpha.3] - 2019-04-02 ### Changed diff --git a/Cargo.toml b/Cargo.toml index 0e2fb32a9..507be4bb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "actix-http", "actix-files", "actix-session", + "actix-multipart", "actix-web-actors", "actix-web-codegen", "test-server", @@ -83,7 +84,6 @@ derive_more = "0.14" encoding = "0.2" futures = "0.1" hashbrown = "0.1.8" -httparse = "1.3" log = "0.4" mime = "0.3" net2 = "0.2.33" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index eef0bdaf8..3ae481db4 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,5 +1,7 @@ # Changes +## [0.1.0-alpha.4] - 2019-04-xx + ### Deleted * Removed PayloadBuffer diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 3bf69b38e..a05f2800c 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -12,7 +12,7 @@ mod service; pub use self::client::{ClientCodec, ClientPayloadCodec}; pub use self::codec::Codec; pub use self::dispatcher::Dispatcher; -pub use self::payload::Payload; +pub use self::payload::{Payload, PayloadWriter}; pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; #[derive(Debug)] diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index 187962259..bef87f7dc 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -14,7 +14,7 @@ use crate::error::PayloadError; pub(crate) const MAX_BUFFER_SIZE: usize = 32_768; #[derive(Debug, PartialEq)] -pub(crate) enum PayloadStatus { +pub enum PayloadStatus { Read, Pause, Dropped, @@ -106,7 +106,7 @@ impl Clone for Payload { } /// Payload writer interface. -pub(crate) trait PayloadWriter { +pub trait PayloadWriter { /// Set stream error. fn set_error(&mut self, err: PayloadError); diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md new file mode 100644 index 000000000..6be07f2e2 --- /dev/null +++ b/actix-multipart/CHANGES.md @@ -0,0 +1,5 @@ +# Changes + +## [0.1.0-alpha.1] - 2019-04-xx + +* Split multipart support to separate crate diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml new file mode 100644 index 000000000..006f7066a --- /dev/null +++ b/actix-multipart/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "actix-multipart" +version = "0.1.0-alpha.1" +authors = ["Nikolay Kim "] +description = "Multipart support for actix web framework." +readme = "README.md" +keywords = ["http", "web", "framework", "async", "futures"] +homepage = "https://actix.rs" +repository = "https://github.com/actix/actix-web.git" +documentation = "https://docs.rs/actix-multipart/" +license = "MIT/Apache-2.0" +exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] +workspace = ".." +edition = "2018" + +[lib] +name = "actix_multipart" +path = "src/lib.rs" + +[dependencies] +actix-web = "1.0.0-alpha.3" +actix-service = "0.3.4" +bytes = "0.4" +derive_more = "0.14" +httparse = "1.3" +futures = "0.1.25" +log = "0.4" +mime = "0.3" +time = "0.1" +twoway = "0.2" + +[dev-dependencies] +actix-rt = "0.2.2" +actix-http = "0.1.0-alpha.3" \ No newline at end of file diff --git a/actix-multipart/README.md b/actix-multipart/README.md new file mode 100644 index 000000000..2a65840a1 --- /dev/null +++ b/actix-multipart/README.md @@ -0,0 +1 @@ +# Multipart support for actix web framework [![Build Status](https://travis-ci.org/actix/actix-web.svg?branch=master)](https://travis-ci.org/actix/actix-web) [![codecov](https://codecov.io/gh/actix/actix-web/branch/master/graph/badge.svg)](https://codecov.io/gh/actix/actix-web) [![crates.io](https://meritbadge.herokuapp.com/actix-session)](https://crates.io/crates/actix-session) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) diff --git a/actix-multipart/src/error.rs b/actix-multipart/src/error.rs new file mode 100644 index 000000000..1b872187d --- /dev/null +++ b/actix-multipart/src/error.rs @@ -0,0 +1,46 @@ +//! Error and Result module +use actix_web::error::{ParseError, PayloadError}; +use actix_web::http::StatusCode; +use actix_web::{HttpResponse, ResponseError}; +use derive_more::{Display, From}; + +/// A set of errors that can occur during parsing multipart streams +#[derive(Debug, Display, From)] +pub enum MultipartError { + /// Content-Type header is not found + #[display(fmt = "No Content-type header found")] + NoContentType, + /// Can not parse Content-Type header + #[display(fmt = "Can not parse Content-Type header")] + ParseContentType, + /// Multipart boundary is not found + #[display(fmt = "Multipart boundary is not found")] + Boundary, + /// Multipart stream is incomplete + #[display(fmt = "Multipart stream is incomplete")] + Incomplete, + /// Error during field parsing + #[display(fmt = "{}", _0)] + Parse(ParseError), + /// Payload error + #[display(fmt = "{}", _0)] + Payload(PayloadError), +} + +/// Return `BadRequest` for `MultipartError` +impl ResponseError for MultipartError { + fn error_response(&self) -> HttpResponse { + HttpResponse::new(StatusCode::BAD_REQUEST) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_multipart_error() { + let resp: HttpResponse = MultipartError::Boundary.error_response(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + } +} diff --git a/actix-multipart/src/extractor.rs b/actix-multipart/src/extractor.rs new file mode 100644 index 000000000..18c26c6fb --- /dev/null +++ b/actix-multipart/src/extractor.rs @@ -0,0 +1,57 @@ +//! Multipart payload support +use bytes::Bytes; +use futures::Stream; + +use actix_web::dev::ServiceFromRequest; +use actix_web::error::{Error, PayloadError}; +use actix_web::FromRequest; +use actix_web::HttpMessage; + +use crate::server::Multipart; + +/// Get request's payload as multipart stream +/// +/// Content-type: multipart/form-data; +/// +/// ## Server example +/// +/// ```rust +/// # use futures::{Future, Stream}; +/// # use futures::future::{ok, result, Either}; +/// use actix_web::{web, HttpResponse, Error}; +/// use actix_multipart as mp; +/// +/// fn index(payload: mp::Multipart) -> impl Future { +/// payload.from_err() // <- get multipart stream for current request +/// .and_then(|item| match item { // <- iterate over multipart items +/// mp::Item::Field(field) => { +/// // Field in turn is stream of *Bytes* object +/// Either::A(field.from_err() +/// .fold((), |_, chunk| { +/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk)); +/// Ok::<_, Error>(()) +/// })) +/// }, +/// mp::Item::Nested(mp) => { +/// // Or item could be nested Multipart stream +/// Either::B(ok(())) +/// } +/// }) +/// .fold((), |_, _| Ok::<_, Error>(())) +/// .map(|_| HttpResponse::Ok().into()) +/// } +/// # fn main() {} +/// ``` +impl

FromRequest

for Multipart +where + P: Stream + 'static, +{ + type Error = Error; + type Future = Result; + + #[inline] + fn from_request(req: &mut ServiceFromRequest

) -> Self::Future { + let pl = req.take_payload(); + Ok(Multipart::new(req.headers(), pl)) + } +} diff --git a/actix-multipart/src/lib.rs b/actix-multipart/src/lib.rs new file mode 100644 index 000000000..602c27931 --- /dev/null +++ b/actix-multipart/src/lib.rs @@ -0,0 +1,6 @@ +mod error; +mod extractor; +mod server; + +pub use self::error::MultipartError; +pub use self::server::{Field, Item, Multipart}; diff --git a/src/types/multipart.rs b/actix-multipart/src/server.rs similarity index 70% rename from src/types/multipart.rs rename to actix-multipart/src/server.rs index 65a64d5e1..c1536af60 100644 --- a/src/types/multipart.rs +++ b/actix-multipart/src/server.rs @@ -4,26 +4,22 @@ use std::marker::PhantomData; use std::rc::Rc; use std::{cmp, fmt}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::task::{current as current_task, Task}; use futures::{Async, Poll, Stream}; use httparse; use mime; -use crate::error::{Error, MultipartError, ParseError, PayloadError}; -use crate::extract::FromRequest; -use crate::http::header::{ +use actix_web::error::{ParseError, PayloadError}; +use actix_web::http::header::{ self, ContentDisposition, HeaderMap, HeaderName, HeaderValue, }; -use crate::http::HttpTryFrom; -use crate::service::ServiceFromRequest; -use crate::HttpMessage; +use actix_web::http::HttpTryFrom; + +use crate::error::MultipartError; const MAX_HEADERS: usize = 32; -type PayloadBuffer = - actix_http::h1::PayloadBuffer>>; - /// The server-side implementation of `multipart/form-data` requests. /// /// This will parse the incoming stream into `MultipartItem` instances via its @@ -37,59 +33,13 @@ pub struct Multipart { } /// Multipart item -pub enum MultipartItem { +pub enum Item { /// Multipart field - Field(MultipartField), + Field(Field), /// Nested multipart stream Nested(Multipart), } -/// Get request's payload as multipart stream -/// -/// Content-type: multipart/form-data; -/// -/// ## Server example -/// -/// ```rust -/// # use futures::{Future, Stream}; -/// # use futures::future::{ok, result, Either}; -/// use actix_web::{web, HttpResponse, Error}; -/// -/// fn index(payload: web::Multipart) -> impl Future { -/// payload.from_err() // <- get multipart stream for current request -/// .and_then(|item| match item { // <- iterate over multipart items -/// web::MultipartItem::Field(field) => { -/// // Field in turn is stream of *Bytes* object -/// Either::A(field.from_err() -/// .fold((), |_, chunk| { -/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk)); -/// Ok::<_, Error>(()) -/// })) -/// }, -/// web::MultipartItem::Nested(mp) => { -/// // Or item could be nested Multipart stream -/// Either::B(ok(())) -/// } -/// }) -/// .fold((), |_, _| Ok::<_, Error>(())) -/// .map(|_| HttpResponse::Ok().into()) -/// } -/// # fn main() {} -/// ``` -impl

FromRequest

for Multipart -where - P: Stream + 'static, -{ - type Error = Error; - type Future = Result; - - #[inline] - fn from_request(req: &mut ServiceFromRequest

) -> Self::Future { - let pl = req.take_payload(); - Ok(Multipart::new(req.headers(), pl)) - } -} - enum InnerMultipartItem { None, Field(Rc>), @@ -163,14 +113,18 @@ impl Multipart { } impl Stream for Multipart { - type Item = MultipartItem; + type Item = Item; type Error = MultipartError; fn poll(&mut self) -> Poll, Self::Error> { if let Some(err) = self.error.take() { Err(err) } else if self.safety.current() { - self.inner.as_mut().unwrap().borrow_mut().poll(&self.safety) + let mut inner = self.inner.as_mut().unwrap().borrow_mut(); + if let Some(payload) = inner.payload.get_mut(&self.safety) { + payload.poll_stream()?; + } + inner.poll(&self.safety) } else { Ok(Async::NotReady) } @@ -178,11 +132,18 @@ impl Stream for Multipart { } impl InnerMultipart { - fn read_headers(payload: &mut PayloadBuffer) -> Poll { - match payload.read_until(b"\r\n\r\n")? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(None) => Err(MultipartError::Incomplete), - Async::Ready(Some(bytes)) => { + fn read_headers( + payload: &mut PayloadBuffer, + ) -> Result, MultipartError> { + match payload.read_until(b"\r\n\r\n") { + None => { + if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(None) + } + } + Some(bytes) => { let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; match httparse::parse_headers(&bytes, &mut hdrs) { Ok(httparse::Status::Complete((_, hdrs))) => { @@ -199,7 +160,7 @@ impl InnerMultipart { return Err(ParseError::Header.into()); } } - Ok(Async::Ready(headers)) + Ok(Some(headers)) } Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), Err(err) => Err(ParseError::from(err).into()), @@ -211,23 +172,28 @@ impl InnerMultipart { fn read_boundary( payload: &mut PayloadBuffer, boundary: &str, - ) -> Poll { + ) -> Result, MultipartError> { // TODO: need to read epilogue - match payload.readline()? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(None) => Err(MultipartError::Incomplete), - Async::Ready(Some(chunk)) => { + match payload.readline() { + None => { + if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(None) + } + } + Some(chunk) => { if chunk.len() == boundary.len() + 4 && &chunk[..2] == b"--" && &chunk[2..boundary.len() + 2] == boundary.as_bytes() { - Ok(Async::Ready(false)) + Ok(Some(false)) } else if chunk.len() == boundary.len() + 6 && &chunk[..2] == b"--" && &chunk[2..boundary.len() + 2] == boundary.as_bytes() && &chunk[boundary.len() + 2..boundary.len() + 4] == b"--" { - Ok(Async::Ready(true)) + Ok(Some(true)) } else { Err(MultipartError::Boundary) } @@ -238,11 +204,11 @@ impl InnerMultipart { fn skip_until_boundary( payload: &mut PayloadBuffer, boundary: &str, - ) -> Poll { + ) -> Result, MultipartError> { let mut eof = false; loop { - match payload.readline()? { - Async::Ready(Some(chunk)) => { + match payload.readline() { + Some(chunk) => { if chunk.is_empty() { //ValueError("Could not find starting boundary %r" //% (self._boundary)) @@ -267,14 +233,19 @@ impl InnerMultipart { } } } - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(None) => return Err(MultipartError::Incomplete), + None => { + return if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(None) + }; + } } } - Ok(Async::Ready(eof)) + Ok(Some(eof)) } - fn poll(&mut self, safety: &Safety) -> Poll, MultipartError> { + fn poll(&mut self, safety: &Safety) -> Poll, MultipartError> { if self.state == InnerState::Eof { Ok(Async::Ready(None)) } else { @@ -317,7 +288,7 @@ impl InnerMultipart { payload, &self.boundary, )? { - Async::Ready(eof) => { + Some(eof) => { if eof { self.state = InnerState::Eof; return Ok(Async::Ready(None)); @@ -325,14 +296,14 @@ impl InnerMultipart { self.state = InnerState::Headers; } } - Async::NotReady => return Ok(Async::NotReady), + None => return Ok(Async::NotReady), } } // read boundary InnerState::Boundary => { match InnerMultipart::read_boundary(payload, &self.boundary)? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(eof) => { + None => return Ok(Async::NotReady), + Some(eof) => { if eof { self.state = InnerState::Eof; return Ok(Async::Ready(None)); @@ -347,8 +318,7 @@ impl InnerMultipart { // read field headers for next field if self.state == InnerState::Headers { - if let Async::Ready(headers) = InnerMultipart::read_headers(payload)? - { + if let Some(headers) = InnerMultipart::read_headers(payload)? { self.state = InnerState::Boundary; headers } else { @@ -389,7 +359,7 @@ impl InnerMultipart { self.item = InnerMultipartItem::Multipart(Rc::clone(&inner)); - Ok(Async::Ready(Some(MultipartItem::Nested(Multipart { + Ok(Async::Ready(Some(Item::Nested(Multipart { safety: safety.clone(), error: None, inner: Some(inner), @@ -402,9 +372,12 @@ impl InnerMultipart { )?)); self.item = InnerMultipartItem::Field(Rc::clone(&field)); - Ok(Async::Ready(Some(MultipartItem::Field( - MultipartField::new(safety.clone(), headers, mt, field), - )))) + Ok(Async::Ready(Some(Item::Field(Field::new( + safety.clone(), + headers, + mt, + field, + ))))) } } } @@ -418,21 +391,21 @@ impl Drop for InnerMultipart { } /// A single field in a multipart stream -pub struct MultipartField { +pub struct Field { ct: mime::Mime, headers: HeaderMap, inner: Rc>, safety: Safety, } -impl MultipartField { +impl Field { fn new( safety: Safety, headers: HeaderMap, ct: mime::Mime, inner: Rc>, ) -> Self { - MultipartField { + Field { ct, headers, inner, @@ -463,22 +436,28 @@ impl MultipartField { } } -impl Stream for MultipartField { +impl Stream for Field { type Item = Bytes; type Error = MultipartError; fn poll(&mut self) -> Poll, Self::Error> { if self.safety.current() { - self.inner.borrow_mut().poll(&self.safety) + let mut inner = self.inner.borrow_mut(); + if let Some(payload) = inner.payload.as_ref().unwrap().get_mut(&self.safety) + { + payload.poll_stream()?; + } + + inner.poll(&self.safety) } else { Ok(Async::NotReady) } } } -impl fmt::Debug for MultipartField { +impl fmt::Debug for Field { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - writeln!(f, "\nMultipartField: {}", self.ct)?; + writeln!(f, "\nField: {}", self.ct)?; writeln!(f, " boundary: {}", self.inner.borrow().boundary)?; writeln!(f, " headers:")?; for (key, val) in self.headers.iter() { @@ -532,10 +511,8 @@ impl InnerField { if *size == 0 { Ok(Async::Ready(None)) } else { - match payload.readany() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => Err(MultipartError::Incomplete), - Ok(Async::Ready(Some(mut chunk))) => { + match payload.read_max(*size) { + Some(mut chunk) => { let len = cmp::min(chunk.len() as u64, *size); *size -= len; let ch = chunk.split_to(len as usize); @@ -544,7 +521,13 @@ impl InnerField { } Ok(Async::Ready(Some(ch))) } - Err(err) => Err(err.into()), + None => { + if payload.eof && (*size != 0) { + Err(MultipartError::Incomplete) + } else { + Ok(Async::NotReady) + } + } } } } @@ -555,16 +538,26 @@ impl InnerField { payload: &mut PayloadBuffer, boundary: &str, ) -> Poll, MultipartError> { - match payload.read_until(b"\r")? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(None) => Err(MultipartError::Incomplete), - Async::Ready(Some(mut chunk)) => { + match payload.read_until(b"\r") { + None => { + if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(Async::NotReady) + } + } + Some(mut chunk) => { if chunk.len() == 1 { payload.unprocessed(chunk); - match payload.read_exact(boundary.len() + 4)? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(None) => Err(MultipartError::Incomplete), - Async::Ready(Some(mut chunk)) => { + match payload.read_exact(boundary.len() + 4) { + None => { + if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(Async::NotReady) + } + } + Some(mut chunk) => { if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" && &chunk[4..] == boundary.as_bytes() @@ -606,10 +599,9 @@ impl InnerField { Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), Async::Ready(None) => { self.eof = true; - match payload.readline()? { - Async::NotReady => Async::NotReady, - Async::Ready(None) => Async::Ready(None), - Async::Ready(Some(line)) => { + match payload.readline() { + None => Async::Ready(None), + Some(line) => { if line.as_ref() != b"\r\n" { log::warn!("multipart field did not read all the data or it is malformed"); } @@ -711,14 +703,86 @@ impl Drop for Safety { } } +/// Payload buffer +struct PayloadBuffer { + eof: bool, + buf: BytesMut, + stream: Box>, +} + +impl PayloadBuffer { + /// Create new `PayloadBuffer` instance + fn new(stream: S) -> Self + where + S: Stream + 'static, + { + PayloadBuffer { + eof: false, + buf: BytesMut::new(), + stream: Box::new(stream), + } + } + + fn poll_stream(&mut self) -> Result<(), PayloadError> { + loop { + match self.stream.poll()? { + Async::Ready(Some(data)) => self.buf.extend_from_slice(&data), + Async::Ready(None) => { + self.eof = true; + return Ok(()); + } + Async::NotReady => return Ok(()), + } + } + } + + /// Read exact number of bytes + #[inline] + fn read_exact(&mut self, size: usize) -> Option { + if size <= self.buf.len() { + Some(self.buf.split_to(size).freeze()) + } else { + None + } + } + + fn read_max(&mut self, size: u64) -> Option { + if !self.buf.is_empty() { + let size = std::cmp::min(self.buf.len() as u64, size) as usize; + Some(self.buf.split_to(size).freeze()) + } else { + None + } + } + + /// Read until specified ending + pub fn read_until(&mut self, line: &[u8]) -> Option { + twoway::find_bytes(&self.buf, line) + .map(|idx| self.buf.split_to(idx + line.len()).freeze()) + } + + /// Read bytes until new line delimiter + pub fn readline(&mut self) -> Option { + self.read_until(b"\n") + } + + /// Put unprocessed data back to the buffer + pub fn unprocessed(&mut self, data: Bytes) { + let buf = BytesMut::from(data); + let buf = std::mem::replace(&mut self.buf, buf); + self.buf.extend_from_slice(&buf); + } +} + #[cfg(test)] mod tests { + use actix_http::h1::{Payload, PayloadWriter}; use bytes::Bytes; use futures::unsync::mpsc; use super::*; - use crate::http::header::{DispositionParam, DispositionType}; - use crate::test::run_on; + use actix_web::http::header::{DispositionParam, DispositionType}; + use actix_web::test::run_on; #[test] fn test_boundary() { @@ -799,9 +863,9 @@ mod tests { ); let mut multipart = Multipart::new(&headers, payload); - match multipart.poll() { - Ok(Async::Ready(Some(item))) => match item { - MultipartItem::Field(mut field) => { + match multipart.poll().unwrap() { + Async::Ready(Some(item)) => match item { + Item::Field(mut field) => { { let cd = field.content_disposition().unwrap(); assert_eq!(cd.disposition, DispositionType::FormData); @@ -813,12 +877,12 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.poll() { - Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "test"), + match field.poll().unwrap() { + Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"), _ => unreachable!(), } - match field.poll() { - Ok(Async::Ready(None)) => (), + match field.poll().unwrap() { + Async::Ready(None) => (), _ => unreachable!(), } } @@ -827,9 +891,9 @@ mod tests { _ => unreachable!(), } - match multipart.poll() { - Ok(Async::Ready(Some(item))) => match item { - MultipartItem::Field(mut field) => { + match multipart.poll().unwrap() { + Async::Ready(Some(item)) => match item { + Item::Field(mut field) => { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); @@ -847,10 +911,110 @@ mod tests { _ => unreachable!(), } - match multipart.poll() { - Ok(Async::Ready(None)) => (), + match multipart.poll().unwrap() { + Async::Ready(None) => (), _ => unreachable!(), } }); } + + #[test] + fn test_basic() { + run_on(|| { + let (_, payload) = Payload::create(false); + let mut payload = PayloadBuffer::new(payload); + + assert_eq!(payload.buf.len(), 0); + payload.poll_stream().unwrap(); + assert_eq!(None, payload.read_max(1)); + }) + } + + #[test] + fn test_eof() { + run_on(|| { + let (mut sender, payload) = Payload::create(false); + let mut payload = PayloadBuffer::new(payload); + + assert_eq!(None, payload.read_max(4)); + sender.feed_data(Bytes::from("data")); + sender.feed_eof(); + payload.poll_stream().unwrap(); + + assert_eq!(Some(Bytes::from("data")), payload.read_max(4)); + assert_eq!(payload.buf.len(), 0); + assert_eq!(None, payload.read_max(1)); + assert!(payload.eof); + }) + } + + #[test] + fn test_err() { + run_on(|| { + let (mut sender, payload) = Payload::create(false); + let mut payload = PayloadBuffer::new(payload); + assert_eq!(None, payload.read_max(1)); + sender.set_error(PayloadError::Incomplete(None)); + payload.poll_stream().err().unwrap(); + }) + } + + #[test] + fn test_readmax() { + run_on(|| { + let (mut sender, payload) = Payload::create(false); + let mut payload = PayloadBuffer::new(payload); + + sender.feed_data(Bytes::from("line1")); + sender.feed_data(Bytes::from("line2")); + payload.poll_stream().unwrap(); + assert_eq!(payload.buf.len(), 10); + + assert_eq!(Some(Bytes::from("line1")), payload.read_max(5)); + assert_eq!(payload.buf.len(), 5); + + assert_eq!(Some(Bytes::from("line2")), payload.read_max(5)); + assert_eq!(payload.buf.len(), 0); + }) + } + + #[test] + fn test_readexactly() { + run_on(|| { + let (mut sender, payload) = Payload::create(false); + let mut payload = PayloadBuffer::new(payload); + + assert_eq!(None, payload.read_exact(2)); + + sender.feed_data(Bytes::from("line1")); + sender.feed_data(Bytes::from("line2")); + payload.poll_stream().unwrap(); + + assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2)); + assert_eq!(payload.buf.len(), 8); + + assert_eq!(Some(Bytes::from_static(b"ne1l")), payload.read_exact(4)); + assert_eq!(payload.buf.len(), 4); + }) + } + + #[test] + fn test_readuntil() { + run_on(|| { + let (mut sender, payload) = Payload::create(false); + let mut payload = PayloadBuffer::new(payload); + + assert_eq!(None, payload.read_until(b"ne")); + + sender.feed_data(Bytes::from("line1")); + sender.feed_data(Bytes::from("line2")); + payload.poll_stream().unwrap(); + + assert_eq!(Some(Bytes::from("line")), payload.read_until(b"ne")); + assert_eq!(payload.buf.len(), 6); + + assert_eq!(Some(Bytes::from("1line2")), payload.read_until(b"2")); + assert_eq!(payload.buf.len(), 0); + }) + } } diff --git a/src/types/mod.rs b/src/types/mod.rs index 9a0a08801..30ee73091 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -2,7 +2,6 @@ pub(crate) mod form; pub(crate) mod json; -mod multipart; mod path; pub(crate) mod payload; mod query; @@ -10,7 +9,6 @@ pub(crate) mod readlines; pub use self::form::{Form, FormConfig}; pub use self::json::{Json, JsonConfig}; -pub use self::multipart::{Multipart, MultipartField, MultipartItem}; pub use self::path::Path; pub use self::payload::{Payload, PayloadConfig}; pub use self::query::Query;