1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

Wake payload reading task when data is available

This commit is contained in:
Nikolay Kim 2018-03-12 16:29:13 -07:00
parent 05ff35d383
commit b4b0deb7fa
2 changed files with 21 additions and 0 deletions

View File

@ -2,8 +2,11 @@
## 0.4.9 (2018-03-xx) ## 0.4.9 (2018-03-xx)
* Wake payload reading task when data is available
* Fix server keep-alive handling * Fix server keep-alive handling
## 0.4.8 (2018-03-12) ## 0.4.8 (2018-03-12)
* Allow to set read buffer capacity for server request * Allow to set read buffer capacity for server request

View File

@ -5,6 +5,7 @@ use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task};
use error::PayloadError; use error::PayloadError;
@ -174,6 +175,7 @@ struct Inner {
need_read: bool, need_read: bool,
items: VecDeque<Bytes>, items: VecDeque<Bytes>,
capacity: usize, capacity: usize,
task: Option<Task>,
} }
impl Inner { impl Inner {
@ -186,6 +188,7 @@ impl Inner {
items: VecDeque::new(), items: VecDeque::new(),
need_read: true, need_read: true,
capacity: MAX_BUFFER_SIZE, capacity: MAX_BUFFER_SIZE,
task: None,
} }
} }
@ -204,6 +207,9 @@ impl Inner {
self.len += data.len(); self.len += data.len();
self.items.push_back(data); self.items.push_back(data);
self.need_read = self.len < self.capacity; self.need_read = self.len < self.capacity;
if let Some(task) = self.task.take() {
task.notify()
}
} }
#[inline] #[inline]
@ -237,6 +243,12 @@ impl Inner {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
self.len -= data.len(); self.len -= data.len();
self.need_read = self.len < self.capacity; self.need_read = self.len < self.capacity;
#[cfg(not(test))]
{
if self.need_read && self.task.is_none() {
self.task = Some(current_task());
}
}
Ok(Async::Ready(Some(data))) Ok(Async::Ready(Some(data)))
} else if let Some(err) = self.err.take() { } else if let Some(err) = self.err.take() {
Err(err) Err(err)
@ -244,6 +256,12 @@ impl Inner {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
self.need_read = true; self.need_read = true;
#[cfg(not(test))]
{
if self.task.is_none() {
self.task = Some(current_task());
}
}
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }