From f6393728c7d4a820ab9cfff81a1cf627a2007889 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 24 Feb 2021 09:08:56 +0000 Subject: [PATCH] remove usage of actix_utils::mpsc (#2023) --- actix-multipart/Cargo.toml | 4 +++- actix-multipart/src/server.rs | 15 +++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 67b2698b..f3bb9233 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -22,7 +22,7 @@ actix-utils = "3.0.0-beta.2" bytes = "1" derive_more = "0.99.5" httparse = "1.3" -futures-util = { version = "0.3.7", default-features = false } +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } log = "0.4" mime = "0.3" twoway = "0.2" @@ -30,3 +30,5 @@ twoway = "0.2" [dev-dependencies] actix-rt = "2" actix-http = "3.0.0-beta.3" +tokio = { version = "1", features = ["sync"] } +tokio-stream = "0.1" diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 8cd1c8e0..d9ff3d57 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -804,12 +804,13 @@ mod tests { use super::*; use actix_http::h1::Payload; - use actix_utils::mpsc; use actix_web::http::header::{DispositionParam, DispositionType}; use actix_web::test::TestRequest; use actix_web::FromRequest; use bytes::Bytes; use futures_util::future::lazy; + use tokio::sync::mpsc; + use tokio_stream::wrappers::UnboundedReceiverStream; #[actix_rt::test] async fn test_boundary() { @@ -855,13 +856,17 @@ mod tests { } fn create_stream() -> ( - mpsc::Sender>, + mpsc::UnboundedSender>, impl Stream>, ) { - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::unbounded_channel(); - (tx, rx.map(|res| res.map_err(|_| panic!()))) + ( + tx, + UnboundedReceiverStream::new(rx).map(|res| res.map_err(|_| panic!())), + ) } + // Stream that returns from a Bytes, one char at a time and Pending every other poll() struct SlowStream { bytes: Bytes, @@ -889,9 +894,11 @@ mod tests { cx.waker().wake_by_ref(); return Poll::Pending; } + if this.pos == this.bytes.len() { return Poll::Ready(None); } + let res = Poll::Ready(Some(Ok(this.bytes.slice(this.pos..(this.pos + 1))))); this.pos += 1; this.ready = false;