mirror of
https://github.com/fafhrd91/actix-web
synced 2024-11-30 18:44:35 +01:00
add body to_bytes helper (#2158)
This commit is contained in:
parent
4442535a45
commit
02ced426fd
@ -3,6 +3,7 @@
|
|||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
### Added
|
### Added
|
||||||
* `impl<T: MessageBody> MessageBody for Pin<Box<T>>`. [#2152]
|
* `impl<T: MessageBody> MessageBody for Pin<Box<T>>`. [#2152]
|
||||||
|
* Helper `body::to_bytes` for async collecting message body into Bytes. [#2158]
|
||||||
|
|
||||||
### Changes
|
### Changes
|
||||||
* The type parameter of `Response` no longer has a default. [#2152]
|
* The type parameter of `Response` no longer has a default. [#2152]
|
||||||
@ -22,6 +23,7 @@
|
|||||||
[#2065]: https://github.com/actix/actix-web/pull/2065
|
[#2065]: https://github.com/actix/actix-web/pull/2065
|
||||||
[#2148]: https://github.com/actix/actix-web/pull/2148
|
[#2148]: https://github.com/actix/actix-web/pull/2148
|
||||||
[#2152]: https://github.com/actix/actix-web/pull/2152
|
[#2152]: https://github.com/actix/actix-web/pull/2152
|
||||||
|
[#2158]: https://github.com/actix/actix-web/pull/2158
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-beta.5 - 2021-04-02
|
## 3.0.0-beta.5 - 2021-04-02
|
||||||
|
@ -61,3 +61,49 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use actix_rt::pin;
|
||||||
|
use actix_utils::future::poll_fn;
|
||||||
|
use futures_util::stream;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use crate::body::to_bytes;
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn skips_empty_chunks() {
|
||||||
|
let body = BodyStream::new(stream::iter(
|
||||||
|
["1", "", "2"]
|
||||||
|
.iter()
|
||||||
|
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>),
|
||||||
|
));
|
||||||
|
pin!(body);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
poll_fn(|cx| body.as_mut().poll_next(cx))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.ok(),
|
||||||
|
Some(Bytes::from("1")),
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
poll_fn(|cx| body.as_mut().poll_next(cx))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.ok(),
|
||||||
|
Some(Bytes::from("2")),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn read_to_bytes() {
|
||||||
|
let body = BodyStream::new(stream::iter(
|
||||||
|
["1", "", "2"]
|
||||||
|
.iter()
|
||||||
|
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>),
|
||||||
|
));
|
||||||
|
|
||||||
|
assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,5 +1,12 @@
|
|||||||
//! Traits and structures to aid consuming and writing HTTP payloads.
|
//! Traits and structures to aid consuming and writing HTTP payloads.
|
||||||
|
|
||||||
|
use std::task::Poll;
|
||||||
|
|
||||||
|
use actix_rt::pin;
|
||||||
|
use actix_utils::future::poll_fn;
|
||||||
|
use bytes::{Bytes, BytesMut};
|
||||||
|
use futures_core::ready;
|
||||||
|
|
||||||
#[allow(clippy::module_inception)]
|
#[allow(clippy::module_inception)]
|
||||||
mod body;
|
mod body;
|
||||||
mod body_stream;
|
mod body_stream;
|
||||||
@ -15,6 +22,50 @@ pub use self::response_body::ResponseBody;
|
|||||||
pub use self::size::BodySize;
|
pub use self::size::BodySize;
|
||||||
pub use self::sized_stream::SizedStream;
|
pub use self::sized_stream::SizedStream;
|
||||||
|
|
||||||
|
/// Collects the body produced by a `MessageBody` implementation into `Bytes`.
|
||||||
|
///
|
||||||
|
/// Any errors produced by the body stream are returned immediately.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
/// ```
|
||||||
|
/// use actix_http::body::{Body, to_bytes};
|
||||||
|
/// use bytes::Bytes;
|
||||||
|
///
|
||||||
|
/// # async fn test_to_bytes() {
|
||||||
|
/// let body = Body::Empty;
|
||||||
|
/// let bytes = to_bytes(body).await.unwrap();
|
||||||
|
/// assert!(bytes.is_empty());
|
||||||
|
///
|
||||||
|
/// let body = Body::Bytes(Bytes::from_static(b"123"));
|
||||||
|
/// let bytes = to_bytes(body).await.unwrap();
|
||||||
|
/// assert_eq!(bytes, b"123"[..]);
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
pub async fn to_bytes(body: impl MessageBody) -> Result<Bytes, crate::Error> {
|
||||||
|
let cap = match body.size() {
|
||||||
|
BodySize::None | BodySize::Empty | BodySize::Sized(0) => return Ok(Bytes::new()),
|
||||||
|
BodySize::Sized(size) => size as usize,
|
||||||
|
BodySize::Stream => 32_768,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut buf = BytesMut::with_capacity(cap);
|
||||||
|
|
||||||
|
pin!(body);
|
||||||
|
|
||||||
|
poll_fn(|cx| loop {
|
||||||
|
let body = body.as_mut();
|
||||||
|
|
||||||
|
match ready!(body.poll_next(cx)) {
|
||||||
|
Some(Ok(bytes)) => buf.extend(bytes),
|
||||||
|
None => return Poll::Ready(Ok(())),
|
||||||
|
Some(Err(err)) => return Poll::Ready(Err(err)),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(buf.freeze())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
@ -22,7 +73,6 @@ mod tests {
|
|||||||
use actix_rt::pin;
|
use actix_rt::pin;
|
||||||
use actix_utils::future::poll_fn;
|
use actix_utils::future::poll_fn;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures_util::stream;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
@ -187,58 +237,6 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn body_stream_skips_empty_chunks() {
|
|
||||||
let body = BodyStream::new(stream::iter(
|
|
||||||
["1", "", "2"]
|
|
||||||
.iter()
|
|
||||||
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>),
|
|
||||||
));
|
|
||||||
pin!(body);
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
poll_fn(|cx| body.as_mut().poll_next(cx))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.ok(),
|
|
||||||
Some(Bytes::from("1")),
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
poll_fn(|cx| body.as_mut().poll_next(cx))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.ok(),
|
|
||||||
Some(Bytes::from("2")),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
mod sized_stream {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn skips_empty_chunks() {
|
|
||||||
let body = SizedStream::new(
|
|
||||||
2,
|
|
||||||
stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
|
|
||||||
);
|
|
||||||
pin!(body);
|
|
||||||
assert_eq!(
|
|
||||||
poll_fn(|cx| body.as_mut().poll_next(cx))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.ok(),
|
|
||||||
Some(Bytes::from("1")),
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
poll_fn(|cx| body.as_mut().poll_next(cx))
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.ok(),
|
|
||||||
Some(Bytes::from("2")),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_body_casting() {
|
async fn test_body_casting() {
|
||||||
let mut body = String::from("hello cast");
|
let mut body = String::from("hello cast");
|
||||||
@ -252,4 +250,15 @@ mod tests {
|
|||||||
let not_body = resp_body.downcast_ref::<()>();
|
let not_body = resp_body.downcast_ref::<()>();
|
||||||
assert!(not_body.is_none());
|
assert!(not_body.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_to_bytes() {
|
||||||
|
let body = Body::Empty;
|
||||||
|
let bytes = to_bytes(body).await.unwrap();
|
||||||
|
assert!(bytes.is_empty());
|
||||||
|
|
||||||
|
let body = Body::Bytes(Bytes::from_static(b"123"));
|
||||||
|
let bytes = to_bytes(body).await.unwrap();
|
||||||
|
assert_eq!(bytes, b"123"[..]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,10 +55,7 @@ impl<B: MessageBody> MessageBody for ResponseBody<B> {
|
|||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Error>>> {
|
) -> Poll<Option<Result<Bytes, Error>>> {
|
||||||
match self.project() {
|
Stream::poll_next(self, cx)
|
||||||
ResponseBodyProj::Body(body) => body.poll_next(cx),
|
|
||||||
ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,3 +61,49 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use actix_rt::pin;
|
||||||
|
use actix_utils::future::poll_fn;
|
||||||
|
use futures_util::stream;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
use crate::body::to_bytes;
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn skips_empty_chunks() {
|
||||||
|
let body = SizedStream::new(
|
||||||
|
2,
|
||||||
|
stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
|
||||||
|
);
|
||||||
|
|
||||||
|
pin!(body);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
poll_fn(|cx| body.as_mut().poll_next(cx))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.ok(),
|
||||||
|
Some(Bytes::from("1")),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
poll_fn(|cx| body.as_mut().poll_next(cx))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.ok(),
|
||||||
|
Some(Bytes::from("2")),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn read_to_bytes() {
|
||||||
|
let body = SizedStream::new(
|
||||||
|
2,
|
||||||
|
stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user