1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

move multipart support to separate crate

This commit is contained in:
Nikolay Kim 2019-04-03 12:28:58 -07:00
parent f56072954b
commit e738361e09
13 changed files with 454 additions and 134 deletions

View File

@ -1,5 +1,12 @@
# Changes # Changes
## [1.0.0-alpha.3] - 2019-04-xx
### Changed
* Move multipart support to actix-multipart crate
## [1.0.0-alpha.3] - 2019-04-02 ## [1.0.0-alpha.3] - 2019-04-02
### Changed ### Changed

View File

@ -30,6 +30,7 @@ members = [
"actix-http", "actix-http",
"actix-files", "actix-files",
"actix-session", "actix-session",
"actix-multipart",
"actix-web-actors", "actix-web-actors",
"actix-web-codegen", "actix-web-codegen",
"test-server", "test-server",
@ -83,7 +84,6 @@ derive_more = "0.14"
encoding = "0.2" encoding = "0.2"
futures = "0.1" futures = "0.1"
hashbrown = "0.1.8" hashbrown = "0.1.8"
httparse = "1.3"
log = "0.4" log = "0.4"
mime = "0.3" mime = "0.3"
net2 = "0.2.33" net2 = "0.2.33"

View File

@ -1,5 +1,7 @@
# Changes # Changes
## [0.1.0-alpha.4] - 2019-04-xx
### Deleted ### Deleted
* Removed PayloadBuffer * Removed PayloadBuffer

View File

@ -12,7 +12,7 @@ mod service;
pub use self::client::{ClientCodec, ClientPayloadCodec}; pub use self::client::{ClientCodec, ClientPayloadCodec};
pub use self::codec::Codec; pub use self::codec::Codec;
pub use self::dispatcher::Dispatcher; pub use self::dispatcher::Dispatcher;
pub use self::payload::Payload; pub use self::payload::{Payload, PayloadWriter};
pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; pub use self::service::{H1Service, H1ServiceHandler, OneRequest};
#[derive(Debug)] #[derive(Debug)]

View File

@ -14,7 +14,7 @@ use crate::error::PayloadError;
pub(crate) const MAX_BUFFER_SIZE: usize = 32_768; pub(crate) const MAX_BUFFER_SIZE: usize = 32_768;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub(crate) enum PayloadStatus { pub enum PayloadStatus {
Read, Read,
Pause, Pause,
Dropped, Dropped,
@ -106,7 +106,7 @@ impl Clone for Payload {
} }
/// Payload writer interface. /// Payload writer interface.
pub(crate) trait PayloadWriter { pub trait PayloadWriter {
/// Set stream error. /// Set stream error.
fn set_error(&mut self, err: PayloadError); fn set_error(&mut self, err: PayloadError);

View File

@ -0,0 +1,5 @@
# Changes
## [0.1.0-alpha.1] - 2019-04-xx
* Split multipart support to separate crate

View File

@ -0,0 +1,34 @@
[package]
name = "actix-multipart"
version = "0.1.0-alpha.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Multipart support for actix web framework."
readme = "README.md"
keywords = ["http", "web", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-web.git"
documentation = "https://docs.rs/actix-multipart/"
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
workspace = ".."
edition = "2018"
[lib]
name = "actix_multipart"
path = "src/lib.rs"
[dependencies]
actix-web = "1.0.0-alpha.3"
actix-service = "0.3.4"
bytes = "0.4"
derive_more = "0.14"
httparse = "1.3"
futures = "0.1.25"
log = "0.4"
mime = "0.3"
time = "0.1"
twoway = "0.2"
[dev-dependencies]
actix-rt = "0.2.2"
actix-http = "0.1.0-alpha.3"

View File

@ -0,0 +1 @@
# Multipart support for actix web framework [![Build Status](https://travis-ci.org/actix/actix-web.svg?branch=master)](https://travis-ci.org/actix/actix-web) [![codecov](https://codecov.io/gh/actix/actix-web/branch/master/graph/badge.svg)](https://codecov.io/gh/actix/actix-web) [![crates.io](https://meritbadge.herokuapp.com/actix-session)](https://crates.io/crates/actix-session) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

View File

@ -0,0 +1,46 @@
//! Error and Result module
use actix_web::error::{ParseError, PayloadError};
use actix_web::http::StatusCode;
use actix_web::{HttpResponse, ResponseError};
use derive_more::{Display, From};
/// A set of errors that can occur during parsing multipart streams
#[derive(Debug, Display, From)]
pub enum MultipartError {
/// Content-Type header is not found
#[display(fmt = "No Content-type header found")]
NoContentType,
/// Can not parse Content-Type header
#[display(fmt = "Can not parse Content-Type header")]
ParseContentType,
/// Multipart boundary is not found
#[display(fmt = "Multipart boundary is not found")]
Boundary,
/// Multipart stream is incomplete
#[display(fmt = "Multipart stream is incomplete")]
Incomplete,
/// Error during field parsing
#[display(fmt = "{}", _0)]
Parse(ParseError),
/// Payload error
#[display(fmt = "{}", _0)]
Payload(PayloadError),
}
/// Return `BadRequest` for `MultipartError`
impl ResponseError for MultipartError {
fn error_response(&self) -> HttpResponse {
HttpResponse::new(StatusCode::BAD_REQUEST)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_multipart_error() {
let resp: HttpResponse = MultipartError::Boundary.error_response();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}

View File

@ -0,0 +1,57 @@
//! Multipart payload support
use bytes::Bytes;
use futures::Stream;
use actix_web::dev::ServiceFromRequest;
use actix_web::error::{Error, PayloadError};
use actix_web::FromRequest;
use actix_web::HttpMessage;
use crate::server::Multipart;
/// Get request's payload as multipart stream
///
/// Content-type: multipart/form-data;
///
/// ## Server example
///
/// ```rust
/// # use futures::{Future, Stream};
/// # use futures::future::{ok, result, Either};
/// use actix_web::{web, HttpResponse, Error};
/// use actix_multipart as mp;
///
/// fn index(payload: mp::Multipart) -> impl Future<Item = HttpResponse, Error = Error> {
/// payload.from_err() // <- get multipart stream for current request
/// .and_then(|item| match item { // <- iterate over multipart items
/// mp::Item::Field(field) => {
/// // Field in turn is stream of *Bytes* object
/// Either::A(field.from_err()
/// .fold((), |_, chunk| {
/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk));
/// Ok::<_, Error>(())
/// }))
/// },
/// mp::Item::Nested(mp) => {
/// // Or item could be nested Multipart stream
/// Either::B(ok(()))
/// }
/// })
/// .fold((), |_, _| Ok::<_, Error>(()))
/// .map(|_| HttpResponse::Ok().into())
/// }
/// # fn main() {}
/// ```
impl<P> FromRequest<P> for Multipart
where
P: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
type Error = Error;
type Future = Result<Multipart, Error>;
#[inline]
fn from_request(req: &mut ServiceFromRequest<P>) -> Self::Future {
let pl = req.take_payload();
Ok(Multipart::new(req.headers(), pl))
}
}

View File

@ -0,0 +1,6 @@
mod error;
mod extractor;
mod server;
pub use self::error::MultipartError;
pub use self::server::{Field, Item, Multipart};

View File

@ -4,26 +4,22 @@ use std::marker::PhantomData;
use std::rc::Rc; use std::rc::Rc;
use std::{cmp, fmt}; use std::{cmp, fmt};
use bytes::Bytes; use bytes::{Bytes, BytesMut};
use futures::task::{current as current_task, Task}; use futures::task::{current as current_task, Task};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use httparse; use httparse;
use mime; use mime;
use crate::error::{Error, MultipartError, ParseError, PayloadError}; use actix_web::error::{ParseError, PayloadError};
use crate::extract::FromRequest; use actix_web::http::header::{
use crate::http::header::{
self, ContentDisposition, HeaderMap, HeaderName, HeaderValue, self, ContentDisposition, HeaderMap, HeaderName, HeaderValue,
}; };
use crate::http::HttpTryFrom; use actix_web::http::HttpTryFrom;
use crate::service::ServiceFromRequest;
use crate::HttpMessage; use crate::error::MultipartError;
const MAX_HEADERS: usize = 32; const MAX_HEADERS: usize = 32;
type PayloadBuffer =
actix_http::h1::PayloadBuffer<Box<dyn Stream<Item = Bytes, Error = PayloadError>>>;
/// The server-side implementation of `multipart/form-data` requests. /// The server-side implementation of `multipart/form-data` requests.
/// ///
/// This will parse the incoming stream into `MultipartItem` instances via its /// This will parse the incoming stream into `MultipartItem` instances via its
@ -37,59 +33,13 @@ pub struct Multipart {
} }
/// Multipart item /// Multipart item
pub enum MultipartItem { pub enum Item {
/// Multipart field /// Multipart field
Field(MultipartField), Field(Field),
/// Nested multipart stream /// Nested multipart stream
Nested(Multipart), Nested(Multipart),
} }
/// Get request's payload as multipart stream
///
/// Content-type: multipart/form-data;
///
/// ## Server example
///
/// ```rust
/// # use futures::{Future, Stream};
/// # use futures::future::{ok, result, Either};
/// use actix_web::{web, HttpResponse, Error};
///
/// fn index(payload: web::Multipart) -> impl Future<Item = HttpResponse, Error = Error> {
/// payload.from_err() // <- get multipart stream for current request
/// .and_then(|item| match item { // <- iterate over multipart items
/// web::MultipartItem::Field(field) => {
/// // Field in turn is stream of *Bytes* object
/// Either::A(field.from_err()
/// .fold((), |_, chunk| {
/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk));
/// Ok::<_, Error>(())
/// }))
/// },
/// web::MultipartItem::Nested(mp) => {
/// // Or item could be nested Multipart stream
/// Either::B(ok(()))
/// }
/// })
/// .fold((), |_, _| Ok::<_, Error>(()))
/// .map(|_| HttpResponse::Ok().into())
/// }
/// # fn main() {}
/// ```
impl<P> FromRequest<P> for Multipart
where
P: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
type Error = Error;
type Future = Result<Multipart, Error>;
#[inline]
fn from_request(req: &mut ServiceFromRequest<P>) -> Self::Future {
let pl = req.take_payload();
Ok(Multipart::new(req.headers(), pl))
}
}
enum InnerMultipartItem { enum InnerMultipartItem {
None, None,
Field(Rc<RefCell<InnerField>>), Field(Rc<RefCell<InnerField>>),
@ -163,14 +113,18 @@ impl Multipart {
} }
impl Stream for Multipart { impl Stream for Multipart {
type Item = MultipartItem; type Item = Item;
type Error = MultipartError; type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(err) = self.error.take() { if let Some(err) = self.error.take() {
Err(err) Err(err)
} else if self.safety.current() { } else if self.safety.current() {
self.inner.as_mut().unwrap().borrow_mut().poll(&self.safety) let mut inner = self.inner.as_mut().unwrap().borrow_mut();
if let Some(payload) = inner.payload.get_mut(&self.safety) {
payload.poll_stream()?;
}
inner.poll(&self.safety)
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
@ -178,11 +132,18 @@ impl Stream for Multipart {
} }
impl InnerMultipart { impl InnerMultipart {
fn read_headers(payload: &mut PayloadBuffer) -> Poll<HeaderMap, MultipartError> { fn read_headers(
match payload.read_until(b"\r\n\r\n")? { payload: &mut PayloadBuffer,
Async::NotReady => Ok(Async::NotReady), ) -> Result<Option<HeaderMap>, MultipartError> {
Async::Ready(None) => Err(MultipartError::Incomplete), match payload.read_until(b"\r\n\r\n") {
Async::Ready(Some(bytes)) => { None => {
if payload.eof {
Err(MultipartError::Incomplete)
} else {
Ok(None)
}
}
Some(bytes) => {
let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
match httparse::parse_headers(&bytes, &mut hdrs) { match httparse::parse_headers(&bytes, &mut hdrs) {
Ok(httparse::Status::Complete((_, hdrs))) => { Ok(httparse::Status::Complete((_, hdrs))) => {
@ -199,7 +160,7 @@ impl InnerMultipart {
return Err(ParseError::Header.into()); return Err(ParseError::Header.into());
} }
} }
Ok(Async::Ready(headers)) Ok(Some(headers))
} }
Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), Ok(httparse::Status::Partial) => Err(ParseError::Header.into()),
Err(err) => Err(ParseError::from(err).into()), Err(err) => Err(ParseError::from(err).into()),
@ -211,23 +172,28 @@ impl InnerMultipart {
fn read_boundary( fn read_boundary(
payload: &mut PayloadBuffer, payload: &mut PayloadBuffer,
boundary: &str, boundary: &str,
) -> Poll<bool, MultipartError> { ) -> Result<Option<bool>, MultipartError> {
// TODO: need to read epilogue // TODO: need to read epilogue
match payload.readline()? { match payload.readline() {
Async::NotReady => Ok(Async::NotReady), None => {
Async::Ready(None) => Err(MultipartError::Incomplete), if payload.eof {
Async::Ready(Some(chunk)) => { Err(MultipartError::Incomplete)
} else {
Ok(None)
}
}
Some(chunk) => {
if chunk.len() == boundary.len() + 4 if chunk.len() == boundary.len() + 4
&& &chunk[..2] == b"--" && &chunk[..2] == b"--"
&& &chunk[2..boundary.len() + 2] == boundary.as_bytes() && &chunk[2..boundary.len() + 2] == boundary.as_bytes()
{ {
Ok(Async::Ready(false)) Ok(Some(false))
} else if chunk.len() == boundary.len() + 6 } else if chunk.len() == boundary.len() + 6
&& &chunk[..2] == b"--" && &chunk[..2] == b"--"
&& &chunk[2..boundary.len() + 2] == boundary.as_bytes() && &chunk[2..boundary.len() + 2] == boundary.as_bytes()
&& &chunk[boundary.len() + 2..boundary.len() + 4] == b"--" && &chunk[boundary.len() + 2..boundary.len() + 4] == b"--"
{ {
Ok(Async::Ready(true)) Ok(Some(true))
} else { } else {
Err(MultipartError::Boundary) Err(MultipartError::Boundary)
} }
@ -238,11 +204,11 @@ impl InnerMultipart {
fn skip_until_boundary( fn skip_until_boundary(
payload: &mut PayloadBuffer, payload: &mut PayloadBuffer,
boundary: &str, boundary: &str,
) -> Poll<bool, MultipartError> { ) -> Result<Option<bool>, MultipartError> {
let mut eof = false; let mut eof = false;
loop { loop {
match payload.readline()? { match payload.readline() {
Async::Ready(Some(chunk)) => { Some(chunk) => {
if chunk.is_empty() { if chunk.is_empty() {
//ValueError("Could not find starting boundary %r" //ValueError("Could not find starting boundary %r"
//% (self._boundary)) //% (self._boundary))
@ -267,14 +233,19 @@ impl InnerMultipart {
} }
} }
} }
Async::NotReady => return Ok(Async::NotReady), None => {
Async::Ready(None) => return Err(MultipartError::Incomplete), return if payload.eof {
Err(MultipartError::Incomplete)
} else {
Ok(None)
};
}
} }
} }
Ok(Async::Ready(eof)) Ok(Some(eof))
} }
fn poll(&mut self, safety: &Safety) -> Poll<Option<MultipartItem>, MultipartError> { fn poll(&mut self, safety: &Safety) -> Poll<Option<Item>, MultipartError> {
if self.state == InnerState::Eof { if self.state == InnerState::Eof {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
@ -317,7 +288,7 @@ impl InnerMultipart {
payload, payload,
&self.boundary, &self.boundary,
)? { )? {
Async::Ready(eof) => { Some(eof) => {
if eof { if eof {
self.state = InnerState::Eof; self.state = InnerState::Eof;
return Ok(Async::Ready(None)); return Ok(Async::Ready(None));
@ -325,14 +296,14 @@ impl InnerMultipart {
self.state = InnerState::Headers; self.state = InnerState::Headers;
} }
} }
Async::NotReady => return Ok(Async::NotReady), None => return Ok(Async::NotReady),
} }
} }
// read boundary // read boundary
InnerState::Boundary => { InnerState::Boundary => {
match InnerMultipart::read_boundary(payload, &self.boundary)? { match InnerMultipart::read_boundary(payload, &self.boundary)? {
Async::NotReady => return Ok(Async::NotReady), None => return Ok(Async::NotReady),
Async::Ready(eof) => { Some(eof) => {
if eof { if eof {
self.state = InnerState::Eof; self.state = InnerState::Eof;
return Ok(Async::Ready(None)); return Ok(Async::Ready(None));
@ -347,8 +318,7 @@ impl InnerMultipart {
// read field headers for next field // read field headers for next field
if self.state == InnerState::Headers { if self.state == InnerState::Headers {
if let Async::Ready(headers) = InnerMultipart::read_headers(payload)? if let Some(headers) = InnerMultipart::read_headers(payload)? {
{
self.state = InnerState::Boundary; self.state = InnerState::Boundary;
headers headers
} else { } else {
@ -389,7 +359,7 @@ impl InnerMultipart {
self.item = InnerMultipartItem::Multipart(Rc::clone(&inner)); self.item = InnerMultipartItem::Multipart(Rc::clone(&inner));
Ok(Async::Ready(Some(MultipartItem::Nested(Multipart { Ok(Async::Ready(Some(Item::Nested(Multipart {
safety: safety.clone(), safety: safety.clone(),
error: None, error: None,
inner: Some(inner), inner: Some(inner),
@ -402,9 +372,12 @@ impl InnerMultipart {
)?)); )?));
self.item = InnerMultipartItem::Field(Rc::clone(&field)); self.item = InnerMultipartItem::Field(Rc::clone(&field));
Ok(Async::Ready(Some(MultipartItem::Field( Ok(Async::Ready(Some(Item::Field(Field::new(
MultipartField::new(safety.clone(), headers, mt, field), safety.clone(),
)))) headers,
mt,
field,
)))))
} }
} }
} }
@ -418,21 +391,21 @@ impl Drop for InnerMultipart {
} }
/// A single field in a multipart stream /// A single field in a multipart stream
pub struct MultipartField { pub struct Field {
ct: mime::Mime, ct: mime::Mime,
headers: HeaderMap, headers: HeaderMap,
inner: Rc<RefCell<InnerField>>, inner: Rc<RefCell<InnerField>>,
safety: Safety, safety: Safety,
} }
impl MultipartField { impl Field {
fn new( fn new(
safety: Safety, safety: Safety,
headers: HeaderMap, headers: HeaderMap,
ct: mime::Mime, ct: mime::Mime,
inner: Rc<RefCell<InnerField>>, inner: Rc<RefCell<InnerField>>,
) -> Self { ) -> Self {
MultipartField { Field {
ct, ct,
headers, headers,
inner, inner,
@ -463,22 +436,28 @@ impl MultipartField {
} }
} }
impl Stream for MultipartField { impl Stream for Field {
type Item = Bytes; type Item = Bytes;
type Error = MultipartError; type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.safety.current() { if self.safety.current() {
self.inner.borrow_mut().poll(&self.safety) let mut inner = self.inner.borrow_mut();
if let Some(payload) = inner.payload.as_ref().unwrap().get_mut(&self.safety)
{
payload.poll_stream()?;
}
inner.poll(&self.safety)
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
} }
impl fmt::Debug for MultipartField { impl fmt::Debug for Field {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "\nMultipartField: {}", self.ct)?; writeln!(f, "\nField: {}", self.ct)?;
writeln!(f, " boundary: {}", self.inner.borrow().boundary)?; writeln!(f, " boundary: {}", self.inner.borrow().boundary)?;
writeln!(f, " headers:")?; writeln!(f, " headers:")?;
for (key, val) in self.headers.iter() { for (key, val) in self.headers.iter() {
@ -532,10 +511,8 @@ impl InnerField {
if *size == 0 { if *size == 0 {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
match payload.readany() { match payload.read_max(*size) {
Ok(Async::NotReady) => Ok(Async::NotReady), Some(mut chunk) => {
Ok(Async::Ready(None)) => Err(MultipartError::Incomplete),
Ok(Async::Ready(Some(mut chunk))) => {
let len = cmp::min(chunk.len() as u64, *size); let len = cmp::min(chunk.len() as u64, *size);
*size -= len; *size -= len;
let ch = chunk.split_to(len as usize); let ch = chunk.split_to(len as usize);
@ -544,7 +521,13 @@ impl InnerField {
} }
Ok(Async::Ready(Some(ch))) Ok(Async::Ready(Some(ch)))
} }
Err(err) => Err(err.into()), None => {
if payload.eof && (*size != 0) {
Err(MultipartError::Incomplete)
} else {
Ok(Async::NotReady)
}
}
} }
} }
} }
@ -555,16 +538,26 @@ impl InnerField {
payload: &mut PayloadBuffer, payload: &mut PayloadBuffer,
boundary: &str, boundary: &str,
) -> Poll<Option<Bytes>, MultipartError> { ) -> Poll<Option<Bytes>, MultipartError> {
match payload.read_until(b"\r")? { match payload.read_until(b"\r") {
Async::NotReady => Ok(Async::NotReady), None => {
Async::Ready(None) => Err(MultipartError::Incomplete), if payload.eof {
Async::Ready(Some(mut chunk)) => { Err(MultipartError::Incomplete)
} else {
Ok(Async::NotReady)
}
}
Some(mut chunk) => {
if chunk.len() == 1 { if chunk.len() == 1 {
payload.unprocessed(chunk); payload.unprocessed(chunk);
match payload.read_exact(boundary.len() + 4)? { match payload.read_exact(boundary.len() + 4) {
Async::NotReady => Ok(Async::NotReady), None => {
Async::Ready(None) => Err(MultipartError::Incomplete), if payload.eof {
Async::Ready(Some(mut chunk)) => { Err(MultipartError::Incomplete)
} else {
Ok(Async::NotReady)
}
}
Some(mut chunk) => {
if &chunk[..2] == b"\r\n" if &chunk[..2] == b"\r\n"
&& &chunk[2..4] == b"--" && &chunk[2..4] == b"--"
&& &chunk[4..] == boundary.as_bytes() && &chunk[4..] == boundary.as_bytes()
@ -606,10 +599,9 @@ impl InnerField {
Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)),
Async::Ready(None) => { Async::Ready(None) => {
self.eof = true; self.eof = true;
match payload.readline()? { match payload.readline() {
Async::NotReady => Async::NotReady, None => Async::Ready(None),
Async::Ready(None) => Async::Ready(None), Some(line) => {
Async::Ready(Some(line)) => {
if line.as_ref() != b"\r\n" { if line.as_ref() != b"\r\n" {
log::warn!("multipart field did not read all the data or it is malformed"); log::warn!("multipart field did not read all the data or it is malformed");
} }
@ -711,14 +703,86 @@ impl Drop for Safety {
} }
} }
/// Payload buffer
struct PayloadBuffer {
eof: bool,
buf: BytesMut,
stream: Box<dyn Stream<Item = Bytes, Error = PayloadError>>,
}
impl PayloadBuffer {
/// Create new `PayloadBuffer` instance
fn new<S>(stream: S) -> Self
where
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
{
PayloadBuffer {
eof: false,
buf: BytesMut::new(),
stream: Box::new(stream),
}
}
fn poll_stream(&mut self) -> Result<(), PayloadError> {
loop {
match self.stream.poll()? {
Async::Ready(Some(data)) => self.buf.extend_from_slice(&data),
Async::Ready(None) => {
self.eof = true;
return Ok(());
}
Async::NotReady => return Ok(()),
}
}
}
/// Read exact number of bytes
#[inline]
fn read_exact(&mut self, size: usize) -> Option<Bytes> {
if size <= self.buf.len() {
Some(self.buf.split_to(size).freeze())
} else {
None
}
}
fn read_max(&mut self, size: u64) -> Option<Bytes> {
if !self.buf.is_empty() {
let size = std::cmp::min(self.buf.len() as u64, size) as usize;
Some(self.buf.split_to(size).freeze())
} else {
None
}
}
/// Read until specified ending
pub fn read_until(&mut self, line: &[u8]) -> Option<Bytes> {
twoway::find_bytes(&self.buf, line)
.map(|idx| self.buf.split_to(idx + line.len()).freeze())
}
/// Read bytes until new line delimiter
pub fn readline(&mut self) -> Option<Bytes> {
self.read_until(b"\n")
}
/// Put unprocessed data back to the buffer
pub fn unprocessed(&mut self, data: Bytes) {
let buf = BytesMut::from(data);
let buf = std::mem::replace(&mut self.buf, buf);
self.buf.extend_from_slice(&buf);
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_http::h1::{Payload, PayloadWriter};
use bytes::Bytes; use bytes::Bytes;
use futures::unsync::mpsc; use futures::unsync::mpsc;
use super::*; use super::*;
use crate::http::header::{DispositionParam, DispositionType}; use actix_web::http::header::{DispositionParam, DispositionType};
use crate::test::run_on; use actix_web::test::run_on;
#[test] #[test]
fn test_boundary() { fn test_boundary() {
@ -799,9 +863,9 @@ mod tests {
); );
let mut multipart = Multipart::new(&headers, payload); let mut multipart = Multipart::new(&headers, payload);
match multipart.poll() { match multipart.poll().unwrap() {
Ok(Async::Ready(Some(item))) => match item { Async::Ready(Some(item)) => match item {
MultipartItem::Field(mut field) => { Item::Field(mut field) => {
{ {
let cd = field.content_disposition().unwrap(); let cd = field.content_disposition().unwrap();
assert_eq!(cd.disposition, DispositionType::FormData); assert_eq!(cd.disposition, DispositionType::FormData);
@ -813,12 +877,12 @@ mod tests {
assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN); assert_eq!(field.content_type().subtype(), mime::PLAIN);
match field.poll() { match field.poll().unwrap() {
Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "test"), Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"),
_ => unreachable!(), _ => unreachable!(),
} }
match field.poll() { match field.poll().unwrap() {
Ok(Async::Ready(None)) => (), Async::Ready(None) => (),
_ => unreachable!(), _ => unreachable!(),
} }
} }
@ -827,9 +891,9 @@ mod tests {
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll() { match multipart.poll().unwrap() {
Ok(Async::Ready(Some(item))) => match item { Async::Ready(Some(item)) => match item {
MultipartItem::Field(mut field) => { Item::Field(mut field) => {
assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().type_(), mime::TEXT);
assert_eq!(field.content_type().subtype(), mime::PLAIN); assert_eq!(field.content_type().subtype(), mime::PLAIN);
@ -847,10 +911,110 @@ mod tests {
_ => unreachable!(), _ => unreachable!(),
} }
match multipart.poll() { match multipart.poll().unwrap() {
Ok(Async::Ready(None)) => (), Async::Ready(None) => (),
_ => unreachable!(), _ => unreachable!(),
} }
}); });
} }
#[test]
fn test_basic() {
run_on(|| {
let (_, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(payload.buf.len(), 0);
payload.poll_stream().unwrap();
assert_eq!(None, payload.read_max(1));
})
}
#[test]
fn test_eof() {
run_on(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_max(4));
sender.feed_data(Bytes::from("data"));
sender.feed_eof();
payload.poll_stream().unwrap();
assert_eq!(Some(Bytes::from("data")), payload.read_max(4));
assert_eq!(payload.buf.len(), 0);
assert_eq!(None, payload.read_max(1));
assert!(payload.eof);
})
}
#[test]
fn test_err() {
run_on(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_max(1));
sender.set_error(PayloadError::Incomplete(None));
payload.poll_stream().err().unwrap();
})
}
#[test]
fn test_readmax() {
run_on(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
payload.poll_stream().unwrap();
assert_eq!(payload.buf.len(), 10);
assert_eq!(Some(Bytes::from("line1")), payload.read_max(5));
assert_eq!(payload.buf.len(), 5);
assert_eq!(Some(Bytes::from("line2")), payload.read_max(5));
assert_eq!(payload.buf.len(), 0);
})
}
#[test]
fn test_readexactly() {
run_on(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_exact(2));
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
payload.poll_stream().unwrap();
assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2));
assert_eq!(payload.buf.len(), 8);
assert_eq!(Some(Bytes::from_static(b"ne1l")), payload.read_exact(4));
assert_eq!(payload.buf.len(), 4);
})
}
#[test]
fn test_readuntil() {
run_on(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(None, payload.read_until(b"ne"));
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
payload.poll_stream().unwrap();
assert_eq!(Some(Bytes::from("line")), payload.read_until(b"ne"));
assert_eq!(payload.buf.len(), 6);
assert_eq!(Some(Bytes::from("1line2")), payload.read_until(b"2"));
assert_eq!(payload.buf.len(), 0);
})
}
} }

View File

@ -2,7 +2,6 @@
pub(crate) mod form; pub(crate) mod form;
pub(crate) mod json; pub(crate) mod json;
mod multipart;
mod path; mod path;
pub(crate) mod payload; pub(crate) mod payload;
mod query; mod query;
@ -10,7 +9,6 @@ pub(crate) mod readlines;
pub use self::form::{Form, FormConfig}; pub use self::form::{Form, FormConfig};
pub use self::json::{Json, JsonConfig}; pub use self::json::{Json, JsonConfig};
pub use self::multipart::{Multipart, MultipartField, MultipartItem};
pub use self::path::Path; pub use self::path::Path;
pub use self::payload::{Payload, PayloadConfig}; pub use self::payload::{Payload, PayloadConfig};
pub use self::query::Query; pub use self::query::Query;