1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-23 23:51:06 +01:00

refactor request pipeline

This commit is contained in:
Nikolay Kim 2017-11-24 22:15:52 -08:00
parent 59b8214685
commit 7569036dd4
12 changed files with 548 additions and 290 deletions

View File

@ -1,3 +1,5 @@
#![allow(unused_imports, dead_code)]
use std::rc::Rc; use std::rc::Rc;
use std::string::ToString; use std::string::ToString;
use std::collections::HashMap; use std::collections::HashMap;
@ -10,6 +12,7 @@ use recognizer::{RouteRecognizer, check_pattern};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use channel::HttpHandler; use channel::HttpHandler;
use pipeline::Pipeline;
use middlewares::Middleware; use middlewares::Middleware;
@ -48,14 +51,9 @@ impl<S: 'static> HttpHandler for Application<S> {
&self.prefix &self.prefix
} }
fn handle(&self, req: &mut HttpRequest, payload: Payload) -> Task { fn handle(&self, req: HttpRequest, payload: Payload) -> Pipeline {
let mut task = self.run(req, payload); Pipeline::new(req, payload, Rc::clone(&self.middlewares),
&|req: &mut HttpRequest, payload: Payload| {self.run(req, payload)})
// init middlewares
if !self.middlewares.is_empty() {
task.set_middlewares(Rc::clone(&self.middlewares));
}
task
} }
} }

View File

@ -8,8 +8,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
use h1; use h1;
use h2; use h2;
use task::Task;
use payload::Payload; use payload::Payload;
use pipeline::Pipeline;
use httprequest::HttpRequest; use httprequest::HttpRequest;
/// Low level http request handler /// Low level http request handler
@ -17,7 +17,7 @@ pub trait HttpHandler: 'static {
/// Http handler prefix /// Http handler prefix
fn prefix(&self) -> &str; fn prefix(&self) -> &str;
/// Handle request /// Handle request
fn handle(&self, req: &mut HttpRequest, payload: Payload) -> Task; fn handle(&self, req: HttpRequest, payload: Payload) -> Pipeline;
} }
enum HttpProtocol<T, H> enum HttpProtocol<T, H>

View File

@ -12,4 +12,5 @@ pub use super::*;
// dev specific // dev specific
pub use task::Task; pub use task::Task;
pub use pipeline::Pipeline;
pub use recognizer::RouteRecognizer; pub use recognizer::RouteRecognizer;

View File

