1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

refactor pipeline

This commit is contained in:
Nikolay Kim 2017-12-09 05:54:04 -08:00
parent b98ab2eebe
commit 273de2260d

View File

@ -1,6 +1,7 @@
use std::{io, mem}; use std::{io, mem};
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell; use std::cell::RefCell;
use std::marker::PhantomData;
use futures::{Async, Poll, Future, Stream}; use futures::{Async, Poll, Future, Stream};
use futures::task::{Task as FutureTask, current as current_task}; use futures::task::{Task as FutureTask, current as current_task};
@ -18,7 +19,7 @@ use middlewares::{Middleware, Finished, Started, Response};
type Handler<S> = Fn(HttpRequest<S>) -> Reply; type Handler<S> = Fn(HttpRequest<S>) -> Reply;
pub(crate) type PipelineHandler<'a, S> = &'a Fn(HttpRequest<S>) -> Reply; pub(crate) type PipelineHandler<'a, S> = &'a Fn(HttpRequest<S>) -> Reply;
pub struct Pipeline<S>(PipelineState<S>); pub struct Pipeline<S>(PipelineInfo<S>, PipelineState<S>);
enum PipelineState<S> { enum PipelineState<S> {
None, None,
@ -31,47 +32,6 @@ enum PipelineState<S> {
Completed(Completed<S>), Completed(Completed<S>),
} }
impl<S> PipelineState<S> {
fn is_done(&self) -> bool {
match *self {
PipelineState::None | PipelineState::Error
| PipelineState::Starting(_) | PipelineState::Handler(_)
| PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true,
PipelineState::Finishing(ref st) => st.info.context.is_none(),
PipelineState::Completed(_) => false,
}
}
fn disconnect(&mut self) {
let info = match *self {
PipelineState::None | PipelineState::Error => return,
PipelineState::Starting(ref mut st) => &mut st.info,
PipelineState::Handler(ref mut st) => &mut st.info,
PipelineState::RunMiddlewares(ref mut st) => &mut st.info,
PipelineState::Response(ref mut st) => &mut st.info,
PipelineState::Finishing(ref mut st) => &mut st.info,
PipelineState::Completed(ref mut st) => &mut st.0,
};
if let Some(ref mut context) = info.context {
context.disconnected();
}
}
fn error(&mut self) -> Option<Error> {
let info = match *self {
PipelineState::None | PipelineState::Error => return None,
PipelineState::Starting(ref mut st) => &mut st.info,
PipelineState::Handler(ref mut st) => &mut st.info,
PipelineState::RunMiddlewares(ref mut st) => &mut st.info,
PipelineState::Response(ref mut st) => &mut st.info,
PipelineState::Finishing(ref mut st) => &mut st.info,
PipelineState::Completed(ref mut st) => &mut st.0,
};
info.error.take()
}
}
struct PipelineInfo<S> { struct PipelineInfo<S> {
req: HttpRequest<S>, req: HttpRequest<S>,
count: usize, count: usize,
@ -162,98 +122,122 @@ impl Future for DrainFut {
impl<S> Pipeline<S> { impl<S> Pipeline<S> {
pub fn new(req: HttpRequest<S>, pub fn new(req: HttpRequest<S>,
mw: Rc<Vec<Box<Middleware<S>>>>, mws: Rc<Vec<Box<Middleware<S>>>>,
handler: PipelineHandler<S>) -> Pipeline<S> handler: PipelineHandler<S>) -> Pipeline<S>
{ {
Pipeline(StartMiddlewares::init(mw, req, handler)) let mut info = PipelineInfo {
req: req,
count: 0,
mws: mws,
error: None,
context: None,
};
let state = StartMiddlewares::init(&mut info, handler);
Pipeline(info, state)
} }
} }
impl Pipeline<()> { impl Pipeline<()> {
pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> { pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> {
Box::new(Pipeline(ProcessResponse::init( Box::new(Pipeline(
PipelineInfo::new(HttpRequest::default()), err.into()))) PipelineInfo::new(HttpRequest::default()), ProcessResponse::init(err.into())))
}
}
impl<S> Pipeline<S> {
fn is_done(&self) -> bool {
match self.1 {
PipelineState::None | PipelineState::Error
| PipelineState::Starting(_) | PipelineState::Handler(_)
| PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true,
PipelineState::Finishing(_) => self.0.context.is_none(),
PipelineState::Completed(_) => false,
}
} }
} }
impl<S> HttpHandlerTask for Pipeline<S> { impl<S> HttpHandlerTask for Pipeline<S> {
fn disconnected(&mut self) { fn disconnected(&mut self) {
self.0.disconnect() if let Some(ref mut context) = self.0.context {
context.disconnected();
}
} }
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> { fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
loop { loop {
let state = mem::replace(&mut self.0, PipelineState::None); let state = mem::replace(&mut self.1, PipelineState::None);
match state { match state {
PipelineState::None => PipelineState::None =>
return Ok(Async::Ready(true)), return Ok(Async::Ready(true)),
PipelineState::Error => PipelineState::Error =>
return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()), return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()),
PipelineState::Starting(st) => { PipelineState::Starting(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::Handler(st) => { PipelineState::Handler(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::RunMiddlewares(st) => { PipelineState::RunMiddlewares(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::Response(st) => { PipelineState::Response(st) => {
match st.poll_io(io) { match st.poll_io(io, &mut self.0) {
Ok(state) => { Ok(state) => {
self.0 = state; self.1 = state;
if let Some(error) = self.0.error() { if let Some(error) = self.0.error.take() {
return Err(error) return Err(error)
} else { } else {
return Ok(Async::Ready(self.0.is_done())) return Ok(Async::Ready(self.is_done()))
} }
} }
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::Finishing(st) => { PipelineState::Finishing(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::Completed(st) => { PipelineState::Completed(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => { Ok(state) => {
self.0 = state; self.1 = state;
return Ok(Async::Ready(true)); return Ok(Async::Ready(true));
} }
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
@ -264,63 +248,63 @@ impl<S> HttpHandlerTask for Pipeline<S> {
fn poll(&mut self) -> Poll<(), Error> { fn poll(&mut self) -> Poll<(), Error> {
loop { loop {
let state = mem::replace(&mut self.0, PipelineState::None); let state = mem::replace(&mut self.1, PipelineState::None);
match state { match state {
PipelineState::None | PipelineState::Error => { PipelineState::None | PipelineState::Error => {
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} }
PipelineState::Starting(st) => { PipelineState::Starting(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::Handler(st) => { PipelineState::Handler(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::RunMiddlewares(st) => { PipelineState::RunMiddlewares(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::Response(_) => { PipelineState::Response(_) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
PipelineState::Finishing(st) => { PipelineState::Finishing(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => Ok(state) =>
self.0 = state, self.1 = state,
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
} }
PipelineState::Completed(st) => { PipelineState::Completed(st) => {
match st.poll() { match st.poll(&mut self.0) {
Ok(state) => { Ok(state) => {
self.0 = state; self.1 = state;
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
Err(state) => { Err(state) => {
self.0 = state; self.1 = state;
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
} }
@ -336,21 +320,11 @@ type Fut = Box<Future<Item=Option<HttpResponse>, Error=Error>>;
struct StartMiddlewares<S> { struct StartMiddlewares<S> {
hnd: *mut Handler<S>, hnd: *mut Handler<S>,
fut: Option<Fut>, fut: Option<Fut>,
info: PipelineInfo<S>,
} }
impl<S> StartMiddlewares<S> { impl<S> StartMiddlewares<S> {
fn init(mws: Rc<Vec<Box<Middleware<S>>>>, fn init(info: &mut PipelineInfo<S>, handler: PipelineHandler<S>) -> PipelineState<S> {
req: HttpRequest<S>, handler: PipelineHandler<S>) -> PipelineState<S> {
let mut info = PipelineInfo {
req: req,
count: 0,
mws: mws,
error: None,
context: None,
};
// execute middlewares, we need this stage because middlewares could be non-async // execute middlewares, we need this stage because middlewares could be non-async
// and we can move to next state immidietly // and we can move to next state immidietly
let len = info.mws.len(); let len = info.mws.len();
@ -369,8 +343,7 @@ impl<S> StartMiddlewares<S> {
Ok(Async::NotReady) => Ok(Async::NotReady) =>
return PipelineState::Starting(StartMiddlewares { return PipelineState::Starting(StartMiddlewares {
hnd: handler as *const _ as *mut _, hnd: handler as *const _ as *mut _,
fut: Some(fut), fut: Some(fut)}),
info: info}),
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
if let Some(resp) = resp { if let Some(resp) = resp {
return RunMiddlewares::init(info, resp); return RunMiddlewares::init(info, resp);
@ -378,49 +351,49 @@ impl<S> StartMiddlewares<S> {
info.count += 1; info.count += 1;
} }
Err(err) => Err(err) =>
return ProcessResponse::init(info, err.into()), return ProcessResponse::init(err.into()),
}, },
Started::Err(err) => Started::Err(err) =>
return ProcessResponse::init(info, err.into()), return ProcessResponse::init(err.into()),
} }
} }
} }
} }
fn poll(mut self) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
let len = self.info.mws.len(); let len = info.mws.len();
'outer: loop { 'outer: loop {
match self.fut.as_mut().unwrap().poll() { match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady) =>
return Err(PipelineState::Starting(self)), return Err(PipelineState::Starting(self)),
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
self.info.count += 1; info.count += 1;
if let Some(resp) = resp { if let Some(resp) = resp {
return Ok(RunMiddlewares::init(self.info, resp)); return Ok(RunMiddlewares::init(info, resp));
} }
if self.info.count == len { if info.count == len {
let reply = (unsafe{&*self.hnd})(self.info.req.clone()); let reply = (unsafe{&*self.hnd})(info.req.clone());
return Ok(WaitingResponse::init(self.info, reply)); return Ok(WaitingResponse::init(info, reply));
} else { } else {
loop { loop {
match self.info.mws[self.info.count].start(self.info.req_mut()) { match info.mws[info.count].start(info.req_mut()) {
Started::Done => Started::Done =>
self.info.count += 1, info.count += 1,
Started::Response(resp) => { Started::Response(resp) => {
return Ok(RunMiddlewares::init(self.info, resp)); return Ok(RunMiddlewares::init(info, resp));
}, },
Started::Future(fut) => { Started::Future(fut) => {
self.fut = Some(fut); self.fut = Some(fut);
continue 'outer continue 'outer
}, },
Started::Err(err) => Started::Err(err) =>
return Ok(ProcessResponse::init(self.info, err.into())) return Ok(ProcessResponse::init(err.into()))
} }
} }
} }
} }
Err(err) => Err(err) =>
return Ok(ProcessResponse::init(self.info, err.into())) return Ok(ProcessResponse::init(err.into()))
} }
} }
} }
@ -428,13 +401,13 @@ impl<S> StartMiddlewares<S> {
// waiting for response // waiting for response
struct WaitingResponse<S> { struct WaitingResponse<S> {
info: PipelineInfo<S>,
stream: PipelineResponse, stream: PipelineResponse,
_s: PhantomData<S>,
} }
impl<S> WaitingResponse<S> { impl<S> WaitingResponse<S> {
fn init(info: PipelineInfo<S>, reply: Reply) -> PipelineState<S> fn init(info: &mut PipelineInfo<S>, reply: Reply) -> PipelineState<S>
{ {
let stream = match reply.into() { let stream = match reply.into() {
ReplyItem::Message(resp) => ReplyItem::Message(resp) =>
@ -446,10 +419,10 @@ impl<S> WaitingResponse<S> {
}; };
PipelineState::Handler( PipelineState::Handler(
WaitingResponse { info: info, stream: stream }) WaitingResponse { stream: stream, _s: PhantomData })
} }
fn poll(mut self) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
let stream = mem::replace(&mut self.stream, PipelineResponse::None); let stream = mem::replace(&mut self.stream, PipelineResponse::None);
match stream { match stream {
@ -459,8 +432,8 @@ impl<S> WaitingResponse<S> {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
match frame { match frame {
Frame::Message(resp) => { Frame::Message(resp) => {
self.info.context = Some(context); info.context = Some(context);
return Ok(RunMiddlewares::init(self.info, resp)) return Ok(RunMiddlewares::init(info, resp))
} }
Frame::Payload(_) | Frame::Drain(_) => (), Frame::Payload(_) | Frame::Drain(_) => (),
} }
@ -468,14 +441,14 @@ impl<S> WaitingResponse<S> {
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
error!("Unexpected eof"); error!("Unexpected eof");
let err: Error = UnexpectedTaskFrame.into(); let err: Error = UnexpectedTaskFrame.into();
return Ok(ProcessResponse::init(self.info, err.into())) return Ok(ProcessResponse::init(err.into()))
}, },
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
self.stream = PipelineResponse::Context(context); self.stream = PipelineResponse::Context(context);
return Err(PipelineState::Handler(self)) return Err(PipelineState::Handler(self))
}, },
Err(err) => Err(err) =>
return Ok(ProcessResponse::init(self.info, err.into())) return Ok(ProcessResponse::init(err.into()))
} }
} }
}, },
@ -486,9 +459,9 @@ impl<S> WaitingResponse<S> {
Err(PipelineState::Handler(self)) Err(PipelineState::Handler(self))
} }
Ok(Async::Ready(response)) => Ok(Async::Ready(response)) =>
Ok(RunMiddlewares::init(self.info, response)), Ok(RunMiddlewares::init(info, response)),
Err(err) => Err(err) =>
Ok(ProcessResponse::init(self.info, err.into())), Ok(ProcessResponse::init(err.into())),
} }
} }
PipelineResponse::None => { PipelineResponse::None => {
@ -501,17 +474,17 @@ impl<S> WaitingResponse<S> {
/// Middlewares response executor /// Middlewares response executor
struct RunMiddlewares<S> { struct RunMiddlewares<S> {
info: PipelineInfo<S>,
curr: usize, curr: usize,
fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>, fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>,
_s: PhantomData<S>,
} }
impl<S> RunMiddlewares<S> { impl<S> RunMiddlewares<S> {
fn init(mut info: PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S> fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S>
{ {
if info.count == 0 { if info.count == 0 {
return ProcessResponse::init(info, resp); return ProcessResponse::init(resp);
} }
let mut curr = 0; let mut curr = 0;
let len = info.mws.len(); let len = info.mws.len();
@ -520,26 +493,26 @@ impl<S> RunMiddlewares<S> {
resp = match info.mws[curr].response(info.req_mut(), resp) { resp = match info.mws[curr].response(info.req_mut(), resp) {
Response::Err(err) => { Response::Err(err) => {
info.count = curr + 1; info.count = curr + 1;
return ProcessResponse::init(info, err.into()) return ProcessResponse::init(err.into())
} }
Response::Done(r) => { Response::Done(r) => {
curr += 1; curr += 1;
if curr == len { if curr == len {
return ProcessResponse::init(info, r) return ProcessResponse::init(r)
} else { } else {
r r
} }
}, },
Response::Future(fut) => { Response::Future(fut) => {
return PipelineState::RunMiddlewares( return PipelineState::RunMiddlewares(
RunMiddlewares { info: info, curr: curr, fut: Some(fut) }) RunMiddlewares { curr: curr, fut: Some(fut), _s: PhantomData })
}, },
}; };
} }
} }
fn poll(mut self) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
let len = self.info.mws.len(); let len = info.mws.len();
loop { loop {
// poll latest fut // poll latest fut
@ -551,16 +524,16 @@ impl<S> RunMiddlewares<S> {
resp resp
} }
Err(err) => Err(err) =>
return Ok(ProcessResponse::init(self.info, err.into())), return Ok(ProcessResponse::init(err.into())),
}; };
loop { loop {
if self.curr == len { if self.curr == len {
return Ok(ProcessResponse::init(self.info, resp)); return Ok(ProcessResponse::init(resp));
} else { } else {
match self.info.mws[self.curr].response(self.info.req_mut(), resp) { match info.mws[self.curr].response(info.req_mut(), resp) {
Response::Err(err) => Response::Err(err) =>
return Ok(ProcessResponse::init(self.info, err.into())), return Ok(ProcessResponse::init(err.into())),
Response::Done(r) => { Response::Done(r) => {
self.curr += 1; self.curr += 1;
resp = r resp = r
@ -581,7 +554,7 @@ struct ProcessResponse<S> {
iostate: IOState, iostate: IOState,
running: RunningState, running: RunningState,
drain: DrainVec, drain: DrainVec,
info: PipelineInfo<S>, _s: PhantomData<S>,
} }
#[derive(PartialEq)] #[derive(PartialEq)]
@ -633,29 +606,31 @@ impl Drop for DrainVec {
impl<S> ProcessResponse<S> { impl<S> ProcessResponse<S> {
fn init(info: PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S> fn init(resp: HttpResponse) -> PipelineState<S>
{ {
PipelineState::Response( PipelineState::Response(
ProcessResponse{ resp: resp, ProcessResponse{ resp: resp,
iostate: IOState::Response, iostate: IOState::Response,
running: RunningState::Running, running: RunningState::Running,
drain: DrainVec(Vec::new()), drain: DrainVec(Vec::new()),
info: info}) _s: PhantomData})
} }
fn poll_io(mut self, io: &mut Writer) -> Result<PipelineState<S>, PipelineState<S>> { fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
-> Result<PipelineState<S>, PipelineState<S>>
{
if self.drain.0.is_empty() && self.running != RunningState::Paused { if self.drain.0.is_empty() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full // if task is paused, write buffer is probably full
loop { loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) { let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => { IOState::Response => {
let result = match io.start(self.info.req_mut().get_inner(), let result = match io.start(info.req_mut().get_inner(),
&mut self.resp) { &mut self.resp) {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => {
self.info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(self.info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
}; };
@ -672,13 +647,13 @@ impl<S> ProcessResponse<S> {
IOState::Payload(mut body) => { IOState::Payload(mut body) => {
// always poll context // always poll context
if self.running == RunningState::Running { if self.running == RunningState::Running {
match self.info.poll_context() { match info.poll_context() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => Ok(Async::Ready(_)) =>
self.running = RunningState::Done, self.running = RunningState::Done,
Err(err) => { Err(err) => {
self.info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init(self.info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
} }
} }
@ -687,8 +662,8 @@ impl<S> ProcessResponse<S> {
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.iostate = IOState::Done; self.iostate = IOState::Done;
if let Err(err) = io.write_eof() { if let Err(err) = io.write_eof() {
self.info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(self.info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
break break
}, },
@ -696,9 +671,8 @@ impl<S> ProcessResponse<S> {
self.iostate = IOState::Payload(body); self.iostate = IOState::Payload(body);
match io.write(chunk.as_ref()) { match io.write(chunk.as_ref()) {
Err(err) => { Err(err) => {
self.info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(info, self.resp))
self.info, self.resp))
}, },
Ok(result) => result Ok(result) => result
} }
@ -708,27 +682,27 @@ impl<S> ProcessResponse<S> {
break break
}, },
Err(err) => { Err(err) => {
self.info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init(self.info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
} }
}, },
IOState::Context => { IOState::Context => {
match self.info.context.as_mut().unwrap().poll() { match info.context.as_mut().unwrap().poll() {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
match frame { match frame {
Frame::Message(msg) => { Frame::Message(msg) => {
error!("Unexpected message frame {:?}", msg); error!("Unexpected message frame {:?}", msg);
self.info.error = Some(UnexpectedTaskFrame.into()); info.error = Some(UnexpectedTaskFrame.into());
return Ok( return Ok(
FinishingMiddlewares::init(self.info, self.resp)) FinishingMiddlewares::init(info, self.resp))
}, },
Frame::Payload(None) => { Frame::Payload(None) => {
self.iostate = IOState::Done; self.iostate = IOState::Done;
if let Err(err) = io.write_eof() { if let Err(err) = io.write_eof() {
self.info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init(self.info, self.resp)) FinishingMiddlewares::init(info, self.resp))
} }
break break
}, },
@ -736,9 +710,9 @@ impl<S> ProcessResponse<S> {
self.iostate = IOState::Context; self.iostate = IOState::Context;
match io.write(chunk.as_ref()) { match io.write(chunk.as_ref()) {
Err(err) => { Err(err) => {
self.info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init( return Ok(FinishingMiddlewares::init(
self.info, self.resp)) info, self.resp))
}, },
Ok(result) => result Ok(result) => result
} }
@ -751,7 +725,7 @@ impl<S> ProcessResponse<S> {
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.iostate = IOState::Done; self.iostate = IOState::Done;
self.info.context.take(); info.context.take();
break break
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
@ -759,8 +733,8 @@ impl<S> ProcessResponse<S> {
break break
} }
Err(err) => { Err(err) => {
self.info.error = Some(err); info.error = Some(err);
return Ok(FinishingMiddlewares::init(self.info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
} }
} }
@ -787,8 +761,8 @@ impl<S> ProcessResponse<S> {
return Err(PipelineState::Response(self)), return Err(PipelineState::Response(self)),
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err); debug!("Error sending data: {}", err);
self.info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(self.info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
} }
@ -803,7 +777,7 @@ impl<S> ProcessResponse<S> {
// response is completed // response is completed
if self.iostate.is_done() { if self.iostate.is_done() {
self.resp.set_response_size(io.written()); self.resp.set_response_size(io.written());
Ok(FinishingMiddlewares::init(self.info, self.resp)) Ok(FinishingMiddlewares::init(info, self.resp))
} else { } else {
Err(PipelineState::Response(self)) Err(PipelineState::Response(self))
} }
@ -812,24 +786,24 @@ impl<S> ProcessResponse<S> {
/// Middlewares start executor /// Middlewares start executor
struct FinishingMiddlewares<S> { struct FinishingMiddlewares<S> {
info: PipelineInfo<S>,
resp: HttpResponse, resp: HttpResponse,
fut: Option<Box<Future<Item=(), Error=Error>>>, fut: Option<Box<Future<Item=(), Error=Error>>>,
_s: PhantomData<S>,
} }
impl<S> FinishingMiddlewares<S> { impl<S> FinishingMiddlewares<S> {
fn init(info: PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S> {
if info.count == 0 { if info.count == 0 {
Completed::init(info) Completed::init(info)
} else { } else {
match (FinishingMiddlewares{info: info, resp: resp, fut: None}).poll() { match (FinishingMiddlewares{resp: resp, fut: None, _s: PhantomData}).poll(info) {
Ok(st) | Err(st) => st, Ok(st) | Err(st) => st,
} }
} }
} }
fn poll(mut self) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
loop { loop {
// poll latest fut // poll latest fut
let not_ready = if let Some(ref mut fut) = self.fut { let not_ready = if let Some(ref mut fut) = self.fut {
@ -852,12 +826,12 @@ impl<S> FinishingMiddlewares<S> {
return Ok(PipelineState::Finishing(self)) return Ok(PipelineState::Finishing(self))
} }
self.fut = None; self.fut = None;
self.info.count -= 1; info.count -= 1;
match self.info.mws[self.info.count].finish(self.info.req_mut(), &self.resp) { match info.mws[info.count].finish(info.req_mut(), &self.resp) {
Finished::Done => { Finished::Done => {
if self.info.count == 0 { if info.count == 0 {
return Ok(Completed::init(self.info)) return Ok(Completed::init(info))
} }
} }
Finished::Future(fut) => { Finished::Future(fut) => {
@ -868,21 +842,21 @@ impl<S> FinishingMiddlewares<S> {
} }
} }
struct Completed<S>(PipelineInfo<S>); struct Completed<S>(PhantomData<S>);
impl<S> Completed<S> { impl<S> Completed<S> {
fn init(info: PipelineInfo<S>) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>) -> PipelineState<S> {
if info.context.is_none() { if info.context.is_none() {
PipelineState::None PipelineState::None
} else { } else {
PipelineState::Completed(Completed(info)) PipelineState::Completed(Completed(PhantomData))
} }
} }
fn poll(mut self) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
match self.0.poll_context() { match info.poll_context() {
Ok(Async::NotReady) => Ok(PipelineState::Completed(self)), Ok(Async::NotReady) => Ok(PipelineState::Completed(Completed(PhantomData))),
Ok(Async::Ready(())) => Ok(PipelineState::None), Ok(Async::Ready(())) => Ok(PipelineState::None),
Err(_) => Ok(PipelineState::Error), Err(_) => Ok(PipelineState::Error),
} }
@ -914,23 +888,25 @@ mod tests {
#[test] #[test]
fn test_completed() { fn test_completed() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let info = Box::new(PipelineInfo::new(HttpRequest::default())); let mut info = PipelineInfo::new(HttpRequest::default());
Completed::init(info).is_none().unwrap(); Completed::init(&mut info).is_none().unwrap();
let req = HttpRequest::default(); let req = HttpRequest::default();
let mut ctx = HttpContext::new(req.clone(), MyActor); let mut ctx = HttpContext::new(req.clone(), MyActor);
let addr: Address<_> = ctx.address(); let addr: Address<_> = ctx.address();
let mut info = Box::new(PipelineInfo::new(req)); let mut info = PipelineInfo::new(req);
info.context = Some(Box::new(ctx)); info.context = Some(Box::new(ctx));
let mut state = Completed::init(info).completed().unwrap(); let mut state = Completed::init(&mut info).completed().unwrap();
let st = state.poll().ok().unwrap(); let st = state.poll(&mut info).ok().unwrap();
assert!(!st.is_done()); let pp = Pipeline(info, st);
assert!(!pp.is_done());
let Pipeline(mut info, st) = pp;
state = st.completed().unwrap(); state = st.completed().unwrap();
drop(addr); drop(addr);
state.poll().ok().unwrap().is_none().unwrap(); state.poll(&mut info).ok().unwrap().is_none().unwrap();
result(Ok::<_, ()>(())) result(Ok::<_, ()>(()))
})).unwrap() })).unwrap()