diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 2949c40c..f0f0a025 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - 2021-xx-xx ### Added * `impl MessageBody for Pin>`. [#2152] +* Helper `body::to_bytes` for async collecting message body into Bytes. [#2158] ### Changes * The type parameter of `Response` no longer has a default. [#2152] @@ -22,6 +23,7 @@ [#2065]: https://github.com/actix/actix-web/pull/2065 [#2148]: https://github.com/actix/actix-web/pull/2148 [#2152]: https://github.com/actix/actix-web/pull/2152 +[#2158]: https://github.com/actix/actix-web/pull/2158 ## 3.0.0-beta.5 - 2021-04-02 diff --git a/actix-http/src/body/body_stream.rs b/actix-http/src/body/body_stream.rs index 1157bc53..b81aeb4c 100644 --- a/actix-http/src/body/body_stream.rs +++ b/actix-http/src/body/body_stream.rs @@ -61,3 +61,49 @@ where } } } + +#[cfg(test)] +mod tests { + use actix_rt::pin; + use actix_utils::future::poll_fn; + use futures_util::stream; + + use super::*; + use crate::body::to_bytes; + + #[actix_rt::test] + async fn skips_empty_chunks() { + let body = BodyStream::new(stream::iter( + ["1", "", "2"] + .iter() + .map(|&v| Ok(Bytes::from(v)) as Result), + )); + pin!(body); + + assert_eq!( + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), + Some(Bytes::from("1")), + ); + assert_eq!( + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), + Some(Bytes::from("2")), + ); + } + + #[actix_rt::test] + async fn read_to_bytes() { + let body = BodyStream::new(stream::iter( + ["1", "", "2"] + .iter() + .map(|&v| Ok(Bytes::from(v)) as Result), + )); + + assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12"))); + } +} diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index f5664e1d..c298dda1 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -1,5 +1,12 @@ //! Traits and structures to aid consuming and writing HTTP payloads. +use std::task::Poll; + +use actix_rt::pin; +use actix_utils::future::poll_fn; +use bytes::{Bytes, BytesMut}; +use futures_core::ready; + #[allow(clippy::module_inception)] mod body; mod body_stream; @@ -15,6 +22,50 @@ pub use self::response_body::ResponseBody; pub use self::size::BodySize; pub use self::sized_stream::SizedStream; +/// Collects the body produced by a `MessageBody` implementation into `Bytes`. +/// +/// Any errors produced by the body stream are returned immediately. +/// +/// # Examples +/// ``` +/// use actix_http::body::{Body, to_bytes}; +/// use bytes::Bytes; +/// +/// # async fn test_to_bytes() { +/// let body = Body::Empty; +/// let bytes = to_bytes(body).await.unwrap(); +/// assert!(bytes.is_empty()); +/// +/// let body = Body::Bytes(Bytes::from_static(b"123")); +/// let bytes = to_bytes(body).await.unwrap(); +/// assert_eq!(bytes, b"123"[..]); +/// # } +/// ``` +pub async fn to_bytes(body: impl MessageBody) -> Result { + let cap = match body.size() { + BodySize::None | BodySize::Empty | BodySize::Sized(0) => return Ok(Bytes::new()), + BodySize::Sized(size) => size as usize, + BodySize::Stream => 32_768, + }; + + let mut buf = BytesMut::with_capacity(cap); + + pin!(body); + + poll_fn(|cx| loop { + let body = body.as_mut(); + + match ready!(body.poll_next(cx)) { + Some(Ok(bytes)) => buf.extend(bytes), + None => return Poll::Ready(Ok(())), + Some(Err(err)) => return Poll::Ready(Err(err)), + } + }) + .await?; + + Ok(buf.freeze()) +} + #[cfg(test)] mod tests { use std::pin::Pin; @@ -22,7 +73,6 @@ mod tests { use actix_rt::pin; use actix_utils::future::poll_fn; use bytes::{Bytes, BytesMut}; - use futures_util::stream; use super::*; @@ -187,58 +237,6 @@ mod tests { ); } - #[actix_rt::test] - async fn body_stream_skips_empty_chunks() { - let body = BodyStream::new(stream::iter( - ["1", "", "2"] - .iter() - .map(|&v| Ok(Bytes::from(v)) as Result), - )); - pin!(body); - - assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)) - .await - .unwrap() - .ok(), - Some(Bytes::from("1")), - ); - assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)) - .await - .unwrap() - .ok(), - Some(Bytes::from("2")), - ); - } - - mod sized_stream { - use super::*; - - #[actix_rt::test] - async fn skips_empty_chunks() { - let body = SizedStream::new( - 2, - stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), - ); - pin!(body); - assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)) - .await - .unwrap() - .ok(), - Some(Bytes::from("1")), - ); - assert_eq!( - poll_fn(|cx| body.as_mut().poll_next(cx)) - .await - .unwrap() - .ok(), - Some(Bytes::from("2")), - ); - } - } - #[actix_rt::test] async fn test_body_casting() { let mut body = String::from("hello cast"); @@ -252,4 +250,15 @@ mod tests { let not_body = resp_body.downcast_ref::<()>(); assert!(not_body.is_none()); } + + #[actix_rt::test] + async fn test_to_bytes() { + let body = Body::Empty; + let bytes = to_bytes(body).await.unwrap(); + assert!(bytes.is_empty()); + + let body = Body::Bytes(Bytes::from_static(b"123")); + let bytes = to_bytes(body).await.unwrap(); + assert_eq!(bytes, b"123"[..]); + } } diff --git a/actix-http/src/body/response_body.rs b/actix-http/src/body/response_body.rs index 97141e11..b2711247 100644 --- a/actix-http/src/body/response_body.rs +++ b/actix-http/src/body/response_body.rs @@ -55,10 +55,7 @@ impl MessageBody for ResponseBody { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - match self.project() { - ResponseBodyProj::Body(body) => body.poll_next(cx), - ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx), - } + Stream::poll_next(self, cx) } } diff --git a/actix-http/src/body/sized_stream.rs b/actix-http/src/body/sized_stream.rs index f648f6f0..f0332fc8 100644 --- a/actix-http/src/body/sized_stream.rs +++ b/actix-http/src/body/sized_stream.rs @@ -61,3 +61,49 @@ where } } } + +#[cfg(test)] +mod tests { + use actix_rt::pin; + use actix_utils::future::poll_fn; + use futures_util::stream; + + use super::*; + use crate::body::to_bytes; + + #[actix_rt::test] + async fn skips_empty_chunks() { + let body = SizedStream::new( + 2, + stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), + ); + + pin!(body); + + assert_eq!( + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), + Some(Bytes::from("1")), + ); + + assert_eq!( + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), + Some(Bytes::from("2")), + ); + } + + #[actix_rt::test] + async fn read_to_bytes() { + let body = SizedStream::new( + 2, + stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), + ); + + assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12"))); + } +}