From 6ae131ce298330c7388b426ec06e7a5969023b75 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 6 Jul 2024 23:38:37 +0100 Subject: [PATCH] test(multipart): replace SlowStream helper --- actix-multipart/Cargo.toml | 1 + actix-multipart/src/multipart.rs | 58 ++++++++------------------------ 2 files changed, 15 insertions(+), 44 deletions(-) diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index e5d1b5b1..19761bb2 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -66,6 +66,7 @@ assert_matches = "1" awc = "3" env_logger = "0.11" futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] } +futures-test = "0.3" multer = "3" tokio = { version = "1.24.2", features = ["sync"] } tokio-stream = "0.1" diff --git a/actix-multipart/src/multipart.rs b/actix-multipart/src/multipart.rs index dc6a9ecb..e511cb6d 100644 --- a/actix-multipart/src/multipart.rs +++ b/actix-multipart/src/multipart.rs @@ -482,7 +482,8 @@ mod tests { FromRequest, }; use assert_matches::assert_matches; - use futures_util::{future::lazy, StreamExt as _}; + use futures_test::stream::StreamTestExt as _; + use futures_util::{future::lazy, stream, StreamExt as _}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -545,45 +546,6 @@ mod tests { ) } - // Stream that returns from a Bytes, one char at a time and Pending every other poll() - struct SlowStream { - bytes: Bytes, - pos: usize, - ready: bool, - } - - impl SlowStream { - fn new(bytes: Bytes) -> SlowStream { - SlowStream { - bytes, - pos: 0, - ready: false, - } - } - } - - impl Stream for SlowStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - if !this.ready { - this.ready = true; - cx.waker().wake_by_ref(); - return Poll::Pending; - } - - if this.pos == this.bytes.len() { - return Poll::Ready(None); - } - - let res = Poll::Ready(Some(Ok(this.bytes.slice(this.pos..(this.pos + 1))))); - this.pos += 1; - this.ready = false; - res - } - } - fn create_simple_request_with_header() -> (Bytes, HeaderMap) { let (body, headers) = crate::test::create_form_data_payload_and_headers_with_boundary( BOUNDARY, @@ -721,7 +683,9 @@ mod tests { #[actix_rt::test] async fn test_stream() { let (bytes, headers) = create_double_request_with_header(); - let payload = SlowStream::new(bytes); + let payload = stream::iter(bytes) + .map(|byte| Ok(Bytes::copy_from_slice(&[byte]))) + .interleave_pending(); let mut multipart = Multipart::new(&headers, payload); match multipart.next().await.unwrap() { @@ -899,7 +863,9 @@ mod tests { "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"", ), ); - let payload = SlowStream::new(bytes); + let payload = stream::iter(bytes) + .map(|byte| Ok(Bytes::copy_from_slice(&[byte]))) + .interleave_pending(); let mut multipart = Multipart::new(&headers, payload); let res = multipart.next().await.unwrap(); @@ -929,7 +895,9 @@ mod tests { "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"", ), ); - let payload = SlowStream::new(bytes); + let payload = stream::iter(bytes) + .map(|byte| Ok(Bytes::copy_from_slice(&[byte]))) + .interleave_pending(); let mut multipart = Multipart::new(&headers, payload); let res = multipart.next().await.unwrap(); @@ -955,7 +923,9 @@ mod tests { "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"", ), ); - let payload = SlowStream::new(bytes); + let payload = stream::iter(bytes) + .map(|byte| Ok(Bytes::copy_from_slice(&[byte]))) + .interleave_pending(); let mut multipart = Multipart::new(&headers, payload); let res = multipart.next().await.unwrap();