diff --git a/src/server/h1.rs b/src/server/h1.rs index b715dfb6a..afe143b4a 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -136,7 +136,7 @@ where } } - fn notify_disconnect(&mut self) { + fn write_disconnected(&mut self) { self.flags.insert(Flags::WRITE_DISCONNECTED); // notify all tasks @@ -144,17 +144,18 @@ where for task in &mut self.tasks { task.pipe.disconnected(); } - } - fn client_disconnect(&mut self) { - // notify all tasks - self.notify_disconnect(); // kill keepalive self.keepalive_timer.take(); + } - // on parse error, stop reading stream but tasks need to be - // completed - self.flags.insert(Flags::ERROR); + fn read_disconnected(&mut self) { + self.flags.insert( + Flags::READ_DISCONNECTED + // on parse error, stop reading stream but tasks need to be + // completed + | Flags::ERROR, + ); if let Some(mut payload) = self.payload.take() { payload.set_error(PayloadError::Incomplete); @@ -225,16 +226,17 @@ where self.parse(); } if disconnected { + self.read_disconnected(); // delay disconnect until all tasks have finished. - self.flags.insert(Flags::READ_DISCONNECTED); if self.tasks.is_empty() { - self.client_disconnect(); + self.write_disconnected(); } } } Ok(Async::NotReady) => (), Err(_) => { - self.client_disconnect(); + self.read_disconnected(); + self.write_disconnected(); } } } @@ -291,7 +293,8 @@ where Err(err) => { // it is not possible to recover from error // during pipe handling, so just drop connection - self.notify_disconnect(); + self.read_disconnected(); + self.write_disconnected(); self.tasks[idx].flags.insert(EntryFlags::ERROR); error!("Unhandled error1: {}", err); continue; @@ -304,7 +307,8 @@ where self.tasks[idx].flags.insert(EntryFlags::FINISHED) } Err(err) => { - self.notify_disconnect(); + self.read_disconnected(); + self.write_disconnected(); self.tasks[idx].flags.insert(EntryFlags::ERROR); error!("Unhandled error: {}", err); continue; @@ -332,7 +336,8 @@ where Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { debug!("Error sending data: {}", err); - self.notify_disconnect(); + self.read_disconnected(); + self.write_disconnected(); return Err(()); } Ok(Async::Ready(_)) => { @@ -472,10 +477,11 @@ where } } Ok(None) => { - if self.flags.contains(Flags::READ_DISCONNECTED) - && self.tasks.is_empty() - { - self.client_disconnect(); + if self.flags.contains(Flags::READ_DISCONNECTED) { + self.read_disconnected(); + if self.tasks.is_empty() { + self.write_disconnected(); + } } break; }