1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

wake up io task when next chunk of data is needed

This commit is contained in:
Nikolay Kim 2018-03-20 11:37:13 -07:00
parent 8198f5e10a
commit 978091cedb

View File

@ -159,6 +159,12 @@ impl PayloadWriter for PayloadSender {
if shared.borrow().need_read { if shared.borrow().need_read {
PayloadStatus::Read PayloadStatus::Read
} else { } else {
#[cfg(not(test))]
{
if shared.borrow_mut().io_task.is_none() {
shared.borrow_mut().io_task = Some(current_task());
}
}
PayloadStatus::Pause PayloadStatus::Pause
} }
} else { } else {
@ -176,6 +182,7 @@ struct Inner {
items: VecDeque<Bytes>, items: VecDeque<Bytes>,
capacity: usize, capacity: usize,
task: Option<Task>, task: Option<Task>,
io_task: Option<Task>,
} }
impl Inner { impl Inner {
@ -189,6 +196,7 @@ impl Inner {
need_read: true, need_read: true,
capacity: MAX_BUFFER_SIZE, capacity: MAX_BUFFER_SIZE,
task: None, task: None,
io_task: None,
} }
} }
@ -248,6 +256,9 @@ impl Inner {
if self.need_read && self.task.is_none() { if self.need_read && self.task.is_none() {
self.task = Some(current_task()); self.task = Some(current_task());
} }
if let Some(task) = self.io_task.take() {
task.notify()
}
} }
Ok(Async::Ready(Some(data))) Ok(Async::Ready(Some(data)))
} else if let Some(err) = self.err.take() { } else if let Some(err) = self.err.take() {
@ -261,6 +272,9 @@ impl Inner {
if self.task.is_none() { if self.task.is_none() {
self.task = Some(current_task()); self.task = Some(current_task());
} }
if let Some(task) = self.io_task.take() {
task.notify()
}
} }
Ok(Async::NotReady) Ok(Async::NotReady)
} }