1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

Fix server keep-alive handling

This commit is contained in:
Nikolay Kim 2018-03-12 16:16:17 -07:00
parent 29c3e8f7ea
commit 05ff35d383
2 changed files with 36 additions and 10 deletions

View File

@ -1,5 +1,9 @@
# Changes # Changes
## 0.4.9 (2018-03-xx)
* Fix server keep-alive handling
## 0.4.8 (2018-03-12) ## 0.4.8 (2018-03-12)
* Allow to set read buffer capacity for server request * Allow to set read buffer capacity for server request

View File

@ -32,8 +32,10 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! { bitflags! {
struct Flags: u8 { struct Flags: u8 {
const STARTED = 0b0000_0001;
const ERROR = 0b0000_0010; const ERROR = 0b0000_0010;
const KEEPALIVE = 0b0000_0100; const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000;
} }
} }
@ -94,17 +96,32 @@ impl<T, H> Http1<T, H>
match timer.poll() { match timer.poll() {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
trace!("Keep-alive timeout, close connection"); trace!("Keep-alive timeout, close connection");
return Ok(Async::Ready(())) self.flags.insert(Flags::SHUTDOWN);
} }
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Err(_) => unreachable!(), Err(_) => unreachable!(),
} }
} }
// shutdown
if self.flags.contains(Flags::SHUTDOWN) {
match self.stream.poll_completed(true) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(())
}
}
}
loop { loop {
match self.poll_io()? { match self.poll_io()? {
Async::Ready(true) => (), Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(())), Async::Ready(false) => {
self.flags.insert(Flags::SHUTDOWN);
return self.poll()
},
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
} }
} }
@ -120,6 +137,8 @@ impl<T, H> Http1<T, H>
match self.reader.parse(self.stream.get_mut(), match self.reader.parse(self.stream.get_mut(),
&mut self.read_buf, &self.settings) { &mut self.read_buf, &self.settings) {
Ok(Async::Ready(mut req)) => { Ok(Async::Ready(mut req)) => {
self.flags.insert(Flags::STARTED);
// set remote addr // set remote addr
req.set_peer_addr(self.addr); req.set_peer_addr(self.addr);
@ -260,21 +279,24 @@ impl<T, H> Http1<T, H>
} }
// check stream state // check stream state
match self.stream.poll_completed(self.tasks.is_empty()) { if self.flags.contains(Flags::STARTED) {
Ok(Async::NotReady) => return Ok(Async::NotReady), match self.stream.poll_completed(false) {
Err(err) => { Ok(Async::NotReady) => return Ok(Async::NotReady),
debug!("Error sending data: {}", err); Err(err) => {
return Err(()) debug!("Error sending data: {}", err);
return Err(())
}
_ => (),
} }
_ => (),
} }
// deal with keep-alive // deal with keep-alive
if self.tasks.is_empty() { if self.tasks.is_empty() {
// no keep-alive situations // no keep-alive situations
if self.flags.contains(Flags::ERROR) if (self.flags.contains(Flags::ERROR)
|| !self.flags.contains(Flags::KEEPALIVE) || !self.flags.contains(Flags::KEEPALIVE)
|| !self.settings.keep_alive_enabled() || !self.settings.keep_alive_enabled()) &&
self.flags.contains(Flags::STARTED)
{ {
return Ok(Async::Ready(false)) return Ok(Async::Ready(false))
} }