diff --git a/src/context.rs b/src/context.rs index ae6211d9e..c9f770147 100644 --- a/src/context.rs +++ b/src/context.rs @@ -12,12 +12,16 @@ use actix::fut::ActorFuture; use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle, Envelope, ToEnvelope, RemoteEnvelope}; -use task::{IoContext, DrainFut}; use body::{Body, Binary}; use error::Error; use httprequest::HttpRequest; use httpresponse::HttpResponse; +use pipeline::DrainFut; +pub(crate) trait IoContext: 'static { + fn disconnected(&mut self); + fn poll(&mut self) -> Poll, Error>; +} #[derive(Debug)] pub(crate) enum Frame { @@ -45,6 +49,7 @@ impl ActorContext for HttpContext where A: Actor { /// Stop actor execution fn stop(&mut self) { + self.stream.push_back(Frame::Payload(None)); self.items.stop(); self.address.close(); if self.state == ActorState::Running { @@ -150,6 +155,11 @@ impl HttpContext where A: Actor { } } + /// Indicate end of streamimng payload. Also this method calls `Self::close`. + pub fn write_eof(&mut self) { + self.stop(); + } + /// Returns drain future pub fn drain(&mut self) -> Drain { let fut = Rc::new(RefCell::new(DrainFut::default())); diff --git a/src/h1.rs b/src/h1.rs index d4c86a8ac..ff358f154 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -54,7 +54,7 @@ pub(crate) struct Http1 { } struct Entry { - task: Pipeline, + pipe: Pipeline, eof: bool, error: bool, finished: bool, @@ -108,7 +108,7 @@ impl Http1 return Err(()) } - match item.task.poll_io(&mut self.stream) { + match item.pipe.poll_io(&mut self.stream) { Ok(Async::Ready(ready)) => { not_ready = false; @@ -129,13 +129,13 @@ impl Http1 }, Err(err) => { // it is not possible to recover from error - // during task handling, so just drop connection + // during pipe handling, so just drop connection error!("Unhandled error: {}", err); return Err(()) } } } else if !item.finished { - match item.task.poll() { + match item.pipe.poll() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { not_ready = false; @@ -181,11 +181,11 @@ impl Http1 self.keepalive_timer.take(); // start request processing - let mut task = None; + let mut pipe = None; for h in self.router.iter() { req = match h.handle(req) { Ok(t) => { - task = Some(t); + pipe = Some(t); break }, Err(req) => req, @@ -193,7 +193,7 @@ impl Http1 } self.tasks.push_back( - Entry {task: task.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), + Entry {pipe: pipe.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), eof: false, error: false, finished: false}); @@ -206,7 +206,7 @@ impl Http1 self.error = true; self.stream.disconnected(); for entry in &mut self.tasks { - entry.task.disconnected() + entry.pipe.disconnected() } }, Err(err) => { @@ -214,7 +214,7 @@ impl Http1 not_ready = false; self.stream.disconnected(); for entry in &mut self.tasks { - entry.task.disconnected() + entry.pipe.disconnected() } // kill keepalive @@ -227,7 +227,7 @@ impl Http1 if self.tasks.is_empty() { if let ReaderError::Error(err) = err { self.tasks.push_back( - Entry {task: Pipeline::error(err.error_response()), + Entry {pipe: Pipeline::error(err.error_response()), eof: false, error: false, finished: false}); @@ -888,7 +888,7 @@ mod tests { self.buf = b.take().freeze(); } } - + impl AsyncRead for Buffer {} impl io::Read for Buffer { fn read(&mut self, dst: &mut [u8]) -> Result { diff --git a/src/lib.rs b/src/lib.rs index 53b30a374..fb2b1a3aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,7 +57,7 @@ mod payload; mod resource; mod recognizer; mod route; -mod task; +//mod task; mod pipeline; mod staticfiles; mod server; diff --git a/src/multipart.rs b/src/multipart.rs index af83e3fda..f09c135fd 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -706,7 +706,6 @@ mod tests { "abbc761f78ff4d7cb7573b5a23f96ef0".to_owned(), payload); match multipart.poll() { Ok(Async::Ready(Some(item))) => { - println!("{:?}", item); match item { MultipartItem::Field(mut field) => { assert_eq!(field.content_type().type_(), mime::TEXT); diff --git a/src/pipeline.rs b/src/pipeline.rs index d286c956b..1bb4fb4cf 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,12 +1,15 @@ -use std::mem; +use std::{io, mem}; use std::rc::Rc; +use std::cell::RefCell; -use futures::{Async, Poll, Future}; +use futures::{Async, Poll, Future, Stream}; +use futures::task::{Task as FutureTask, current as current_task}; -use task::Task; -use route::Reply; -use error::Error; -use h1writer::Writer; +use body::{Body, BodyStream}; +use context::{Frame, IoContext}; +use error::{Error, UnexpectedTaskFrame}; +use route::{Reply, ReplyItem}; +use h1writer::{Writer, WriterState}; use httprequest::HttpRequest; use httpresponse::HttpResponse; use middlewares::{Middleware, Finished, Started, Response}; @@ -18,130 +21,303 @@ pub struct Pipeline(PipelineState); enum PipelineState { None, - Starting(Start), - Handle(Box), - Finishing(Box), - Error(Box<(Task, HttpRequest)>), - Task(Box<(Task, HttpRequest)>), + Error, + Starting(StartMiddlewares), + Handler(WaitingResponse), + RunMiddlewares(RunMiddlewares), + Response(ProcessResponse), + Finishing(FinishingMiddlewares), + Completed(Completed), } +impl PipelineState { + + fn is_done(&self) -> bool { + match *self { + PipelineState::None | PipelineState::Error + | PipelineState::Starting(_) | PipelineState::Handler(_) + | PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true, + PipelineState::Finishing(ref st) => st.info.context.is_none(), + PipelineState::Completed(_) => false, + } + } + + fn disconnect(&mut self) { + let info = match *self { + PipelineState::None | PipelineState::Error => return, + PipelineState::Starting(ref mut st) => &mut st.info, + PipelineState::Handler(ref mut st) => &mut st.info, + PipelineState::RunMiddlewares(ref mut st) => &mut st.info, + PipelineState::Response(ref mut st) => &mut st.info, + PipelineState::Finishing(ref mut st) => &mut st.info, + PipelineState::Completed(ref mut st) => &mut st.0, + }; + if let Some(ref mut context) = info.context { + context.disconnected(); + } + } + + fn error(&mut self) -> Option { + let info = match *self { + PipelineState::None | PipelineState::Error => return None, + PipelineState::Starting(ref mut st) => &mut st.info, + PipelineState::Handler(ref mut st) => &mut st.info, + PipelineState::RunMiddlewares(ref mut st) => &mut st.info, + PipelineState::Response(ref mut st) => &mut st.info, + PipelineState::Finishing(ref mut st) => &mut st.info, + PipelineState::Completed(ref mut st) => &mut st.0, + }; + info.error.take() + } +} + +struct PipelineInfo { + req: HttpRequest, + count: usize, + mws: Rc>>, + context: Option>, + error: Option, +} + +impl PipelineInfo { + fn new(req: HttpRequest) -> PipelineInfo { + PipelineInfo { + req: req, + count: 0, + mws: Rc::new(Vec::new()), + error: None, + context: None, + } + } + + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))] + fn req_mut(&self) -> &mut HttpRequest { + #[allow(mutable_transmutes)] + unsafe{mem::transmute(&self.req)} + } + + fn poll_context(&mut self) -> Poll<(), Error> { + if let Some(ref mut context) = self.context { + match context.poll() { + Err(err) => Err(err), + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => Ok(Async::Ready(())), + } + } else { + Ok(Async::Ready(())) + } + } +} + +enum PipelineResponse { + None, + Context(Box), + Response(Box>), +} + +/// Future that resolves when all buffered data get sent +#[doc(hidden)] +#[derive(Debug)] +pub struct DrainFut { + drained: bool, + task: Option, +} + +impl Default for DrainFut { + + fn default() -> DrainFut { + DrainFut { + drained: false, + task: None, + } + } +} + +impl DrainFut { + + fn set(&mut self) { + self.drained = true; + if let Some(task) = self.task.take() { + task.notify() + } + } +} + +impl Future for DrainFut { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.drained { + Ok(Async::Ready(())) + } else { + self.task = Some(current_task()); + Ok(Async::NotReady) + } + } +} + + impl Pipeline { - pub fn new(req: HttpRequest, mw: Rc>>, handler: PipelineHandler) -> Pipeline + pub fn new(req: HttpRequest, + mw: Rc>>, + handler: PipelineHandler) -> Pipeline { - if mw.is_empty() { - let task = Task::new((handler)(req.clone())); - Pipeline(PipelineState::Task(Box::new((task, req)))) - } else { - match Start::init(mw, req, handler) { - Ok(StartResult::Ready(res)) => - Pipeline(PipelineState::Handle(res)), - Ok(StartResult::NotReady(res)) => - Pipeline(PipelineState::Starting(res)), - Err(err) => - Pipeline(PipelineState::Error( - Box::new((Task::from_error(err), HttpRequest::default())))) - } - } + Pipeline(StartMiddlewares::init(mw, req, handler)) } - pub fn error>(resp: R) -> Self { - Pipeline(PipelineState::Error( - Box::new((Task::from_response(resp), HttpRequest::default())))) + pub fn error>(err: R) -> Self { + Pipeline(ProcessResponse::init( + Box::new(PipelineInfo::new(HttpRequest::default())), err.into())) } pub(crate) fn disconnected(&mut self) { - match self.0 { - PipelineState::Starting(ref mut st) => - st.disconnected(), - PipelineState::Handle(ref mut st) => - st.task.disconnected(), - PipelineState::Task(ref mut st) | PipelineState::Error(ref mut st) => - st.0.disconnected(), - _ =>(), - } + self.0.disconnect() } pub(crate) fn poll_io(&mut self, io: &mut T) -> Poll { loop { - match mem::replace(&mut self.0, PipelineState::None) { - PipelineState::Task(mut st) => { - let req:&mut HttpRequest = unsafe{mem::transmute(&mut st.1)}; - let res = st.0.poll_io(io, req); - self.0 = PipelineState::Task(st); - return res - } - PipelineState::Starting(mut st) => { + let state = mem::replace(&mut self.0, PipelineState::None); + match state { + PipelineState::None => + return Ok(Async::Ready(true)), + PipelineState::Error => + return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()), + PipelineState::Starting(st) => { match st.poll() { - Ok(Async::NotReady) => { - self.0 = PipelineState::Starting(st); + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; return Ok(Async::NotReady) } - Ok(Async::Ready(h)) => - self.0 = PipelineState::Handle(h), - Err(err) => - self.0 = PipelineState::Error( - Box::new((Task::from_error(err), HttpRequest::default()))) } } - PipelineState::Handle(mut st) => { - let res = st.poll_io(io); - if let Ok(Async::Ready(r)) = res { - if r { - self.0 = PipelineState::Finishing(st.finish()); - return Ok(Async::Ready(false)) - } else { - self.0 = PipelineState::Handle(st); - return res + PipelineState::Handler(st) => { + match st.poll() { + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) } - } else { - self.0 = PipelineState::Handle(st); - return res } } - PipelineState::Error(mut st) => { - let req:&mut HttpRequest = unsafe{mem::transmute(&mut st.1)}; - let res = st.0.poll_io(io, req); - self.0 = PipelineState::Error(st); - return res + PipelineState::RunMiddlewares(st) => { + match st.poll() { + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } + } + PipelineState::Response(st) => { + match st.poll_io(io) { + Ok(state) => { + self.0 = state; + if let Some(error) = self.0.error() { + return Err(error) + } else { + return Ok(Async::Ready(self.0.is_done())) + } + } + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } + } + PipelineState::Finishing(st) => { + match st.poll() { + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } + } + PipelineState::Completed(st) => { + match st.poll() { + Ok(state) => { + self.0 = state; + return Ok(Async::Ready(true)); + } + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } } - PipelineState::Finishing(_) | PipelineState::None => unreachable!(), } } } pub(crate) fn poll(&mut self) -> Poll<(), Error> { loop { - match mem::replace(&mut self.0, PipelineState::None) { - PipelineState::Handle(mut st) => { - let res = st.poll(); - match res { - Ok(Async::NotReady) => { - self.0 = PipelineState::Handle(st); + let state = mem::replace(&mut self.0, PipelineState::None); + match state { + PipelineState::None | PipelineState::Error => { + return Ok(Async::Ready(())) + } + PipelineState::Starting(st) => { + match st.poll() { + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; return Ok(Async::NotReady) } - Ok(Async::Ready(())) | Err(_) => { - self.0 = PipelineState::Finishing(st.finish()); - } } } - PipelineState::Finishing(mut st) => { - let res = st.poll(); - self.0 = PipelineState::Finishing(st); - return Ok(res) + PipelineState::Handler(st) => { + match st.poll() { + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } } - PipelineState::Error(mut st) => { - let res = st.0.poll(); - self.0 = PipelineState::Error(st); - return res + PipelineState::RunMiddlewares(st) => { + match st.poll() { + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } } - PipelineState::Task(mut st) => { - let res = st.0.poll(); - self.0 = PipelineState::Task(st); - return res + PipelineState::Response(_) => { + self.0 = state; + return Ok(Async::NotReady); } - _ => { - return Ok(Async::Ready(())) + PipelineState::Finishing(st) => { + match st.poll() { + Ok(state) => + self.0 = state, + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } + } + PipelineState::Completed(st) => { + match st.poll() { + Ok(state) => { + self.0 = state; + return Ok(Async::Ready(())); + } + Err(state) => { + self.0 = state; + return Ok(Async::NotReady) + } + } } } } @@ -151,266 +327,236 @@ impl Pipeline { type Fut = Box, Error=Error>>; /// Middlewares start executor -struct Start { - idx: usize, +struct StartMiddlewares { hnd: *mut Handler, - disconnected: bool, - req: HttpRequest, fut: Option, - middlewares: Rc>>, + info: Box, } -enum StartResult { - Ready(Box), - NotReady(Start), -} +impl StartMiddlewares { -impl Start { - - fn init(mw: Rc>>, - req: HttpRequest, handler: PipelineHandler) -> Result { - Start { - idx: 0, - fut: None, + fn init(mws: Rc>>, + req: HttpRequest, handler: PipelineHandler) -> PipelineState { + let mut info = PipelineInfo { req: req, - disconnected: false, - hnd: handler as *const _ as *mut _, - middlewares: mw, - }.start() - } + count: 0, + mws: mws, + error: None, + context: None, + }; - fn disconnected(&mut self) { - self.disconnected = true; - } - - fn prepare(&self, mut task: Task) -> Task { - if self.disconnected { - task.disconnected() - } - task.set_middlewares(MiddlewaresResponse::new(self.idx-1, Rc::clone(&self.middlewares))); - task - } - - fn start(mut self) -> Result { - let len = self.middlewares.len(); + // execute middlewares, we need this stage because middlewares could be non-async + // and we can move to next state immidietly + let len = info.mws.len(); loop { - if self.idx == len { - let task = Task::new((unsafe{&*self.hnd})(self.req.clone())); - return Ok(StartResult::Ready( - Box::new(Handle::new(self.idx-1, self.req.clone(), - self.prepare(task), self.middlewares)))) + if info.count == len { + let reply = (&*handler)(info.req.clone()); + return WaitingResponse::init(Box::new(info), reply) } else { - match self.middlewares[self.idx].start(&mut self.req) { + match info.mws[info.count].start(&mut info.req) { Started::Done => - self.idx += 1, + info.count += 1, Started::Response(resp) => - return Ok(StartResult::Ready( - Box::new(Handle::new( - self.idx, self.req.clone(), - self.prepare(Task::from_response(resp)), self.middlewares)))), + return RunMiddlewares::init(Box::new(info), resp), Started::Future(mut fut) => match fut.poll() { - Ok(Async::NotReady) => { - self.fut = Some(fut); - return Ok(StartResult::NotReady(self)) - } + Ok(Async::NotReady) => + return PipelineState::Starting(StartMiddlewares { + hnd: handler as *const _ as *mut _, + fut: Some(fut), + info: Box::new(info)}), Ok(Async::Ready(resp)) => { if let Some(resp) = resp { - return Ok(StartResult::Ready( - Box::new(Handle::new( - self.idx, self.req.clone(), - self.prepare(Task::from_response(resp)), - self.middlewares)))) + return RunMiddlewares::init(Box::new(info), resp); } - self.idx += 1; + info.count += 1; } - Err(err) => return Err(err) + Err(err) => + return ProcessResponse::init(Box::new(info), err.into()), }, - Started::Err(err) => return Err(err), + Started::Err(err) => + return ProcessResponse::init(Box::new(info), err.into()), } } } } - fn poll(&mut self) -> Poll, Error> { - let len = self.middlewares.len(); + fn poll(mut self) -> Result { + let len = self.info.mws.len(); 'outer: loop { match self.fut.as_mut().unwrap().poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::NotReady) => + return Err(PipelineState::Starting(self)), Ok(Async::Ready(resp)) => { - self.idx += 1; + self.info.count += 1; if let Some(resp) = resp { - return Ok(Async::Ready(Box::new(Handle::new( - self.idx-1, self.req.clone(), - self.prepare(Task::from_response(resp)), - Rc::clone(&self.middlewares))))) + return Ok(RunMiddlewares::init(self.info, resp)); } - if self.idx == len { - let task = Task::new((unsafe{&*self.hnd})(self.req.clone())); - return Ok(Async::Ready(Box::new(Handle::new( - self.idx-1, self.req.clone(), - self.prepare(task), Rc::clone(&self.middlewares))))) + if self.info.count == len { + let reply = (unsafe{&*self.hnd})(self.info.req.clone()); + return Ok(WaitingResponse::init(self.info, reply)); } else { loop { - match self.middlewares[self.idx].start(&mut self.req) { + match self.info.mws[self.info.count].start(self.info.req_mut()) { Started::Done => - self.idx += 1, + self.info.count += 1, Started::Response(resp) => { - self.idx += 1; - return Ok(Async::Ready(Box::new(Handle::new( - self.idx-1, self.req.clone(), - self.prepare(Task::from_response(resp)), - Rc::clone(&self.middlewares))))) + return Ok(RunMiddlewares::init(self.info, resp)); }, Started::Future(fut) => { self.fut = Some(fut); continue 'outer }, - Started::Err(err) => return Err(err), + Started::Err(err) => + return Ok(ProcessResponse::init(self.info, err.into())) } } } } - Err(err) => return Err(err) + Err(err) => + return Ok(ProcessResponse::init(self.info, err.into())) } } } } -struct Handle { - idx: usize, - req: HttpRequest, - task: Task, - middlewares: Rc>>, +// waiting for response +struct WaitingResponse { + info: Box, + stream: PipelineResponse, } -impl Handle { - fn new(idx: usize, req: HttpRequest, task: Task, mw: Rc>>) -> Handle { - Handle { idx: idx, req: req, task:task, middlewares: mw } +impl WaitingResponse { + + fn init(info: Box, reply: Reply) -> PipelineState + { + let stream = match reply.into() { + ReplyItem::Message(resp) => + return RunMiddlewares::init(info, resp), + ReplyItem::Actor(ctx) => + PipelineResponse::Context(ctx), + ReplyItem::Future(fut) => + PipelineResponse::Response(fut), + }; + + PipelineState::Handler( + WaitingResponse { info: info, stream: stream }) } - fn poll_io(&mut self, io: &mut T) -> Poll { - self.task.poll_io(io, &mut self.req) - } + fn poll(mut self) -> Result { + let stream = mem::replace(&mut self.stream, PipelineResponse::None); - fn poll(&mut self) -> Poll<(), Error> { - self.task.poll() - } - - fn finish(mut self) -> Box { - Box::new(Finish { - idx: self.idx, - req: self.req, - fut: None, - resp: self.task.response(), - middlewares: self.middlewares - }) - } -} - -/// Middlewares start executor -struct Finish { - idx: usize, - req: HttpRequest, - resp: HttpResponse, - fut: Option>>, - middlewares: Rc>>, -} - -impl Finish { - - pub fn poll(&mut self) -> Async<()> { - loop { - // poll latest fut - if let Some(ref mut fut) = self.fut { + match stream { + PipelineResponse::Context(mut context) => { + loop { + match context.poll() { + Ok(Async::Ready(Some(frame))) => { + match frame { + Frame::Message(resp) => { + self.info.context = Some(context); + return Ok(RunMiddlewares::init(self.info, resp)) + } + Frame::Payload(_) | Frame::Drain(_) => (), + } + }, + Ok(Async::Ready(None)) => { + error!("Unexpected eof"); + let err: Error = UnexpectedTaskFrame.into(); + return Ok(ProcessResponse::init(self.info, err.into())) + }, + Ok(Async::NotReady) => { + self.stream = PipelineResponse::Context(context); + return Err(PipelineState::Handler(self)) + }, + Err(err) => + return Ok(ProcessResponse::init(self.info, err.into())) + } + } + }, + PipelineResponse::Response(mut fut) => { match fut.poll() { - Ok(Async::NotReady) => return Async::NotReady, - Ok(Async::Ready(())) => self.idx -= 1, - Err(err) => { - error!("Middleware finish error: {}", err); - self.idx -= 1; + Ok(Async::NotReady) => { + self.stream = PipelineResponse::Response(fut); + Err(PipelineState::Handler(self)) } + Ok(Async::Ready(response)) => + Ok(RunMiddlewares::init(self.info, response)), + Err(err) => + Ok(ProcessResponse::init(self.info, err.into())), } } - self.fut = None; - - match self.middlewares[self.idx].finish(&mut self.req, &self.resp) { - Finished::Done => { - if self.idx == 0 { - return Async::Ready(()) - } else { - self.idx -= 1 - } - } - Finished::Future(fut) => { - self.fut = Some(fut); - }, + PipelineResponse::None => { + unreachable!("Broken internal state") } } + } } /// Middlewares response executor -pub(crate) struct MiddlewaresResponse { - idx: usize, +pub(crate) struct RunMiddlewares { + info: Box, + curr: usize, fut: Option>>, - middlewares: Rc>>, } -impl MiddlewaresResponse { +impl RunMiddlewares { - fn new(idx: usize, mw: Rc>>) -> MiddlewaresResponse { - MiddlewaresResponse { - idx: idx, - fut: None, - middlewares: mw } - } - - pub fn response(&mut self, req: &mut HttpRequest, mut resp: HttpResponse) - -> Result, Error> + fn init(mut info: Box, mut resp: HttpResponse) -> PipelineState { + if info.count == 0 { + return ProcessResponse::init(info, resp); + } + let mut curr = 0; + let len = info.mws.len(); + loop { - resp = match self.middlewares[self.idx].response(req, resp) { - Response::Err(err) => - return Err(err), + resp = match info.mws[curr].response(info.req_mut(), resp) { + Response::Err(err) => { + info.count = curr + 1; + return ProcessResponse::init(info, err.into()) + } Response::Done(r) => { - if self.idx == 0 { - return Ok(Some(r)) + curr += 1; + if curr == len { + return ProcessResponse::init(info, r) } else { - self.idx -= 1; r } }, Response::Future(fut) => { - self.fut = Some(fut); - return Ok(None) + return PipelineState::RunMiddlewares( + RunMiddlewares { info: info, curr: curr, fut: Some(fut) }) }, }; } } - pub fn poll(&mut self, req: &mut HttpRequest) -> Poll { + fn poll(mut self) -> Result { + let len = self.info.mws.len(); + loop { // poll latest fut let mut resp = match self.fut.as_mut().unwrap().poll() { Ok(Async::NotReady) => - return Ok(Async::NotReady), + return Ok(PipelineState::RunMiddlewares(self)), Ok(Async::Ready(resp)) => { - self.idx -= 1; + self.curr += 1; resp } - Err(err) => return Err(err) + Err(err) => + return Ok(ProcessResponse::init(self.info, err.into())), }; loop { - if self.idx == 0 { - return Ok(Async::Ready(resp)) + if self.curr == len { + return Ok(ProcessResponse::init(self.info, resp)); } else { - match self.middlewares[self.idx].response(req, resp) { + match self.info.mws[self.curr].response(self.info.req_mut(), resp) { Response::Err(err) => - return Err(err), + return Ok(ProcessResponse::init(self.info, err.into())), Response::Done(r) => { - self.idx -= 1; + self.curr += 1; resp = r }, Response::Future(fut) => { @@ -423,3 +569,315 @@ impl MiddlewaresResponse { } } } + +struct ProcessResponse { + resp: HttpResponse, + iostate: IOState, + running: RunningState, + drain: DrainVec, + info: Box, +} + +#[derive(PartialEq)] +enum RunningState { + Running, + Paused, + Done, +} + +impl RunningState { + #[inline] + fn pause(&mut self) { + if *self != RunningState::Done { + *self = RunningState::Paused + } + } + #[inline] + fn resume(&mut self) { + if *self != RunningState::Done { + *self = RunningState::Running + } + } +} + +enum IOState { + Response, + Payload(BodyStream), + Context, + Done, +} + +impl IOState { + fn is_done(&self) -> bool { + match *self { + IOState::Done => true, + _ => false + } + } +} + +struct DrainVec(Vec>>); +impl Drop for DrainVec { + fn drop(&mut self) { + for drain in &mut self.0 { + drain.borrow_mut().set() + } + } +} + +impl ProcessResponse { + + fn init(info: Box, resp: HttpResponse) -> PipelineState + { + PipelineState::Response( + ProcessResponse{ resp: resp, + iostate: IOState::Response, + running: RunningState::Running, + drain: DrainVec(Vec::new()), + info: info}) + } + + fn poll_io(mut self, io: &mut T) -> Result { + if self.drain.0.is_empty() && self.running != RunningState::Paused { + // if task is paused, write buffer is probably full + + loop { + let result = match mem::replace(&mut self.iostate, IOState::Done) { + IOState::Response => { + let result = match io.start(self.info.req_mut(), &mut self.resp) { + Ok(res) => res, + Err(err) => { + self.info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(self.info, self.resp)) + } + }; + + match self.resp.replace_body(Body::Empty) { + Body::Streaming(stream) | Body::Upgrade(stream) => + self.iostate = IOState::Payload(stream), + Body::StreamingContext | Body::UpgradeContext => + self.iostate = IOState::Context, + _ => (), + } + + result + }, + IOState::Payload(mut body) => { + // always poll context + if self.running == RunningState::Running { + match self.info.poll_context() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => + self.running = RunningState::Done, + Err(err) => { + self.info.error = Some(err); + return Ok(FinishingMiddlewares::init(self.info, self.resp)) + } + } + } + + match body.poll() { + Ok(Async::Ready(None)) => { + self.iostate = IOState::Done; + if let Err(err) = io.write_eof() { + self.info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(self.info, self.resp)) + } + break + }, + Ok(Async::Ready(Some(chunk))) => { + self.iostate = IOState::Payload(body); + match io.write(chunk.as_ref()) { + Err(err) => { + self.info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init( + self.info, self.resp)) + }, + Ok(result) => result + } + } + Ok(Async::NotReady) => { + self.iostate = IOState::Payload(body); + break + }, + Err(err) => { + self.info.error = Some(err); + return Ok(FinishingMiddlewares::init(self.info, self.resp)) + } + } + }, + IOState::Context => { + match self.info.context.as_mut().unwrap().poll() { + Ok(Async::Ready(Some(frame))) => { + match frame { + Frame::Message(msg) => { + error!("Unexpected message frame {:?}", msg); + self.info.error = Some(UnexpectedTaskFrame.into()); + return Ok( + FinishingMiddlewares::init(self.info, self.resp)) + }, + Frame::Payload(None) => { + self.iostate = IOState::Done; + if let Err(err) = io.write_eof() { + self.info.error = Some(err.into()); + return Ok( + FinishingMiddlewares::init(self.info, self.resp)) + } + break + }, + Frame::Payload(Some(chunk)) => { + self.iostate = IOState::Context; + match io.write(chunk.as_ref()) { + Err(err) => { + self.info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init( + self.info, self.resp)) + }, + Ok(result) => result + } + }, + Frame::Drain(fut) => { + self.drain.0.push(fut); + break + } + } + }, + Ok(Async::Ready(None)) => { + self.iostate = IOState::Done; + self.info.context.take(); + break + } + Ok(Async::NotReady) => { + self.iostate = IOState::Context; + break + } + Err(err) => { + self.info.error = Some(err); + return Ok(FinishingMiddlewares::init(self.info, self.resp)) + } + } + } + IOState::Done => break, + }; + + match result { + WriterState::Pause => { + self.running.pause(); + break + } + WriterState::Done => { + self.running.resume() + }, + } + } + } + + // flush io + match io.poll_complete() { + Ok(Async::Ready(_)) => + self.running.resume(), + Ok(Async::NotReady) => + return Err(PipelineState::Response(self)), + Err(err) => { + debug!("Error sending data: {}", err); + self.info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(self.info, self.resp)) + } + } + + // drain futures + if !self.drain.0.is_empty() { + for fut in &mut self.drain.0 { + fut.borrow_mut().set() + } + self.drain.0.clear(); + } + + // response is completed + if self.iostate.is_done() { + self.resp.set_response_size(io.written()); + Ok(FinishingMiddlewares::init(self.info, self.resp)) + } else { + Err(PipelineState::Response(self)) + } + } +} + +/// Middlewares start executor +struct FinishingMiddlewares { + info: Box, + resp: HttpResponse, + fut: Option>>, +} + +impl FinishingMiddlewares { + + fn init(info: Box, resp: HttpResponse) -> PipelineState { + if info.count == 0 { + Completed::init(info) + } else { + match (FinishingMiddlewares{info: info, resp: resp, fut: None}).poll() { + Ok(st) | Err(st) => st, + } + } + } + + fn poll(mut self) -> Result { + loop { + // poll latest fut + let not_ready = if let Some(ref mut fut) = self.fut { + match fut.poll() { + Ok(Async::NotReady) => { + true + }, + Ok(Async::Ready(())) => { + false + }, + Err(err) => { + error!("Middleware finish error: {}", err); + false + } + } + } else { + false + }; + if not_ready { + return Ok(PipelineState::Finishing(self)) + } + self.fut = None; + self.info.count -= 1; + + match self.info.mws[self.info.count].finish(self.info.req_mut(), &self.resp) { + Finished::Done => { + if self.info.count == 0 { + return Ok(Completed::init(self.info)) + } + } + Finished::Future(fut) => { + self.fut = Some(fut); + }, + } + } + } +} + +struct Completed(Box); + +impl Completed { + + fn init(info: Box) -> PipelineState { + if info.context.is_none() { + PipelineState::None + } else { + PipelineState::Completed(Completed(info)) + } + } + + fn poll(mut self) -> Result { + match self.0.poll_context() { + Ok(Async::NotReady) => Ok(PipelineState::Completed(self)), + Ok(Async::Ready(())) => Ok(PipelineState::None), + Err(_) => Ok(PipelineState::Error), + } + } +} diff --git a/src/route.rs b/src/route.rs index 8e6249838..17c952db0 100644 --- a/src/route.rs +++ b/src/route.rs @@ -5,8 +5,7 @@ use actix::Actor; use futures::Future; use error::Error; -use task::IoContext; -use context::HttpContext; +use context::{HttpContext, IoContext}; use httprequest::HttpRequest; use httpresponse::HttpResponse; diff --git a/src/task.rs b/src/task.rs deleted file mode 100644 index 457519c4c..000000000 --- a/src/task.rs +++ /dev/null @@ -1,483 +0,0 @@ -use std::{fmt, mem}; -use std::rc::Rc; -use std::cell::RefCell; - -use futures::{Async, Future, Poll}; -use futures::task::{Task as FutureTask, current as current_task}; - -use route::{Reply, ReplyItem}; -use body::{Body, BodyStream, Binary}; -use context::Frame; -use h1writer::{Writer, WriterState}; -use error::{Error, UnexpectedTaskFrame}; -use pipeline::MiddlewaresResponse; -use httprequest::HttpRequest; -use httpresponse::HttpResponse; - -#[derive(PartialEq, Debug)] -enum TaskRunningState { - Paused, - Running, - Done, -} - -impl TaskRunningState { - fn is_done(&self) -> bool { - *self == TaskRunningState::Done - } - fn pause(&mut self) { - if *self != TaskRunningState::Done { - *self = TaskRunningState::Paused - } - } - fn resume(&mut self) { - if *self != TaskRunningState::Done { - *self = TaskRunningState::Running - } - } -} - -enum ResponseState { - Reading, - Ready(HttpResponse), - Middlewares(MiddlewaresResponse), - Prepared(Option), -} - -enum IOState { - Response, - Payload(BodyStream), - Context, - Done, -} - -enum TaskStream { - None, - Context(Box), - Response(Box>), -} - -impl IOState { - fn is_done(&self) -> bool { - match *self { - IOState::Done => true, - _ => false - } - } -} - -impl ResponseState { - fn is_reading(&self) -> bool { - match *self { - ResponseState::Reading => true, - _ => false - } - } -} - -impl fmt::Debug for ResponseState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - ResponseState::Reading => write!(f, "ResponseState::Reading"), - ResponseState::Ready(_) => write!(f, "ResponseState::Ready"), - ResponseState::Middlewares(_) => write!(f, "ResponseState::Middlewares"), - ResponseState::Prepared(_) => write!(f, "ResponseState::Prepared"), - } - } -} - -impl fmt::Debug for IOState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - IOState::Response => write!(f, "IOState::Response"), - IOState::Payload(_) => write!(f, "IOState::Payload"), - IOState::Context => write!(f, "IOState::Context"), - IOState::Done => write!(f, "IOState::Done"), - } - } -} - -pub(crate) trait IoContext: 'static { - fn disconnected(&mut self); - fn poll(&mut self) -> Poll, Error>; -} - -/// Future that resolves when all buffered data get sent -#[doc(hidden)] -#[derive(Debug)] -pub struct DrainFut { - drained: bool, - task: Option, -} - -impl Default for DrainFut { - - fn default() -> DrainFut { - DrainFut { - drained: false, - task: None, - } - } -} - -impl DrainFut { - - fn set(&mut self) { - self.drained = true; - if let Some(task) = self.task.take() { - task.notify() - } - } -} - -impl Future for DrainFut { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - if self.drained { - Ok(Async::Ready(())) - } else { - self.task = Some(current_task()); - Ok(Async::NotReady) - } - } -} - -pub(crate) struct Task { - running: TaskRunningState, - response: ResponseState, - iostate: IOState, - stream: TaskStream, - drain: Vec>>, - middlewares: Option, -} - -impl Task { - - pub(crate) fn new(reply: Reply) -> Task { - match reply.into() { - ReplyItem::Message(msg) => { - Task::from_response(msg) - }, - ReplyItem::Actor(ctx) => { - Task { running: TaskRunningState::Running, - response: ResponseState::Reading, - iostate: IOState::Response, - drain: Vec::new(), - stream: TaskStream::Context(ctx), - middlewares: None } - } - ReplyItem::Future(fut) => { - Task { running: TaskRunningState::Running, - response: ResponseState::Reading, - iostate: IOState::Response, - drain: Vec::new(), - stream: TaskStream::Response(fut), - middlewares: None } - } - } - } - - pub(crate) fn from_response>(response: R) -> Task { - Task { running: TaskRunningState::Running, - response: ResponseState::Ready(response.into()), - iostate: IOState::Response, - drain: Vec::new(), - stream: TaskStream::None, - middlewares: None } - } - - pub(crate) fn from_error>(err: E) -> Task { - Task::from_response(err.into()) - } - - pub(crate) fn response(&mut self) -> HttpResponse { - match self.response { - ResponseState::Prepared(ref mut state) => state.take().unwrap(), - _ => panic!("Internal state is broken"), - } - } - - pub(crate) fn set_middlewares(&mut self, middlewares: MiddlewaresResponse) { - self.middlewares = Some(middlewares) - } - - pub(crate) fn disconnected(&mut self) { - if let TaskStream::Context(ref mut ctx) = self.stream { - ctx.disconnected(); - } - } - - pub(crate) fn poll_io(&mut self, io: &mut T, req: &mut HttpRequest) -> Poll - where T: Writer - { - trace!("POLL-IO frames resp: {:?}, io: {:?}, running: {:?}", - self.response, self.iostate, self.running); - - if self.iostate.is_done() { // response is completed - return Ok(Async::Ready(self.running.is_done())); - } else if self.drain.is_empty() && self.running != TaskRunningState::Paused { - // if task is paused, write buffer is probably full - - loop { - let result = match mem::replace(&mut self.iostate, IOState::Done) { - IOState::Response => { - match self.poll_response(req) { - Ok(Async::Ready(mut resp)) => { - let result = io.start(req, &mut resp)?; - - match resp.replace_body(Body::Empty) { - Body::Streaming(stream) | Body::Upgrade(stream) => - self.iostate = IOState::Payload(stream), - Body::StreamingContext | Body::UpgradeContext => - self.iostate = IOState::Context, - _ => (), - } - self.response = ResponseState::Prepared(Some(resp)); - result - }, - Ok(Async::NotReady) => { - self.iostate = IOState::Response; - return Ok(Async::NotReady) - } - Err(err) => { - let mut resp = err.into(); - let result = io.start(req, &mut resp)?; - - match resp.replace_body(Body::Empty) { - Body::Streaming(stream) | Body::Upgrade(stream) => - self.iostate = IOState::Payload(stream), - _ => (), - } - self.response = ResponseState::Prepared(Some(resp)); - result - } - } - }, - IOState::Payload(mut body) => { - // always poll stream - if self.running == TaskRunningState::Running { - match self.poll()? { - Async::Ready(_) => - self.running = TaskRunningState::Done, - Async::NotReady => (), - } - } - - match body.poll() { - Ok(Async::Ready(None)) => { - self.iostate = IOState::Done; - io.write_eof()?; - break - }, - Ok(Async::Ready(Some(chunk))) => { - self.iostate = IOState::Payload(body); - io.write(chunk.as_ref())? - } - Ok(Async::NotReady) => { - self.iostate = IOState::Payload(body); - break - }, - Err(err) => return Err(err), - } - } - IOState::Context => { - match self.poll_context() { - Ok(Async::Ready(None)) => { - self.iostate = IOState::Done; - self.running = TaskRunningState::Done; - io.write_eof()?; - break - }, - Ok(Async::Ready(Some(chunk))) => { - self.iostate = IOState::Context; - io.write(chunk.as_ref())? - } - Ok(Async::NotReady) => { - self.iostate = IOState::Context; - break - } - Err(err) => return Err(err), - } - } - IOState::Done => break, - }; - - match result { - WriterState::Pause => { - self.running.pause(); - break - } - WriterState::Done => - self.running.resume(), - } - } - } - - // flush io - match io.poll_complete() { - Ok(Async::Ready(_)) => self.running.resume(), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => { - debug!("Error sending data: {}", err); - return Err(err.into()) - } - } - - // drain futures - if !self.drain.is_empty() { - for fut in &mut self.drain { - fut.borrow_mut().set() - } - self.drain.clear(); - } - - // response is completed - if self.iostate.is_done() { - if let ResponseState::Prepared(Some(ref mut resp)) = self.response { - resp.set_response_size(io.written()) - } - Ok(Async::Ready(self.running.is_done())) - } else { - Ok(Async::NotReady) - } - } - - pub(crate) fn poll_response(&mut self, req: &mut HttpRequest) -> Poll { - loop { - let state = mem::replace(&mut self.response, ResponseState::Prepared(None)); - match state { - ResponseState::Ready(response) => { - // run middlewares - if let Some(mut middlewares) = self.middlewares.take() { - match middlewares.response(req, response) { - Ok(Some(response)) => - return Ok(Async::Ready(response)), - Ok(None) => { - // middlewares need to run some futures - self.response = ResponseState::Middlewares(middlewares); - continue - } - Err(err) => return Err(err), - } - } else { - return Ok(Async::Ready(response)) - } - } - ResponseState::Middlewares(mut middlewares) => { - // process middlewares - match middlewares.poll(req) { - Ok(Async::NotReady) => { - self.response = ResponseState::Middlewares(middlewares); - return Ok(Async::NotReady) - }, - Ok(Async::Ready(response)) => - return Ok(Async::Ready(response)), - Err(err) => - return Err(err), - } - } - _ => (), - } - self.response = state; - - match mem::replace(&mut self.stream, TaskStream::None) { - TaskStream::None => - return Ok(Async::NotReady), - TaskStream::Context(mut context) => { - loop { - match context.poll() { - Ok(Async::Ready(Some(frame))) => { - match frame { - Frame::Message(msg) => { - if !self.response.is_reading() { - error!("Unexpected message frame {:?}", msg); - return Err(UnexpectedTaskFrame.into()) - } - self.stream = TaskStream::Context(context); - self.response = ResponseState::Ready(msg); - break - }, - Frame::Payload(_) | Frame::Drain(_) => (), - } - }, - Ok(Async::Ready(None)) => { - error!("Unexpected eof"); - return Err(UnexpectedTaskFrame.into()) - }, - Ok(Async::NotReady) => { - self.stream = TaskStream::Context(context); - return Ok(Async::NotReady) - }, - Err(err) => - return Err(err), - } - } - }, - TaskStream::Response(mut fut) => { - match fut.poll() { - Ok(Async::NotReady) => { - self.stream = TaskStream::Response(fut); - return Ok(Async::NotReady); - }, - Ok(Async::Ready(response)) => { - self.response = ResponseState::Ready(response); - } - Err(err) => - return Err(err) - } - } - } - } - } - - pub(crate) fn poll(&mut self) -> Poll<(), Error> { - match self.stream { - TaskStream::None | TaskStream::Response(_) => - Ok(Async::Ready(())), - TaskStream::Context(ref mut context) => { - loop { - match context.poll() { - Ok(Async::Ready(Some(_))) => (), - Ok(Async::Ready(None)) => - return Ok(Async::Ready(())), - Ok(Async::NotReady) => - return Ok(Async::NotReady), - Err(err) => - return Err(err), - } - } - }, - } - } - - fn poll_context(&mut self) -> Poll, Error> { - match self.stream { - TaskStream::None | TaskStream::Response(_) => - Err(UnexpectedTaskFrame.into()), - TaskStream::Context(ref mut context) => { - match context.poll() { - Ok(Async::Ready(Some(frame))) => { - match frame { - Frame::Message(msg) => { - error!("Unexpected message frame {:?}", msg); - Err(UnexpectedTaskFrame.into()) - }, - Frame::Payload(payload) => { - Ok(Async::Ready(payload)) - }, - Frame::Drain(fut) => { - self.drain.push(fut); - Ok(Async::NotReady) - } - } - }, - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), - } - }, - } - } -}