1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

stop actor context on error #311

This commit is contained in:
Nikolay Kim 2018-07-08 09:41:55 +06:00
parent 00c97504b6
commit 110605f50b
2 changed files with 22 additions and 10 deletions

View File

@ -580,6 +580,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Frame::Chunk(Some(chunk)) => { Frame::Chunk(Some(chunk)) => {
match io.write(&chunk) { match io.write(&chunk) {
Err(err) => { Err(err) => {
info.context = Some(ctx);
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init( FinishingMiddlewares::init(
@ -606,6 +607,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
break; break;
} }
Err(err) => { Err(err) => {
info.context = Some(ctx);
info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, mws, self.resp, info, mws, self.resp,
@ -641,6 +643,12 @@ impl<S: 'static, H> ProcessResponse<S, H> {
} }
Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => { Err(err) => {
if let IOState::Actor(mut ctx) =
mem::replace(&mut self.iostate, IOState::Done)
{
ctx.disconnected();
info.context = Some(ctx);
}
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, mws, self.resp)); return Ok(FinishingMiddlewares::init(info, mws, self.resp));
} }
@ -755,8 +763,14 @@ impl<S, H> Completed<S, H> {
if info.context.is_none() { if info.context.is_none() {
PipelineState::None PipelineState::None
} else { } else {
match info.poll_context() {
Ok(Async::NotReady) => {
PipelineState::Completed(Completed(PhantomData, PhantomData)) PipelineState::Completed(Completed(PhantomData, PhantomData))
} }
Ok(Async::Ready(())) => PipelineState::None,
Err(_) => PipelineState::Error,
}
}
} }
#[inline] #[inline]

View File

@ -127,8 +127,8 @@ where
fn notify_disconnect(&mut self) { fn notify_disconnect(&mut self) {
// notify all tasks // notify all tasks
self.stream.disconnected(); self.stream.disconnected();
for entry in &mut self.tasks { for task in &mut self.tasks {
entry.pipe.disconnected() task.pipe.disconnected();
} }
} }
@ -239,6 +239,7 @@ where
if let Ok(Async::NotReady) = self.stream.poll_completed(true) { if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
self.flags.insert(Flags::ERROR);
return Err(()); return Err(());
} }
@ -272,14 +273,10 @@ where
Err(err) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during pipe handling, so just drop connection // during pipe handling, so just drop connection
error!("Unhandled error: {}", err); self.notify_disconnect();
self.tasks[idx].flags.insert(EntryFlags::ERROR); self.tasks[idx].flags.insert(EntryFlags::ERROR);
error!("Unhandled error1: {}", err);
// check stream state, we still can have valid data in buffer continue;
if let Ok(Async::NotReady) = self.stream.poll_completed(true) {
return Ok(Async::NotReady);
}
return Err(());
} }
} }
} else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) { } else if !self.tasks[idx].flags.contains(EntryFlags::FINISHED) {
@ -292,6 +289,7 @@ where
self.notify_disconnect(); self.notify_disconnect();
self.tasks[idx].flags.insert(EntryFlags::ERROR); self.tasks[idx].flags.insert(EntryFlags::ERROR);
error!("Unhandled error: {}", err); error!("Unhandled error: {}", err);
continue;
} }
} }
} }