From 110605f50bb08b86d789081554c8ce1fe2db2f62 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 8 Jul 2018 09:41:55 +0600 Subject: [PATCH] stop actor context on error #311 --- src/pipeline.rs | 16 +++++++++++++++- src/server/h1.rs | 16 +++++++--------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/pipeline.rs b/src/pipeline.rs index 528680f53..66b2f29a2 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -580,6 +580,7 @@ impl ProcessResponse { Frame::Chunk(Some(chunk)) => { match io.write(&chunk) { Err(err) => { + info.context = Some(ctx); info.error = Some(err.into()); return Ok( FinishingMiddlewares::init( @@ -606,6 +607,7 @@ impl ProcessResponse { break; } Err(err) => { + info.context = Some(ctx); info.error = Some(err); return Ok(FinishingMiddlewares::init( info, mws, self.resp, @@ -641,6 +643,12 @@ impl ProcessResponse { } Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Err(err) => { + if let IOState::Actor(mut ctx) = + mem::replace(&mut self.iostate, IOState::Done) + { + ctx.disconnected(); + info.context = Some(ctx); + } info.error = Some(err.into()); return Ok(FinishingMiddlewares::init(info, mws, self.resp)); } @@ -755,7 +763,13 @@ impl Completed { if info.context.is_none() { PipelineState::None } else { - PipelineState::Completed(Completed(PhantomData, PhantomData)) + match info.poll_context() { + Ok(Async::NotReady) => { + PipelineState::Completed(Completed(PhantomData, PhantomData)) + } + Ok(Async::Ready(())) => PipelineState::None, + Err(_) => PipelineState::Error, + } } } diff --git a/src/server/h1.rs b/src/server/h1.rs index 6b1a5b9c9..5b83dcc08 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -127,8 +127,8 @@ where fn notify_disconnect(&mut self) { // notify all tasks self.stream.disconnected(); - for entry in &mut self.tasks { - entry.pipe.disconnected() + for task in &mut self.tasks { + task.pipe.disconnected(); } } @@ -239,6 +239,7 @@ where if let Ok(Async::NotReady) = self.stream.poll_completed(true) { return Ok(Async::NotReady); } + self.flags.insert(Flags::ERROR); return Err(()); } @@ -272,14 +273,10 @@ where Err(err) => { // it is not possible to recover from error // during pipe handling, so just drop connection - error!("Unhandled error: {}", err); + self.notify_disconnect(); self.tasks[idx].flags.insert(EntryFlags::ERROR); - - // check stream state, we still can have valid data in buffer - if let Ok(Async::NotReady) = self.stream.poll_completed(true) { - return Ok(Async::NotReady); - } - return Err(()); + error!("Unhandled error1: {}", err); + continue; } } } else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) { @@ -292,6 +289,7 @@ where self.notify_disconnect(); self.tasks[idx].flags.insert(EntryFlags::ERROR); error!("Unhandled error: {}", err); + continue; } } }