@ -86,6 +86,13 @@ default impl<T: StdError + Sync + Send + 'static> ErrorResponse for T {
/// `InternalServerError` for `JsonError` /// `InternalServerError` for `JsonError`
impl ErrorResponse for JsonError {} impl ErrorResponse for JsonError {}
/// Internal error
#[derive(Fail, Debug)]
#[fail(display="Unexpected task frame")]
pub struct UnexpectedTaskFrame;
impl ErrorResponse for UnexpectedTaskFrame {}
/// A set of errors that can occur during parsing HTTP streams /// A set of errors that can occur during parsing HTTP streams
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum ParseError { pub enum ParseError {

View File

@ -1,6 +1,5 @@
use std::{self, io, ptr}; use std::{self, io, ptr};
use std::rc::Rc; use std::rc::Rc;
use std::cell::UnsafeCell;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -15,13 +14,13 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Timeout; use tokio_core::reactor::Timeout;
use percent_encoding; use percent_encoding;
use task::Task; use pipeline::Pipeline;
use encoding::PayloadType;
use channel::HttpHandler; use channel::HttpHandler;
use error::{ParseError, PayloadError, ErrorResponse};
use h1writer::H1Writer; use h1writer::H1Writer;
use httpcodes::HTTPNotFound; use httpcodes::HTTPNotFound;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use encoding::PayloadType; use error::{ParseError, PayloadError, ErrorResponse};
use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE}; use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
const KEEPALIVE_PERIOD: u64 = 15; // seconds const KEEPALIVE_PERIOD: u64 = 15; // seconds
@ -51,8 +50,8 @@ pub(crate) struct Http1<T: AsyncWrite + 'static, H: 'static> {
} }
struct Entry { struct Entry {
task: Task, task: Pipeline,
req: UnsafeCell<HttpRequest>, //req: UnsafeCell<HttpRequest>,
eof: bool, eof: bool,
error: bool, error: bool,
finished: bool, finished: bool,
@ -107,8 +106,7 @@ impl<T, H> Http1<T, H>
} }
// this is anoying // this is anoying
let req = unsafe {item.req.get().as_mut().unwrap()}; match item.task.poll_io(&mut self.stream)
match item.task.poll_io(&mut self.stream, req)
{ {
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
not_ready = false; not_ready = false;
@ -182,14 +180,14 @@ impl<T, H> Http1<T, H>
let mut task = None; let mut task = None;
for h in self.router.iter() { for h in self.router.iter() {
if req.path().starts_with(h.prefix()) { if req.path().starts_with(h.prefix()) {
task = Some(h.handle(&mut req, payload)); task = Some(h.handle(req, payload));
break break
} }
} }
self.tasks.push_back( self.tasks.push_back(
Entry {task: task.unwrap_or_else(|| Task::reply(HTTPNotFound)), Entry {task: task.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
req: UnsafeCell::new(req), //req: UnsafeCell::new(req),
eof: false, eof: false,
error: false, error: false,
finished: false}); finished: false});
@ -217,15 +215,13 @@ impl<T, H> Http1<T, H>
self.keepalive = false; self.keepalive = false;
self.keepalive_timer.take(); self.keepalive_timer.take();
// on parse error, stop reading stream but // on parse error, stop reading stream but tasks need to be completed
// tasks need to be completed
self.error = true; self.error = true;
if self.tasks.is_empty() { if self.tasks.is_empty() {
if let ReaderError::Error(err) = err { if let ReaderError::Error(err) = err {
self.tasks.push_back( self.tasks.push_back(
Entry {task: Task::reply(err.error_response()), Entry {task: Pipeline::error(err.error_response()),
req: UnsafeCell::new(HttpRequest::for_error()),
eof: false, eof: false,
error: false, error: false,
finished: false}); finished: false});

View File

@ -1,7 +1,6 @@
use std::{io, cmp, mem}; use std::{io, cmp, mem};
use std::rc::Rc; use std::rc::Rc;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::cell::UnsafeCell;
use std::time::Duration; use std::time::Duration;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -15,13 +14,13 @@ use futures::{Async, Poll, Future, Stream};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Timeout; use tokio_core::reactor::Timeout;
use task::Task; use pipeline::Pipeline;
use h2writer::H2Writer; use h2writer::H2Writer;
use channel::HttpHandler; use channel::HttpHandler;
use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use error::PayloadError; use error::PayloadError;
use encoding::PayloadType; use encoding::PayloadType;
use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use payload::{Payload, PayloadWriter}; use payload::{Payload, PayloadWriter};
const KEEPALIVE_PERIOD: u64 = 15; // seconds const KEEPALIVE_PERIOD: u64 = 15; // seconds
@ -83,8 +82,8 @@ impl<T, H> Http2<T, H>
item.poll_payload(); item.poll_payload();
if !item.eof { if !item.eof {
let req = unsafe {item.req.get().as_mut().unwrap()}; //let req = unsafe {item.req.get().as_mut().unwrap()};
match item.task.poll_io(&mut item.stream, req) { match item.task.poll_io(&mut item.stream) {
Ok(Async::Ready(ready)) => { Ok(Async::Ready(ready)) => {
item.eof = true; item.eof = true;
if ready { if ready {
@ -198,8 +197,7 @@ impl<T, H> Http2<T, H>
} }
struct Entry { struct Entry {
task: Task, task: Pipeline,
req: UnsafeCell<HttpRequest>,
payload: PayloadType, payload: PayloadType,
recv: RecvStream, recv: RecvStream,
stream: H2Writer, stream: H2Writer,
@ -230,18 +228,19 @@ impl Entry {
// Payload and Content-Encoding // Payload and Content-Encoding
let (psender, payload) = Payload::new(false); let (psender, payload) = Payload::new(false);
// Payload sender
let psender = PayloadType::new(req.headers(), psender);
// start request processing // start request processing
let mut task = None; let mut task = None;
for h in router.iter() { for h in router.iter() {
if req.path().starts_with(h.prefix()) { if req.path().starts_with(h.prefix()) {
task = Some(h.handle(&mut req, payload)); task = Some(h.handle(req, payload));
break break
} }
} }
let psender = PayloadType::new(req.headers(), psender);
Entry {task: task.unwrap_or_else(|| Task::reply(HTTPNotFound)), Entry {task: task.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
req: UnsafeCell::new(req),
payload: psender, payload: psender,
recv: recv, recv: recv,
stream: H2Writer::new(resp), stream: H2Writer::new(resp),

View File

@ -58,6 +58,7 @@ mod resource;
mod recognizer; mod recognizer;
mod route; mod route;
mod task; mod task;
mod pipeline;
mod staticfiles; mod staticfiles;
mod server; mod server;
mod channel; mod channel;

View File

@ -101,9 +101,9 @@ impl Logger {
impl Middleware for Logger { impl Middleware for Logger {
fn start(&self, req: &mut HttpRequest) -> Started { fn start(&self, mut req: HttpRequest) -> Started {
req.extensions().insert(StartTime(time::now())); req.extensions().insert(StartTime(time::now()));
Started::Done Started::Done(req)
} }
fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished { fn finish(&self, req: &mut HttpRequest, resp: &HttpResponse) -> Finished {
@ -298,16 +298,16 @@ mod tests {
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
headers.insert(header::USER_AGENT, header::HeaderValue::from_static("ACTIX-WEB")); headers.insert(header::USER_AGENT, header::HeaderValue::from_static("ACTIX-WEB"));
let mut req = HttpRequest::new( let req = HttpRequest::new(
Method::GET, "/".to_owned(), Version::HTTP_11, headers, String::new()); Method::GET, "/".to_owned(), Version::HTTP_11, headers, String::new());
let resp = HttpResponse::builder(StatusCode::OK) let resp = HttpResponse::builder(StatusCode::OK)
.header("X-Test", "ttt") .header("X-Test", "ttt")
.force_close().body(Body::Empty).unwrap(); .force_close().body(Body::Empty).unwrap();
match logger.start(&mut req) { let mut req = match logger.start(req) {
Started::Done => (), Started::Done(req) => req,
_ => panic!(), _ => panic!(),
} };
match logger.finish(&mut req, &resp) { match logger.finish(&mut req, &resp) {
Finished::Done => (), Finished::Done => (),
_ => panic!(), _ => panic!(),

View File

@ -1,7 +1,10 @@
//! Middlewares //! Middlewares
#![allow(unused_imports, dead_code)]
use std::rc::Rc; use std::rc::Rc;
use std::error::Error;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use error::Error;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@ -11,12 +14,12 @@ pub use self::logger::Logger;
/// Middleware start result /// Middleware start result
pub enum Started { pub enum Started {
/// Execution completed /// Execution completed
Done, Done(HttpRequest),
/// New http response got generated. If middleware generates response /// New http response got generated. If middleware generates response
/// handler execution halts. /// handler execution halts.
Response(HttpResponse), Response(HttpRequest, HttpResponse),
/// Execution completed, but run future to completion. /// Execution completed, runs future to completion.
Future(Box<Future<Item=(), Error=HttpResponse>>), Future(Box<Future<Item=(HttpRequest, Option<HttpResponse>), Error=(HttpRequest, HttpResponse)>>),
} }
/// Middleware execution result /// Middleware execution result
@ -32,7 +35,7 @@ pub enum Finished {
/// Execution completed /// Execution completed
Done, Done,
/// Execution completed, but run future to completion /// Execution completed, but run future to completion
Future(Box<Future<Item=(), Error=Box<Error>>>), Future(Box<Future<Item=(), Error=Error>>),
} }
/// Middleware definition /// Middleware definition
@ -41,8 +44,8 @@ pub trait Middleware {
/// Method is called when request is ready. It may return /// Method is called when request is ready. It may return
/// future, which should resolve before next middleware get called. /// future, which should resolve before next middleware get called.
fn start(&self, req: &mut HttpRequest) -> Started { fn start(&self, req: HttpRequest) -> Started {
Started::Done Started::Done(req)
} }
/// Method is called when handler returns response, /// Method is called when handler returns response,
@ -56,192 +59,3 @@ pub trait Middleware {
Finished::Done Finished::Done
} }
} }
/// Middlewares executor
pub(crate) struct MiddlewaresExecutor {
state: ExecutorState,
fut: Option<Box<Future<Item=HttpResponse, Error=HttpResponse>>>,
started: Option<Box<Future<Item=(), Error=HttpResponse>>>,
finished: Option<Box<Future<Item=(), Error=Box<Error>>>>,
middlewares: Option<Rc<Vec<Box<Middleware>>>>,
}
enum ExecutorState {
None,
Starting(usize),
Started(usize),
Processing(usize, usize),
Finishing(usize),
}
impl Default for MiddlewaresExecutor {
fn default() -> MiddlewaresExecutor {
MiddlewaresExecutor {
fut: None,
started: None,
finished: None,
state: ExecutorState::None,
middlewares: None,
}
}
}
impl MiddlewaresExecutor {
pub fn start(&mut self, mw: Rc<Vec<Box<Middleware>>>) {
self.state = ExecutorState::Starting(0);
self.middlewares = Some(mw);
}
pub fn starting(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, ()> {
if let Some(ref middlewares) = self.middlewares {
let state = &mut self.state;
if let ExecutorState::Starting(mut idx) = *state {
loop {
// poll latest fut
if let Some(ref mut fut) = self.started {
match fut.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => idx += 1,
Err(response) => {
*state = ExecutorState::Started(idx);
return Ok(Async::Ready(Some(response)))
}
}
}
self.started = None;
if idx >= middlewares.len() {
*state = ExecutorState::Started(idx-1);
return Ok(Async::Ready(None))
} else {
match middlewares[idx].start(req) {
Started::Done => idx += 1,
Started::Response(resp) => {
*state = ExecutorState::Started(idx);
return Ok(Async::Ready(Some(resp)))
},
Started::Future(fut) => {
self.started = Some(fut);
},
}
}
}
}
}
Ok(Async::Ready(None))
}
pub fn processing(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, ()> {
if let Some(ref middlewares) = self.middlewares {
let state = &mut self.state;
match *state {
ExecutorState::Processing(mut idx, total) => {
loop {
// poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(response)) | Err(response) => {
idx += 1;
response
}
};
self.fut = None;
loop {
if idx == 0 {
*state = ExecutorState::Finishing(total);
return Ok(Async::Ready(Some(resp)))
} else {
match middlewares[idx].response(req, resp) {
Response::Response(r) => {
idx -= 1;
resp = r
},
Response::Future(fut) => {
self.fut = Some(fut);
break
},
}
}
}
}
}
_ => Ok(Async::Ready(None))
}
} else {
Ok(Async::Ready(None))
}
}
pub fn finishing(&mut self, req: &mut HttpRequest, resp: &HttpResponse) -> Poll<(), ()> {
if let Some(ref middlewares) = self.middlewares {
let state = &mut self.state;
if let ExecutorState::Finishing(mut idx) = *state {
loop {
// poll latest fut
if let Some(ref mut fut) = self.finished {
match fut.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => idx -= 1,
Err(err) => {
error!("Middleware finish error: {}", err);
}
}
}
self.finished = None;
match middlewares[idx].finish(req, resp) {
Finished::Done => {
if idx == 0 {
return Ok(Async::Ready(()))
} else {
idx -= 1
}
}
Finished::Future(fut) => {
self.finished = Some(fut);
},
}
}
}
}
Ok(Async::Ready(()))
}
pub fn response(&mut self, req: &mut HttpRequest, resp: HttpResponse)
-> Option<HttpResponse>
{
if let Some(ref middlewares) = self.middlewares {
let mut resp = resp;
let state = &mut self.state;
match *state {
ExecutorState::Started(mut idx) => {
let total = idx;
loop {
resp = match middlewares[idx].response(req, resp) {
Response::Response(r) => {
if idx == 0 {
*state = ExecutorState::Finishing(total);
return Some(r)
} else {
idx -= 1;
r
}
},
Response::Future(fut) => {
*state = ExecutorState::Processing(idx, total);
self.fut = Some(fut);
return None
},
};
}
}
_ => Some(resp)
}
} else {
Some(resp)
}
}
}

437
src/pipeline.rs Normal file
View File

@ -0,0 +1,437 @@
use std::mem;
use std::rc::Rc;
use futures::{Async, Poll, Future};
use task::Task;
use error::Error;
use payload::Payload;
use middlewares::{Middleware, Finished, Started, Response};
use h1writer::Writer;
use httprequest::HttpRequest;
use httpresponse::HttpResponse;
type Handler = Fn(&mut HttpRequest, Payload) -> Task;
pub(crate) type PipelineHandler<'a> = &'a Fn(&mut HttpRequest, Payload) -> Task;
pub struct Pipeline(PipelineState);
enum PipelineState {
None,
Starting(Start),
Handle(Box<Handle>),
Finishing(Box<Finish>),
Error(Box<(Task, HttpRequest)>),
Task(Box<(Task, HttpRequest)>),
}
impl Pipeline {
pub fn new(mut req: HttpRequest, payload: Payload,
mw: Rc<Vec<Box<Middleware>>>, handler: PipelineHandler) -> Pipeline {
if mw.is_empty() {
let task = (handler)(&mut req, payload);
Pipeline(PipelineState::Task(Box::new((task, req))))
} else {
match Start::init(mw, req, handler, payload) {
StartResult::Ready(res) => {
Pipeline(PipelineState::Handle(res))
},
StartResult::NotReady(res) => {
Pipeline(PipelineState::Starting(res))
},
}
}
}
pub fn error<R: Into<HttpResponse>>(resp: R) -> Self {
Pipeline(PipelineState::Error(Box::new((Task::reply(resp), HttpRequest::for_error()))))
}
pub(crate) fn disconnected(&mut self) {
match self.0 {
PipelineState::Starting(ref mut st) =>
st.disconnected(),
PipelineState::Handle(ref mut st) =>
st.task.disconnected(),
_ =>(),
}
}
pub(crate) fn poll_io<T: Writer>(&mut self, io: &mut T) -> Poll<bool, ()> {
loop {
let state = mem::replace(&mut self.0, PipelineState::None);
match state {
PipelineState::Task(mut st) => {
let req:&mut HttpRequest = unsafe{mem::transmute(&mut st.1)};
let res = st.0.poll_io(io, req);
self.0 = PipelineState::Task(st);
return res
}
PipelineState::Starting(mut st) => {
match st.poll() {
Async::NotReady => {
self.0 = PipelineState::Starting(st);
return Ok(Async::NotReady)
}
Async::Ready(h) =>
self.0 = PipelineState::Handle(h),
}
}
PipelineState::Handle(mut st) => {
let res = st.poll_io(io);
if let Ok(Async::Ready(r)) = res {
if r {
self.0 = PipelineState::Finishing(st.finish());
return Ok(Async::Ready(false))
} else {
self.0 = PipelineState::Handle(st);
return res
}
} else {
self.0 = PipelineState::Handle(st);
return res
}
}
PipelineState::Error(mut st) => {
let req:&mut HttpRequest = unsafe{mem::transmute(&mut st.1)};
let res = st.0.poll_io(io, req);
self.0 = PipelineState::Error(st);
return res
}
PipelineState::Finishing(_) | PipelineState::None => unreachable!(),
}
}
}
pub(crate) fn poll(&mut self) -> Poll<(), Error> {
loop {
let state = mem::replace(&mut self.0, PipelineState::None);
match state {
PipelineState::Handle(mut st) => {
let res = st.poll();
match res {
Ok(Async::NotReady) => {
self.0 = PipelineState::Handle(st);
return Ok(Async::NotReady)
}
Ok(Async::Ready(())) | Err(_) => {
self.0 = PipelineState::Finishing(st.finish());
}
}
}
PipelineState::Finishing(mut st) => {
let res = st.poll();
self.0 = PipelineState::Finishing(st);
return Ok(res)
}
PipelineState::Error(mut st) => {
let res = st.0.poll();
self.0 = PipelineState::Error(st);
return res
}
PipelineState::Task(mut st) => {
let res = st.0.poll();
self.0 = PipelineState::Task(st);
return res
}
_ => {
self.0 = state;
return Ok(Async::Ready(()))
}
}
}
}
}
struct Handle {
idx: usize,
req: HttpRequest,
task: Task,
middlewares: Rc<Vec<Box<Middleware>>>,
}
impl Handle {
fn new(idx: usize,
req: HttpRequest,
task: Task,
mw: Rc<Vec<Box<Middleware>>>) -> Handle
{
Handle {
idx: idx, req: req, task:task, middlewares: mw }
}
fn poll_io<T: Writer>(&mut self, io: &mut T) -> Poll<bool, ()> {
self.task.poll_io(io, &mut self.req)
}
fn poll(&mut self) -> Poll<(), Error> {
self.task.poll()
}
fn finish(mut self) -> Box<Finish> {
Box::new(Finish {
idx: self.idx,
req: self.req,
fut: None,
resp: self.task.response(),
middlewares: self.middlewares
})
}
}
/// Middlewares start executor
struct Finish {
idx: usize,
req: HttpRequest,
resp: HttpResponse,
fut: Option<Box<Future<Item=(), Error=Error>>>,
middlewares: Rc<Vec<Box<Middleware>>>,
}
impl Finish {
pub fn poll(&mut self) -> Async<()> {
loop {
// poll latest fut
if let Some(ref mut fut) = self.fut {
match fut.poll() {
Ok(Async::NotReady) => return Async::NotReady,
Ok(Async::Ready(())) => self.idx -= 1,
Err(err) => {
error!("Middleware finish error: {}", err);
self.idx -= 1;
}
}
}
self.fut = None;
match self.middlewares[self.idx].finish(&mut self.req, &self.resp) {
Finished::Done => {
if self.idx == 0 {
return Async::Ready(())
} else {
self.idx -= 1
}
}
Finished::Future(fut) => {
self.fut = Some(fut);
},
}
}
}
}
type Fut = Box<Future<Item=(HttpRequest, Option<HttpResponse>), Error=(HttpRequest, HttpResponse)>>;
/// Middlewares start executor
struct Start {
idx: usize,
hnd: *mut Handler,
disconnected: bool,
payload: Option<Payload>,
fut: Option<Fut>,
middlewares: Rc<Vec<Box<Middleware>>>,
}
enum StartResult {
Ready(Box<Handle>),
NotReady(Start),
}
impl Start {
fn init(mw: Rc<Vec<Box<Middleware>>>,
req: HttpRequest, handler: PipelineHandler, payload: Payload) -> StartResult
{
Start {
idx: 0,
fut: None,
disconnected: false,
hnd: handler as *const _ as *mut _,
payload: Some(payload),
middlewares: mw,
}.start(req)
}
fn disconnected(&mut self) {
self.disconnected = true;
}
fn prepare(&self, mut task: Task) -> Task {
if self.disconnected {
task.disconnected()
}
task.set_middlewares(
MiddlewaresResponse::new(self.idx, Rc::clone(&self.middlewares)));
task
}
fn start(mut self, mut req: HttpRequest) -> StartResult {
loop {
if self.idx >= self.middlewares.len() {
let task = (unsafe{&*self.hnd})(
&mut req, self.payload.take().expect("Something is completlywrong"));
return StartResult::Ready(
Box::new(Handle::new(self.idx-1, req, self.prepare(task), self.middlewares)))
} else {
req = match self.middlewares[self.idx].start(req) {
Started::Done(req) => {
self.idx += 1;
req
}
Started::Response(req, resp) => {
return StartResult::Ready(
Box::new(Handle::new(
self.idx, req, self.prepare(Task::reply(resp)), self.middlewares)))
},
Started::Future(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.fut = Some(fut);
return StartResult::NotReady(self)
}
Ok(Async::Ready((req, resp))) => {
self.idx += 1;
if let Some(resp) = resp {
return StartResult::Ready(
Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)), self.middlewares)))
}
req
}
Err((req, resp)) => {
return StartResult::Ready(Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)), self.middlewares)))
}
}
},
}
}
}
}
fn poll(&mut self) -> Async<Box<Handle>> {
'outer: loop {
match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return Async::NotReady,
Ok(Async::Ready((mut req, resp))) => {
self.idx += 1;
if let Some(resp) = resp {
return Async::Ready(Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)), Rc::clone(&self.middlewares))))
}
if self.idx >= self.middlewares.len() {
let task = (unsafe{&*self.hnd})(
&mut req, self.payload.take().expect("Something is completlywrong"));
return Async::Ready(Box::new(Handle::new(
self.idx-1, req,
self.prepare(task), Rc::clone(&self.middlewares))))
} else {
loop {
req = match self.middlewares[self.idx].start(req) {
Started::Done(req) => {
self.idx += 1;
req
}
Started::Response(req, resp) => {
return Async::Ready(Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)),
Rc::clone(&self.middlewares))))
},
Started::Future(mut fut) => {
self.fut = Some(fut);
continue 'outer
},
}
}
}
}
Err((req, resp)) => {
return Async::Ready(Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)),
Rc::clone(&self.middlewares))))
}
}
}
}
}
/// Middlewares response executor
pub(crate) struct MiddlewaresResponse {
idx: usize,
fut: Option<Box<Future<Item=HttpResponse, Error=HttpResponse>>>,
middlewares: Rc<Vec<Box<Middleware>>>,
}
impl MiddlewaresResponse {
fn new(idx: usize, mw: Rc<Vec<Box<Middleware>>>) -> MiddlewaresResponse {
let idx = if idx == 0 { 0 } else { idx - 1 };
MiddlewaresResponse {
idx: idx,
fut: None,
middlewares: mw }
}
pub fn response(&mut self, req: &mut HttpRequest, mut resp: HttpResponse)
-> Option<HttpResponse>
{
loop {
resp = match self.middlewares[self.idx].response(req, resp) {
Response::Response(r) => {
if self.idx == 0 {
return Some(r)
} else {
self.idx -= 1;
r
}
},
Response::Future(fut) => {
self.fut = Some(fut);
return None
},
};
}
}
pub fn poll(&mut self, req: &mut HttpRequest) -> Poll<Option<HttpResponse>, ()> {
if self.fut.is_none() {
return Ok(Async::Ready(None))
}
loop {
// poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(resp)) | Err(resp) => {
self.idx += 1;
resp
}
};
loop {
if self.idx == 0 {
return Ok(Async::Ready(Some(resp)))
} else {
match self.middlewares[self.idx].response(req, resp) {
Response::Response(r) => {
self.idx -= 1;
resp = r
},
Response::Future(fut) => {
self.fut = Some(fut);
break
},
}
}
}
}
}
}

