diff --git a/CHANGES.md b/CHANGES.md index 3a6a3797c..dc7dda78d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,8 +2,11 @@ ## 0.4.9 (2018-03-xx) +* Wake payload reading task when data is available + * Fix server keep-alive handling + ## 0.4.8 (2018-03-12) * Allow to set read buffer capacity for server request diff --git a/src/payload.rs b/src/payload.rs index 13e48efca..695a2a03f 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -5,6 +5,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; +use futures::task::{Task, current as current_task}; use error::PayloadError; @@ -174,6 +175,7 @@ struct Inner { need_read: bool, items: VecDeque, capacity: usize, + task: Option, } impl Inner { @@ -186,6 +188,7 @@ impl Inner { items: VecDeque::new(), need_read: true, capacity: MAX_BUFFER_SIZE, + task: None, } } @@ -204,6 +207,9 @@ impl Inner { self.len += data.len(); self.items.push_back(data); self.need_read = self.len < self.capacity; + if let Some(task) = self.task.take() { + task.notify() + } } #[inline] @@ -237,6 +243,12 @@ impl Inner { if let Some(data) = self.items.pop_front() { self.len -= data.len(); self.need_read = self.len < self.capacity; + #[cfg(not(test))] + { + if self.need_read && self.task.is_none() { + self.task = Some(current_task()); + } + } Ok(Async::Ready(Some(data))) } else if let Some(err) = self.err.take() { Err(err) @@ -244,6 +256,12 @@ impl Inner { Ok(Async::Ready(None)) } else { self.need_read = true; + #[cfg(not(test))] + { + if self.task.is_none() { + self.task = Some(current_task()); + } + } Ok(Async::NotReady) } }