1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

Try to separate HTTP/1 read & write disconnect handling, to fix #511. (#514)

This commit is contained in:
sapir 2018-09-28 04:15:02 +03:00 committed by Nikolay Kim
parent 782eeb5ded
commit 59deb4b40d

View File

@ -136,7 +136,7 @@ where
} }
} }
fn notify_disconnect(&mut self) { fn write_disconnected(&mut self) {
self.flags.insert(Flags::WRITE_DISCONNECTED); self.flags.insert(Flags::WRITE_DISCONNECTED);
// notify all tasks // notify all tasks
@ -144,17 +144,18 @@ where
for task in &mut self.tasks { for task in &mut self.tasks {
task.pipe.disconnected(); task.pipe.disconnected();
} }
}
fn client_disconnect(&mut self) {
// notify all tasks
self.notify_disconnect();
// kill keepalive // kill keepalive
self.keepalive_timer.take(); self.keepalive_timer.take();
}
fn read_disconnected(&mut self) {
self.flags.insert(
Flags::READ_DISCONNECTED
// on parse error, stop reading stream but tasks need to be // on parse error, stop reading stream but tasks need to be
// completed // completed
self.flags.insert(Flags::ERROR); | Flags::ERROR,
);
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::Incomplete); payload.set_error(PayloadError::Incomplete);
@ -225,16 +226,17 @@ where
self.parse(); self.parse();
} }
if disconnected { if disconnected {
self.read_disconnected();
// delay disconnect until all tasks have finished. // delay disconnect until all tasks have finished.
self.flags.insert(Flags::READ_DISCONNECTED);
if self.tasks.is_empty() { if self.tasks.is_empty() {
self.client_disconnect(); self.write_disconnected();
} }
} }
} }
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Err(_) => { Err(_) => {
self.client_disconnect(); self.read_disconnected();
self.write_disconnected();
} }
} }
} }
@ -291,7 +293,8 @@ where
Err(err) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during pipe handling, so just drop connection // during pipe handling, so just drop connection
self.notify_disconnect(); self.read_disconnected();
self.write_disconnected();
self.tasks[idx].flags.insert(EntryFlags::ERROR); self.tasks[idx].flags.insert(EntryFlags::ERROR);
error!("Unhandled error1: {}", err); error!("Unhandled error1: {}", err);
continue; continue;
@ -304,7 +307,8 @@ where
self.tasks[idx].flags.insert(EntryFlags::FINISHED) self.tasks[idx].flags.insert(EntryFlags::FINISHED)
} }
Err(err) => { Err(err) => {
self.notify_disconnect(); self.read_disconnected();
self.write_disconnected();
self.tasks[idx].flags.insert(EntryFlags::ERROR); self.tasks[idx].flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
continue; continue;
@ -332,7 +336,8 @@ where
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err); debug!("Error sending data: {}", err);
self.notify_disconnect(); self.read_disconnected();
self.write_disconnected();
return Err(()); return Err(());
} }
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
@ -472,10 +477,11 @@ where
} }
} }
Ok(None) => { Ok(None) => {
if self.flags.contains(Flags::READ_DISCONNECTED) if self.flags.contains(Flags::READ_DISCONNECTED) {
&& self.tasks.is_empty() self.read_disconnected();
{ if self.tasks.is_empty() {
self.client_disconnect(); self.write_disconnected();
}
} }
break; break;
} }