View File

@ -7,9 +7,9 @@ use futures::{Async, Future, Poll, Stream};
use futures::task::{Task as FutureTask, current as current_task}; use futures::task::{Task as FutureTask, current as current_task};
use h1writer::{Writer, WriterState}; use h1writer::{Writer, WriterState};
use error::Error; use error::{Error, UnexpectedTaskFrame};
use route::Frame; use route::Frame;
use middlewares::{Middleware, MiddlewaresExecutor}; use pipeline::MiddlewaresResponse;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@ -111,7 +111,7 @@ pub struct Task {
drain: Vec<Rc<RefCell<DrainFut>>>, drain: Vec<Rc<RefCell<DrainFut>>>,
prepared: Option<HttpResponse>, prepared: Option<HttpResponse>,
disconnected: bool, disconnected: bool,
middlewares: MiddlewaresExecutor, middlewares: Option<MiddlewaresResponse>,
} }
impl Task { impl Task {
@ -128,7 +128,7 @@ impl Task {
stream: TaskStream::None, stream: TaskStream::None,
prepared: None, prepared: None,
disconnected: false, disconnected: false,
middlewares: MiddlewaresExecutor::default() } middlewares: None }
} }
pub(crate) fn with_context<C: IoContext>(ctx: C) -> Self { pub(crate) fn with_context<C: IoContext>(ctx: C) -> Self {
@ -139,7 +139,7 @@ impl Task {
drain: Vec::new(), drain: Vec::new(),
prepared: None, prepared: None,
disconnected: false, disconnected: false,
middlewares: MiddlewaresExecutor::default() } middlewares: None }
} }
pub(crate) fn with_stream<S>(stream: S) -> Self pub(crate) fn with_stream<S>(stream: S) -> Self
@ -152,11 +152,15 @@ impl Task {
drain: Vec::new(), drain: Vec::new(),
prepared: None, prepared: None,
disconnected: false, disconnected: false,
middlewares: MiddlewaresExecutor::default() } middlewares: None }
} }
pub(crate) fn set_middlewares(&mut self, middlewares: Rc<Vec<Box<Middleware>>>) { pub(crate) fn response(&mut self) -> HttpResponse {
self.middlewares.start(middlewares) self.prepared.take().unwrap()
}
pub(crate) fn set_middlewares(&mut self, middlewares: MiddlewaresResponse) {
self.middlewares = Some(middlewares)
} }
pub(crate) fn disconnected(&mut self) { pub(crate) fn disconnected(&mut self) {
@ -171,16 +175,6 @@ impl Task {
{ {
trace!("POLL-IO frames:{:?}", self.frames.len()); trace!("POLL-IO frames:{:?}", self.frames.len());
// start middlewares
match self.middlewares.starting(req) {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) | Err(_) => (),
Ok(Async::Ready(Some(response))) => {
self.frames.clear();
self.frames.push_front(Frame::Message(response));
},
}
// response is completed // response is completed
if self.frames.is_empty() && self.iostate.is_done() { if self.frames.is_empty() && self.iostate.is_done() {
return Ok(Async::Ready(self.state.is_done())); return Ok(Async::Ready(self.state.is_done()));
@ -197,10 +191,16 @@ impl Task {
} }
// process middlewares response // process middlewares response
match self.middlewares.processing(req) { if let Some(mut middlewares) = self.middlewares.take() {
match middlewares.poll(req) {
Err(_) => return Err(()), Err(_) => return Err(()),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => {
Ok(Async::Ready(None)) => (), self.middlewares = Some(middlewares);
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) => {
self.middlewares = Some(middlewares);
}
Ok(Async::Ready(Some(mut response))) => { Ok(Async::Ready(Some(mut response))) => {
let result = io.start(req, &mut response); let result = io.start(req, &mut response);
self.prepared = Some(response); self.prepared = Some(response);
@ -213,6 +213,7 @@ impl Task {
} }
}, },
} }
}
// if task is paused, write buffer probably is full // if task is paused, write buffer probably is full
if self.state != TaskRunningState::Paused { if self.state != TaskRunningState::Paused {
@ -220,16 +221,23 @@ impl Task {
while let Some(frame) = self.frames.pop_front() { while let Some(frame) = self.frames.pop_front() {
trace!("IO Frame: {:?}", frame); trace!("IO Frame: {:?}", frame);
let res = match frame { let res = match frame {
Frame::Message(resp) => { Frame::Message(mut resp) => {
// run middlewares // run middlewares
if let Some(mut resp) = self.middlewares.response(req, resp) { if let Some(mut middlewares) = self.middlewares.take() {
if let Some(mut resp) = middlewares.response(req, resp) {
let result = io.start(req, &mut resp); let result = io.start(req, &mut resp);
self.prepared = Some(resp); self.prepared = Some(resp);
result result
} else { } else {
// middlewares need to run some futures // middlewares need to run some futures
self.middlewares = Some(middlewares);
return self.poll_io(io, req) return self.poll_io(io, req)
} }
} else {
let result = io.start(req, &mut resp);
self.prepared = Some(resp);
result
}
} }
Frame::Payload(Some(chunk)) => { Frame::Payload(Some(chunk)) => {
io.write(chunk.as_ref()) io.write(chunk.as_ref())
@ -278,12 +286,8 @@ impl Task {
// response is completed // response is completed
if self.iostate.is_done() { if self.iostate.is_done() {
// finish middlewares
if let Some(ref mut resp) = self.prepared { if let Some(ref mut resp) = self.prepared {
resp.set_response_size(io.written()); resp.set_response_size(io.written());
if let Ok(Async::NotReady) = self.middlewares.finishing(req, resp) {
return Ok(Async::NotReady)
}
} }
Ok(Async::Ready(self.state.is_done())) Ok(Async::Ready(self.state.is_done()))
} else { } else {
@ -291,8 +295,9 @@ impl Task {
} }
} }
fn poll_stream<S>(&mut self, stream: &mut S) -> Poll<(), ()> fn poll_stream<S>(&mut self, stream: &mut S) -> Poll<(), Error>
where S: Stream<Item=Frame, Error=Error> { where S: Stream<Item=Frame, Error=Error>
{
loop { loop {
match stream.poll() { match stream.poll() {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
@ -300,7 +305,7 @@ impl Task {
Frame::Message(ref msg) => { Frame::Message(ref msg) => {
if self.iostate != TaskIOState::ReadingMessage { if self.iostate != TaskIOState::ReadingMessage {
error!("Unexpected frame {:?}", frame); error!("Unexpected frame {:?}", frame);
return Err(()) return Err(UnexpectedTaskFrame.into())
} }
let upgrade = msg.upgrade(); let upgrade = msg.upgrade();
if upgrade || msg.body().is_streaming() { if upgrade || msg.body().is_streaming() {
@ -314,7 +319,7 @@ impl Task {
self.iostate = TaskIOState::Done; self.iostate = TaskIOState::Done;
} else if self.iostate != TaskIOState::ReadingPayload { } else if self.iostate != TaskIOState::ReadingPayload {
error!("Unexpected frame {:?}", self.iostate); error!("Unexpected frame {:?}", self.iostate);
return Err(()) return Err(UnexpectedTaskFrame.into())
} }
}, },
_ => (), _ => (),
@ -325,8 +330,8 @@ impl Task {
return Ok(Async::Ready(())), return Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady) =>
return Ok(Async::NotReady), return Ok(Async::NotReady),
Err(_) => Err(err) =>
return Err(()), return Err(err),
} }
} }
} }
@ -334,7 +339,7 @@ impl Task {
impl Future for Task { impl Future for Task {
type Item = (); type Item = ();
type Error = (); type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut s = mem::replace(&mut self.stream, TaskStream::None); let mut s = mem::replace(&mut self.stream, TaskStream::None);

View File

@ -61,9 +61,9 @@ struct MiddlewareTest {
} }
impl middlewares::Middleware for MiddlewareTest { impl middlewares::Middleware for MiddlewareTest {
fn start(&self, _: &mut HttpRequest) -> middlewares::Started { fn start(&self, req: HttpRequest) -> middlewares::Started {
self.start.store(self.start.load(Ordering::Relaxed) + 1, Ordering::Relaxed); self.start.store(self.start.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
middlewares::Started::Done middlewares::Started::Done(req)
} }
fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> middlewares::Response { fn response(&self, _: &mut HttpRequest, resp: HttpResponse) -> middlewares::Response {