1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

refactor pipeline impl

This commit is contained in:
Nikolay Kim 2018-01-10 16:45:57 -08:00
parent f7807e43d8
commit e0faf3f69c

View File

@ -34,6 +34,27 @@ enum PipelineState<S, H> {
Completed(Completed<S, H>), Completed(Completed<S, H>),
} }
impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
fn is_response(&self) -> bool {
match *self {
PipelineState::Response(_) => true,
_ => false,
}
}
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
match *self {
PipelineState::Starting(ref mut state) => state.poll(info),
PipelineState::Handler(ref mut state) => state.poll(info),
PipelineState::RunMiddlewares(ref mut state) => state.poll(info),
PipelineState::Finishing(ref mut state) => state.poll(info),
PipelineState::Completed(ref mut state) => state.poll(info),
PipelineState::Response(_) | PipelineState::None | PipelineState::Error => None,
}
}
}
struct PipelineInfo<S> { struct PipelineInfo<S> {
req: HttpRequest<S>, req: HttpRequest<S>,
count: usize, count: usize,
@ -108,8 +129,7 @@ impl<S: 'static, H> Pipeline<S, H> {
PipelineState::None | PipelineState::Error PipelineState::None | PipelineState::Error
| PipelineState::Starting(_) | PipelineState::Handler(_) | PipelineState::Starting(_) | PipelineState::Handler(_)
| PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true, | PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true,
PipelineState::Finishing(_) => self.0.context.is_none(), PipelineState::Finishing(_) | PipelineState::Completed(_) => false,
PipelineState::Completed(_) => false,
} }
} }
} }
@ -121,45 +141,13 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
} }
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> { fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
let info: &mut PipelineInfo<_> = unsafe{ mem::transmute(&mut self.0) };
loop { loop {
if self.1.is_response() {
let state = mem::replace(&mut self.1, PipelineState::None); let state = mem::replace(&mut self.1, PipelineState::None);
match state { if let PipelineState::Response(st) = state {
PipelineState::None => match st.poll_io(io, info) {
return Ok(Async::Ready(true)),
PipelineState::Error =>
return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()),
PipelineState::Starting(st) => {
match st.poll(&mut self.0) {
Ok(state) =>
self.1 = state,
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::Handler(st) => {
match st.poll(&mut self.0) {
Ok(state) =>
self.1 = state,
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::RunMiddlewares(st) => {
match st.poll(&mut self.0) {
Ok(state) =>
self.1 = state,
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::Response(st) => {
match st.poll_io(io, &mut self.0) {
Ok(state) => { Ok(state) => {
self.1 = state; self.1 = state;
if let Some(error) = self.0.error.take() { if let Some(error) = self.0.error.take() {
@ -170,100 +158,42 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
} }
Err(state) => { Err(state) => {
self.1 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady);
} }
} }
} }
PipelineState::Finishing(st) => {
match st.poll(&mut self.0) {
Ok(state) =>
self.1 = state,
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::Completed(st) => {
match st.poll(&mut self.0) {
Ok(state) => {
self.1 = state;
return Ok(Async::Ready(true));
}
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
} }
match self.1 {
PipelineState::None =>
return Ok(Async::Ready(true)),
PipelineState::Error =>
return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()),
_ => (),
} }
match self.1.poll(info) {
Some(state) => self.1 = state,
None => return Ok(Async::NotReady),
} }
} }
} }
fn poll(&mut self) -> Poll<(), Error> { fn poll(&mut self) -> Poll<(), Error> {
let info: &mut PipelineInfo<_> = unsafe{ mem::transmute(&mut self.0) };
loop { loop {
let state = mem::replace(&mut self.1, PipelineState::None); match self.1 {
match state {
PipelineState::None | PipelineState::Error => { PipelineState::None | PipelineState::Error => {
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} }
PipelineState::Starting(st) => { _ => (),
match st.poll(&mut self.0) { }
Ok(state) =>
self.1 = state, if let Some(state) = self.1.poll(info) {
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::Handler(st) => {
match st.poll(&mut self.0) {
Ok(state) =>
self.1 = state,
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::RunMiddlewares(st) => {
match st.poll(&mut self.0) {
Ok(state) =>
self.1 = state,
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::Response(_) => {
self.1 = state; self.1 = state;
} else {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
PipelineState::Finishing(st) => {
match st.poll(&mut self.0) {
Ok(state) =>
self.1 = state,
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
PipelineState::Completed(st) => {
match st.poll(&mut self.0) {
Ok(state) => {
self.1 = state;
return Ok(Async::Ready(()));
}
Err(state) => {
self.1 = state;
return Ok(Async::NotReady)
}
}
}
}
} }
} }
} }
@ -317,41 +247,40 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>> fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>>
{ {
let len = info.mws.len(); let len = info.mws.len();
'outer: loop { 'outer: loop {
match self.fut.as_mut().unwrap().poll() { match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady) => return None,
return Err(PipelineState::Starting(self)),
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
info.count += 1; info.count += 1;
if let Some(resp) = resp { if let Some(resp) = resp {
return Ok(RunMiddlewares::init(info, resp)); return Some(RunMiddlewares::init(info, resp));
} }
if info.count == len { if info.count == len {
let reply = (*self.hnd.borrow_mut()).handle(info.req.clone()); let reply = (*self.hnd.borrow_mut()).handle(info.req.clone());
return Ok(WaitingResponse::init(info, reply)); return Some(WaitingResponse::init(info, reply));
} else { } else {
loop { loop {
match info.mws[info.count].start(info.req_mut()) { match info.mws[info.count].start(info.req_mut()) {
Ok(Started::Done) => Ok(Started::Done) =>
info.count += 1, info.count += 1,
Ok(Started::Response(resp)) => { Ok(Started::Response(resp)) => {
return Ok(RunMiddlewares::init(info, resp)); return Some(RunMiddlewares::init(info, resp));
}, },
Ok(Started::Future(fut)) => { Ok(Started::Future(fut)) => {
self.fut = Some(fut); self.fut = Some(fut);
continue 'outer continue 'outer
}, },
Err(err) => Err(err) =>
return Ok(ProcessResponse::init(err.into())) return Some(ProcessResponse::init(err.into()))
} }
} }
} }
} }
Err(err) => Err(err) =>
return Ok(ProcessResponse::init(err.into())) return Some(ProcessResponse::init(err.into()))
} }
} }
} }
@ -378,15 +307,14 @@ impl<S: 'static, H> WaitingResponse<S, H> {
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>> fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>>
{ {
match self.fut.poll() { match self.fut.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady) => None,
Err(PipelineState::Handler(self)),
Ok(Async::Ready(response)) => Ok(Async::Ready(response)) =>
Ok(RunMiddlewares::init(info, response)), Some(RunMiddlewares::init(info, response)),
Err(err) => Err(err) =>
Ok(ProcessResponse::init(err.into())), Some(ProcessResponse::init(err.into())),
} }
} }
} }
@ -432,31 +360,30 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S,H>, PipelineState<S, H>> fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
{
let len = info.mws.len(); let len = info.mws.len();
loop { loop {
// poll latest fut // poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() { let mut resp = match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
return Err(PipelineState::RunMiddlewares(self)) return None
} }
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
self.curr += 1; self.curr += 1;
resp resp
} }
Err(err) => Err(err) =>
return Ok(ProcessResponse::init(err.into())), return Some(ProcessResponse::init(err.into())),
}; };
loop { loop {
if self.curr == len { if self.curr == len {
return Ok(ProcessResponse::init(resp)); return Some(ProcessResponse::init(resp));
} else { } else {
match info.mws[self.curr].response(info.req_mut(), resp) { match info.mws[self.curr].response(info.req_mut(), resp) {
Err(err) => Err(err) =>
return Ok(ProcessResponse::init(err.into())), return Some(ProcessResponse::init(err.into())),
Ok(Response::Done(r)) => { Ok(Response::Done(r)) => {
self.curr += 1; self.curr += 1;
resp = r resp = r
@ -601,8 +528,8 @@ impl<S: 'static, H> ProcessResponse<S, H> {
match io.write(chunk.as_ref()) { match io.write(chunk.as_ref()) {
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(
info, self.resp)) FinishingMiddlewares::init(info, self.resp))
}, },
Ok(result) => result Ok(result) => result
} }
@ -656,8 +583,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
// restart io processing // restart io processing
return self.poll_io(io, info); return self.poll_io(io, info);
}, },
Ok(Async::NotReady) => Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
return Err(PipelineState::Response(self)),
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err); debug!("Error sending data: {}", err);
info.error = Some(err.into()); info.error = Some(err.into());
@ -680,7 +606,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
self.resp.set_response_size(io.written()); self.resp.set_response_size(io.written());
Ok(FinishingMiddlewares::init(info, self.resp)) Ok(FinishingMiddlewares::init(info, self.resp))
} }
_ => Err(PipelineState::Response(self)) _ => Err(PipelineState::Response(self)),
} }
} }
} }
@ -699,15 +625,17 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
if info.count == 0 { if info.count == 0 {
Completed::init(info) Completed::init(info)
} else { } else {
match (FinishingMiddlewares{resp: resp, fut: None, let mut state = FinishingMiddlewares{resp: resp, fut: None,
_s: PhantomData, _h: PhantomData}).poll(info) { _s: PhantomData, _h: PhantomData};
Ok(st) | Err(st) => st, if let Some(st) = state.poll(info) {
st
} else {
PipelineState::Finishing(state)
} }
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>> fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
{
loop { loop {
// poll latest fut // poll latest fut
let not_ready = if let Some(ref mut fut) = self.fut { let not_ready = if let Some(ref mut fut) = self.fut {
@ -727,7 +655,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
false false
}; };
if not_ready { if not_ready {
return Ok(PipelineState::Finishing(self)) return None;
} }
self.fut = None; self.fut = None;
info.count -= 1; info.count -= 1;
@ -735,7 +663,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
match info.mws[info.count].finish(info.req_mut(), &self.resp) { match info.mws[info.count].finish(info.req_mut(), &self.resp) {
Finished::Done => { Finished::Done => {
if info.count == 0 { if info.count == 0 {
return Ok(Completed::init(info)) return Some(Completed::init(info))
} }
} }
Finished::Future(fut) => { Finished::Future(fut) => {
@ -760,13 +688,11 @@ impl<S, H> Completed<S, H> {
} }
#[inline] #[inline]
fn poll(self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>> { fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
match info.poll_context() { match info.poll_context() {
Ok(Async::NotReady) => Ok(Async::NotReady) => None,
Ok(PipelineState::Completed(Completed(PhantomData, PhantomData))), Ok(Async::Ready(())) => Some(PipelineState::None),
Ok(Async::Ready(())) => Err(_) => Some(PipelineState::Error),
Ok(PipelineState::None),
Err(_) => Ok(PipelineState::Error),
} }
} }
} }
@ -806,17 +732,17 @@ mod tests {
info.context = Some(Box::new(ctx)); info.context = Some(Box::new(ctx));
let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap(); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap();
let st = state.poll(&mut info).ok().unwrap(); assert!(state.poll(&mut info).is_none());
let pp = Pipeline(info, st); let pp = Pipeline(info, PipelineState::Completed(state));
assert!(!pp.is_done()); assert!(!pp.is_done());
let Pipeline(mut info, st) = pp; let Pipeline(mut info, st) = pp;
state = st.completed().unwrap(); let mut st = st.completed().unwrap();
drop(addr); drop(addr);
state.poll(&mut info).ok().unwrap().is_none().unwrap(); assert!(st.poll(&mut info).unwrap().is_none().unwrap());
result(Ok::<_, ()>(())) result(Ok::<_, ()>(()))
})).unwrap() })).unwrap();
} }
} }