From 978091cedbc4e17f641f6d8c0d9524e04d8e6cf1 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 20 Mar 2018 11:37:13 -0700 Subject: [PATCH] wake up io task when next chunk of data is needed --- src/payload.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/payload.rs b/src/payload.rs index 695a2a03..6fb63f69 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -159,6 +159,12 @@ impl PayloadWriter for PayloadSender { if shared.borrow().need_read { PayloadStatus::Read } else { + #[cfg(not(test))] + { + if shared.borrow_mut().io_task.is_none() { + shared.borrow_mut().io_task = Some(current_task()); + } + } PayloadStatus::Pause } } else { @@ -176,6 +182,7 @@ struct Inner { items: VecDeque, capacity: usize, task: Option, + io_task: Option, } impl Inner { @@ -189,6 +196,7 @@ impl Inner { need_read: true, capacity: MAX_BUFFER_SIZE, task: None, + io_task: None, } } @@ -248,6 +256,9 @@ impl Inner { if self.need_read && self.task.is_none() { self.task = Some(current_task()); } + if let Some(task) = self.io_task.take() { + task.notify() + } } Ok(Async::Ready(Some(data))) } else if let Some(err) = self.err.take() { @@ -261,6 +272,9 @@ impl Inner { if self.task.is_none() { self.task = Some(current_task()); } + if let Some(task) = self.io_task.take() { + task.notify() + } } Ok(Async::NotReady) }