From 50fbef88fce113a2d0c2972509f1f5ba6f7e9913 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 21 Jun 2018 23:51:25 +0600 Subject: [PATCH] cleanup srver pipeline --- src/pipeline.rs | 168 ++++++++++++++++++++++++++---------------------- 1 file changed, 90 insertions(+), 78 deletions(-) diff --git a/src/pipeline.rs b/src/pipeline.rs index fe5e1d02a..458436820 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,4 +1,3 @@ -use std::cell::UnsafeCell; use std::marker::PhantomData; use std::rc::Rc; use std::{io, mem}; @@ -36,7 +35,11 @@ pub trait PipelineHandler { } #[doc(hidden)] -pub struct Pipeline(PipelineInfo, PipelineState); +pub struct Pipeline( + PipelineInfo, + PipelineState, + Rc>>>, +); enum PipelineState { None, @@ -57,12 +60,14 @@ impl> PipelineState { } } - fn poll(&mut self, info: &mut PipelineInfo) -> Option> { + fn poll( + &mut self, info: &mut PipelineInfo, mws: &[Box>], + ) -> Option> { match *self { - PipelineState::Starting(ref mut state) => state.poll(info), - PipelineState::Handler(ref mut state) => state.poll(info), - PipelineState::RunMiddlewares(ref mut state) => state.poll(info), - PipelineState::Finishing(ref mut state) => state.poll(info), + PipelineState::Starting(ref mut state) => state.poll(info, mws), + PipelineState::Handler(ref mut state) => state.poll(info, mws), + PipelineState::RunMiddlewares(ref mut state) => state.poll(info, mws), + PipelineState::Finishing(ref mut state) => state.poll(info, mws), PipelineState::Completed(ref mut state) => state.poll(info), PipelineState::Response(_) | PipelineState::None | PipelineState::Error => { None @@ -72,9 +77,8 @@ impl> PipelineState { } struct PipelineInfo { - req: UnsafeCell>, + req: HttpRequest, count: u16, - mws: Rc>>>, context: Option>, error: Option, disconnected: Option, @@ -84,9 +88,8 @@ struct PipelineInfo { impl PipelineInfo { fn new(req: HttpRequest) -> PipelineInfo { PipelineInfo { - req: UnsafeCell::new(req), + req, count: 0, - mws: Rc::new(Vec::new()), error: None, context: None, disconnected: None, @@ -94,20 +97,6 @@ impl PipelineInfo { } } - #[inline] - fn req(&self) -> &HttpRequest { - unsafe { &*self.req.get() } - } - - #[inline] - #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))] - fn req_mut(&self) -> &mut HttpRequest { - #[allow(mutable_transmutes)] - unsafe { - &mut *self.req.get() - } - } - fn poll_context(&mut self) -> Poll<(), Error> { if let Some(ref mut context) = self.context { match context.poll() { @@ -127,17 +116,16 @@ impl> Pipeline { htype: HandlerType, ) -> Pipeline { let mut info = PipelineInfo { - mws, - req: UnsafeCell::new(req), + req, count: 0, error: None, context: None, disconnected: None, encoding: handler.encoding(), }; - let state = StartMiddlewares::init(&mut info, handler, htype); + let state = StartMiddlewares::init(&mut info, &mws, handler, htype); - Pipeline(info, state) + Pipeline(info, state, mws) } } @@ -146,6 +134,7 @@ impl Pipeline<(), Inner<()>> { Box::new(Pipeline::<(), Inner<()>>( PipelineInfo::new(HttpRequest::default()), ProcessResponse::init(err.into()), + Rc::new(Vec::new()), )) } } @@ -176,7 +165,7 @@ impl> HttpHandlerTask for Pipeline { loop { if state.is_response() { if let PipelineState::Response(st) = state { - match st.poll_io(io, &mut self.0) { + match st.poll_io(io, &mut self.0, &self.2) { Ok(state) => { self.1 = state; if let Some(error) = self.0.error.take() { @@ -202,7 +191,7 @@ impl> HttpHandlerTask for Pipeline { _ => (), } - match state.poll(&mut self.0) { + match state.poll(&mut self.0, &self.2) { Some(st) => state = st, None => { return { @@ -224,7 +213,7 @@ impl> HttpHandlerTask for Pipeline { _ => (), } - if let Some(st) = state.poll(&mut self.0) { + if let Some(st) = state.poll(&mut self.0, &self.2) { state = st; } else { self.1 = state; @@ -246,21 +235,22 @@ struct StartMiddlewares { impl> StartMiddlewares { fn init( - info: &mut PipelineInfo, hnd: Rc, htype: HandlerType, + info: &mut PipelineInfo, mws: &[Box>], hnd: Rc, + htype: HandlerType, ) -> PipelineState { // execute middlewares, we need this stage because middlewares could be // non-async and we can move to next state immediately - let len = info.mws.len() as u16; + let len = mws.len() as u16; loop { if info.count == len { - let reply = hnd.handle(info.req().clone(), htype); - return WaitingResponse::init(info, reply); + let reply = hnd.handle(info.req.clone(), htype); + return WaitingResponse::init(info, mws, reply); } else { - let state = info.mws[info.count as usize].start(info.req_mut()); + let state = mws[info.count as usize].start(&mut info.req); match state { Ok(Started::Done) => info.count += 1, Ok(Started::Response(resp)) => { - return RunMiddlewares::init(info, resp) + return RunMiddlewares::init(info, mws, resp) } Ok(Started::Future(fut)) => { return PipelineState::Starting(StartMiddlewares { @@ -270,46 +260,51 @@ impl> StartMiddlewares { _s: PhantomData, }) } - Err(err) => return RunMiddlewares::init(info, err.into()), + Err(err) => return RunMiddlewares::init(info, mws, err.into()), } } } } - fn poll(&mut self, info: &mut PipelineInfo) -> Option> { - let len = info.mws.len() as u16; + fn poll( + &mut self, info: &mut PipelineInfo, mws: &[Box>], + ) -> Option> { + let len = mws.len() as u16; 'outer: loop { match self.fut.as_mut().unwrap().poll() { Ok(Async::NotReady) => return None, Ok(Async::Ready(resp)) => { info.count += 1; if let Some(resp) = resp { - return Some(RunMiddlewares::init(info, resp)); + return Some(RunMiddlewares::init(info, mws, resp)); } loop { if info.count == len { - let reply = self.hnd.handle(info.req().clone(), self.htype); - return Some(WaitingResponse::init(info, reply)); + let reply = self.hnd.handle(info.req.clone(), self.htype); + return Some(WaitingResponse::init(info, mws, reply)); } else { - let state = - info.mws[info.count as usize].start(info.req_mut()); + let state = mws[info.count as usize].start(&mut info.req); match state { Ok(Started::Done) => info.count += 1, Ok(Started::Response(resp)) => { - return Some(RunMiddlewares::init(info, resp)); + return Some(RunMiddlewares::init(info, mws, resp)); } Ok(Started::Future(fut)) => { self.fut = Some(fut); continue 'outer; } Err(err) => { - return Some(RunMiddlewares::init(info, err.into())) + return Some(RunMiddlewares::init( + info, + mws, + err.into(), + )) } } } } } - Err(err) => return Some(RunMiddlewares::init(info, err.into())), + Err(err) => return Some(RunMiddlewares::init(info, mws, err.into())), } } } @@ -325,11 +320,12 @@ struct WaitingResponse { impl WaitingResponse { #[inline] fn init( - info: &mut PipelineInfo, reply: AsyncResult, + info: &mut PipelineInfo, mws: &[Box>], + reply: AsyncResult, ) -> PipelineState { match reply.into() { - AsyncResultItem::Err(err) => RunMiddlewares::init(info, err.into()), - AsyncResultItem::Ok(resp) => RunMiddlewares::init(info, resp), + AsyncResultItem::Err(err) => RunMiddlewares::init(info, mws, err.into()), + AsyncResultItem::Ok(resp) => RunMiddlewares::init(info, mws, resp), AsyncResultItem::Future(fut) => PipelineState::Handler(WaitingResponse { fut, _s: PhantomData, @@ -338,11 +334,15 @@ impl WaitingResponse { } } - fn poll(&mut self, info: &mut PipelineInfo) -> Option> { + fn poll( + &mut self, info: &mut PipelineInfo, mws: &[Box>], + ) -> Option> { match self.fut.poll() { Ok(Async::NotReady) => None, - Ok(Async::Ready(response)) => Some(RunMiddlewares::init(info, response)), - Err(err) => Some(RunMiddlewares::init(info, err.into())), + Ok(Async::Ready(response)) => { + Some(RunMiddlewares::init(info, mws, response)) + } + Err(err) => Some(RunMiddlewares::init(info, mws, err.into())), } } } @@ -357,15 +357,17 @@ struct RunMiddlewares { impl RunMiddlewares { #[inline] - fn init(info: &mut PipelineInfo, mut resp: HttpResponse) -> PipelineState { + fn init( + info: &mut PipelineInfo, mws: &[Box>], mut resp: HttpResponse, + ) -> PipelineState { if info.count == 0 { return ProcessResponse::init(resp); } let mut curr = 0; - let len = info.mws.len(); + let len = mws.len(); loop { - let state = info.mws[curr].response(info.req_mut(), resp); + let state = mws[curr].response(&mut info.req, resp); resp = match state { Err(err) => { info.count = (curr + 1) as u16; @@ -391,8 +393,10 @@ impl RunMiddlewares { } } - fn poll(&mut self, info: &mut PipelineInfo) -> Option> { - let len = info.mws.len(); + fn poll( + &mut self, info: &mut PipelineInfo, mws: &[Box>], + ) -> Option> { + let len = mws.len(); loop { // poll latest fut @@ -409,7 +413,7 @@ impl RunMiddlewares { if self.curr == len { return Some(ProcessResponse::init(resp)); } else { - let state = info.mws[self.curr].response(info.req_mut(), resp); + let state = mws[self.curr].response(&mut info.req, resp); match state { Err(err) => return Some(ProcessResponse::init(err.into())), Ok(Response::Done(r)) => { @@ -480,6 +484,7 @@ impl ProcessResponse { fn poll_io( mut self, io: &mut Writer, info: &mut PipelineInfo, + mws: &[Box>], ) -> Result, PipelineState> { loop { if self.drain.is_none() && self.running != RunningState::Paused { @@ -491,7 +496,7 @@ impl ProcessResponse { self.resp.content_encoding().unwrap_or(info.encoding); let result = match io.start( - info.req_mut().as_mut(), + info.req.as_mut(), &mut self.resp, encoding, ) { @@ -499,7 +504,7 @@ impl ProcessResponse { Err(err) => { info.error = Some(err.into()); return Ok(FinishingMiddlewares::init( - info, self.resp, + info, mws, self.resp, )); } }; @@ -541,7 +546,7 @@ impl ProcessResponse { if let Err(err) = io.write_eof() { info.error = Some(err.into()); return Ok(FinishingMiddlewares::init( - info, self.resp, + info, mws, self.resp, )); } break; @@ -552,7 +557,7 @@ impl ProcessResponse { Err(err) => { info.error = Some(err.into()); return Ok(FinishingMiddlewares::init( - info, self.resp, + info, mws, self.resp, )); } Ok(result) => result, @@ -564,7 +569,9 @@ impl ProcessResponse { } Err(err) => { info.error = Some(err); - return Ok(FinishingMiddlewares::init(info, self.resp)); + return Ok(FinishingMiddlewares::init( + info, mws, self.resp, + )); } }, IOState::Actor(mut ctx) => { @@ -586,7 +593,7 @@ impl ProcessResponse { info.error = Some(err.into()); return Ok( FinishingMiddlewares::init( - info, self.resp, + info, mws, self.resp, ), ); } @@ -598,7 +605,7 @@ impl ProcessResponse { info.error = Some(err.into()); return Ok( FinishingMiddlewares::init( - info, self.resp, + info, mws, self.resp, ), ); } @@ -623,7 +630,7 @@ impl ProcessResponse { Err(err) => { info.error = Some(err); return Ok(FinishingMiddlewares::init( - info, self.resp, + info, mws, self.resp, )); } } @@ -657,7 +664,7 @@ impl ProcessResponse { Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Err(err) => { info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init(info, self.resp)); + return Ok(FinishingMiddlewares::init(info, mws, self.resp)); } } } @@ -671,11 +678,11 @@ impl ProcessResponse { Ok(_) => (), Err(err) => { info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init(info, self.resp)); + return Ok(FinishingMiddlewares::init(info, mws, self.resp)); } } self.resp.set_response_size(io.written()); - Ok(FinishingMiddlewares::init(info, self.resp)) + Ok(FinishingMiddlewares::init(info, mws, self.resp)) } _ => Err(PipelineState::Response(self)), } @@ -692,7 +699,9 @@ struct FinishingMiddlewares { impl FinishingMiddlewares { #[inline] - fn init(info: &mut PipelineInfo, resp: HttpResponse) -> PipelineState { + fn init( + info: &mut PipelineInfo, mws: &[Box>], resp: HttpResponse, + ) -> PipelineState { if info.count == 0 { Completed::init(info) } else { @@ -702,7 +711,7 @@ impl FinishingMiddlewares { _s: PhantomData, _h: PhantomData, }; - if let Some(st) = state.poll(info) { + if let Some(st) = state.poll(info, mws) { st } else { PipelineState::Finishing(state) @@ -710,7 +719,9 @@ impl FinishingMiddlewares { } } - fn poll(&mut self, info: &mut PipelineInfo) -> Option> { + fn poll( + &mut self, info: &mut PipelineInfo, mws: &[Box>], + ) -> Option> { loop { // poll latest fut let not_ready = if let Some(ref mut fut) = self.fut { @@ -734,7 +745,7 @@ impl FinishingMiddlewares { } info.count -= 1; - let state = info.mws[info.count as usize].finish(info.req_mut(), &self.resp); + let state = mws[info.count as usize].finish(&mut info.req, &self.resp); match state { Finished::Done => { if info.count == 0 { @@ -826,10 +837,11 @@ mod tests { .unwrap(); assert!(state.poll(&mut info).is_none()); - let pp = Pipeline(info, PipelineState::Completed(state)); + let pp = + Pipeline(info, PipelineState::Completed(state), Rc::new(Vec::new())); assert!(!pp.is_done()); - let Pipeline(mut info, st) = pp; + let Pipeline(mut info, st, mws) = pp; let mut st = st.completed().unwrap(); drop(addr);