1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

better pipeline error handling

This commit is contained in:
Nikolay Kim 2017-11-25 09:28:25 -08:00
parent 64ade803f9
commit 1fc64bc83d
4 changed files with 40 additions and 42 deletions

View File

@ -106,8 +106,7 @@ impl<T, H> Http1<T, H>
} }
// this is anoying // this is anoying
match item.task.poll_io(&mut self.stream) match item.task.poll_io(&mut self.stream) {
{
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
not_ready = false; not_ready = false;
@ -126,9 +125,10 @@ impl<T, H> Http1<T, H>
// no more IO for this iteration // no more IO for this iteration
io = true; io = true;
}, },
Err(_) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during task handling, so just drop connection // during task handling, so just drop connection
error!("Unhandled error: {}", err);
return Err(()) return Err(())
} }
} }
@ -139,8 +139,10 @@ impl<T, H> Http1<T, H>
not_ready = false; not_ready = false;
item.finished = true; item.finished = true;
}, },
Err(_) => Err(err) => {
item.error = true, item.error = true;
error!("Unhandled error: {}", err);
}
} }
} }
idx += 1; idx += 1;

View File

@ -92,7 +92,8 @@ impl<T, H> Http2<T, H>
not_ready = false; not_ready = false;
}, },
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Err(_) => { Err(err) => {
error!("Unhandled error: {}", err);
item.eof = true; item.eof = true;
item.error = true; item.error = true;
item.stream.reset(Reason::INTERNAL_ERROR); item.stream.reset(Reason::INTERNAL_ERROR);
@ -105,9 +106,10 @@ impl<T, H> Http2<T, H>
not_ready = false; not_ready = false;
item.finished = true; item.finished = true;
}, },
Err(_) => { Err(err) => {
item.error = true; item.error = true;
item.finished = true; item.finished = true;
error!("Unhandled error: {}", err);
} }
} }
} }

View File

