diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 3e62ac2d1..7081361e4 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -18,6 +18,7 @@ * `Request::take_conn_data()`. [#2491] * `Request::take_req_data()`. [#2487] * `impl Clone` for `RequestHead`. [#2487] +* New methods on `MessageBody` trait, `is_complete_body` and `take_complete_body`, both with default implementations, for optimisations on body types that are done in exactly one poll/chunk. [#2497] ### Changed * Rename `body::BoxBody::{from_body => new}`. [#2468] @@ -45,6 +46,7 @@ [#2487]: https://github.com/actix/actix-web/pull/2487 [#2488]: https://github.com/actix/actix-web/pull/2488 [#2491]: https://github.com/actix/actix-web/pull/2491 +[#2497]: https://github.com/actix/actix-web/pull/2497 ## 3.0.0-beta.14 - 2021-11-30 diff --git a/actix-http/src/body/boxed.rs b/actix-http/src/body/boxed.rs index c39da10c0..d2469e986 100644 --- a/actix-http/src/body/boxed.rs +++ b/actix-http/src/body/boxed.rs @@ -51,6 +51,34 @@ impl MessageBody for BoxBody { .poll_next(cx) .map_err(|err| Error::new_body().with_cause(err)) } + + fn is_complete_body(&self) -> bool { + self.0.is_complete_body() + } + + fn take_complete_body(&mut self) -> Bytes { + debug_assert!( + self.is_complete_body(), + "boxed type does not allow taking complete body; caller should make sure to \ + call `is_complete_body` first", + ); + + // we do not have DerefMut access to call take_complete_body directly but since + // is_complete_body is true we should expect the entire bytes chunk in one poll_next + + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + match self.as_pin_mut().poll_next(&mut cx) { + Poll::Ready(Some(Ok(data))) => data, + _ => { + panic!( + "boxed type indicated it allows taking complete body but failed to \ + return Bytes when polled", + ); + } + } + } } #[cfg(test)] diff --git a/actix-http/src/body/either.rs b/actix-http/src/body/either.rs index 6169ee627..6135d834d 100644 --- a/actix-http/src/body/either.rs +++ b/actix-http/src/body/either.rs @@ -67,6 +67,20 @@ where .map_err(|err| Error::new_body().with_cause(err)), } } + + fn is_complete_body(&self) -> bool { + match self { + EitherBody::Left { body } => body.is_complete_body(), + EitherBody::Right { body } => body.is_complete_body(), + } + } + + fn take_complete_body(&mut self) -> Bytes { + match self { + EitherBody::Left { body } => body.take_complete_body(), + EitherBody::Right { body } => body.take_complete_body(), + } + } } #[cfg(test)] diff --git a/actix-http/src/body/message_body.rs b/actix-http/src/body/message_body.rs index 053b6f286..e4020d2af 100644 --- a/actix-http/src/body/message_body.rs +++ b/actix-http/src/body/message_body.rs @@ -25,10 +25,58 @@ pub trait MessageBody { fn size(&self) -> BodySize; /// Attempt to pull out the next chunk of body bytes. + // TODO: expand documentation fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>>; + + /// Returns true if entire body bytes chunk is obtainable in one call to `poll_next`. + /// + /// This method's implementation should agree with [`take_complete_body`] and should always be + /// checked before taking the body. + /// + /// The default implementation returns `false. + /// + /// [`take_complete_body`]: MessageBody::take_complete_body + fn is_complete_body(&self) -> bool { + false + } + + /// Returns the complete chunk of body bytes. + /// + /// Implementors of this method should note the following: + /// - It is acceptable to skip the omit checks of [`is_complete_body`]. The responsibility of + /// performing this check is delegated to the caller. + /// - If the result of [`is_complete_body`] is conditional, that condition should be given + /// equivalent attention here. + /// - A second call call to [`take_complete_body`] should return an empty `Bytes` or panic. + /// - A call to [`poll_next`] after calling [`take_complete_body`] should return `None` unless + /// the chunk is guaranteed to be empty. + /// + /// The default implementation panics unconditionally, indicating a control flow bug in the + /// calling code. + /// + /// # Panics + /// With a correct implementation, panics if called without first checking [`is_complete_body`]. + /// + /// [`is_complete_body`]: MessageBody::is_complete_body + /// [`take_complete_body`]: MessageBody::take_complete_body + /// [`poll_next`]: MessageBody::poll_next + fn take_complete_body(&mut self) -> Bytes { + assert!( + self.is_complete_body(), + "type ({}) allows taking complete body but did not provide an implementation \ + of `take_complete_body`", + std::any::type_name::() + ); + + unimplemented!( + "type ({}) does not allow taking complete body; caller should make sure to \ + check `is_complete_body` first", + std::any::type_name::() + ); + } } mod foreign_impls { @@ -49,6 +97,14 @@ mod foreign_impls { ) -> Poll>> { match *self {} } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + match *self {} + } } impl MessageBody for () { @@ -66,6 +122,16 @@ mod foreign_impls { ) -> Poll>> { Poll::Ready(None) } + + #[inline] + fn is_complete_body(&self) -> bool { + true + } + + #[inline] + fn take_complete_body(&mut self) -> Bytes { + Bytes::new() + } } impl MessageBody for Box @@ -86,6 +152,16 @@ mod foreign_impls { ) -> Poll>> { Pin::new(self.get_mut().as_mut()).poll_next(cx) } + + #[inline] + fn is_complete_body(&self) -> bool { + self.as_ref().is_complete_body() + } + + #[inline] + fn take_complete_body(&mut self) -> Bytes { + self.as_mut().take_complete_body() + } } impl MessageBody for Pin> @@ -106,6 +182,38 @@ mod foreign_impls { ) -> Poll>> { self.as_mut().poll_next(cx) } + + #[inline] + fn is_complete_body(&self) -> bool { + self.as_ref().is_complete_body() + } + + #[inline] + fn take_complete_body(&mut self) -> Bytes { + debug_assert!( + self.is_complete_body(), + "inner type \"{}\" does not allow taking complete body; caller should make sure to \ + call `is_complete_body` first", + std::any::type_name::(), + ); + + // we do not have DerefMut access to call take_complete_body directly but since + // is_complete_body is true we should expect the entire bytes chunk in one poll_next + + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + match self.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(Ok(data))) => data, + _ => { + panic!( + "inner type \"{}\" indicated it allows taking complete body but failed to \ + return Bytes when polled", + std::any::type_name::() + ); + } + } + } } impl MessageBody for &'static [u8] { @@ -116,17 +224,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()); - let bytes = Bytes::from_static(bytes); - Poll::Ready(Some(Ok(bytes))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from_static(mem::take(self)) + } } impl MessageBody for Bytes { @@ -137,16 +251,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()); - Poll::Ready(Some(Ok(bytes))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + mem::take(self) + } } impl MessageBody for BytesMut { @@ -157,16 +278,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()).freeze(); - Poll::Ready(Some(Ok(bytes))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + mem::take(self).freeze() + } } impl MessageBody for Vec { @@ -177,16 +305,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()); - Poll::Ready(Some(Ok(Bytes::from(bytes)))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from(mem::take(self)) + } } impl MessageBody for &'static str { @@ -208,6 +343,14 @@ mod foreign_impls { Poll::Ready(Some(Ok(bytes))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from_static(mem::take(self).as_bytes()) + } } impl MessageBody for String { @@ -228,6 +371,14 @@ mod foreign_impls { Poll::Ready(Some(Ok(Bytes::from(string)))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from(mem::take(self)) + } } impl MessageBody for bytestring::ByteString { @@ -244,6 +395,14 @@ mod foreign_impls { let string = mem::take(self.get_mut()); Poll::Ready(Some(Ok(string.into_bytes()))) } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + mem::take(self).into_bytes() + } } } @@ -406,6 +565,51 @@ mod tests { assert_poll_next!(pl, Bytes::from("test")); } + #[test] + fn take_string() { + let mut data = "test".repeat(2); + let data_bytes = Bytes::from(data.clone()); + assert!(data.is_complete_body()); + assert_eq!(data.take_complete_body(), data_bytes); + + let mut big_data = "test".repeat(64 * 1024); + let data_bytes = Bytes::from(big_data.clone()); + assert!(big_data.is_complete_body()); + assert_eq!(big_data.take_complete_body(), data_bytes); + } + + #[test] + fn take_boxed_equivalence() { + let mut data = Bytes::from_static(b"test"); + assert!(data.is_complete_body()); + assert_eq!(data.take_complete_body(), b"test".as_ref()); + + let mut data = Box::new(Bytes::from_static(b"test")); + assert!(data.is_complete_body()); + assert_eq!(data.take_complete_body(), b"test".as_ref()); + + let mut data = Box::pin(Bytes::from_static(b"test")); + assert!(data.is_complete_body()); + assert_eq!(data.take_complete_body(), b"test".as_ref()); + } + + #[test] + fn take_policy() { + let mut data = Bytes::from_static(b"test"); + // first call returns chunk + assert_eq!(data.take_complete_body(), b"test".as_ref()); + // second call returns empty + assert_eq!(data.take_complete_body(), b"".as_ref()); + + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + let mut data = Bytes::from_static(b"test"); + // take returns whole chunk + assert_eq!(data.take_complete_body(), b"test".as_ref()); + // subsequent poll_next returns None + assert_eq!(Pin::new(&mut data).poll_next(&mut cx), Poll::Ready(None)); + } + // down-casting used to be done with a method on MessageBody trait // test is kept to demonstrate equivalence of Any trait #[actix_rt::test] diff --git a/actix-http/src/body/none.rs b/actix-http/src/body/none.rs index 0fc7c8c9f..bb494078f 100644 --- a/actix-http/src/body/none.rs +++ b/actix-http/src/body/none.rs @@ -40,4 +40,14 @@ impl MessageBody for None { ) -> Poll>> { Poll::Ready(Option::None) } + + #[inline] + fn is_complete_body(&self) -> bool { + true + } + + #[inline] + fn take_complete_body(&mut self) -> Bytes { + Bytes::new() + } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 0886221cc..fa294ab0d 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -53,32 +53,32 @@ impl Encoder { } } - pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self { + pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, mut body: B) -> Self { let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING) || head.status == StatusCode::SWITCHING_PROTOCOLS || head.status == StatusCode::NO_CONTENT || encoding == ContentEncoding::Identity || encoding == ContentEncoding::Auto); - match body.size() { - // no need to compress an empty body - BodySize::None => return Self::none(), - - // we cannot assume that Sized is not a stream - BodySize::Sized(_) | BodySize::Stream => {} + // no need to compress an empty body + if matches!(body.size(), BodySize::None) { + return Self::none(); } - // TODO potentially some optimisation for single-chunk responses here by trying to read the - // payload eagerly, stopping after 2 polls if the first is a chunk and the second is None + let body = if body.is_complete_body() { + let body = body.take_complete_body(); + EncoderBody::Full { body } + } else { + EncoderBody::Stream { body } + }; if can_encode { // Modify response body only if encoder is set if let Some(enc) = ContentEncoder::encoder(encoding) { update_head(encoding, head); - head.no_chunking(false); return Encoder { - body: EncoderBody::Stream { body }, + body, encoder: Some(enc), fut: None, eof: false, @@ -87,7 +87,7 @@ impl Encoder { } Encoder { - body: EncoderBody::Stream { body }, + body, encoder: None, fut: None, eof: false, @@ -99,6 +99,7 @@ pin_project! { #[project = EncoderBodyProj] enum EncoderBody { None, + Full { body: Bytes }, Stream { #[pin] body: B }, } } @@ -112,6 +113,7 @@ where fn size(&self) -> BodySize { match self { EncoderBody::None => BodySize::None, + EncoderBody::Full { body } => body.size(), EncoderBody::Stream { body } => body.size(), } } @@ -122,12 +124,32 @@ where ) -> Poll>> { match self.project() { EncoderBodyProj::None => Poll::Ready(None), - + EncoderBodyProj::Full { body } => { + Pin::new(body).poll_next(cx).map_err(|err| match err {}) + } EncoderBodyProj::Stream { body } => body .poll_next(cx) .map_err(|err| EncoderError::Body(err.into())), } } + + fn is_complete_body(&self) -> bool { + match self { + EncoderBody::None => true, + EncoderBody::Full { .. } => true, + EncoderBody::Stream { .. } => false, + } + } + + fn take_complete_body(&mut self) -> Bytes { + match self { + EncoderBody::None => Bytes::new(), + EncoderBody::Full { body } => body.take_complete_body(), + EncoderBody::Stream { .. } => { + panic!("EncoderBody::Stream variant cannot be taken") + } + } + } } impl MessageBody for Encoder @@ -137,10 +159,10 @@ where type Error = EncoderError; fn size(&self) -> BodySize { - if self.encoder.is_none() { - self.body.size() - } else { + if self.encoder.is_some() { BodySize::Stream + } else { + self.body.size() } } @@ -211,6 +233,22 @@ where } } } + + fn is_complete_body(&self) -> bool { + if self.encoder.is_some() { + false + } else { + self.body.is_complete_body() + } + } + + fn take_complete_body(&mut self) -> Bytes { + if self.encoder.is_some() { + panic!("compressed body stream cannot be taken") + } else { + self.body.take_complete_body() + } + } } fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) { @@ -218,6 +256,8 @@ fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) { header::CONTENT_ENCODING, HeaderValue::from_static(encoding.as_str()), ); + + head.no_chunking(false); } enum ContentEncoder { diff --git a/actix-http/tests/test_openssl.rs b/actix-http/tests/test_openssl.rs index 0c373b8b2..1e371473f 100644 --- a/actix-http/tests/test_openssl.rs +++ b/actix-http/tests/test_openssl.rs @@ -101,7 +101,7 @@ async fn test_h2_1() -> io::Result<()> { #[actix_rt::test] async fn test_h2_body() -> io::Result<()> { - let data = "HELLOWORLD".to_owned().repeat(64 * 1024); + let data = "HELLOWORLD".to_owned().repeat(64 * 1024); // 640 KiB let mut srv = test_server(move || { HttpService::build() .h2(|mut req: Request<_>| async move {