1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00
actix-extras/src/pipeline.rs

915 lines
31 KiB
Rust
Raw Normal View History

2017-12-02 00:45:15 +01:00
use std::{io, mem};
2017-11-25 07:15:52 +01:00
use std::rc::Rc;
2017-12-02 00:45:15 +01:00
use std::cell::RefCell;
2017-12-09 14:54:04 +01:00
use std::marker::PhantomData;
2017-11-25 07:15:52 +01:00
2017-12-02 00:45:15 +01:00
use futures::{Async, Poll, Future, Stream};
use futures::task::{Task as FutureTask, current as current_task};
2017-11-25 07:15:52 +01:00
2017-12-09 13:33:40 +01:00
use channel::HttpHandlerTask;
2017-12-02 00:45:15 +01:00
use body::{Body, BodyStream};
use context::{Frame, IoContext};
use error::{Error, UnexpectedTaskFrame};
2017-12-04 23:53:40 +01:00
use handler::{Reply, ReplyItem};
2017-12-02 00:45:15 +01:00
use h1writer::{Writer, WriterState};
2017-11-25 07:15:52 +01:00
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
2017-12-01 00:13:56 +01:00
use middlewares::{Middleware, Finished, Started, Response};
2017-11-25 07:15:52 +01:00
2017-12-09 13:33:40 +01:00
type Handler<S> = Fn(HttpRequest<S>) -> Reply;
pub(crate) type PipelineHandler<'a, S> = &'a Fn(HttpRequest<S>) -> Reply;
2017-11-25 07:15:52 +01:00
2017-12-09 14:54:04 +01:00
pub struct Pipeline<S>(PipelineInfo<S>, PipelineState<S>);
2017-11-25 07:15:52 +01:00
2017-12-09 13:33:40 +01:00
enum PipelineState<S> {
2017-11-25 07:15:52 +01:00
None,
2017-12-02 00:45:15 +01:00
Error,
2017-12-09 13:33:40 +01:00
Starting(StartMiddlewares<S>),
Handler(WaitingResponse<S>),
RunMiddlewares(RunMiddlewares<S>),
Response(ProcessResponse<S>),
Finishing(FinishingMiddlewares<S>),
Completed(Completed<S>),
2017-11-25 07:15:52 +01:00
}
2017-12-09 13:33:40 +01:00
struct PipelineInfo<S> {
req: HttpRequest<S>,
2017-12-02 00:45:15 +01:00
count: usize,
2017-12-09 13:33:40 +01:00
mws: Rc<Vec<Box<Middleware<S>>>>,
2017-12-02 00:45:15 +01:00
context: Option<Box<IoContext>>,
error: Option<Error>,
}
2017-12-09 13:33:40 +01:00
impl<S> PipelineInfo<S> {
fn new(req: HttpRequest<S>) -> PipelineInfo<S> {
2017-12-02 00:45:15 +01:00
PipelineInfo {
req: req,
count: 0,
mws: Rc::new(Vec::new()),
error: None,
context: None,
}
}
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))]
2017-12-09 13:33:40 +01:00
fn req_mut(&self) -> &mut HttpRequest<S> {
2017-12-02 00:45:15 +01:00
#[allow(mutable_transmutes)]
unsafe{mem::transmute(&self.req)}
}
fn poll_context(&mut self) -> Poll<(), Error> {
if let Some(ref mut context) = self.context {
match context.poll() {
Err(err) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
} else {
Ok(Async::Ready(()))
2017-11-25 07:15:52 +01:00
}
}
2017-12-02 00:45:15 +01:00
}
2017-11-25 07:15:52 +01:00
2017-12-02 00:45:15 +01:00
enum PipelineResponse {
None,
Context(Box<IoContext>),
Response(Box<Future<Item=HttpResponse, Error=Error>>),
}
/// Future that resolves when all buffered data get sent
#[doc(hidden)]
#[derive(Debug)]
pub struct DrainFut {
drained: bool,
task: Option<FutureTask>,
}
impl Default for DrainFut {
fn default() -> DrainFut {
DrainFut {
drained: false,
task: None,
}
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
}
2017-11-25 07:15:52 +01:00
2017-12-02 00:45:15 +01:00
impl DrainFut {
fn set(&mut self) {
self.drained = true;
if let Some(task) = self.task.take() {
task.notify()
2017-11-25 07:15:52 +01:00
}
}
2017-12-02 00:45:15 +01:00
}
impl Future for DrainFut {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.drained {
Ok(Async::Ready(()))
} else {
self.task = Some(current_task());
Ok(Async::NotReady)
}
}
}
2017-12-09 13:33:40 +01:00
impl<S> Pipeline<S> {
2017-12-02 00:45:15 +01:00
2017-12-09 13:33:40 +01:00
pub fn new(req: HttpRequest<S>,
2017-12-09 14:54:04 +01:00
mws: Rc<Vec<Box<Middleware<S>>>>,
2017-12-09 13:33:40 +01:00
handler: PipelineHandler<S>) -> Pipeline<S>
2017-12-02 00:45:15 +01:00
{
2017-12-09 14:54:04 +01:00
let mut info = PipelineInfo {
req: req,
count: 0,
mws: mws,
error: None,
context: None,
};
let state = StartMiddlewares::init(&mut info, handler);
Pipeline(info, state)
2017-12-02 00:45:15 +01:00
}
2017-12-09 13:33:40 +01:00
}
2017-12-02 00:45:15 +01:00
2017-12-09 13:33:40 +01:00
impl Pipeline<()> {
pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> {
2017-12-09 14:54:04 +01:00
Box::new(Pipeline(
PipelineInfo::new(HttpRequest::default()), ProcessResponse::init(err.into())))
}
}
impl<S> Pipeline<S> {
fn is_done(&self) -> bool {
match self.1 {
PipelineState::None | PipelineState::Error
| PipelineState::Starting(_) | PipelineState::Handler(_)
| PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true,
PipelineState::Finishing(_) => self.0.context.is_none(),
PipelineState::Completed(_) => false,
}
2017-12-02 00:45:15 +01:00
}
2017-12-09 13:33:40 +01:00
}
impl<S> HttpHandlerTask for Pipeline<S> {
2017-12-02 00:45:15 +01:00
2017-12-09 13:33:40 +01:00
fn disconnected(&mut self) {
2017-12-09 14:54:04 +01:00
if let Some(ref mut context) = self.0.context {
context.disconnected();
}
2017-12-02 00:45:15 +01:00
}
2017-11-25 07:15:52 +01:00
2017-12-09 13:33:40 +01:00
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
2017-11-25 07:15:52 +01:00
loop {
2017-12-09 14:54:04 +01:00
let state = mem::replace(&mut self.1, PipelineState::None);
2017-12-02 00:45:15 +01:00
match state {
PipelineState::None =>
return Ok(Async::Ready(true)),
PipelineState::Error =>
return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()),
PipelineState::Starting(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
}
}
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
PipelineState::Handler(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
}
}
}
PipelineState::RunMiddlewares(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-11-25 07:15:52 +01:00
return Ok(Async::NotReady)
}
}
}
2017-12-02 00:45:15 +01:00
PipelineState::Response(st) => {
2017-12-09 14:54:04 +01:00
match st.poll_io(io, &mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
if let Some(error) = self.0.error.take() {
2017-12-02 00:45:15 +01:00
return Err(error)
} else {
2017-12-09 14:54:04 +01:00
return Ok(Async::Ready(self.is_done()))
2017-12-02 00:45:15 +01:00
}
}
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
2017-11-25 07:15:52 +01:00
}
}
}
2017-12-02 00:45:15 +01:00
PipelineState::Finishing(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
}
}
}
PipelineState::Completed(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::Ready(true));
}
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
}
}
2017-11-25 07:15:52 +01:00
}
}
}
}
2017-12-09 13:33:40 +01:00
fn poll(&mut self) -> Poll<(), Error> {
2017-11-25 07:15:52 +01:00
loop {
2017-12-09 14:54:04 +01:00
let state = mem::replace(&mut self.1, PipelineState::None);
2017-12-02 00:45:15 +01:00
match state {
PipelineState::None | PipelineState::Error => {
return Ok(Async::Ready(()))
}
PipelineState::Starting(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-11-25 07:15:52 +01:00
return Ok(Async::NotReady)
}
2017-12-02 00:45:15 +01:00
}
}
PipelineState::Handler(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
2017-11-25 07:15:52 +01:00
}
}
}
2017-12-02 00:45:15 +01:00
PipelineState::RunMiddlewares(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
}
}
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
PipelineState::Response(_) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady);
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
PipelineState::Finishing(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) =>
2017-12-09 14:54:04 +01:00
self.1 = state,
2017-12-02 00:45:15 +01:00
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
}
}
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
PipelineState::Completed(st) => {
2017-12-09 14:54:04 +01:00
match st.poll(&mut self.0) {
2017-12-02 00:45:15 +01:00
Ok(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::Ready(()));
}
Err(state) => {
2017-12-09 14:54:04 +01:00
self.1 = state;
2017-12-02 00:45:15 +01:00
return Ok(Async::NotReady)
}
}
2017-11-25 07:15:52 +01:00
}
}
}
}
}
2017-11-27 06:47:33 +01:00
type Fut = Box<Future<Item=Option<HttpResponse>, Error=Error>>;
2017-11-25 07:15:52 +01:00
/// Middlewares start executor
2017-12-09 13:33:40 +01:00
struct StartMiddlewares<S> {
hnd: *mut Handler<S>,
2017-11-25 07:15:52 +01:00
fut: Option<Fut>,
}
2017-12-09 13:33:40 +01:00
impl<S> StartMiddlewares<S> {
2017-11-25 07:15:52 +01:00
2017-12-09 14:54:04 +01:00
fn init(info: &mut PipelineInfo<S>, handler: PipelineHandler<S>) -> PipelineState<S> {
2017-12-02 00:45:15 +01:00
// execute middlewares, we need this stage because middlewares could be non-async
// and we can move to next state immidietly
let len = info.mws.len();
2017-11-25 07:15:52 +01:00
loop {
2017-12-02 00:45:15 +01:00
if info.count == len {
let reply = (&*handler)(info.req.clone());
2017-12-09 13:33:40 +01:00
return WaitingResponse::init(info, reply)
2017-11-25 07:15:52 +01:00
} else {
2017-12-02 00:45:15 +01:00
match info.mws[info.count].start(&mut info.req) {
2017-11-27 06:47:33 +01:00
Started::Done =>
2017-12-02 00:45:15 +01:00
info.count += 1,
2017-11-27 06:47:33 +01:00
Started::Response(resp) =>
2017-12-09 13:33:40 +01:00
return RunMiddlewares::init(info, resp),
2017-11-27 06:47:33 +01:00
Started::Future(mut fut) =>
2017-11-25 07:15:52 +01:00
match fut.poll() {
2017-12-02 00:45:15 +01:00
Ok(Async::NotReady) =>
return PipelineState::Starting(StartMiddlewares {
hnd: handler as *const _ as *mut _,
2017-12-09 14:54:04 +01:00
fut: Some(fut)}),
2017-11-27 06:47:33 +01:00
Ok(Async::Ready(resp)) => {
2017-11-25 07:15:52 +01:00
if let Some(resp) = resp {
2017-12-09 13:33:40 +01:00
return RunMiddlewares::init(info, resp);
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
info.count += 1;
2017-11-25 07:15:52 +01:00
}
2017-12-02 00:45:15 +01:00
Err(err) =>
2017-12-09 14:54:04 +01:00
return ProcessResponse::init(err.into()),
2017-11-27 06:47:33 +01:00
},
2017-12-02 00:45:15 +01:00
Started::Err(err) =>
2017-12-09 14:54:04 +01:00
return ProcessResponse::init(err.into()),
2017-11-25 07:15:52 +01:00
}
}
}
}
2017-12-09 14:54:04 +01:00
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
let len = info.mws.len();
2017-11-25 07:15:52 +01:00
'outer: loop {
match self.fut.as_mut().unwrap().poll() {
2017-12-02 00:45:15 +01:00
Ok(Async::NotReady) =>
return Err(PipelineState::Starting(self)),
2017-11-27 06:47:33 +01:00
Ok(Async::Ready(resp)) => {
2017-12-09 14:54:04 +01:00
info.count += 1;
2017-11-25 07:15:52 +01:00
if let Some(resp) = resp {
2017-12-09 14:54:04 +01:00
return Ok(RunMiddlewares::init(info, resp));
2017-11-25 07:15:52 +01:00
}
2017-12-09 14:54:04 +01:00
if info.count == len {
let reply = (unsafe{&*self.hnd})(info.req.clone());
return Ok(WaitingResponse::init(info, reply));
2017-11-25 07:15:52 +01:00
} else {
loop {
2017-12-09 14:54:04 +01:00
match info.mws[info.count].start(info.req_mut()) {
2017-11-27 06:47:33 +01:00
Started::Done =>
2017-12-09 14:54:04 +01:00
info.count += 1,
2017-11-27 06:47:33 +01:00
Started::Response(resp) => {
2017-12-09 14:54:04 +01:00
return Ok(RunMiddlewares::init(info, resp));
2017-11-25 07:15:52 +01:00
},
2017-11-25 19:52:43 +01:00
Started::Future(fut) => {
2017-11-25 07:15:52 +01:00
self.fut = Some(fut);
continue 'outer
},
2017-12-02 00:45:15 +01:00
Started::Err(err) =>
2017-12-09 14:54:04 +01:00
return Ok(ProcessResponse::init(err.into()))
2017-11-25 07:15:52 +01:00
}
}
}
}
2017-12-02 00:45:15 +01:00
Err(err) =>
2017-12-09 14:54:04 +01:00
return Ok(ProcessResponse::init(err.into()))
2017-11-25 07:15:52 +01:00
}
}
}
}
2017-12-02 00:45:15 +01:00
// waiting for response
2017-12-09 13:33:40 +01:00
struct WaitingResponse<S> {
2017-12-02 00:45:15 +01:00
stream: PipelineResponse,
2017-12-09 14:54:04 +01:00
_s: PhantomData<S>,
2017-11-25 21:05:27 +01:00
}
2017-12-09 13:33:40 +01:00
impl<S> WaitingResponse<S> {
2017-11-25 21:05:27 +01:00
2017-12-09 14:54:04 +01:00
fn init(info: &mut PipelineInfo<S>, reply: Reply) -> PipelineState<S>
2017-12-02 00:45:15 +01:00
{
let stream = match reply.into() {
ReplyItem::Message(resp) =>
return RunMiddlewares::init(info, resp),
ReplyItem::Actor(ctx) =>
PipelineResponse::Context(ctx),
ReplyItem::Future(fut) =>
PipelineResponse::Response(fut),
};
PipelineState::Handler(
2017-12-09 14:54:04 +01:00
WaitingResponse { stream: stream, _s: PhantomData })
2017-11-25 21:05:27 +01:00
}
2017-12-09 14:54:04 +01:00
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
2017-12-02 00:45:15 +01:00
let stream = mem::replace(&mut self.stream, PipelineResponse::None);
match stream {
PipelineResponse::Context(mut context) => {
loop {
match context.poll() {
Ok(Async::Ready(Some(frame))) => {
match frame {
Frame::Message(resp) => {
2017-12-09 14:54:04 +01:00
info.context = Some(context);
return Ok(RunMiddlewares::init(info, resp))
2017-12-02 00:45:15 +01:00
}
Frame::Payload(_) | Frame::Drain(_) => (),
}
},
Ok(Async::Ready(None)) => {
error!("Unexpected eof");
let err: Error = UnexpectedTaskFrame.into();
2017-12-09 14:54:04 +01:00
return Ok(ProcessResponse::init(err.into()))
2017-12-02 00:45:15 +01:00
},
Ok(Async::NotReady) => {
self.stream = PipelineResponse::Context(context);
return Err(PipelineState::Handler(self))
},
Err(err) =>
2017-12-09 14:54:04 +01:00
return Ok(ProcessResponse::init(err.into()))
2017-11-25 21:05:27 +01:00
}
}
2017-12-02 00:45:15 +01:00
},
PipelineResponse::Response(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.stream = PipelineResponse::Response(fut);
Err(PipelineState::Handler(self))
2017-11-25 21:05:27 +01:00
}
2017-12-02 00:45:15 +01:00
Ok(Async::Ready(response)) =>
2017-12-09 14:54:04 +01:00
Ok(RunMiddlewares::init(info, response)),
2017-12-02 00:45:15 +01:00
Err(err) =>
2017-12-09 14:54:04 +01:00
Ok(ProcessResponse::init(err.into())),
2017-11-25 21:05:27 +01:00
}
2017-12-02 00:45:15 +01:00
}
PipelineResponse::None => {
unreachable!("Broken internal state")
2017-11-25 21:05:27 +01:00
}
}
2017-12-02 00:45:15 +01:00
2017-11-25 21:05:27 +01:00
}
}
2017-11-25 07:15:52 +01:00
/// Middlewares response executor
2017-12-09 13:33:40 +01:00
struct RunMiddlewares<S> {
2017-12-02 00:45:15 +01:00
curr: usize,
2017-11-25 18:28:25 +01:00
fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>,
2017-12-09 14:54:04 +01:00
_s: PhantomData<S>,
2017-11-25 07:15:52 +01:00
}
2017-12-09 13:33:40 +01:00
impl<S> RunMiddlewares<S> {
2017-11-25 07:15:52 +01:00
2017-12-09 14:54:04 +01:00
fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S>
2017-11-25 07:15:52 +01:00
{
2017-12-02 00:45:15 +01:00
if info.count == 0 {
2017-12-09 14:54:04 +01:00
return ProcessResponse::init(resp);
2017-12-02 00:45:15 +01:00
}
let mut curr = 0;
let len = info.mws.len();
2017-11-25 07:15:52 +01:00
loop {
2017-12-02 00:45:15 +01:00
resp = match info.mws[curr].response(info.req_mut(), resp) {
Response::Err(err) => {
info.count = curr + 1;
2017-12-09 14:54:04 +01:00
return ProcessResponse::init(err.into())
2017-12-02 00:45:15 +01:00
}
2017-11-25 19:24:45 +01:00
Response::Done(r) => {
2017-12-02 00:45:15 +01:00
curr += 1;
if curr == len {
2017-12-09 14:54:04 +01:00
return ProcessResponse::init(r)
2017-11-25 07:15:52 +01:00
} else {
r
}
},
Response::Future(fut) => {
2017-12-02 00:45:15 +01:00
return PipelineState::RunMiddlewares(
2017-12-09 14:54:04 +01:00
RunMiddlewares { curr: curr, fut: Some(fut), _s: PhantomData })
2017-11-25 07:15:52 +01:00
},
};
}
}
2017-12-09 14:54:04 +01:00
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
let len = info.mws.len();
2017-12-02 00:45:15 +01:00
2017-11-25 07:15:52 +01:00
loop {
// poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() {
2017-11-25 18:28:25 +01:00
Ok(Async::NotReady) =>
2017-12-02 00:45:15 +01:00
return Ok(PipelineState::RunMiddlewares(self)),
2017-11-25 18:28:25 +01:00
Ok(Async::Ready(resp)) => {
2017-12-02 00:45:15 +01:00
self.curr += 1;
2017-11-25 07:15:52 +01:00
resp
}
2017-12-02 00:45:15 +01:00
Err(err) =>
2017-12-09 14:54:04 +01:00
return Ok(ProcessResponse::init(err.into())),
2017-11-25 07:15:52 +01:00
};
loop {
2017-12-02 00:45:15 +01:00
if self.curr == len {
2017-12-09 14:54:04 +01:00
return Ok(ProcessResponse::init(resp));
2017-11-25 07:15:52 +01:00
} else {
2017-12-09 14:54:04 +01:00
match info.mws[self.curr].response(info.req_mut(), resp) {
2017-11-25 19:24:45 +01:00
Response::Err(err) =>
2017-12-09 14:54:04 +01:00
return Ok(ProcessResponse::init(err.into())),
2017-11-25 19:24:45 +01:00
Response::Done(r) => {
2017-12-02 00:45:15 +01:00
self.curr += 1;
2017-11-25 07:15:52 +01:00
resp = r
},
Response::Future(fut) => {
self.fut = Some(fut);
break
},
}
}
}
}
}
}
2017-12-02 00:45:15 +01:00
2017-12-09 13:33:40 +01:00
struct ProcessResponse<S> {
2017-12-02 00:45:15 +01:00
resp: HttpResponse,
iostate: IOState,
running: RunningState,
drain: DrainVec,
2017-12-09 14:54:04 +01:00
_s: PhantomData<S>,
2017-12-02 00:45:15 +01:00
}
#[derive(PartialEq)]
enum RunningState {
Running,
Paused,
Done,
}
impl RunningState {
#[inline]
fn pause(&mut self) {
if *self != RunningState::Done {
*self = RunningState::Paused
}
}
#[inline]
fn resume(&mut self) {
if *self != RunningState::Done {
*self = RunningState::Running
}
}
}
enum IOState {
Response,
Payload(BodyStream),
Context,
Done,
}
impl IOState {
fn is_done(&self) -> bool {
match *self {
IOState::Done => true,
_ => false
}
}
}
struct DrainVec(Vec<Rc<RefCell<DrainFut>>>);
impl Drop for DrainVec {
fn drop(&mut self) {
for drain in &mut self.0 {
drain.borrow_mut().set()
}
}
}
2017-12-09 13:33:40 +01:00
impl<S> ProcessResponse<S> {
2017-12-02 00:45:15 +01:00
2017-12-09 14:54:04 +01:00
fn init(resp: HttpResponse) -> PipelineState<S>
2017-12-02 00:45:15 +01:00
{
PipelineState::Response(
ProcessResponse{ resp: resp,
iostate: IOState::Response,
running: RunningState::Running,
drain: DrainVec(Vec::new()),
2017-12-09 14:54:04 +01:00
_s: PhantomData})
2017-12-02 00:45:15 +01:00
}
2017-12-09 14:54:04 +01:00
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
-> Result<PipelineState<S>, PipelineState<S>>
{
2017-12-02 00:45:15 +01:00
if self.drain.0.is_empty() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full
loop {
let result = match mem::replace(&mut self.iostate, IOState::Done) {
IOState::Response => {
2017-12-09 14:54:04 +01:00
let result = match io.start(info.req_mut().get_inner(),
2017-12-09 13:33:40 +01:00
&mut self.resp) {
2017-12-02 00:45:15 +01:00
Ok(res) => res,
Err(err) => {
2017-12-09 14:54:04 +01:00
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
}
};
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) | Body::Upgrade(stream) =>
self.iostate = IOState::Payload(stream),
Body::StreamingContext | Body::UpgradeContext =>
self.iostate = IOState::Context,
_ => (),
}
result
},
IOState::Payload(mut body) => {
// always poll context
if self.running == RunningState::Running {
2017-12-09 14:54:04 +01:00
match info.poll_context() {
2017-12-02 00:45:15 +01:00
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) =>
self.running = RunningState::Done,
Err(err) => {
2017-12-09 14:54:04 +01:00
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
}
}
}
match body.poll() {
Ok(Async::Ready(None)) => {
self.iostate = IOState::Done;
if let Err(err) = io.write_eof() {
2017-12-09 14:54:04 +01:00
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
}
break
},
Ok(Async::Ready(Some(chunk))) => {
self.iostate = IOState::Payload(body);
match io.write(chunk.as_ref()) {
Err(err) => {
2017-12-09 14:54:04 +01:00
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
},
Ok(result) => result
}
}
Ok(Async::NotReady) => {
self.iostate = IOState::Payload(body);
break
},
Err(err) => {
2017-12-09 14:54:04 +01:00
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
}
}
},
IOState::Context => {
2017-12-09 14:54:04 +01:00
match info.context.as_mut().unwrap().poll() {
2017-12-02 00:45:15 +01:00
Ok(Async::Ready(Some(frame))) => {
match frame {
Frame::Message(msg) => {
error!("Unexpected message frame {:?}", msg);
2017-12-09 14:54:04 +01:00
info.error = Some(UnexpectedTaskFrame.into());
2017-12-02 00:45:15 +01:00
return Ok(
2017-12-09 14:54:04 +01:00
FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
},
Frame::Payload(None) => {
self.iostate = IOState::Done;
if let Err(err) = io.write_eof() {
2017-12-09 14:54:04 +01:00
info.error = Some(err.into());
2017-12-02 00:45:15 +01:00
return Ok(
2017-12-09 14:54:04 +01:00
FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
}
break
},
Frame::Payload(Some(chunk)) => {
self.iostate = IOState::Context;
match io.write(chunk.as_ref()) {
Err(err) => {
2017-12-09 14:54:04 +01:00
info.error = Some(err.into());
2017-12-02 00:45:15 +01:00
return Ok(FinishingMiddlewares::init(
2017-12-09 14:54:04 +01:00
info, self.resp))
2017-12-02 00:45:15 +01:00
},
Ok(result) => result
}
},
Frame::Drain(fut) => {
self.drain.0.push(fut);
break
}
}
},
Ok(Async::Ready(None)) => {
self.iostate = IOState::Done;
2017-12-09 14:54:04 +01:00
info.context.take();
2017-12-02 00:45:15 +01:00
break
}
Ok(Async::NotReady) => {
self.iostate = IOState::Context;
break
}
Err(err) => {
2017-12-09 14:54:04 +01:00
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
}
}
}
IOState::Done => break,
};
match result {
WriterState::Pause => {
self.running.pause();
break
}
WriterState::Done => {
self.running.resume()
},
}
}
}
// flush io
match io.poll_complete() {
Ok(Async::Ready(_)) =>
self.running.resume(),
Ok(Async::NotReady) =>
return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
2017-12-09 14:54:04 +01:00
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
}
}
// drain futures
if !self.drain.0.is_empty() {
for fut in &mut self.drain.0 {
fut.borrow_mut().set()
}
self.drain.0.clear();
}
// response is completed
if self.iostate.is_done() {
self.resp.set_response_size(io.written());
2017-12-09 14:54:04 +01:00
Ok(FinishingMiddlewares::init(info, self.resp))
2017-12-02 00:45:15 +01:00
} else {
Err(PipelineState::Response(self))
}
}
}
/// Middlewares start executor
2017-12-09 13:33:40 +01:00
struct FinishingMiddlewares<S> {
2017-12-02 00:45:15 +01:00
resp: HttpResponse,
fut: Option<Box<Future<Item=(), Error=Error>>>,
2017-12-09 14:54:04 +01:00
_s: PhantomData<S>,
2017-12-02 00:45:15 +01:00
}
2017-12-09 13:33:40 +01:00
impl<S> FinishingMiddlewares<S> {
2017-12-02 00:45:15 +01:00
2017-12-09 14:54:04 +01:00
fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S> {
2017-12-02 00:45:15 +01:00
if info.count == 0 {
Completed::init(info)
} else {
2017-12-09 14:54:04 +01:00
match (FinishingMiddlewares{resp: resp, fut: None, _s: PhantomData}).poll(info) {
2017-12-02 00:45:15 +01:00
Ok(st) | Err(st) => st,
}
}
}
2017-12-09 14:54:04 +01:00
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
2017-12-02 00:45:15 +01:00
loop {
// poll latest fut
let not_ready = if let Some(ref mut fut) = self.fut {
match fut.poll() {
Ok(Async::NotReady) => {
true
},
Ok(Async::Ready(())) => {
false
},
Err(err) => {
error!("Middleware finish error: {}", err);
false
}
}
} else {
false
};
if not_ready {
return Ok(PipelineState::Finishing(self))
}
self.fut = None;
2017-12-09 14:54:04 +01:00
info.count -= 1;
2017-12-02 00:45:15 +01:00
2017-12-09 14:54:04 +01:00
match info.mws[info.count].finish(info.req_mut(), &self.resp) {
2017-12-02 00:45:15 +01:00
Finished::Done => {
2017-12-09 14:54:04 +01:00
if info.count == 0 {
return Ok(Completed::init(info))
2017-12-02 00:45:15 +01:00
}
}
Finished::Future(fut) => {
self.fut = Some(fut);
},
}
}
}
}
2017-12-09 14:54:04 +01:00
struct Completed<S>(PhantomData<S>);
2017-12-02 00:45:15 +01:00
2017-12-09 13:33:40 +01:00
impl<S> Completed<S> {
2017-12-02 00:45:15 +01:00
2017-12-09 14:54:04 +01:00
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S> {
2017-12-02 00:45:15 +01:00
if info.context.is_none() {
PipelineState::None
} else {
2017-12-09 14:54:04 +01:00
PipelineState::Completed(Completed(PhantomData))
2017-12-02 00:45:15 +01:00
}
}
2017-12-09 14:54:04 +01:00
fn poll(self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> {
match info.poll_context() {
Ok(Async::NotReady) => Ok(PipelineState::Completed(Completed(PhantomData))),
2017-12-02 00:45:15 +01:00
Ok(Async::Ready(())) => Ok(PipelineState::None),
Err(_) => Ok(PipelineState::Error),
}
}
}
2017-12-02 01:10:01 +01:00
#[cfg(test)]
mod tests {
use super::*;
use actix::*;
use context::HttpContext;
2017-12-02 04:57:34 +01:00
use tokio_core::reactor::Core;
use futures::future::{lazy, result};
2017-12-02 01:10:01 +01:00
2017-12-09 13:33:40 +01:00
impl<S> PipelineState<S> {
2017-12-02 01:10:01 +01:00
fn is_none(&self) -> Option<bool> {
if let PipelineState::None = *self { Some(true) } else { None }
}
2017-12-09 13:33:40 +01:00
fn completed(self) -> Option<Completed<S>> {
2017-12-02 04:57:34 +01:00
if let PipelineState::Completed(c) = self { Some(c) } else { None }
2017-12-02 01:10:01 +01:00
}
}
struct MyActor;
impl Actor for MyActor {
type Context = HttpContext<MyActor>;
}
#[test]
fn test_completed() {
2017-12-02 04:57:34 +01:00
Core::new().unwrap().run(lazy(|| {
2017-12-09 14:54:04 +01:00
let mut info = PipelineInfo::new(HttpRequest::default());
Completed::init(&mut info).is_none().unwrap();
2017-12-02 04:57:34 +01:00
let req = HttpRequest::default();
let mut ctx = HttpContext::new(req.clone(), MyActor);
let addr: Address<_> = ctx.address();
2017-12-09 14:54:04 +01:00
let mut info = PipelineInfo::new(req);
2017-12-02 04:57:34 +01:00
info.context = Some(Box::new(ctx));
2017-12-09 14:54:04 +01:00
let mut state = Completed::init(&mut info).completed().unwrap();
2017-12-02 04:57:34 +01:00
2017-12-09 14:54:04 +01:00
let st = state.poll(&mut info).ok().unwrap();
let pp = Pipeline(info, st);
assert!(!pp.is_done());
2017-12-02 04:57:34 +01:00
2017-12-09 14:54:04 +01:00
let Pipeline(mut info, st) = pp;
2017-12-02 04:57:34 +01:00
state = st.completed().unwrap();
drop(addr);
2017-12-09 14:54:04 +01:00
state.poll(&mut info).ok().unwrap().is_none().unwrap();
2017-12-02 04:57:34 +01:00
result(Ok::<_, ()>(()))
})).unwrap()
2017-12-02 01:10:01 +01:00
}
}