@ -58,7 +58,7 @@ impl Pipeline {
} }
} }
pub(crate) fn poll_io<T: Writer>(&mut self, io: &mut T) -> Poll<bool, ()> { pub(crate) fn poll_io<T: Writer>(&mut self, io: &mut T) -> Poll<bool, Error> {
loop { loop {
let state = mem::replace(&mut self.0, PipelineState::None); let state = mem::replace(&mut self.0, PipelineState::None);
match state { match state {
@ -161,7 +161,7 @@ impl Handle {
idx: idx, req: req, task:task, middlewares: mw } idx: idx, req: req, task:task, middlewares: mw }
} }
fn poll_io<T: Writer>(&mut self, io: &mut T) -> Poll<bool, ()> { fn poll_io<T: Writer>(&mut self, io: &mut T) -> Poll<bool, Error> {
self.task.poll_io(io, &mut self.req) self.task.poll_io(io, &mut self.req)
} }
@ -262,8 +262,7 @@ impl Start {
if self.disconnected { if self.disconnected {
task.disconnected() task.disconnected()
} }
task.set_middlewares( task.set_middlewares(MiddlewaresResponse::new(Rc::clone(&self.middlewares)));
MiddlewaresResponse::new(self.idx, Rc::clone(&self.middlewares)));
task task
} }
@ -366,16 +365,15 @@ impl Start {
/// Middlewares response executor /// Middlewares response executor
pub(crate) struct MiddlewaresResponse { pub(crate) struct MiddlewaresResponse {
idx: usize, idx: usize,
fut: Option<Box<Future<Item=HttpResponse, Error=HttpResponse>>>, fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>,
middlewares: Rc<Vec<Box<Middleware>>>, middlewares: Rc<Vec<Box<Middleware>>>,
} }
impl MiddlewaresResponse { impl MiddlewaresResponse {
fn new(idx: usize, mw: Rc<Vec<Box<Middleware>>>) -> MiddlewaresResponse { fn new(mw: Rc<Vec<Box<Middleware>>>) -> MiddlewaresResponse {
let idx = if idx == 0 { 0 } else { idx - 1 };
MiddlewaresResponse { MiddlewaresResponse {
idx: idx, idx: 0,
fut: None, fut: None,
middlewares: mw } middlewares: mw }
} }
@ -401,7 +399,7 @@ impl MiddlewaresResponse {
} }
} }
pub fn poll(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, ()> { pub fn poll(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, Error> {
if self.fut.is_none() { if self.fut.is_none() {
return Ok(Async::Ready(None)) return Ok(Async::Ready(None))
} }
@ -409,11 +407,13 @@ impl MiddlewaresResponse {
loop { loop {
// poll latest fut // poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() { let mut resp = match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) =>
Ok(Async::Ready(resp)) | Err(resp) => { return Ok(Async::NotReady),
Ok(Async::Ready(resp)) => {
self.idx += 1; self.idx += 1;
resp resp
} }
Err(err) => return Err(err)
}; };
loop { loop {

View File

@ -170,7 +170,7 @@ impl Task {
} }
} }
pub(crate) fn poll_io<T>(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll<bool, ()> pub(crate) fn poll_io<T>(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll<bool, Error>
where T: Writer where T: Writer
{ {
trace!("POLL-IO frames:{:?}", self.frames.len()); trace!("POLL-IO frames:{:?}", self.frames.len());
@ -181,35 +181,30 @@ impl Task {
} else if self.drain.is_empty() { } else if self.drain.is_empty() {
// poll stream // poll stream
if self.state == TaskRunningState::Running { if self.state == TaskRunningState::Running {
match self.poll() { match self.poll()? {
Ok(Async::Ready(_)) => { Async::Ready(_) => {
self.state = TaskRunningState::Done; self.state = TaskRunningState::Done;
}, },
Ok(Async::NotReady) => (), Async::NotReady => (),
Err(_) => return Err(())
} }
} }
// process middlewares response // process middlewares response
if let Some(mut middlewares) = self.middlewares.take() { if let Some(mut middlewares) = self.middlewares.take() {
match middlewares.poll(req) { match middlewares.poll(req)? {
Err(_) => return Err(()), Async::NotReady => {
Ok(Async::NotReady) => {
self.middlewares = Some(middlewares); self.middlewares = Some(middlewares);
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Ok(Async::Ready(None)) => { Async::Ready(None) => {
self.middlewares = Some(middlewares); self.middlewares = Some(middlewares);
} }
Ok(Async::Ready(Some(mut response))) => { Async::Ready(Some(mut response)) => {
let result = io.start(req, &mut response); let result = io.start(req, &mut response)?;
self.prepared = Some(response); self.prepared = Some(response);
match result { match result {
Ok(WriterState::Pause) => { WriterState::Pause => self.state.pause(),
self.state.pause(); WriterState::Done => self.state.resume(),
}
Ok(WriterState::Done) => self.state.resume(),
Err(_) => return Err(())
} }
}, },
} }
@ -225,7 +220,7 @@ impl Task {
// run middlewares // run middlewares
if let Some(mut middlewares) = self.middlewares.take() { if let Some(mut middlewares) = self.middlewares.take() {
if let Some(mut resp) = middlewares.response(req, resp) { if let Some(mut resp) = middlewares.response(req, resp) {
let result = io.start(req, &mut resp); let result = io.start(req, &mut resp)?;
self.prepared = Some(resp); self.prepared = Some(resp);
result result
} else { } else {
@ -234,17 +229,17 @@ impl Task {
return self.poll_io(io, req) return self.poll_io(io, req)
} }
} else { } else {
let result = io.start(req, &mut resp); let result = io.start(req, &mut resp)?;
self.prepared = Some(resp); self.prepared = Some(resp);
result result
} }
} }
Frame::Payload(Some(chunk)) => { Frame::Payload(Some(chunk)) => {
io.write(chunk.as_ref()) io.write(chunk.as_ref())?
}, },
Frame::Payload(None) => { Frame::Payload(None) => {
self.iostate = TaskIOState::Done; self.iostate = TaskIOState::Done;
io.write_eof() io.write_eof()?
}, },
Frame::Drain(fut) => { Frame::Drain(fut) => {
self.drain.push(fut); self.drain.push(fut);
@ -253,12 +248,11 @@ impl Task {
}; };
match res { match res {
Ok(WriterState::Pause) => { WriterState::Pause => {
self.state.pause(); self.state.pause();
break break
} }
Ok(WriterState::Done) => self.state.resume(), WriterState::Done => self.state.resume(),
Err(_) => return Err(())
} }
} }
} }
@ -272,7 +266,7 @@ impl Task {
} }
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err); debug!("Error sending data: {}", err);
return Err(()) return Err(err.into())
} }
} }