1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-09-01 17:27:18 +02:00

process inactive tasks

This commit is contained in:
Nikolay Kim
2017-10-20 23:12:36 -07:00
parent 71dc9edf8e
commit 0bfe07b371
9 changed files with 238 additions and 34 deletions

View File

@@ -115,7 +115,7 @@ impl<T, A> Handler<(T, A), io::Error> for HttpServer<T, A>
reader: Reader::new(),
error: false,
items: VecDeque::new(),
inactive: Vec::new(),
inactive: VecDeque::new(),
keepalive: true,
keepalive_timer: None,
});
@@ -143,7 +143,7 @@ pub struct HttpChannel<T: 'static, A: 'static> {
reader: Reader,
error: bool,
items: VecDeque<Entry>,
inactive: Vec<Entry>,
inactive: VecDeque<Entry>,
keepalive: bool,
keepalive_timer: Option<Timeout>,
}
@@ -192,20 +192,22 @@ impl<T, A> Future for HttpChannel<T, A>
};
match self.items[idx].task.poll_io(&mut self.stream, req)
{
Ok(Async::Ready(val)) => {
Ok(Async::Ready(ready)) => {
let mut item = self.items.pop_front().unwrap();
// overide keep-alive state
if self.keepalive {
self.keepalive = item.task.keepalive();
}
if !val {
if !ready {
item.eof = true;
self.inactive.push(item);
self.inactive.push_back(item);
}
// no keep-alive
if !self.keepalive && self.items.is_empty() {
if ready && !self.keepalive &&
self.items.is_empty() && self.inactive.is_empty()
{
return Ok(Async::Ready(()))
}
continue
@@ -217,11 +219,11 @@ impl<T, A> Future for HttpChannel<T, A>
return Err(())
}
}
} else if !self.items[idx].finished {
} else if !self.items[idx].finished && !self.items[idx].error {
match self.items[idx].task.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) =>
self.items[idx].finished = true,
Ok(Async::NotReady) => (),
Err(_) =>
self.items[idx].error = true,
}
@@ -229,6 +231,26 @@ impl<T, A> Future for HttpChannel<T, A>
idx += 1;
}
// check inactive tasks
let mut idx = 0;
while idx < self.inactive.len() {
if idx == 0 && self.inactive[idx].error && self.inactive[idx].finished {
let _ = self.inactive.pop_front();
continue
}
if !self.inactive[idx].finished && !self.inactive[idx].error {
match self.inactive[idx].task.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) =>
self.inactive[idx].finished = true,
Err(_) =>
self.inactive[idx].error = true,
}
}
idx += 1;
}
// read incoming data
if !self.error && self.items.len() < MAX_PIPELINED_MESSAGES {
match self.reader.parse(&mut self.stream) {
@@ -251,7 +273,7 @@ impl<T, A> Future for HttpChannel<T, A>
self.keepalive_timer.take();
// on parse error, stop reading stream but
// complete tasks
// tasks need to be completed
self.error = true;
if let ReaderError::Error(err) = err {
@@ -265,7 +287,7 @@ impl<T, A> Future for HttpChannel<T, A>
}
Ok(Async::NotReady) => {
// start keep-alive timer, this is also slow request timeout
if self.items.is_empty() {
if self.items.is_empty() && self.inactive.is_empty() {
if self.keepalive {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
@@ -287,7 +309,7 @@ impl<T, A> Future for HttpChannel<T, A>
}
// check for parse error
if self.items.is_empty() && self.error {
if self.items.is_empty() && self.inactive.is_empty() && self.error {
return Ok(Async::Ready(()))
}
}