From d8cbb879dde831f5b6083664660e90ab1ecc584d Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 16 Nov 2021 21:41:35 +0000 Subject: [PATCH] make `AnyBody` generic on `Body` type (#2448) --- CHANGES.md | 10 ++ actix-http/CHANGES.md | 17 ++- actix-http/Cargo.toml | 1 + actix-http/src/body/body.rs | 174 ++++++++++++++++++++------- actix-http/src/body/body_stream.rs | 36 ++++++ actix-http/src/body/mod.rs | 8 +- actix-http/src/body/response_body.rs | 84 ------------- actix-http/src/body/sized_stream.rs | 45 +++++++ actix-http/src/encoding/encoder.rs | 39 +++--- actix-http/src/h1/dispatcher.rs | 2 +- actix-http/src/response_builder.rs | 2 +- awc/src/sender.rs | 4 +- src/dev.rs | 2 +- src/middleware/compat.rs | 4 +- src/middleware/compress.rs | 31 +++-- src/responder.rs | 27 +---- src/response/builder.rs | 4 +- src/response/response.rs | 3 + 18 files changed, 283 insertions(+), 210 deletions(-) delete mode 100644 actix-http/src/body/response_body.rs diff --git a/CHANGES.md b/CHANGES.md index 0cb5ccb2..784500d9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,16 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed +* Compress middleware's response type is now `AnyBody>`. [#2448] + +### Fixed +* Relax `Unpin` bound on `S` (stream) parameter of `HttpResponseBuilder::streaming`. [#2448] + +### Removed +* `dev::ResponseBody` re-export; is function is replaced by the new `dev::AnyBody` enum. [#2446] + +[#2423]: https://github.com/actix/actix-web/pull/2423 ## 4.0.0-beta.11 - 2021-11-15 diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 2beda3dc..71cdd6d4 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -2,14 +2,23 @@ ## Unreleased - 2021-xx-xx ### Added -* `AnyBody::empty` for quickly creating an empty body. [#2446] +* `body::AnyBody::empty` for quickly creating an empty body. [#2446] +* `impl Clone` for `body::AnyBody where S: Clone`. [#2448] +* `body::AnyBody::into_boxed` for quickly converting to a type-erased, boxed body type. [#2448] ### Changed -* Rename `AnyBody::{Message => Stream}`. [#2446] +* Rename `body::AnyBody::{Message => Body}`. [#2446] +* Rename `body::AnyBody::{from_message => new_boxed}`. [#2448] +* Rename `body::AnyBody::{from_slice => copy_from_slice}`. [#2448] +* Rename `body::{BoxAnyBody => BoxBody}`. [#2448] +* Change representation of `AnyBody` to include a type parameter in `Body` variant. Defaults to `BoxBody`. [#2448] +* `Encoder::response` now returns `AnyBody>`. [#2448] ### Removed -* `AnyBody::Empty`; an empty body can now only be represented as a zero-length `Bytes` variant. [#2446] -* `BodySize::Empty`; an empty body can now only be represented as a `Sized(0)` variant. [#2446] +* `body::AnyBody::Empty`; an empty body can now only be represented as a zero-length `Bytes` variant. [#2446] +* `body::BodySize::Empty`; an empty body can now only be represented as a `Sized(0)` variant. [#2446] +* `EncoderError::Boxed`; it is no longer required. [#2446] +* `body::ResponseBody`; is function is replaced by the new `body::AnyBody` enum. [#2446] [#2446]: https://github.com/actix/actix-web/pull/2446 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 78431244..27a14737 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -92,6 +92,7 @@ regex = "1.3" rustls-pemfile = "0.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +static_assertions = "1" tls-openssl = { package = "openssl", version = "0.10.9" } tls-rustls = { package = "rustls", version = "0.20.0" } tokio = { version = "1.2", features = ["net", "rt"] } diff --git a/actix-http/src/body/body.rs b/actix-http/src/body/body.rs index 32b46448..d51173a5 100644 --- a/actix-http/src/body/body.rs +++ b/actix-http/src/body/body.rs @@ -8,6 +8,7 @@ use std::{ use bytes::{Bytes, BytesMut}; use futures_core::Stream; +use pin_project::pin_project; use crate::error::Error; @@ -16,15 +17,17 @@ use super::{BodySize, BodyStream, MessageBody, MessageBodyMapErr, SizedStream}; pub type Body = AnyBody; /// Represents various types of HTTP message body. -pub enum AnyBody { +#[pin_project(project = AnyBodyProj)] +#[derive(Clone)] +pub enum AnyBody { /// Empty response. `Content-Length` header is not set. None, - /// Specific response body. + /// Complete, in-memory response body. Bytes(Bytes), - /// Generic message body. - Stream(BoxAnyBody), + /// Generic / Other message body. + Body(#[pin] B), } impl AnyBody { @@ -33,29 +36,60 @@ impl AnyBody { Self::Bytes(Bytes::new()) } - /// Create body from slice (copy) - pub fn from_slice(s: &[u8]) -> Self { - Self::Bytes(Bytes::copy_from_slice(s)) - } - - /// Create body from generic message body. - pub fn from_message(body: B) -> Self + /// Create boxed body from generic message body. + pub fn new_boxed(body: B) -> Self where B: MessageBody + 'static, B::Error: Into>, { - Self::Stream(BoxAnyBody::from_body(body)) + Self::Body(BoxBody::from_body(body)) + } + + /// Constructs new `AnyBody` instance from a slice of bytes by copying it. + /// + /// If your bytes container is owned, it may be cheaper to use a `From` impl. + pub fn copy_from_slice(s: &[u8]) -> Self { + Self::Bytes(Bytes::copy_from_slice(s)) + } + + #[doc(hidden)] + #[deprecated(since = "4.0.0", note = "Renamed to `copy_from_slice`.")] + pub fn from_slice(s: &[u8]) -> Self { + Self::Bytes(Bytes::copy_from_slice(s)) } } -impl MessageBody for AnyBody { +impl AnyBody +where + B: MessageBody + 'static, + B::Error: Into>, +{ + /// Create body from generic message body. + pub fn new(body: B) -> Self { + Self::Body(body) + } + + pub fn into_boxed(self) -> AnyBody { + match self { + Self::None => AnyBody::None, + Self::Bytes(bytes) => AnyBody::Bytes(bytes), + Self::Body(body) => AnyBody::new_boxed(body), + } + } +} + +impl MessageBody for AnyBody +where + B: MessageBody, + B::Error: Into> + 'static, +{ type Error = Error; fn size(&self) -> BodySize { match self { AnyBody::None => BodySize::None, AnyBody::Bytes(ref bin) => BodySize::Sized(bin.len() as u64), - AnyBody::Stream(ref body) => body.size(), + AnyBody::Body(ref body) => body.size(), } } @@ -63,9 +97,9 @@ impl MessageBody for AnyBody { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - match self.get_mut() { - AnyBody::None => Poll::Ready(None), - AnyBody::Bytes(ref mut bin) => { + match self.project() { + AnyBodyProj::None => Poll::Ready(None), + AnyBodyProj::Bytes(bin) => { let len = bin.len(); if len == 0 { Poll::Ready(None) @@ -74,8 +108,7 @@ impl MessageBody for AnyBody { } } - AnyBody::Stream(body) => body - .as_pin_mut() + AnyBodyProj::Body(body) => body .poll_next(cx) .map_err(|err| Error::new_body().with_cause(err)), } @@ -90,30 +123,30 @@ impl PartialEq for AnyBody { AnyBody::Bytes(ref b2) => b == b2, _ => false, }, - AnyBody::Stream(_) => false, + AnyBody::Body(_) => false, } } } -impl fmt::Debug for AnyBody { +impl fmt::Debug for AnyBody { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { AnyBody::None => write!(f, "AnyBody::None"), - AnyBody::Bytes(ref b) => write!(f, "AnyBody::Bytes({:?})", b), - AnyBody::Stream(_) => write!(f, "AnyBody::Message(_)"), + AnyBody::Bytes(ref bytes) => write!(f, "AnyBody::Bytes({:?})", bytes), + AnyBody::Body(ref stream) => write!(f, "AnyBody::Message({:?})", stream), } } } impl From<&'static str> for AnyBody { - fn from(s: &'static str) -> Body { - AnyBody::Bytes(Bytes::from_static(s.as_ref())) + fn from(string: &'static str) -> Body { + AnyBody::Bytes(Bytes::from_static(string.as_ref())) } } impl From<&'static [u8]> for AnyBody { - fn from(s: &'static [u8]) -> Body { - AnyBody::Bytes(Bytes::from_static(s)) + fn from(bytes: &'static [u8]) -> Body { + AnyBody::Bytes(Bytes::from_static(bytes)) } } @@ -124,20 +157,20 @@ impl From> for AnyBody { } impl From for AnyBody { - fn from(s: String) -> Body { - s.into_bytes().into() + fn from(string: String) -> Body { + string.into_bytes().into() } } impl From<&'_ String> for AnyBody { - fn from(s: &String) -> Body { - AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s))) + fn from(string: &String) -> Body { + AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&string))) } } impl From> for AnyBody { - fn from(s: Cow<'_, str>) -> Body { - match s { + fn from(string: Cow<'_, str>) -> Body { + match string { Cow::Owned(s) => AnyBody::from(s), Cow::Borrowed(s) => { AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(s))) @@ -147,14 +180,14 @@ impl From> for AnyBody { } impl From for AnyBody { - fn from(s: Bytes) -> Body { - AnyBody::Bytes(s) + fn from(bytes: Bytes) -> Body { + AnyBody::Bytes(bytes) } } impl From for AnyBody { - fn from(s: BytesMut) -> Body { - AnyBody::Bytes(s.freeze()) + fn from(bytes: BytesMut) -> Body { + AnyBody::Bytes(bytes.freeze()) } } @@ -163,8 +196,8 @@ where S: Stream> + 'static, E: Into> + 'static, { - fn from(s: SizedStream) -> Body { - AnyBody::from_message(s) + fn from(stream: SizedStream) -> Body { + AnyBody::new_boxed(stream) } } @@ -173,15 +206,15 @@ where S: Stream> + 'static, E: Into> + 'static, { - fn from(s: BodyStream) -> Body { - AnyBody::from_message(s) + fn from(stream: BodyStream) -> Body { + AnyBody::new_boxed(stream) } } /// A boxed message body with boxed errors. -pub struct BoxAnyBody(Pin>>>); +pub struct BoxBody(Pin>>>); -impl BoxAnyBody { +impl BoxBody { /// Boxes a `MessageBody` and any errors it generates. pub fn from_body(body: B) -> Self where @@ -195,18 +228,18 @@ impl BoxAnyBody { /// Returns a mutable pinned reference to the inner message body type. pub fn as_pin_mut( &mut self, - ) -> Pin<&mut (dyn MessageBody>)> { + ) -> Pin<&mut (dyn MessageBody>)> { self.0.as_mut() } } -impl fmt::Debug for BoxAnyBody { +impl fmt::Debug for BoxBody { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("BoxAnyBody(dyn MessageBody)") } } -impl MessageBody for BoxAnyBody { +impl MessageBody for BoxBody { type Error = Error; fn size(&self) -> BodySize { @@ -223,3 +256,52 @@ impl MessageBody for BoxAnyBody { .map_err(|err| Error::new_body().with_cause(err)) } } + +#[cfg(test)] +mod tests { + use std::marker::PhantomPinned; + + use static_assertions::{assert_impl_all, assert_not_impl_all}; + + use super::*; + use crate::body::to_bytes; + + struct PinType(PhantomPinned); + + impl MessageBody for PinType { + type Error = crate::Error; + + fn size(&self) -> BodySize { + unimplemented!() + } + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>> { + unimplemented!() + } + } + + assert_impl_all!(AnyBody<()>: MessageBody, fmt::Debug, Send, Sync, Unpin); + assert_impl_all!(AnyBody>: MessageBody, fmt::Debug, Send, Sync, Unpin); + assert_impl_all!(AnyBody: MessageBody, fmt::Debug, Send, Sync, Unpin); + assert_impl_all!(AnyBody: MessageBody, fmt::Debug, Unpin); + assert_impl_all!(BoxBody: MessageBody, fmt::Debug, Unpin); + assert_impl_all!(AnyBody: MessageBody); + + assert_not_impl_all!(AnyBody: Send, Sync, Unpin); + assert_not_impl_all!(BoxBody: Send, Sync, Unpin); + assert_not_impl_all!(AnyBody: Send, Sync, Unpin); + + #[actix_rt::test] + async fn nested_boxed_body() { + let body = AnyBody::copy_from_slice(&[1, 2, 3]); + let boxed_body = BoxBody::from_body(BoxBody::from_body(body)); + + assert_eq!( + to_bytes(boxed_body).await.unwrap(), + Bytes::from(vec![1, 2, 3]), + ); + } +} diff --git a/actix-http/src/body/body_stream.rs b/actix-http/src/body/body_stream.rs index f726f447..31de9b48 100644 --- a/actix-http/src/body/body_stream.rs +++ b/actix-http/src/body/body_stream.rs @@ -75,10 +75,22 @@ mod tests { use derive_more::{Display, Error}; use futures_core::ready; use futures_util::{stream, FutureExt as _}; + use static_assertions::{assert_impl_all, assert_not_impl_all}; use super::*; use crate::body::to_bytes; + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + + assert_not_impl_all!(BodyStream>: MessageBody); + assert_not_impl_all!(BodyStream>: MessageBody); + // crate::Error is not Clone + assert_not_impl_all!(BodyStream>>: MessageBody); + #[actix_rt::test] async fn skips_empty_chunks() { let body = BodyStream::new(stream::iter( @@ -124,6 +136,30 @@ mod tests { assert!(matches!(to_bytes(body).await, Err(StreamErr))); } + #[actix_rt::test] + async fn stream_string_error() { + // `&'static str` does not impl `Error` + // but it does impl `Into>` + + let body = BodyStream::new(stream::once(async { Err("stringy error") })); + assert!(matches!(to_bytes(body).await, Err("stringy error"))); + } + + #[actix_rt::test] + async fn stream_boxed_error() { + // `Box` does not impl `Error` + // but it does impl `Into>` + + let body = BodyStream::new(stream::once(async { + Err(Box::::from("stringy error")) + })); + + assert_eq!( + to_bytes(body).await.unwrap_err().to_string(), + "stringy error" + ); + } + #[actix_rt::test] async fn stream_delayed_error() { let body = diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index 0d5b0f07..07e5e67c 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -11,15 +11,13 @@ use futures_core::ready; mod body; mod body_stream; mod message_body; -mod response_body; mod size; mod sized_stream; -pub use self::body::{AnyBody, Body, BoxAnyBody}; +pub use self::body::{AnyBody, Body, BoxBody}; pub use self::body_stream::BodyStream; pub use self::message_body::MessageBody; pub(crate) use self::message_body::MessageBodyMapErr; -pub use self::response_body::ResponseBody; pub use self::size::BodySize; pub use self::sized_stream::SizedStream; @@ -108,10 +106,10 @@ mod tests { assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test"); assert_eq!( - Body::from_slice(b"test".as_ref()).size(), + Body::copy_from_slice(b"test".as_ref()).size(), BodySize::Sized(4) ); - assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test"); + assert_eq!(Body::copy_from_slice(b"test".as_ref()).get_ref(), b"test"); let sb = Bytes::from(&b"test"[..]); pin!(sb); diff --git a/actix-http/src/body/response_body.rs b/actix-http/src/body/response_body.rs deleted file mode 100644 index 699ea938..00000000 --- a/actix-http/src/body/response_body.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::{ - mem, - pin::Pin, - task::{Context, Poll}, -}; - -use bytes::Bytes; -use futures_core::Stream; -use pin_project::pin_project; - -use crate::error::Error; - -use super::{Body, BodySize, MessageBody}; - -#[pin_project(project = ResponseBodyProj)] -pub enum ResponseBody { - Body(#[pin] B), - Other(Body), -} - -impl ResponseBody { - pub fn into_body(self) -> ResponseBody { - match self { - ResponseBody::Body(b) => ResponseBody::Other(b), - ResponseBody::Other(b) => ResponseBody::Other(b), - } - } -} - -impl ResponseBody { - pub fn take_body(&mut self) -> ResponseBody { - mem::replace(self, ResponseBody::Other(Body::None)) - } -} - -impl ResponseBody { - pub fn as_ref(&self) -> Option<&B> { - if let ResponseBody::Body(ref b) = self { - Some(b) - } else { - None - } - } -} - -impl MessageBody for ResponseBody -where - B: MessageBody, - B::Error: Into, -{ - type Error = Error; - - fn size(&self) -> BodySize { - match self { - ResponseBody::Body(ref body) => body.size(), - ResponseBody::Other(ref body) => body.size(), - } - } - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - Stream::poll_next(self, cx) - } -} - -impl Stream for ResponseBody -where - B: MessageBody, - B::Error: Into, -{ - type Item = Result; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match self.project() { - ResponseBodyProj::Body(body) => body.poll_next(cx).map_err(Into::into), - ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx), - } - } -} diff --git a/actix-http/src/body/sized_stream.rs b/actix-http/src/body/sized_stream.rs index b6ceb32f..b92de44c 100644 --- a/actix-http/src/body/sized_stream.rs +++ b/actix-http/src/body/sized_stream.rs @@ -72,10 +72,22 @@ mod tests { use actix_rt::pin; use actix_utils::future::poll_fn; use futures_util::stream; + use static_assertions::{assert_impl_all, assert_not_impl_all}; use super::*; use crate::body::to_bytes; + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + + assert_not_impl_all!(SizedStream>: MessageBody); + assert_not_impl_all!(SizedStream>: MessageBody); + // crate::Error is not Clone + assert_not_impl_all!(SizedStream>>: MessageBody); + #[actix_rt::test] async fn skips_empty_chunks() { let body = SizedStream::new( @@ -119,4 +131,37 @@ mod tests { assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12"))); } + + #[actix_rt::test] + async fn stream_string_error() { + // `&'static str` does not impl `Error` + // but it does impl `Into>` + + let body = SizedStream::new(0, stream::once(async { Err("stringy error") })); + assert_eq!(to_bytes(body).await, Ok(Bytes::new())); + + let body = SizedStream::new(1, stream::once(async { Err("stringy error") })); + assert!(matches!(to_bytes(body).await, Err("stringy error"))); + } + + #[actix_rt::test] + async fn stream_boxed_error() { + // `Box` does not impl `Error` + // but it does impl `Into>` + + let body = SizedStream::new( + 0, + stream::once(async { Err(Box::::from("stringy error")) }), + ); + assert_eq!(to_bytes(body).await.unwrap(), Bytes::new()); + + let body = SizedStream::new( + 1, + stream::once(async { Err(Box::::from("stringy error")) }), + ); + assert_eq!( + to_bytes(body).await.unwrap_err().to_string(), + "stringy error" + ); + } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 6cb034b7..62100ff1 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -24,7 +24,7 @@ use flate2::write::{GzEncoder, ZlibEncoder}; use zstd::stream::write::Encoder as ZstdEncoder; use crate::{ - body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody}, + body::{AnyBody, BodySize, MessageBody}, http::{ header::{ContentEncoding, CONTENT_ENCODING}, HeaderValue, StatusCode, @@ -50,8 +50,8 @@ impl Encoder { pub fn response( encoding: ContentEncoding, head: &mut ResponseHead, - body: ResponseBody, - ) -> ResponseBody> { + body: AnyBody, + ) -> AnyBody> { let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING) || head.status == StatusCode::SWITCHING_PROTOCOLS || head.status == StatusCode::NO_CONTENT @@ -59,18 +59,15 @@ impl Encoder { || encoding == ContentEncoding::Auto); let body = match body { - ResponseBody::Other(b) => match b { - Body::None => return ResponseBody::Other(Body::None), - Body::Bytes(buf) => { - if can_encode { - EncoderBody::Bytes(buf) - } else { - return ResponseBody::Other(Body::Bytes(buf)); - } + AnyBody::None => return AnyBody::None, + AnyBody::Bytes(buf) => { + if can_encode { + EncoderBody::Bytes(buf) + } else { + return AnyBody::Bytes(buf); } - Body::Stream(stream) => EncoderBody::BoxedStream(stream), - }, - ResponseBody::Body(stream) => EncoderBody::Stream(stream), + } + AnyBody::Body(body) => EncoderBody::Stream(body), }; if can_encode { @@ -78,7 +75,8 @@ impl Encoder { if let Some(enc) = ContentEncoder::encoder(encoding) { update_head(encoding, head); head.no_chunking(false); - return ResponseBody::Body(Encoder { + + return AnyBody::Body(Encoder { body, eof: false, fut: None, @@ -87,7 +85,7 @@ impl Encoder { } } - ResponseBody::Body(Encoder { + AnyBody::Body(Encoder { body, eof: false, fut: None, @@ -100,7 +98,6 @@ impl Encoder { enum EncoderBody { Bytes(Bytes), Stream(#[pin] B), - BoxedStream(BoxAnyBody), } impl MessageBody for EncoderBody @@ -113,7 +110,6 @@ where match self { EncoderBody::Bytes(ref b) => b.size(), EncoderBody::Stream(ref b) => b.size(), - EncoderBody::BoxedStream(ref b) => b.size(), } } @@ -130,9 +126,6 @@ where } } EncoderBodyProj::Stream(b) => b.poll_next(cx).map_err(EncoderError::Body), - EncoderBodyProj::BoxedStream(ref mut b) => { - b.as_pin_mut().poll_next(cx).map_err(EncoderError::Boxed) - } } } } @@ -348,9 +341,6 @@ pub enum EncoderError { #[display(fmt = "body")] Body(E), - #[display(fmt = "boxed")] - Boxed(Box), - #[display(fmt = "blocking")] Blocking(BlockingError), @@ -362,7 +352,6 @@ impl StdError for EncoderError { fn source(&self) -> Option<&(dyn StdError + 'static)> { match self { EncoderError::Body(err) => Some(err), - EncoderError::Boxed(err) => Some(&**err), EncoderError::Blocking(err) => Some(err), EncoderError::Io(err) => Some(err), } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 844bc61e..163d84f5 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1077,7 +1077,7 @@ mod tests { fn_service(|req: Request| { let path = req.path().as_bytes(); ready(Ok::<_, Error>( - Response::ok().set_body(AnyBody::from_slice(path)), + Response::ok().set_body(AnyBody::copy_from_slice(path)), )) }) } diff --git a/actix-http/src/response_builder.rs b/actix-http/src/response_builder.rs index a1cb1a42..e934f94d 100644 --- a/actix-http/src/response_builder.rs +++ b/actix-http/src/response_builder.rs @@ -262,7 +262,7 @@ impl ResponseBuilder { S: Stream> + 'static, E: Into> + 'static, { - self.body(AnyBody::from_message(BodyStream::new(stream))) + self.body(AnyBody::new_boxed(BodyStream::new(stream))) } /// Generate response with an empty body. diff --git a/awc/src/sender.rs b/awc/src/sender.rs index fcd0c71a..02870aea 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -9,7 +9,7 @@ use std::{ }; use actix_http::{ - body::{Body, BodyStream}, + body::{AnyBody, Body, BodyStream}, http::{ header::{self, HeaderMap, HeaderName, IntoHeaderValue}, Error as HttpError, @@ -286,7 +286,7 @@ impl RequestSender { response_decompress, timeout, config, - Body::from_message(BodyStream::new(stream)), + AnyBody::new_boxed(BodyStream::new(stream)), ) } diff --git a/src/dev.rs b/src/dev.rs index 4fac207a..27c206e7 100644 --- a/src/dev.rs +++ b/src/dev.rs @@ -14,7 +14,7 @@ pub use crate::types::form::UrlEncoded; pub use crate::types::json::JsonBody; pub use crate::types::readlines::Readlines; -pub use actix_http::body::{AnyBody, Body, BodySize, MessageBody, ResponseBody, SizedStream}; +pub use actix_http::body::{AnyBody, Body, BodySize, MessageBody, SizedStream}; #[cfg(feature = "__compress")] pub use actix_http::encoding::Decoder as Decompress; diff --git a/src/middleware/compat.rs b/src/middleware/compat.rs index 0a6256fe..752e90f9 100644 --- a/src/middleware/compat.rs +++ b/src/middleware/compat.rs @@ -7,7 +7,7 @@ use std::{ task::{Context, Poll}, }; -use actix_http::body::{Body, MessageBody}; +use actix_http::body::{AnyBody, MessageBody}; use actix_service::{Service, Transform}; use futures_core::{future::LocalBoxFuture, ready}; @@ -124,7 +124,7 @@ where B::Error: Into>, { fn map_body(self) -> ServiceResponse { - self.map_body(|_, body| Body::from_message(body)) + self.map_body(|_, body| AnyBody::new_boxed(body)) } } diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index 4854f4be..3e85cb84 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -10,13 +10,14 @@ use std::{ }; use actix_http::{ - body::{MessageBody, ResponseBody}, + body::{AnyBody, MessageBody}, encoding::Encoder, http::header::{ContentEncoding, ACCEPT_ENCODING}, StatusCode, }; use actix_service::{Service, Transform}; use actix_utils::future::{ok, Either, Ready}; +use bytes::Bytes; use futures_core::ready; use once_cell::sync::Lazy; use pin_project::pin_project; @@ -61,7 +62,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>>; + type Response = ServiceResponse>>; type Error = Error; type Transform = CompressMiddleware; type InitError = (); @@ -110,7 +111,7 @@ where S: Service, Error = Error>, B: MessageBody, { - type Response = ServiceResponse>>; + type Response = ServiceResponse>>; type Error = Error; type Future = Either, Ready>>; @@ -142,15 +143,19 @@ where // There is an HTTP header but we cannot match what client as asked for Some(Err(_)) => { - let res = HttpResponse::with_body( - StatusCode::NOT_ACCEPTABLE, - SUPPORTED_ALGORITHM_NAMES.as_str(), - ); - let enc = ContentEncoding::Identity; + let res = HttpResponse::new(StatusCode::NOT_ACCEPTABLE); - Either::right(ok(req.into_response(res.map_body(move |head, body| { - Encoder::response(enc, head, ResponseBody::Other(body.into())) - })))) + let res: HttpResponse>> = res.map_body(move |head, _| { + let body_bytes = Bytes::from(SUPPORTED_ALGORITHM_NAMES.as_bytes()); + + Encoder::response( + ContentEncoding::Identity, + head, + AnyBody::Bytes(body_bytes), + ) + }); + + Either::right(ok(req.into_response(res))) } } } @@ -172,7 +177,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Output = Result>>, Error>; + type Output = Result>>, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -186,7 +191,7 @@ where }; Poll::Ready(Ok(resp.map_body(move |head, body| { - Encoder::response(enc, head, ResponseBody::Body(body)) + Encoder::response(enc, head, AnyBody::Body(body)) }))) } Err(e) => Poll::Ready(Err(e)), diff --git a/src/responder.rs b/src/responder.rs index 005bff03..4d2e97c3 100644 --- a/src/responder.rs +++ b/src/responder.rs @@ -232,7 +232,7 @@ pub(crate) mod tests { use bytes::{Bytes, BytesMut}; use super::*; - use crate::dev::{Body, ResponseBody}; + use crate::dev::AnyBody; use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode}; use crate::test::{init_service, TestRequest}; use crate::{error, web, App}; @@ -264,13 +264,13 @@ pub(crate) mod tests { pub(crate) trait BodyTest { fn bin_ref(&self) -> &[u8]; - fn body(&self) -> &Body; + fn body(&self) -> &AnyBody; } impl BodyTest for Body { fn bin_ref(&self) -> &[u8] { match self { - Body::Bytes(ref bin) => bin, + AnyBody::Bytes(ref bin) => bin, _ => unreachable!("bug in test impl"), } } @@ -279,27 +279,6 @@ pub(crate) mod tests { } } - impl BodyTest for ResponseBody { - fn bin_ref(&self) -> &[u8] { - match self { - ResponseBody::Body(ref b) => match b { - Body::Bytes(ref bin) => bin, - _ => unreachable!("bug in test impl"), - }, - ResponseBody::Other(ref b) => match b { - Body::Bytes(ref bin) => bin, - _ => unreachable!("bug in test impl"), - }, - } - } - fn body(&self) -> &Body { - match self { - ResponseBody::Body(ref b) => b, - ResponseBody::Other(ref b) => b, - } - } - } - #[actix_rt::test] async fn test_responder() { let req = TestRequest::default().to_http_request(); diff --git a/src/response/builder.rs b/src/response/builder.rs index f6099a01..e42d85f5 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -354,10 +354,10 @@ impl HttpResponseBuilder { #[inline] pub fn streaming(&mut self, stream: S) -> HttpResponse where - S: Stream> + Unpin + 'static, + S: Stream> + 'static, E: Into> + 'static, { - self.body(AnyBody::from_message(BodyStream::new(stream))) + self.body(AnyBody::new_boxed(BodyStream::new(stream))) } /// Set a json body and generate `Response` diff --git a/src/response/response.rs b/src/response/response.rs index 09515c83..46360e53 100644 --- a/src/response/response.rs +++ b/src/response/response.rs @@ -227,6 +227,9 @@ impl HttpResponse { } } + // TODO: into_body equivalent + // TODO: into_boxed_body + /// Extract response body pub fn into_body(self) -> B { self.res.into_body()