From cdba30d45fc5814c0cd9657deec55968d04787ad Mon Sep 17 00:00:00 2001 From: Kai Ren Date: Tue, 28 Jan 2020 10:28:09 +0100 Subject: [PATCH] Skip empty chucks for BodyStream and SizedStream (#1308) * Skip empty chucks for BodyStream and SizedStream when streaming response (#1267) * Fix tests to fail on previous implementation Co-authored-by: Yuki Okushi --- CHANGES.md | 4 ++- actix-http/src/body.rs | 76 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 29f78e0b1..4143f78d9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,7 +4,9 @@ ### Changed -* Use `sha-1` crate instead of unmaintained `sha1` crate +* Use `sha-1` crate instead of unmaintained `sha1` crate + +* Skip empty chunks when returning response from a `Stream` #1308 ## [2.0.0] - 2019-12-25 diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 850f97ee4..881764439 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -5,6 +5,7 @@ use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; use futures_core::Stream; +use futures_util::ready; use pin_project::{pin_project, project}; use crate::error::Error; @@ -389,12 +390,19 @@ where BodySize::Stream } + /// Attempts to pull out the next value of the underlying [`Stream`]. + /// + /// Empty values are skipped to prevent [`BodyStream`]'s transmission being + /// ended on a zero-length chunk, but rather proceed until the underlying + /// [`Stream`] ends. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - unsafe { Pin::new_unchecked(self) } - .project() - .stream - .poll_next(cx) - .map(|res| res.map(|res| res.map_err(std::convert::Into::into))) + let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream; + loop { + return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + Some(Ok(ref bytes)) if bytes.is_empty() => continue, + opt => opt.map(|res| res.map_err(Into::into)), + }); + } } } @@ -424,17 +432,26 @@ where BodySize::Sized64(self.size) } + /// Attempts to pull out the next value of the underlying [`Stream`]. + /// + /// Empty values are skipped to prevent [`SizedStream`]'s transmission being + /// ended on a zero-length chunk, but rather proceed until the underlying + /// [`Stream`] ends. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - unsafe { Pin::new_unchecked(self) } - .project() - .stream - .poll_next(cx) + let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream; + loop { + return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + Some(Ok(ref bytes)) if bytes.is_empty() => continue, + val => val, + }); + } } } #[cfg(test)] mod tests { use super::*; + use futures::stream; use futures_util::future::poll_fn; impl Body { @@ -589,4 +606,45 @@ mod tests { BodySize::Sized(25) ); } + + mod body_stream { + use super::*; + + #[actix_rt::test] + async fn skips_empty_chunks() { + let mut body = BodyStream::new(stream::iter( + ["1", "", "2"] + .iter() + .map(|&v| Ok(Bytes::from(v)) as Result), + )); + assert_eq!( + poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from("1")), + ); + assert_eq!( + poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from("2")), + ); + } + } + + mod sized_stream { + use super::*; + + #[actix_rt::test] + async fn skips_empty_chunks() { + let mut body = SizedStream::new( + 2, + stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), + ); + assert_eq!( + poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from("1")), + ); + assert_eq!( + poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + Some(Bytes::from("2")), + ); + } + } }