use std::marker::PhantomData; use std::rc::Rc; use std::{io, mem}; use futures::sync::oneshot; use futures::{Async, Future, Poll, Stream}; use log::Level::Debug; use body::{Body, BodyStream}; use context::{ActorHttpContext, Frame}; use error::Error; use handler::{AsyncResult, AsyncResultItem}; use header::ContentEncoding; use httprequest::HttpRequest; use httpresponse::HttpResponse; use middleware::{Finished, Middleware, Response, Started}; use server::{HttpHandlerTask, Writer, WriterState}; #[doc(hidden)] pub trait PipelineHandler { fn encoding(&self) -> ContentEncoding; fn handle(&self, &HttpRequest) -> AsyncResult; } #[doc(hidden)] pub struct Pipeline( PipelineInfo, PipelineState, Rc>>>, ); enum PipelineState { None, Error, Starting(StartMiddlewares), Handler(WaitingResponse), RunMiddlewares(RunMiddlewares), Response(ProcessResponse), Finishing(FinishingMiddlewares), Completed(Completed), } impl> PipelineState { fn is_response(&self) -> bool { match *self { PipelineState::Response(_) => true, _ => false, } } fn poll( &mut self, info: &mut PipelineInfo, mws: &[Box>], ) -> Option> { match *self { PipelineState::Starting(ref mut state) => state.poll(info, mws), PipelineState::Handler(ref mut state) => state.poll(info, mws), PipelineState::RunMiddlewares(ref mut state) => state.poll(info, mws), PipelineState::Finishing(ref mut state) => state.poll(info, mws), PipelineState::Completed(ref mut state) => state.poll(info), PipelineState::Response(_) | PipelineState::None | PipelineState::Error => { None } } } } struct PipelineInfo { req: HttpRequest, count: u16, context: Option>, error: Option, disconnected: Option, encoding: ContentEncoding, } impl PipelineInfo { fn poll_context(&mut self) -> Poll<(), Error> { if let Some(ref mut context) = self.context { match context.poll() { Err(err) => Err(err), Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => Ok(Async::Ready(())), } } else { Ok(Async::Ready(())) } } } impl> Pipeline { pub fn new( req: HttpRequest, mws: Rc>>>, handler: Rc, ) -> Pipeline { let mut info = PipelineInfo { req, count: 0, error: None, context: None, disconnected: None, encoding: handler.encoding(), }; let state = StartMiddlewares::init(&mut info, &mws, handler); Pipeline(info, state, mws) } } impl Pipeline { #[inline] fn is_done(&self) -> bool { match self.1 { PipelineState::None | PipelineState::Error | PipelineState::Starting(_) | PipelineState::Handler(_) | PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true, PipelineState::Finishing(_) | PipelineState::Completed(_) => false, } } } impl> HttpHandlerTask for Pipeline { fn disconnected(&mut self) { self.0.disconnected = Some(true); } fn poll_io(&mut self, io: &mut Writer) -> Poll { let mut state = mem::replace(&mut self.1, PipelineState::None); loop { if state.is_response() { if let PipelineState::Response(st) = state { match st.poll_io(io, &mut self.0, &self.2) { Ok(state) => { self.1 = state; if let Some(error) = self.0.error.take() { return Err(error); } else { return Ok(Async::Ready(self.is_done())); } } Err(state) => { self.1 = state; return Ok(Async::NotReady); } } } } match state { PipelineState::None => return Ok(Async::Ready(true)), PipelineState::Error => { return Err( io::Error::new(io::ErrorKind::Other, "Internal error").into() ) } _ => (), } match state.poll(&mut self.0, &self.2) { Some(st) => state = st, None => { return { self.1 = state; Ok(Async::NotReady) } } } } } fn poll_completed(&mut self) -> Poll<(), Error> { let mut state = mem::replace(&mut self.1, PipelineState::None); loop { match state { PipelineState::None | PipelineState::Error => { return Ok(Async::Ready(())) } _ => (), } if let Some(st) = state.poll(&mut self.0, &self.2) { state = st; } else { self.1 = state; return Ok(Async::NotReady); } } } } type Fut = Box, Error = Error>>; /// Middlewares start executor struct StartMiddlewares { hnd: Rc, fut: Option, _s: PhantomData, } impl> StartMiddlewares { fn init( info: &mut PipelineInfo, mws: &[Box>], hnd: Rc, ) -> PipelineState { // execute middlewares, we need this stage because middlewares could be // non-async and we can move to next state immediately let len = mws.len() as u16; loop { if info.count == len { let reply = hnd.handle(&info.req); return WaitingResponse::init(info, mws, reply); } else { match mws[info.count as usize].start(&info.req) { Ok(Started::Done) => info.count += 1, Ok(Started::Response(resp)) => { return RunMiddlewares::init(info, mws, resp); } Ok(Started::Future(fut)) => { return PipelineState::Starting(StartMiddlewares { hnd, fut: Some(fut), _s: PhantomData, }) } Err(err) => { return RunMiddlewares::init(info, mws, err.into()); } } } } } fn poll( &mut self, info: &mut PipelineInfo, mws: &[Box>], ) -> Option> { let len = mws.len() as u16; 'outer: loop { match self.fut.as_mut().unwrap().poll() { Ok(Async::NotReady) => { return None; } Ok(Async::Ready(resp)) => { info.count += 1; if let Some(resp) = resp { return Some(RunMiddlewares::init(info, mws, resp)); } loop { if info.count == len { let reply = self.hnd.handle(&info.req); return Some(WaitingResponse::init(info, mws, reply)); } else { let res = mws[info.count as usize].start(&info.req); match res { Ok(Started::Done) => info.count += 1, Ok(Started::Response(resp)) => { return Some(RunMiddlewares::init(info, mws, resp)); } Ok(Started::Future(fut)) => { self.fut = Some(fut); continue 'outer; } Err(err) => { return Some(RunMiddlewares::init( info, mws, err.into(), )); } } } } } Err(err) => { return Some(RunMiddlewares::init(info, mws, err.into())); } } } } } // waiting for response struct WaitingResponse { fut: Box>, _s: PhantomData, _h: PhantomData, } impl WaitingResponse { #[inline] fn init( info: &mut PipelineInfo, mws: &[Box>], reply: AsyncResult, ) -> PipelineState { match reply.into() { AsyncResultItem::Ok(resp) => RunMiddlewares::init(info, mws, resp), AsyncResultItem::Err(err) => RunMiddlewares::init(info, mws, err.into()), AsyncResultItem::Future(fut) => PipelineState::Handler(WaitingResponse { fut, _s: PhantomData, _h: PhantomData, }), } } fn poll( &mut self, info: &mut PipelineInfo, mws: &[Box>], ) -> Option> { match self.fut.poll() { Ok(Async::NotReady) => None, Ok(Async::Ready(resp)) => Some(RunMiddlewares::init(info, mws, resp)), Err(err) => Some(RunMiddlewares::init(info, mws, err.into())), } } } /// Middlewares response executor struct RunMiddlewares { curr: usize, fut: Option>>, _s: PhantomData, _h: PhantomData, } impl RunMiddlewares { #[inline] fn init( info: &mut PipelineInfo, mws: &[Box>], mut resp: HttpResponse, ) -> PipelineState { if info.count == 0 { return ProcessResponse::init(resp); } let mut curr = 0; let len = mws.len(); loop { let state = mws[curr].response(&info.req, resp); resp = match state { Err(err) => { info.count = (curr + 1) as u16; return ProcessResponse::init(err.into()); } Ok(Response::Done(r)) => { curr += 1; if curr == len { return ProcessResponse::init(r); } else { r } } Ok(Response::Future(fut)) => { return PipelineState::RunMiddlewares(RunMiddlewares { curr, fut: Some(fut), _s: PhantomData, _h: PhantomData, }); } }; } } fn poll( &mut self, info: &mut PipelineInfo, mws: &[Box>], ) -> Option> { let len = mws.len(); loop { // poll latest fut let mut resp = match self.fut.as_mut().unwrap().poll() { Ok(Async::NotReady) => return None, Ok(Async::Ready(resp)) => { self.curr += 1; resp } Err(err) => return Some(ProcessResponse::init(err.into())), }; loop { if self.curr == len { return Some(ProcessResponse::init(resp)); } else { let state = mws[self.curr].response(&info.req, resp); match state { Err(err) => return Some(ProcessResponse::init(err.into())), Ok(Response::Done(r)) => { self.curr += 1; resp = r } Ok(Response::Future(fut)) => { self.fut = Some(fut); break; } } } } } } } struct ProcessResponse { resp: HttpResponse, iostate: IOState, running: RunningState, drain: Option>, _s: PhantomData, _h: PhantomData, } #[derive(PartialEq, Debug)] enum RunningState { Running, Paused, Done, } impl RunningState { #[inline] fn pause(&mut self) { if *self != RunningState::Done { *self = RunningState::Paused } } #[inline] fn resume(&mut self) { if *self != RunningState::Done { *self = RunningState::Running } } } enum IOState { Response, Payload(BodyStream), Actor(Box), Done, } impl ProcessResponse { #[inline] fn init(resp: HttpResponse) -> PipelineState { PipelineState::Response(ProcessResponse { resp, iostate: IOState::Response, running: RunningState::Running, drain: None, _s: PhantomData, _h: PhantomData, }) } fn poll_io( mut self, io: &mut Writer, info: &mut PipelineInfo, mws: &[Box>], ) -> Result, PipelineState> { loop { if self.drain.is_none() && self.running != RunningState::Paused { // if task is paused, write buffer is probably full 'inner: loop { let result = match mem::replace(&mut self.iostate, IOState::Done) { IOState::Response => { let encoding = self.resp.content_encoding().unwrap_or(info.encoding); let result = match io.start(&info.req, &mut self.resp, encoding) { Ok(res) => res, Err(err) => { info.error = Some(err.into()); return Ok(FinishingMiddlewares::init( info, mws, self.resp, )); } }; if let Some(err) = self.resp.error() { if self.resp.status().is_server_error() { error!( "Error occured during request handling, status: {} {}", self.resp.status(), err ); } else { warn!( "Error occured during request handling: {}", err ); } if log_enabled!(Debug) { debug!("{:?}", err); } } // always poll stream or actor for the first time match self.resp.replace_body(Body::Empty) { Body::Streaming(stream) => { self.iostate = IOState::Payload(stream); continue 'inner; } Body::Actor(ctx) => { self.iostate = IOState::Actor(ctx); continue 'inner; } _ => (), } result } IOState::Payload(mut body) => match body.poll() { Ok(Async::Ready(None)) => { if let Err(err) = io.write_eof() { info.error = Some(err.into()); return Ok(FinishingMiddlewares::init( info, mws, self.resp, )); } break; } Ok(Async::Ready(Some(chunk))) => { self.iostate = IOState::Payload(body); match io.write(&chunk.into()) { Err(err) => { info.error = Some(err.into()); return Ok(FinishingMiddlewares::init( info, mws, self.resp, )); } Ok(result) => result, } } Ok(Async::NotReady) => { self.iostate = IOState::Payload(body); break; } Err(err) => { info.error = Some(err); return Ok(FinishingMiddlewares::init( info, mws, self.resp, )); } }, IOState::Actor(mut ctx) => { if info.disconnected.take().is_some() { ctx.disconnected(); } match ctx.poll() { Ok(Async::Ready(Some(vec))) => { if vec.is_empty() { self.iostate = IOState::Actor(ctx); break; } let mut res = None; for frame in vec { match frame { Frame::Chunk(None) => { info.context = Some(ctx); if let Err(err) = io.write_eof() { info.error = Some(err.into()); return Ok( FinishingMiddlewares::init( info, mws, self.resp, ), ); } break 'inner; } Frame::Chunk(Some(chunk)) => { match io.write(&chunk) { Err(err) => { info.context = Some(ctx); info.error = Some(err.into()); return Ok( FinishingMiddlewares::init( info, mws, self.resp, ), ); } Ok(result) => res = Some(result), } } Frame::Drain(fut) => self.drain = Some(fut), } } self.iostate = IOState::Actor(ctx); if self.drain.is_some() { self.running.resume(); break 'inner; } res.unwrap() } Ok(Async::Ready(None)) => break, Ok(Async::NotReady) => { self.iostate = IOState::Actor(ctx); break; } Err(err) => { info.context = Some(ctx); info.error = Some(err); return Ok(FinishingMiddlewares::init( info, mws, self.resp, )); } } } IOState::Done => break, }; match result { WriterState::Pause => { self.running.pause(); break; } WriterState::Done => self.running.resume(), } } } // flush io but only if we need to if self.running == RunningState::Paused || self.drain.is_some() { match io.poll_completed(false) { Ok(Async::Ready(_)) => { self.running.resume(); // resolve drain futures if let Some(tx) = self.drain.take() { let _ = tx.send(()); } // restart io processing continue; } Ok(Async::NotReady) => return Err(PipelineState::Response(self)), Err(err) => { if let IOState::Actor(mut ctx) = mem::replace(&mut self.iostate, IOState::Done) { ctx.disconnected(); info.context = Some(ctx); } info.error = Some(err.into()); return Ok(FinishingMiddlewares::init(info, mws, self.resp)); } } } break; } // response is completed match self.iostate { IOState::Done => { match io.write_eof() { Ok(_) => (), Err(err) => { info.error = Some(err.into()); return Ok(FinishingMiddlewares::init(info, mws, self.resp)); } } self.resp.set_response_size(io.written()); Ok(FinishingMiddlewares::init(info, mws, self.resp)) } _ => Err(PipelineState::Response(self)), } } } /// Middlewares start executor struct FinishingMiddlewares { resp: Option, fut: Option>>, _s: PhantomData, _h: PhantomData, } impl FinishingMiddlewares { #[inline] fn init( info: &mut PipelineInfo, mws: &[Box>], resp: HttpResponse, ) -> PipelineState { if info.count == 0 { resp.release(); Completed::init(info) } else { let mut state = FinishingMiddlewares { resp: Some(resp), fut: None, _s: PhantomData, _h: PhantomData, }; if let Some(st) = state.poll(info, mws) { st } else { PipelineState::Finishing(state) } } } fn poll( &mut self, info: &mut PipelineInfo, mws: &[Box>], ) -> Option> { loop { // poll latest fut let not_ready = if let Some(ref mut fut) = self.fut { match fut.poll() { Ok(Async::NotReady) => true, Ok(Async::Ready(())) => false, Err(err) => { error!("Middleware finish error: {}", err); false } } } else { false }; if not_ready { return None; } self.fut = None; if info.count == 0 { self.resp.take().unwrap().release(); return Some(Completed::init(info)); } info.count -= 1; let state = mws[info.count as usize].finish(&info.req, self.resp.as_ref().unwrap()); match state { Finished::Done => { if info.count == 0 { self.resp.take().unwrap().release(); return Some(Completed::init(info)); } } Finished::Future(fut) => { self.fut = Some(fut); } } } } } #[derive(Debug)] struct Completed(PhantomData, PhantomData); impl Completed { #[inline] fn init(info: &mut PipelineInfo) -> PipelineState { if let Some(ref err) = info.error { error!("Error occurred during request handling: {}", err); } if info.context.is_none() { PipelineState::None } else { match info.poll_context() { Ok(Async::NotReady) => { PipelineState::Completed(Completed(PhantomData, PhantomData)) } Ok(Async::Ready(())) => PipelineState::None, Err(_) => PipelineState::Error, } } } #[inline] fn poll(&mut self, info: &mut PipelineInfo) -> Option> { match info.poll_context() { Ok(Async::NotReady) => None, Ok(Async::Ready(())) => Some(PipelineState::None), Err(_) => Some(PipelineState::Error), } } }