1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-24 08:22:59 +01:00

refactor multipart stream

This commit is contained in:
Nikolay Kim 2018-02-26 05:55:07 +03:00
parent 6ef9c60361
commit 0a3b776aa7
6 changed files with 293 additions and 186 deletions

View File

@ -11,7 +11,7 @@ use futures::{Future, Stream};
use futures::future::{result, Either}; use futures::future::{result, Either};
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> fn index(req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>>
{ {
println!("{:?}", req); println!("{:?}", req);

View File

@ -294,6 +294,9 @@ pub enum MultipartError {
/// Multipart boundary is not found /// Multipart boundary is not found
#[fail(display="Multipart boundary is not found")] #[fail(display="Multipart boundary is not found")]
Boundary, Boundary,
/// Multipart stream is incomplete
#[fail(display="Multipart stream is incomplete")]
Incomplete,
/// Error during field parsing /// Error during field parsing
#[fail(display="{}", _0)] #[fail(display="{}", _0)]
Parse(#[cause] ParseError), Parse(#[cause] ParseError),

View File

@ -520,8 +520,9 @@ impl<S> HttpRequest<S> {
/// } /// }
/// # fn main() {} /// # fn main() {}
/// ``` /// ```
pub fn multipart(self) -> Multipart { pub fn multipart(self) -> Multipart<HttpRequest<S>> {
Multipart::from_request(self) let boundary = Multipart::boundary(self.headers());
Multipart::new(boundary, self)
} }
/// Parse `application/x-www-form-urlencoded` encoded body. /// Parse `application/x-www-form-urlencoded` encoded body.

View File

@ -188,27 +188,31 @@ mod tests {
#[test] #[test]
fn test_json_body() { fn test_json_body() {
let mut req = HttpRequest::default(); let req = HttpRequest::default();
let mut json = req.json::<MyObject>(); let mut json = req.json::<MyObject>();
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType);
let mut json = req.json::<MyObject>().content_type("text/json"); let mut req = HttpRequest::default();
req.headers_mut().insert(header::CONTENT_TYPE, req.headers_mut().insert(header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json")); header::HeaderValue::from_static("application/json"));
let mut json = req.json::<MyObject>().content_type("text/json");
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::ContentType);
let mut json = req.json::<MyObject>().limit(100); let mut req = HttpRequest::default();
req.headers_mut().insert(header::CONTENT_TYPE, req.headers_mut().insert(header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json")); header::HeaderValue::from_static("application/json"));
req.headers_mut().insert(header::CONTENT_LENGTH, req.headers_mut().insert(header::CONTENT_LENGTH,
header::HeaderValue::from_static("10000")); header::HeaderValue::from_static("10000"));
let mut json = req.json::<MyObject>().limit(100);
assert_eq!(json.poll().err().unwrap(), JsonPayloadError::Overflow); assert_eq!(json.poll().err().unwrap(), JsonPayloadError::Overflow);
let mut req = HttpRequest::default();
req.headers_mut().insert(header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"));
req.headers_mut().insert(header::CONTENT_LENGTH, req.headers_mut().insert(header::CONTENT_LENGTH,
header::HeaderValue::from_static("16")); header::HeaderValue::from_static("16"));
req.payload_mut().unread_data(Bytes::from_static(b"{\"name\": \"test\"}")); req.payload_mut().unread_data(Bytes::from_static(b"{\"name\": \"test\"}"));
let mut json = req.json::<MyObject>(); let mut json = req.json::<MyObject>();
assert_eq!(json.poll().ok().unwrap(), Async::Ready(MyObject{name: "test".to_owned()})); assert_eq!(json.poll().ok().unwrap(), Async::Ready(MyObject{name: "test".to_owned()}));
} }
} }

View File

@ -9,12 +9,11 @@ use httparse;
use bytes::Bytes; use bytes::Bytes;
use http::HttpTryFrom; use http::HttpTryFrom;
use http::header::{self, HeaderMap, HeaderName, HeaderValue}; use http::header::{self, HeaderMap, HeaderName, HeaderValue};
use futures::{Async, Future, Stream, Poll}; use futures::{Async, Stream, Poll};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use error::{ParseError, PayloadError, MultipartError}; use error::{ParseError, PayloadError, MultipartError};
use payload::Payload; use payload::PayloadHelper;
use httprequest::HttpRequest;
const MAX_HEADERS: usize = 32; const MAX_HEADERS: usize = 32;
@ -24,27 +23,24 @@ const MAX_HEADERS: usize = 32;
/// Stream implementation. /// Stream implementation.
/// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart` /// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart`
/// is used for nested multipart streams. /// is used for nested multipart streams.
#[derive(Debug)] pub struct Multipart<S> {
pub struct Multipart {
safety: Safety, safety: Safety,
error: Option<MultipartError>, error: Option<MultipartError>,
inner: Option<Rc<RefCell<InnerMultipart>>>, inner: Option<Rc<RefCell<InnerMultipart<S>>>>,
} }
/// ///
#[derive(Debug)] pub enum MultipartItem<S> {
pub enum MultipartItem {
/// Multipart field /// Multipart field
Field(Field), Field(Field<S>),
/// Nested multipart stream /// Nested multipart stream
Nested(Multipart), Nested(Multipart<S>),
} }
#[derive(Debug)] enum InnerMultipartItem<S> {
enum InnerMultipartItem {
None, None,
Field(Rc<RefCell<InnerField>>), Field(Rc<RefCell<InnerField<S>>>),
Multipart(Rc<RefCell<InnerMultipart>>), Multipart(Rc<RefCell<InnerMultipart<S>>>),
} }
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
@ -59,57 +55,14 @@ enum InnerState {
Headers, Headers,
} }
#[derive(Debug)] struct InnerMultipart<S> {
struct InnerMultipart { payload: PayloadRef<S>,
payload: PayloadRef,
boundary: String, boundary: String,
state: InnerState, state: InnerState,
item: InnerMultipartItem, item: InnerMultipartItem<S>,
} }
impl Multipart { impl Multipart<()> {
/// Create multipart instance for boundary.
pub fn new(boundary: String, payload: Payload) -> Multipart {
Multipart {
error: None,
safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(
InnerMultipart {
payload: PayloadRef::new(payload),
boundary: boundary,
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
})))
}
}
/// Create multipart instance for request.
pub fn from_request<S>(req: HttpRequest<S>) -> Multipart {
match Multipart::boundary(req.headers()) {
Ok(boundary) => Multipart::new(boundary, req.payload().clone()),
Err(err) =>
Multipart {
error: Some(err),
safety: Safety::new(),
inner: None,
}
}
}
// /// Create multipart instance for client response.
// pub fn from_response(resp: &mut ClientResponse) -> Multipart {
// match Multipart::boundary(resp.headers()) {
// Ok(boundary) => Multipart::new(boundary, resp.payload().clone()),
// Err(err) =>
// Multipart {
// error: Some(err),
// safety: Safety::new(),
// inner: None,
// }
// }
// }
/// Extract boundary info from headers. /// Extract boundary info from headers.
pub fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> { pub fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {
if let Some(content_type) = headers.get(header::CONTENT_TYPE) { if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
@ -132,8 +85,34 @@ impl Multipart {
} }
} }
impl Stream for Multipart { impl<S> Multipart<S> where S: Stream<Item=Bytes, Error=PayloadError> {
type Item = MultipartItem;
/// Create multipart instance for boundary.
pub fn new(boundary: Result<String, MultipartError>, stream: S) -> Multipart<S> {
match boundary {
Ok(boundary) => Multipart {
error: None,
safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(
InnerMultipart {
payload: PayloadRef::new(PayloadHelper::new(stream)),
boundary: boundary,
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
})))
},
Err(err) =>
Multipart {
error: Some(err),
safety: Safety::new(),
inner: None,
}
}
}
}
impl<S> Stream for Multipart<S> where S: Stream<Item=Bytes, Error=PayloadError> {
type Item = MultipartItem<S>;
type Error = MultipartError; type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -147,13 +126,14 @@ impl Stream for Multipart {
} }
} }
impl InnerMultipart { impl<S> InnerMultipart<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn read_headers(payload: &mut Payload) -> Poll<HeaderMap, MultipartError> fn read_headers(payload: &mut PayloadHelper<S>) -> Poll<HeaderMap, MultipartError>
{ {
match payload.readuntil(b"\r\n\r\n").poll()? { match payload.readuntil(b"\r\n\r\n")? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(bytes) => { Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(bytes)) => {
let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
match httparse::parse_headers(&bytes, &mut hdrs) { match httparse::parse_headers(&bytes, &mut hdrs) {
Ok(httparse::Status::Complete((_, hdrs))) => { Ok(httparse::Status::Complete((_, hdrs))) => {
@ -179,12 +159,14 @@ impl InnerMultipart {
} }
} }
fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, MultipartError> fn read_boundary(payload: &mut PayloadHelper<S>, boundary: &str)
-> Poll<bool, MultipartError>
{ {
// TODO: need to read epilogue // TODO: need to read epilogue
match payload.readline().poll()? { match payload.readline()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(chunk) => { Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(chunk)) => {
if chunk.len() == boundary.len() + 4 && if chunk.len() == boundary.len() + 4 &&
&chunk[..2] == b"--" && &chunk[..2] == b"--" &&
&chunk[2..boundary.len()+2] == boundary.as_bytes() &chunk[2..boundary.len()+2] == boundary.as_bytes()
@ -203,39 +185,42 @@ impl InnerMultipart {
} }
} }
fn skip_until_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, MultipartError> fn skip_until_boundary(payload: &mut PayloadHelper<S>, boundary: &str)
-> Poll<bool, MultipartError>
{ {
let mut eof = false; let mut eof = false;
loop { loop {
if let Async::Ready(chunk) = payload.readline().poll()? { match payload.readline()? {
if chunk.is_empty() { Async::Ready(Some(chunk)) => {
//ValueError("Could not find starting boundary %r" if chunk.is_empty() {
//% (self._boundary)) //ValueError("Could not find starting boundary %r"
} //% (self._boundary))
if chunk.len() < boundary.len() { }
continue if chunk.len() < boundary.len() {
}
if &chunk[..2] == b"--" && &chunk[2..chunk.len()-2] == boundary.as_bytes() {
break;
} else {
if chunk.len() < boundary.len() + 2{
continue continue
} }
let b: &[u8] = boundary.as_ref(); if &chunk[..2] == b"--" && &chunk[2..chunk.len()-2] == boundary.as_bytes() {
if &chunk[..boundary.len()] == b && break;
&chunk[boundary.len()..boundary.len()+2] == b"--" { } else {
eof = true; if chunk.len() < boundary.len() + 2{
break; continue
} }
} let b: &[u8] = boundary.as_ref();
} else { if &chunk[..boundary.len()] == b &&
return Ok(Async::NotReady) &chunk[boundary.len()..boundary.len()+2] == b"--" {
eof = true;
break;
}
}
},
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Err(MultipartError::Incomplete),
} }
} }
Ok(Async::Ready(eof)) Ok(Async::Ready(eof))
} }
fn poll(&mut self, safety: &Safety) -> Poll<Option<MultipartItem>, MultipartError> { fn poll(&mut self, safety: &Safety) -> Poll<Option<MultipartItem<S>>, MultipartError> {
if self.state == InnerState::Eof { if self.state == InnerState::Eof {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
@ -247,25 +232,18 @@ impl InnerMultipart {
let stop = match self.item { let stop = match self.item {
InnerMultipartItem::Field(ref mut field) => { InnerMultipartItem::Field(ref mut field) => {
match field.borrow_mut().poll(safety)? { match field.borrow_mut().poll(safety)? {
Async::NotReady => { Async::NotReady => return Ok(Async::NotReady),
return Ok(Async::NotReady) Async::Ready(Some(_)) => continue,
} Async::Ready(None) => true,
Async::Ready(Some(_)) =>
continue,
Async::Ready(None) =>
true,
} }
} },
InnerMultipartItem::Multipart(ref mut multipart) => { InnerMultipartItem::Multipart(ref mut multipart) => {
match multipart.borrow_mut().poll(safety)? { match multipart.borrow_mut().poll(safety)? {
Async::NotReady => Async::NotReady => return Ok(Async::NotReady),
return Ok(Async::NotReady), Async::Ready(Some(_)) => continue,
Async::Ready(Some(_)) => Async::Ready(None) => true,
continue,
Async::Ready(None) =>
true,
} }
} },
_ => false, _ => false,
}; };
if stop { if stop {
@ -281,25 +259,22 @@ impl InnerMultipart {
match self.state { match self.state {
// read until first boundary // read until first boundary
InnerState::FirstBoundary => { InnerState::FirstBoundary => {
if let Async::Ready(eof) = match InnerMultipart::skip_until_boundary(payload, &self.boundary)? {
InnerMultipart::skip_until_boundary(payload, &self.boundary)? Async::Ready(eof) => {
{ if eof {
if eof { self.state = InnerState::Eof;
self.state = InnerState::Eof; return Ok(Async::Ready(None));
return Ok(Async::Ready(None)); } else {
} else { self.state = InnerState::Headers;
self.state = InnerState::Headers; }
} },
} else { Async::NotReady => return Ok(Async::NotReady),
return Ok(Async::NotReady)
} }
} },
// read boundary // read boundary
InnerState::Boundary => { InnerState::Boundary => {
match InnerMultipart::read_boundary(payload, &self.boundary)? { match InnerMultipart::read_boundary(payload, &self.boundary)? {
Async::NotReady => { Async::NotReady => return Ok(Async::NotReady),
return Ok(Async::NotReady)
}
Async::Ready(eof) => { Async::Ready(eof) => {
if eof { if eof {
self.state = InnerState::Eof; self.state = InnerState::Eof;
@ -375,7 +350,7 @@ impl InnerMultipart {
} }
} }
impl Drop for InnerMultipart { impl<S> Drop for InnerMultipart<S> {
fn drop(&mut self) { fn drop(&mut self) {
// InnerMultipartItem::Field has to be dropped first because of Safety. // InnerMultipartItem::Field has to be dropped first because of Safety.
self.item = InnerMultipartItem::None; self.item = InnerMultipartItem::None;
@ -383,17 +358,17 @@ impl Drop for InnerMultipart {
} }
/// A single field in a multipart stream /// A single field in a multipart stream
pub struct Field { pub struct Field<S> {
ct: mime::Mime, ct: mime::Mime,
headers: HeaderMap, headers: HeaderMap,
inner: Rc<RefCell<InnerField>>, inner: Rc<RefCell<InnerField<S>>>,
safety: Safety, safety: Safety,
} }
impl Field { impl<S> Field<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn new(safety: Safety, headers: HeaderMap, fn new(safety: Safety, headers: HeaderMap,
ct: mime::Mime, inner: Rc<RefCell<InnerField>>) -> Self { ct: mime::Mime, inner: Rc<RefCell<InnerField<S>>>) -> Self {
Field { Field {
ct: ct, ct: ct,
headers: headers, headers: headers,
@ -411,7 +386,7 @@ impl Field {
} }
} }
impl Stream for Field { impl<S> Stream for Field<S> where S: Stream<Item=Bytes, Error=PayloadError> {
type Item = Bytes; type Item = Bytes;
type Error = MultipartError; type Error = MultipartError;
@ -424,7 +399,7 @@ impl Stream for Field {
} }
} }
impl fmt::Debug for Field { impl<S> fmt::Debug for Field<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = write!(f, "\nMultipartField: {}\n", self.ct); let res = write!(f, "\nMultipartField: {}\n", self.ct);
let _ = write!(f, " boundary: {}\n", self.inner.borrow().boundary); let _ = write!(f, " boundary: {}\n", self.inner.borrow().boundary);
@ -441,18 +416,17 @@ impl fmt::Debug for Field {
} }
} }
#[derive(Debug)] struct InnerField<S> {
struct InnerField { payload: Option<PayloadRef<S>>,
payload: Option<PayloadRef>,
boundary: String, boundary: String,
eof: bool, eof: bool,
length: Option<u64>, length: Option<u64>,
} }
impl InnerField { impl<S> InnerField<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn new(payload: PayloadRef, boundary: String, headers: &HeaderMap) fn new(payload: PayloadRef<S>, boundary: String, headers: &HeaderMap)
-> Result<InnerField, PayloadError> -> Result<InnerField<S>, PayloadError>
{ {
let len = if let Some(len) = headers.get(header::CONTENT_LENGTH) { let len = if let Some(len) = headers.get(header::CONTENT_LENGTH) {
if let Ok(s) = len.to_str() { if let Ok(s) = len.to_str() {
@ -477,14 +451,15 @@ impl InnerField {
/// Reads body part content chunk of the specified size. /// Reads body part content chunk of the specified size.
/// The body part must has `Content-Length` header with proper value. /// The body part must has `Content-Length` header with proper value.
fn read_len(payload: &mut Payload, size: &mut u64) -> Poll<Option<Bytes>, MultipartError> fn read_len(payload: &mut PayloadHelper<S>, size: &mut u64)
-> Poll<Option<Bytes>, MultipartError>
{ {
if *size == 0 { if *size == 0 {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
match payload.poll() { match payload.readany() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::Ready(None)) => Err(MultipartError::Incomplete),
Ok(Async::Ready(Some(mut chunk))) => { Ok(Async::Ready(Some(mut chunk))) => {
let len = cmp::min(chunk.len() as u64, *size); let len = cmp::min(chunk.len() as u64, *size);
*size -= len; *size -= len;
@ -501,16 +476,19 @@ impl InnerField {
/// Reads content chunk of body part with unknown length. /// Reads content chunk of body part with unknown length.
/// The `Content-Length` header for body part is not necessary. /// The `Content-Length` header for body part is not necessary.
fn read_stream(payload: &mut Payload, boundary: &str) -> Poll<Option<Bytes>, MultipartError> fn read_stream(payload: &mut PayloadHelper<S>, boundary: &str)
-> Poll<Option<Bytes>, MultipartError>
{ {
match payload.readuntil(b"\r").poll()? { match payload.readuntil(b"\r")? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(mut chunk) => { Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(mut chunk)) => {
if chunk.len() == 1 { if chunk.len() == 1 {
payload.unread_data(chunk); payload.unread_data(chunk);
match payload.readexactly(boundary.len() + 4).poll()? { match payload.readexactly(boundary.len() + 4)? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(chunk) => { Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(chunk)) => {
if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" && if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" &&
&chunk[4..] == boundary.as_bytes() &chunk[4..] == boundary.as_bytes()
{ {
@ -535,24 +513,6 @@ impl InnerField {
if self.payload.is_none() { if self.payload.is_none() {
return Ok(Async::Ready(None)) return Ok(Async::Ready(None))
} }
if self.eof {
if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) {
match payload.readline().poll()? {
Async::NotReady =>
return Ok(Async::NotReady),
Async::Ready(chunk) => {
assert_eq!(
chunk.as_ref(), b"\r\n",
"reader did not read all the data or it is malformed");
}
}
} else {
return Ok(Async::NotReady);
}
self.payload.take();
return Ok(Async::Ready(None))
}
let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) {
let res = if let Some(ref mut len) = self.length { let res = if let Some(ref mut len) = self.length {
@ -566,12 +526,13 @@ impl InnerField {
Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)),
Async::Ready(None) => { Async::Ready(None) => {
self.eof = true; self.eof = true;
match payload.readline().poll()? { match payload.readline()? {
Async::NotReady => Async::NotReady, Async::NotReady => Async::NotReady,
Async::Ready(chunk) => { Async::Ready(None) => Async::Ready(None),
assert_eq!( Async::Ready(Some(line)) => {
chunk.as_ref(), b"\r\n", if line.as_ref() != b"\r\n" {
"reader did not read all the data or it is malformed"); warn!("multipart field did not read all the data or it is malformed");
}
Async::Ready(None) Async::Ready(None)
} }
} }
@ -588,25 +549,22 @@ impl InnerField {
} }
} }
#[derive(Debug)] struct PayloadRef<S> {
struct PayloadRef { payload: Rc<PayloadHelper<S>>,
task: Option<Task>,
payload: Rc<Payload>,
} }
impl PayloadRef { impl<S> PayloadRef<S> where S: Stream<Item=Bytes, Error=PayloadError> {
fn new(payload: Payload) -> PayloadRef { fn new(payload: PayloadHelper<S>) -> PayloadRef<S> {
PayloadRef { PayloadRef {
task: None,
payload: Rc::new(payload), payload: Rc::new(payload),
} }
} }
fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut Payload> fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut PayloadHelper<S>>
where 'a: 'b where 'a: 'b
{ {
if s.current() { if s.current() {
let payload: &mut Payload = unsafe { let payload: &mut PayloadHelper<S> = unsafe {
&mut *(self.payload.as_ref() as *const _ as *mut _)}; &mut *(self.payload.as_ref() as *const _ as *mut _)};
Some(payload) Some(payload)
} else { } else {
@ -615,10 +573,9 @@ impl PayloadRef {
} }
} }
impl Clone for PayloadRef { impl<S> Clone for PayloadRef<S> {
fn clone(&self) -> PayloadRef { fn clone(&self) -> PayloadRef<S> {
PayloadRef { PayloadRef {
task: Some(current_task()),
payload: Rc::clone(&self.payload), payload: Rc::clone(&self.payload),
} }
} }
@ -733,7 +690,7 @@ mod tests {
sender.feed_data(bytes); sender.feed_data(bytes);
let mut multipart = Multipart::new( let mut multipart = Multipart::new(
"abbc761f78ff4d7cb7573b5a23f96ef0".to_owned(), payload); Ok("abbc761f78ff4d7cb7573b5a23f96ef0".to_owned()), payload);
match multipart.poll() { match multipart.poll() {
Ok(Async::Ready(Some(item))) => { Ok(Async::Ready(Some(item))) => {
match item { match item {

View File

@ -411,6 +411,148 @@ impl Inner {
} }
} }
pub struct PayloadHelper<S> {
len: usize,
items: VecDeque<Bytes>,
stream: S,
}
impl<S> PayloadHelper<S> where S: Stream<Item=Bytes, Error=PayloadError> {
pub fn new(stream: S) -> Self {
PayloadHelper {
len: 0,
items: VecDeque::new(),
stream: stream,
}
}
fn poll_stream(&mut self) -> Poll<bool, PayloadError> {
self.stream.poll().map(|res| {
match res {
Async::Ready(Some(data)) => {
self.len += data.len();
self.items.push_back(data);
Async::Ready(true)
},
Async::Ready(None) => Async::Ready(false),
Async::NotReady => Async::NotReady,
}
})
}
pub fn len(&self) -> usize {
self.len
}
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
Ok(Async::Ready(Some(data)))
} else {
match self.poll_stream()? {
Async::Ready(true) => self.readany(),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
pub fn readexactly(&mut self, size: usize) -> Poll<Option<Bytes>, PayloadError> {
if size <= self.len {
let mut buf = BytesMut::with_capacity(size);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
self.len -= rem;
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
return Ok(Async::Ready(Some(buf.freeze())))
}
}
}
match self.poll_stream()? {
Async::Ready(true) => self.readexactly(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
pub fn readuntil(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
let mut offset = 0;
let mut found = false;
let mut length = 0;
for no in 0..self.items.len() {
{
let chunk = &self.items[no];
for (pos, ch) in chunk.iter().enumerate() {
if *ch == line[idx] {
idx += 1;
if idx == line.len() {
num = no;
offset = pos+1;
length += pos+1;
found = true;
break;
}
} else {
idx = 0
}
}
if !found {
length += chunk.len()
}
}
if found {
let mut buf = BytesMut::with_capacity(length);
if num > 0 {
for _ in 0..num {
buf.extend_from_slice(&self.items.pop_front().unwrap());
}
}
if offset > 0 {
let mut chunk = self.items.pop_front().unwrap();
buf.extend_from_slice(&chunk.split_to(offset));
if !chunk.is_empty() {
self.items.push_front(chunk)
}
}
self.len -= length;
return Ok(Async::Ready(Some(buf.freeze())))
}
}
match self.poll_stream()? {
Async::Ready(true) => self.readuntil(line),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
pub fn readline(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.readuntil(b"\n")
}
pub fn unread_data(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_front(data);
}
pub fn remaining(&mut self) -> Bytes {
self.items.iter_mut()
.fold(BytesMut::new(), |mut b, c| {
b.extend_from_slice(c);
b
}).freeze()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;