1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

cleanup srver pipeline

This commit is contained in:
Nikolay Kim 2018-06-21 23:51:25 +06:00
parent c9069e9a3c
commit 50fbef88fc

View File

@ -1,4 +1,3 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::rc::Rc; use std::rc::Rc;
use std::{io, mem}; use std::{io, mem};
@ -36,7 +35,11 @@ pub trait PipelineHandler<S> {
} }
#[doc(hidden)] #[doc(hidden)]
pub struct Pipeline<S, H>(PipelineInfo<S>, PipelineState<S, H>); pub struct Pipeline<S, H>(
PipelineInfo<S>,
PipelineState<S, H>,
Rc<Vec<Box<Middleware<S>>>>,
);
enum PipelineState<S, H> { enum PipelineState<S, H> {
None, None,
@ -57,12 +60,14 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
} }
} }
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> { fn poll(
&mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
) -> Option<PipelineState<S, H>> {
match *self { match *self {
PipelineState::Starting(ref mut state) => state.poll(info), PipelineState::Starting(ref mut state) => state.poll(info, mws),
PipelineState::Handler(ref mut state) => state.poll(info), PipelineState::Handler(ref mut state) => state.poll(info, mws),
PipelineState::RunMiddlewares(ref mut state) => state.poll(info), PipelineState::RunMiddlewares(ref mut state) => state.poll(info, mws),
PipelineState::Finishing(ref mut state) => state.poll(info), PipelineState::Finishing(ref mut state) => state.poll(info, mws),
PipelineState::Completed(ref mut state) => state.poll(info), PipelineState::Completed(ref mut state) => state.poll(info),
PipelineState::Response(_) | PipelineState::None | PipelineState::Error => { PipelineState::Response(_) | PipelineState::None | PipelineState::Error => {
None None
@ -72,9 +77,8 @@ impl<S: 'static, H: PipelineHandler<S>> PipelineState<S, H> {
} }
struct PipelineInfo<S> { struct PipelineInfo<S> {
req: UnsafeCell<HttpRequest<S>>, req: HttpRequest<S>,
count: u16, count: u16,
mws: Rc<Vec<Box<Middleware<S>>>>,
context: Option<Box<ActorHttpContext>>, context: Option<Box<ActorHttpContext>>,
error: Option<Error>, error: Option<Error>,
disconnected: Option<bool>, disconnected: Option<bool>,
@ -84,9 +88,8 @@ struct PipelineInfo<S> {
impl<S> PipelineInfo<S> { impl<S> PipelineInfo<S> {
fn new(req: HttpRequest<S>) -> PipelineInfo<S> { fn new(req: HttpRequest<S>) -> PipelineInfo<S> {
PipelineInfo { PipelineInfo {
req: UnsafeCell::new(req), req,
count: 0, count: 0,
mws: Rc::new(Vec::new()),
error: None, error: None,
context: None, context: None,
disconnected: None, disconnected: None,
@ -94,20 +97,6 @@ impl<S> PipelineInfo<S> {
} }
} }
#[inline]
fn req(&self) -> &HttpRequest<S> {
unsafe { &*self.req.get() }
}
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))]
fn req_mut(&self) -> &mut HttpRequest<S> {
#[allow(mutable_transmutes)]
unsafe {
&mut *self.req.get()
}
}
fn poll_context(&mut self) -> Poll<(), Error> { fn poll_context(&mut self) -> Poll<(), Error> {
if let Some(ref mut context) = self.context { if let Some(ref mut context) = self.context {
match context.poll() { match context.poll() {
@ -127,17 +116,16 @@ impl<S: 'static, H: PipelineHandler<S>> Pipeline<S, H> {
htype: HandlerType, htype: HandlerType,
) -> Pipeline<S, H> { ) -> Pipeline<S, H> {
let mut info = PipelineInfo { let mut info = PipelineInfo {
mws, req,
req: UnsafeCell::new(req),
count: 0, count: 0,
error: None, error: None,
context: None, context: None,
disconnected: None, disconnected: None,
encoding: handler.encoding(), encoding: handler.encoding(),
}; };
let state = StartMiddlewares::init(&mut info, handler, htype); let state = StartMiddlewares::init(&mut info, &mws, handler, htype);
Pipeline(info, state) Pipeline(info, state, mws)
} }
} }
@ -146,6 +134,7 @@ impl Pipeline<(), Inner<()>> {
Box::new(Pipeline::<(), Inner<()>>( Box::new(Pipeline::<(), Inner<()>>(
PipelineInfo::new(HttpRequest::default()), PipelineInfo::new(HttpRequest::default()),
ProcessResponse::init(err.into()), ProcessResponse::init(err.into()),
Rc::new(Vec::new()),
)) ))
} }
} }
@ -176,7 +165,7 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
loop { loop {
if state.is_response() { if state.is_response() {
if let PipelineState::Response(st) = state { if let PipelineState::Response(st) = state {
match st.poll_io(io, &mut self.0) { match st.poll_io(io, &mut self.0, &self.2) {
Ok(state) => { Ok(state) => {
self.1 = state; self.1 = state;
if let Some(error) = self.0.error.take() { if let Some(error) = self.0.error.take() {
@ -202,7 +191,7 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
_ => (), _ => (),
} }
match state.poll(&mut self.0) { match state.poll(&mut self.0, &self.2) {
Some(st) => state = st, Some(st) => state = st,
None => { None => {
return { return {
@ -224,7 +213,7 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
_ => (), _ => (),
} }
if let Some(st) = state.poll(&mut self.0) { if let Some(st) = state.poll(&mut self.0, &self.2) {
state = st; state = st;
} else { } else {
self.1 = state; self.1 = state;
@ -246,21 +235,22 @@ struct StartMiddlewares<S, H> {
impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> { impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
fn init( fn init(
info: &mut PipelineInfo<S>, hnd: Rc<H>, htype: HandlerType, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>], hnd: Rc<H>,
htype: HandlerType,
) -> PipelineState<S, H> { ) -> PipelineState<S, H> {
// execute middlewares, we need this stage because middlewares could be // execute middlewares, we need this stage because middlewares could be
// non-async and we can move to next state immediately // non-async and we can move to next state immediately
let len = info.mws.len() as u16; let len = mws.len() as u16;
loop { loop {
if info.count == len { if info.count == len {
let reply = hnd.handle(info.req().clone(), htype); let reply = hnd.handle(info.req.clone(), htype);
return WaitingResponse::init(info, reply); return WaitingResponse::init(info, mws, reply);
} else { } else {
let state = info.mws[info.count as usize].start(info.req_mut()); let state = mws[info.count as usize].start(&mut info.req);
match state { match state {
Ok(Started::Done) => info.count += 1, Ok(Started::Done) => info.count += 1,
Ok(Started::Response(resp)) => { Ok(Started::Response(resp)) => {
return RunMiddlewares::init(info, resp) return RunMiddlewares::init(info, mws, resp)
} }
Ok(Started::Future(fut)) => { Ok(Started::Future(fut)) => {
return PipelineState::Starting(StartMiddlewares { return PipelineState::Starting(StartMiddlewares {
@ -270,46 +260,51 @@ impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
_s: PhantomData, _s: PhantomData,
}) })
} }
Err(err) => return RunMiddlewares::init(info, err.into()), Err(err) => return RunMiddlewares::init(info, mws, err.into()),
} }
} }
} }
} }
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> { fn poll(
let len = info.mws.len() as u16; &mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
) -> Option<PipelineState<S, H>> {
let len = mws.len() as u16;
'outer: loop { 'outer: loop {
match self.fut.as_mut().unwrap().poll() { match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return None, Ok(Async::NotReady) => return None,
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
info.count += 1; info.count += 1;
if let Some(resp) = resp { if let Some(resp) = resp {
return Some(RunMiddlewares::init(info, resp)); return Some(RunMiddlewares::init(info, mws, resp));
} }
loop { loop {
if info.count == len { if info.count == len {
let reply = self.hnd.handle(info.req().clone(), self.htype); let reply = self.hnd.handle(info.req.clone(), self.htype);
return Some(WaitingResponse::init(info, reply)); return Some(WaitingResponse::init(info, mws, reply));
} else { } else {
let state = let state = mws[info.count as usize].start(&mut info.req);
info.mws[info.count as usize].start(info.req_mut());
match state { match state {
Ok(Started::Done) => info.count += 1, Ok(Started::Done) => info.count += 1,
Ok(Started::Response(resp)) => { Ok(Started::Response(resp)) => {
return Some(RunMiddlewares::init(info, resp)); return Some(RunMiddlewares::init(info, mws, resp));
} }
Ok(Started::Future(fut)) => { Ok(Started::Future(fut)) => {
self.fut = Some(fut); self.fut = Some(fut);
continue 'outer; continue 'outer;
} }
Err(err) => { Err(err) => {
return Some(RunMiddlewares::init(info, err.into())) return Some(RunMiddlewares::init(
info,
mws,
err.into(),
))
} }
} }
} }
} }
} }
Err(err) => return Some(RunMiddlewares::init(info, err.into())), Err(err) => return Some(RunMiddlewares::init(info, mws, err.into())),
} }
} }
} }
@ -325,11 +320,12 @@ struct WaitingResponse<S, H> {
impl<S: 'static, H> WaitingResponse<S, H> { impl<S: 'static, H> WaitingResponse<S, H> {
#[inline] #[inline]
fn init( fn init(
info: &mut PipelineInfo<S>, reply: AsyncResult<HttpResponse>, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
reply: AsyncResult<HttpResponse>,
) -> PipelineState<S, H> { ) -> PipelineState<S, H> {
match reply.into() { match reply.into() {
AsyncResultItem::Err(err) => RunMiddlewares::init(info, err.into()), AsyncResultItem::Err(err) => RunMiddlewares::init(info, mws, err.into()),
AsyncResultItem::Ok(resp) => RunMiddlewares::init(info, resp), AsyncResultItem::Ok(resp) => RunMiddlewares::init(info, mws, resp),
AsyncResultItem::Future(fut) => PipelineState::Handler(WaitingResponse { AsyncResultItem::Future(fut) => PipelineState::Handler(WaitingResponse {
fut, fut,
_s: PhantomData, _s: PhantomData,
@ -338,11 +334,15 @@ impl<S: 'static, H> WaitingResponse<S, H> {
} }
} }
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> { fn poll(
&mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
) -> Option<PipelineState<S, H>> {
match self.fut.poll() { match self.fut.poll() {
Ok(Async::NotReady) => None, Ok(Async::NotReady) => None,
Ok(Async::Ready(response)) => Some(RunMiddlewares::init(info, response)), Ok(Async::Ready(response)) => {
Err(err) => Some(RunMiddlewares::init(info, err.into())), Some(RunMiddlewares::init(info, mws, response))
}
Err(err) => Some(RunMiddlewares::init(info, mws, err.into())),
} }
} }
} }
@ -357,15 +357,17 @@ struct RunMiddlewares<S, H> {
impl<S: 'static, H> RunMiddlewares<S, H> { impl<S: 'static, H> RunMiddlewares<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S, H> { fn init(
info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>], mut resp: HttpResponse,
) -> PipelineState<S, H> {
if info.count == 0 { if info.count == 0 {
return ProcessResponse::init(resp); return ProcessResponse::init(resp);
} }
let mut curr = 0; let mut curr = 0;
let len = info.mws.len(); let len = mws.len();
loop { loop {
let state = info.mws[curr].response(info.req_mut(), resp); let state = mws[curr].response(&mut info.req, resp);
resp = match state { resp = match state {
Err(err) => { Err(err) => {
info.count = (curr + 1) as u16; info.count = (curr + 1) as u16;
@ -391,8 +393,10 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
} }
} }
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> { fn poll(
let len = info.mws.len(); &mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
) -> Option<PipelineState<S, H>> {
let len = mws.len();
loop { loop {
// poll latest fut // poll latest fut
@ -409,7 +413,7 @@ impl<S: 'static, H> RunMiddlewares<S, H> {
if self.curr == len { if self.curr == len {
return Some(ProcessResponse::init(resp)); return Some(ProcessResponse::init(resp));
} else { } else {
let state = info.mws[self.curr].response(info.req_mut(), resp); let state = mws[self.curr].response(&mut info.req, resp);
match state { match state {
Err(err) => return Some(ProcessResponse::init(err.into())), Err(err) => return Some(ProcessResponse::init(err.into())),
Ok(Response::Done(r)) => { Ok(Response::Done(r)) => {
@ -480,6 +484,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
fn poll_io( fn poll_io(
mut self, io: &mut Writer, info: &mut PipelineInfo<S>, mut self, io: &mut Writer, info: &mut PipelineInfo<S>,
mws: &[Box<Middleware<S>>],
) -> Result<PipelineState<S, H>, PipelineState<S, H>> { ) -> Result<PipelineState<S, H>, PipelineState<S, H>> {
loop { loop {
if self.drain.is_none() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
@ -491,7 +496,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
self.resp.content_encoding().unwrap_or(info.encoding); self.resp.content_encoding().unwrap_or(info.encoding);
let result = match io.start( let result = match io.start(
info.req_mut().as_mut(), info.req.as_mut(),
&mut self.resp, &mut self.resp,
encoding, encoding,
) { ) {
@ -499,7 +504,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, self.resp, info, mws, self.resp,
)); ));
} }
}; };
@ -541,7 +546,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
if let Err(err) = io.write_eof() { if let Err(err) = io.write_eof() {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, self.resp, info, mws, self.resp,
)); ));
} }
break; break;
@ -552,7 +557,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, self.resp, info, mws, self.resp,
)); ));
} }
Ok(result) => result, Ok(result) => result,
@ -564,7 +569,9 @@ impl<S: 'static, H> ProcessResponse<S, H> {
} }
Err(err) => { Err(err) => {
info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp)); return Ok(FinishingMiddlewares::init(
info, mws, self.resp,
));
} }
}, },
IOState::Actor(mut ctx) => { IOState::Actor(mut ctx) => {
@ -586,7 +593,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init( FinishingMiddlewares::init(
info, self.resp, info, mws, self.resp,
), ),
); );
} }
@ -598,7 +605,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init( FinishingMiddlewares::init(
info, self.resp, info, mws, self.resp,
), ),
); );
} }
@ -623,7 +630,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Err(err) => { Err(err) => {
info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
info, self.resp, info, mws, self.resp,
)); ));
} }
} }
@ -657,7 +664,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp)); return Ok(FinishingMiddlewares::init(info, mws, self.resp));
} }
} }
} }
@ -671,11 +678,11 @@ impl<S: 'static, H> ProcessResponse<S, H> {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp)); return Ok(FinishingMiddlewares::init(info, mws, self.resp));
} }
} }
self.resp.set_response_size(io.written()); self.resp.set_response_size(io.written());
Ok(FinishingMiddlewares::init(info, self.resp)) Ok(FinishingMiddlewares::init(info, mws, self.resp))
} }
_ => Err(PipelineState::Response(self)), _ => Err(PipelineState::Response(self)),
} }
@ -692,7 +699,9 @@ struct FinishingMiddlewares<S, H> {
impl<S: 'static, H> FinishingMiddlewares<S, H> { impl<S: 'static, H> FinishingMiddlewares<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S, H> { fn init(
info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>], resp: HttpResponse,
) -> PipelineState<S, H> {
if info.count == 0 { if info.count == 0 {
Completed::init(info) Completed::init(info)
} else { } else {
@ -702,7 +711,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
_s: PhantomData, _s: PhantomData,
_h: PhantomData, _h: PhantomData,
}; };
if let Some(st) = state.poll(info) { if let Some(st) = state.poll(info, mws) {
st st
} else { } else {
PipelineState::Finishing(state) PipelineState::Finishing(state)
@ -710,7 +719,9 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
} }
} }
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> { fn poll(
&mut self, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>],
) -> Option<PipelineState<S, H>> {
loop { loop {
// poll latest fut // poll latest fut
let not_ready = if let Some(ref mut fut) = self.fut { let not_ready = if let Some(ref mut fut) = self.fut {
@ -734,7 +745,7 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
} }
info.count -= 1; info.count -= 1;
let state = info.mws[info.count as usize].finish(info.req_mut(), &self.resp); let state = mws[info.count as usize].finish(&mut info.req, &self.resp);
match state { match state {
Finished::Done => { Finished::Done => {
if info.count == 0 { if info.count == 0 {
@ -826,10 +837,11 @@ mod tests {
.unwrap(); .unwrap();
assert!(state.poll(&mut info).is_none()); assert!(state.poll(&mut info).is_none());
let pp = Pipeline(info, PipelineState::Completed(state)); let pp =
Pipeline(info, PipelineState::Completed(state), Rc::new(Vec::new()));
assert!(!pp.is_done()); assert!(!pp.is_done());
let Pipeline(mut info, st) = pp; let Pipeline(mut info, st, mws) = pp;
let mut st = st.completed().unwrap(); let mut st = st.completed().unwrap();
drop(addr); drop(addr);