From a2b98b31e8951a15ed94acc207d04c2bb17f1da3 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 25 Feb 2018 20:34:26 +0300 Subject: [PATCH] refactor payload related futures for HttpRequest --- examples/basics/src/main.rs | 2 +- examples/json/src/main.rs | 10 +- guide/src/qs_7.md | 20 ++-- src/httprequest.rs | 178 ++++++++++++++++++------------------ src/json.rs | 4 +- src/lib.rs | 4 +- src/multipart.rs | 2 +- 7 files changed, 109 insertions(+), 111 deletions(-) diff --git a/examples/basics/src/main.rs b/examples/basics/src/main.rs index f52b0954..b93f5f20 100644 --- a/examples/basics/src/main.rs +++ b/examples/basics/src/main.rs @@ -22,7 +22,7 @@ fn index(mut req: HttpRequest) -> Result { println!("{:?}", req); // example of ... - if let Ok(ch) = req.payload_mut().readany().poll() { + if let Ok(ch) = req.poll() { if let futures::Async::Ready(Some(d)) = ch { println!("{}", String::from_utf8_lossy(d.as_ref())); } diff --git a/examples/json/src/main.rs b/examples/json/src/main.rs index 719d7485..3247e5d6 100644 --- a/examples/json/src/main.rs +++ b/examples/json/src/main.rs @@ -34,9 +34,9 @@ fn index(req: HttpRequest) -> Box> { const MAX_SIZE: usize = 262_144; // max payload size is 256k /// This handler manually load request payload and parse serde json -fn index_manual(mut req: HttpRequest) -> Box> { - // readany() returns asynchronous stream of Bytes objects - req.payload_mut().readany() +fn index_manual(req: HttpRequest) -> Box> { + // HttpRequest is stream of Bytes objects + req // `Future::from_err` acts like `?` in that it coerces the error type from // the future into the final error type .from_err() @@ -63,8 +63,8 @@ fn index_manual(mut req: HttpRequest) -> Box Box> { - req.payload_mut().readany().concat2() +fn index_mjsonrust(req: HttpRequest) -> Box> { + req.concat2() .from_err() .and_then(|body| { // body is loaded, now we can deserialize json-rust diff --git a/guide/src/qs_7.md b/guide/src/qs_7.md index 3a96529a..81990361 100644 --- a/guide/src/qs_7.md +++ b/guide/src/qs_7.md @@ -109,7 +109,7 @@ struct MyObj {name: String, number: i32} fn index(mut req: HttpRequest) -> Box> { // `concat2` will asynchronously read each chunk of the request body and // return a single, concatenated, chunk - req.payload_mut().readany().concat2() + req.payload_mut().concat2() // `Future::from_err` acts like `?` in that it coerces the error type from // the future into the final error type .from_err() @@ -256,13 +256,13 @@ fn index(mut req: HttpRequest) -> Box> { ## Streaming request -Actix uses [*Payload*](../actix_web/payload/struct.Payload.html) object as request payload stream. -*HttpRequest* provides several methods, which can be used for payload access. -At the same time *Payload* implements *Stream* trait, so it could be used with various -stream combinators. Also *Payload* provides several convenience methods that return -future object that resolve to Bytes object. - -* *readany()* method returns *Stream* of *Bytes* objects. +*HttpRequest* is a stream of `Bytes` objects. It could be used to read request +body payload. At the same time actix uses +[*Payload*](../actix_web/payload/struct.Payload.html) object. +*HttpRequest* provides several methods, which can be used for +payload access.At the same time *Payload* implements *Stream* trait, so it +could be used with various stream combinators. Also *Payload* provides +several convenience methods that return future object that resolve to Bytes object. * *readexactly()* method returns *Future* that resolves when specified number of bytes get received. @@ -283,9 +283,7 @@ use futures::{Future, Stream}; fn index(mut req: HttpRequest) -> Box> { - req.payload() - .readany() - .from_err() + req.from_err() .fold((), |_, chunk| { println!("Chunk: {:?}", chunk); result::<_, error::PayloadError>(Ok(())) diff --git a/src/httprequest.rs b/src/httprequest.rs index e46a7c3e..279b7d97 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use std::collections::HashMap; use bytes::{Bytes, BytesMut}; use cookie::Cookie; -use futures::{Async, Future, Stream, Poll}; +use futures::{Future, Stream, Poll}; use http_range::HttpRange; use serde::de::DeserializeOwned; use mime::Mime; @@ -155,8 +155,8 @@ impl HttpRequest { HttpRequest(self.0.clone(), None, None) } - // get mutable reference for inner message - // mutable reference should not be returned as result for request's method + /// get mutable reference for inner message + /// mutable reference should not be returned as result for request's method #[inline(always)] #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] pub(crate) fn as_mut(&self) -> &mut HttpMessage { @@ -480,8 +480,8 @@ impl HttpRequest { /// } /// # fn main() {} /// ``` - pub fn body(&self) -> RequestBody { - RequestBody::from_request(self) + pub fn body(self) -> RequestBody { + RequestBody::from(self) } /// Return stream to http payload processes as multipart. @@ -518,7 +518,7 @@ impl HttpRequest { /// } /// # fn main() {} /// ``` - pub fn multipart(&mut self) -> Multipart { + pub fn multipart(self) -> Multipart { Multipart::from_request(self) } @@ -549,10 +549,8 @@ impl HttpRequest { /// } /// # fn main() {} /// ``` - pub fn urlencoded(&self) -> UrlEncoded { - UrlEncoded::from(self.payload().clone(), - self.headers(), - self.chunked().unwrap_or(false)) + pub fn urlencoded(self) -> UrlEncoded { + UrlEncoded::from(self) } /// Parse `application/json` encoded body. @@ -585,7 +583,7 @@ impl HttpRequest { /// } /// # fn main() {} /// ``` - pub fn json(&self) -> JsonBody { + pub fn json(self) -> JsonBody { JsonBody::from_request(self) } } @@ -638,49 +636,24 @@ impl fmt::Debug for HttpRequest { /// Future that resolves to a parsed urlencoded values. pub struct UrlEncoded { - pl: Payload, - body: BytesMut, - error: Option, + req: Option>, + limit: usize, + fut: Option, Error=UrlencodedError>>>, } impl UrlEncoded { - pub fn from(pl: Payload, headers: &HeaderMap, chunked: bool) -> UrlEncoded { - let mut encoded = UrlEncoded { - pl: pl, - body: BytesMut::new(), - error: None - }; - - if chunked { - encoded.error = Some(UrlencodedError::Chunked); - } else if let Some(len) = headers.get(header::CONTENT_LENGTH) { - if let Ok(s) = len.to_str() { - if let Ok(len) = s.parse::() { - if len > 262_144 { - encoded.error = Some(UrlencodedError::Overflow); - } - } else { - encoded.error = Some(UrlencodedError::UnknownLength); - } - } else { - encoded.error = Some(UrlencodedError::UnknownLength); - } + pub fn from(req: HttpRequest) -> UrlEncoded { + UrlEncoded { + req: Some(req.clone_without_state()), + limit: 262_144, + fut: None, } + } - // check content type - if encoded.error.is_none() { - if let Some(content_type) = headers.get(header::CONTENT_TYPE) { - if let Ok(content_type) = content_type.to_str() { - if content_type.to_lowercase() == "application/x-www-form-urlencoded" { - return encoded - } - } - } - encoded.error = Some(UrlencodedError::ContentType); - return encoded - } - - encoded + /// Change max size of payload. By default max size is 256Kb + pub fn limit(mut self, limit: usize) -> Self { + self.limit = limit; + self } } @@ -689,48 +662,76 @@ impl Future for UrlEncoded { type Error = UrlencodedError; fn poll(&mut self) -> Poll { - if let Some(err) = self.error.take() { - return Err(err) - } + if let Some(req) = self.req.take() { + if req.chunked().unwrap_or(false) { + return Err(UrlencodedError::Chunked) + } else if let Some(len) = req.headers().get(header::CONTENT_LENGTH) { + if let Ok(s) = len.to_str() { + if let Ok(len) = s.parse::() { + if len > 262_144 { + return Err(UrlencodedError::Overflow); + } + } else { + return Err(UrlencodedError::UnknownLength) + } + } else { + return Err(UrlencodedError::UnknownLength) + } + } - loop { - return match self.pl.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => { + // check content type + let mut err = true; + if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) { + if let Ok(content_type) = content_type.to_str() { + if content_type.to_lowercase() == "application/x-www-form-urlencoded" { + err = false; + } + } + } + if err { + return Err(UrlencodedError::ContentType); + } + + // future + let limit = self.limit; + let fut = req.from_err() + .fold(BytesMut::new(), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(UrlencodedError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .map(|body| { let mut m = HashMap::new(); - for (k, v) in form_urlencoded::parse(&self.body) { + for (k, v) in form_urlencoded::parse(&body) { m.insert(k.into(), v.into()); } - Ok(Async::Ready(m)) - }, - Ok(Async::Ready(Some(item))) => { - self.body.extend_from_slice(&item); - continue - }, - Err(err) => Err(err.into()), - } + m + }); + self.fut = Some(Box::new(fut)); } + + self.fut.as_mut().expect("UrlEncoded could not be used second time").poll() } } /// Future that resolves to a complete request body. pub struct RequestBody { - pl: Payload, - body: BytesMut, limit: usize, req: Option>, + fut: Option>>, } impl RequestBody { /// Create `RequestBody` for request. - pub fn from_request(req: &HttpRequest) -> RequestBody { - let pl = req.payload().clone(); + pub fn from(req: HttpRequest) -> RequestBody { RequestBody { - pl: pl, - body: BytesMut::new(), limit: 262_144, - req: Some(req.clone_without_state()) + req: Some(req.clone_without_state()), + fut: None, } } @@ -760,25 +761,24 @@ impl Future for RequestBody { return Err(PayloadError::UnknownLength); } } + + // future + let limit = self.limit; + self.fut = Some(Box::new( + req.from_err() + .fold(BytesMut::new(), move |mut body, chunk| { + if (body.len() + chunk.len()) > limit { + Err(PayloadError::Overflow) + } else { + body.extend_from_slice(&chunk); + Ok(body) + } + }) + .map(|body| body.freeze()) + )); } - loop { - return match self.pl.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => { - Ok(Async::Ready(self.body.take().freeze())) - }, - Ok(Async::Ready(Some(chunk))) => { - if (self.body.len() + chunk.len()) > self.limit { - Err(PayloadError::Overflow) - } else { - self.body.extend_from_slice(&chunk); - continue - } - }, - Err(err) => Err(err), - } - } + self.fut.as_mut().expect("UrlEncoded could not be used second time").poll() } } diff --git a/src/json.rs b/src/json.rs index 86e61204..7f13db84 100644 --- a/src/json.rs +++ b/src/json.rs @@ -86,10 +86,10 @@ pub struct JsonBody{ impl JsonBody { /// Create `JsonBody` for request. - pub fn from_request(req: &HttpRequest) -> Self { + pub fn from_request(req: HttpRequest) -> Self { JsonBody{ limit: 262_144, - req: Some(req.clone()), + req: Some(req), fut: None, ct: "application/json", } diff --git a/src/lib.rs b/src/lib.rs index ab683b6d..23c3f498 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,11 +32,11 @@ //! * Supported *HTTP/1.x* and *HTTP/2.0* protocols //! * Streaming and pipelining //! * Keep-alive and slow requests handling -//! * `WebSockets` +//! * WebSockets server/client //! * Transparent content compression/decompression (br, gzip, deflate) //! * Configurable request routing //! * Multipart streams -//! * Middlewares (`Logger`, `Session`, `DefaultHeaders`) +//! * Middlewares (`Logger`, `Session`, `CORS`, `DefaultHeaders`) //! * Graceful server shutdown //! * Built on top of [Actix](https://github.com/actix/actix). diff --git a/src/multipart.rs b/src/multipart.rs index 0fbc906d..f97ccf3d 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -85,7 +85,7 @@ impl Multipart { } /// Create multipart instance for request. - pub fn from_request(req: &mut HttpRequest) -> Multipart { + pub fn from_request(req: HttpRequest) -> Multipart { match Multipart::boundary(req.headers()) { Ok(boundary) => Multipart::new(boundary, req.payload().clone()), Err(err) =>