From e0faf3f69c38f2e1358ef695a0fa7aedb12b55cb Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 10 Jan 2018 16:45:57 -0800 Subject: [PATCH] refactor pipeline impl --- src/pipeline.rs | 252 +++++++++++++++++------------------------------- 1 file changed, 89 insertions(+), 163 deletions(-) diff --git a/src/pipeline.rs b/src/pipeline.rs index 975b0e710..66be1f55a 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -34,6 +34,27 @@ enum PipelineState { Completed(Completed), } +impl> PipelineState { + + fn is_response(&self) -> bool { + match *self { + PipelineState::Response(_) => true, + _ => false, + } + } + + fn poll(&mut self, info: &mut PipelineInfo) -> Option> { + match *self { + PipelineState::Starting(ref mut state) => state.poll(info), + PipelineState::Handler(ref mut state) => state.poll(info), + PipelineState::RunMiddlewares(ref mut state) => state.poll(info), + PipelineState::Finishing(ref mut state) => state.poll(info), + PipelineState::Completed(ref mut state) => state.poll(info), + PipelineState::Response(_) | PipelineState::None | PipelineState::Error => None, + } + } +} + struct PipelineInfo { req: HttpRequest, count: usize, @@ -108,8 +129,7 @@ impl Pipeline { PipelineState::None | PipelineState::Error | PipelineState::Starting(_) | PipelineState::Handler(_) | PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true, - PipelineState::Finishing(_) => self.0.context.is_none(), - PipelineState::Completed(_) => false, + PipelineState::Finishing(_) | PipelineState::Completed(_) => false, } } } @@ -121,45 +141,13 @@ impl> HttpHandlerTask for Pipeline { } fn poll_io(&mut self, io: &mut Writer) -> Poll { + let info: &mut PipelineInfo<_> = unsafe{ mem::transmute(&mut self.0) }; + loop { - let state = mem::replace(&mut self.1, PipelineState::None); - match state { - PipelineState::None => - return Ok(Async::Ready(true)), - PipelineState::Error => - return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()), - PipelineState::Starting(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::Handler(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::RunMiddlewares(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::Response(st) => { - match st.poll_io(io, &mut self.0) { + if self.1.is_response() { + let state = mem::replace(&mut self.1, PipelineState::None); + if let PipelineState::Response(st) = state { + match st.poll_io(io, info) { Ok(state) => { self.1 = state; if let Some(error) = self.0.error.take() { @@ -170,99 +158,41 @@ impl> HttpHandlerTask for Pipeline { } Err(state) => { self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::Finishing(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::Completed(st) => { - match st.poll(&mut self.0) { - Ok(state) => { - self.1 = state; - return Ok(Async::Ready(true)); - } - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) + return Ok(Async::NotReady); } } } } + match self.1 { + PipelineState::None => + return Ok(Async::Ready(true)), + PipelineState::Error => + return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()), + _ => (), + } + + match self.1.poll(info) { + Some(state) => self.1 = state, + None => return Ok(Async::NotReady), + } } } fn poll(&mut self) -> Poll<(), Error> { + let info: &mut PipelineInfo<_> = unsafe{ mem::transmute(&mut self.0) }; + loop { - let state = mem::replace(&mut self.1, PipelineState::None); - match state { + match self.1 { PipelineState::None | PipelineState::Error => { return Ok(Async::Ready(())) } - PipelineState::Starting(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::Handler(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::RunMiddlewares(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::Response(_) => { - self.1 = state; - return Ok(Async::NotReady); - } - PipelineState::Finishing(st) => { - match st.poll(&mut self.0) { - Ok(state) => - self.1 = state, - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } - PipelineState::Completed(st) => { - match st.poll(&mut self.0) { - Ok(state) => { - self.1 = state; - return Ok(Async::Ready(())); - } - Err(state) => { - self.1 = state; - return Ok(Async::NotReady) - } - } - } + _ => (), + } + + if let Some(state) = self.1.poll(info) { + self.1 = state; + } else { + return Ok(Async::NotReady); } } } @@ -317,41 +247,40 @@ impl> StartMiddlewares { } } - fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> + fn poll(&mut self, info: &mut PipelineInfo) -> Option> { let len = info.mws.len(); 'outer: loop { match self.fut.as_mut().unwrap().poll() { - Ok(Async::NotReady) => - return Err(PipelineState::Starting(self)), + Ok(Async::NotReady) => return None, Ok(Async::Ready(resp)) => { info.count += 1; if let Some(resp) = resp { - return Ok(RunMiddlewares::init(info, resp)); + return Some(RunMiddlewares::init(info, resp)); } if info.count == len { let reply = (*self.hnd.borrow_mut()).handle(info.req.clone()); - return Ok(WaitingResponse::init(info, reply)); + return Some(WaitingResponse::init(info, reply)); } else { loop { match info.mws[info.count].start(info.req_mut()) { Ok(Started::Done) => info.count += 1, Ok(Started::Response(resp)) => { - return Ok(RunMiddlewares::init(info, resp)); + return Some(RunMiddlewares::init(info, resp)); }, Ok(Started::Future(fut)) => { self.fut = Some(fut); continue 'outer }, Err(err) => - return Ok(ProcessResponse::init(err.into())) + return Some(ProcessResponse::init(err.into())) } } } } Err(err) => - return Ok(ProcessResponse::init(err.into())) + return Some(ProcessResponse::init(err.into())) } } } @@ -378,15 +307,14 @@ impl WaitingResponse { } } - fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> + fn poll(&mut self, info: &mut PipelineInfo) -> Option> { match self.fut.poll() { - Ok(Async::NotReady) => - Err(PipelineState::Handler(self)), + Ok(Async::NotReady) => None, Ok(Async::Ready(response)) => - Ok(RunMiddlewares::init(info, response)), + Some(RunMiddlewares::init(info, response)), Err(err) => - Ok(ProcessResponse::init(err.into())), + Some(ProcessResponse::init(err.into())), } } } @@ -432,31 +360,30 @@ impl RunMiddlewares { } } - fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> - { + fn poll(&mut self, info: &mut PipelineInfo) -> Option> { let len = info.mws.len(); loop { // poll latest fut let mut resp = match self.fut.as_mut().unwrap().poll() { Ok(Async::NotReady) => { - return Err(PipelineState::RunMiddlewares(self)) + return None } Ok(Async::Ready(resp)) => { self.curr += 1; resp } Err(err) => - return Ok(ProcessResponse::init(err.into())), + return Some(ProcessResponse::init(err.into())), }; loop { if self.curr == len { - return Ok(ProcessResponse::init(resp)); + return Some(ProcessResponse::init(resp)); } else { match info.mws[self.curr].response(info.req_mut(), resp) { Err(err) => - return Ok(ProcessResponse::init(err.into())), + return Some(ProcessResponse::init(err.into())), Ok(Response::Done(r)) => { self.curr += 1; resp = r @@ -601,8 +528,8 @@ impl ProcessResponse { match io.write(chunk.as_ref()) { Err(err) => { info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init( - info, self.resp)) + return Ok( + FinishingMiddlewares::init(info, self.resp)) }, Ok(result) => result } @@ -656,8 +583,7 @@ impl ProcessResponse { // restart io processing return self.poll_io(io, info); }, - Ok(Async::NotReady) => - return Err(PipelineState::Response(self)), + Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Err(err) => { debug!("Error sending data: {}", err); info.error = Some(err.into()); @@ -680,7 +606,7 @@ impl ProcessResponse { self.resp.set_response_size(io.written()); Ok(FinishingMiddlewares::init(info, self.resp)) } - _ => Err(PipelineState::Response(self)) + _ => Err(PipelineState::Response(self)), } } } @@ -699,15 +625,17 @@ impl FinishingMiddlewares { if info.count == 0 { Completed::init(info) } else { - match (FinishingMiddlewares{resp: resp, fut: None, - _s: PhantomData, _h: PhantomData}).poll(info) { - Ok(st) | Err(st) => st, + let mut state = FinishingMiddlewares{resp: resp, fut: None, + _s: PhantomData, _h: PhantomData}; + if let Some(st) = state.poll(info) { + st + } else { + PipelineState::Finishing(state) } } } - fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> - { + fn poll(&mut self, info: &mut PipelineInfo) -> Option> { loop { // poll latest fut let not_ready = if let Some(ref mut fut) = self.fut { @@ -727,7 +655,7 @@ impl FinishingMiddlewares { false }; if not_ready { - return Ok(PipelineState::Finishing(self)) + return None; } self.fut = None; info.count -= 1; @@ -735,7 +663,7 @@ impl FinishingMiddlewares { match info.mws[info.count].finish(info.req_mut(), &self.resp) { Finished::Done => { if info.count == 0 { - return Ok(Completed::init(info)) + return Some(Completed::init(info)) } } Finished::Future(fut) => { @@ -760,13 +688,11 @@ impl Completed { } #[inline] - fn poll(self, info: &mut PipelineInfo) -> Result, PipelineState> { + fn poll(&mut self, info: &mut PipelineInfo) -> Option> { match info.poll_context() { - Ok(Async::NotReady) => - Ok(PipelineState::Completed(Completed(PhantomData, PhantomData))), - Ok(Async::Ready(())) => - Ok(PipelineState::None), - Err(_) => Ok(PipelineState::Error), + Ok(Async::NotReady) => None, + Ok(Async::Ready(())) => Some(PipelineState::None), + Err(_) => Some(PipelineState::Error), } } } @@ -806,17 +732,17 @@ mod tests { info.context = Some(Box::new(ctx)); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap(); - let st = state.poll(&mut info).ok().unwrap(); - let pp = Pipeline(info, st); + assert!(state.poll(&mut info).is_none()); + let pp = Pipeline(info, PipelineState::Completed(state)); assert!(!pp.is_done()); let Pipeline(mut info, st) = pp; - state = st.completed().unwrap(); + let mut st = st.completed().unwrap(); drop(addr); - state.poll(&mut info).ok().unwrap().is_none().unwrap(); + assert!(st.poll(&mut info).unwrap().is_none().unwrap()); result(Ok::<_, ()>(())) - })).unwrap() + })).unwrap(); } }