1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

add middleware finished handler for route middleware

This commit is contained in:
Nikolay Kim 2018-04-29 20:50:38 -07:00
parent 91235ac816
commit ab4e889f96
3 changed files with 90 additions and 16 deletions

View File

@ -187,6 +187,9 @@ impl<S: 'static> ResourceHandler<S> {
/// ///
/// This is similar to `App's` middlewares, but /// This is similar to `App's` middlewares, but
/// middlewares get invoked on resource level. /// middlewares get invoked on resource level.
///
/// *Note* `Middleware::finish()` fires right after response get
/// prepared. It does not wait until body get sent to peer.
pub fn middleware<M: Middleware<S>>(&mut self, mw: M) { pub fn middleware<M: Middleware<S>>(&mut self, mw: M) {
Rc::get_mut(&mut self.middlewares) Rc::get_mut(&mut self.middlewares)
.unwrap() .unwrap()

View File

@ -10,8 +10,8 @@ use handler::{AsyncHandler, FromRequest, Handler, Reply, ReplyItem, Responder,
use http::StatusCode; use http::StatusCode;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use middleware::{Middleware, Response as MiddlewareResponse, use middleware::{Finished as MiddlewareFinished, Middleware,
Started as MiddlewareStarted}; Response as MiddlewareResponse, Started as MiddlewareStarted};
use pred::Predicate; use pred::Predicate;
use with::{ExtractorConfig, With, With2, With3}; use with::{ExtractorConfig, With, With2, With3};
@ -274,7 +274,8 @@ enum ComposeState<S: 'static> {
Starting(StartMiddlewares<S>), Starting(StartMiddlewares<S>),
Handler(WaitingResponse<S>), Handler(WaitingResponse<S>),
RunMiddlewares(RunMiddlewares<S>), RunMiddlewares(RunMiddlewares<S>),
Response(Response<S>), Finishing(FinishingMiddlewares<S>),
Completed(Response<S>),
} }
impl<S: 'static> ComposeState<S> { impl<S: 'static> ComposeState<S> {
@ -283,7 +284,8 @@ impl<S: 'static> ComposeState<S> {
ComposeState::Starting(ref mut state) => state.poll(info), ComposeState::Starting(ref mut state) => state.poll(info),
ComposeState::Handler(ref mut state) => state.poll(info), ComposeState::Handler(ref mut state) => state.poll(info),
ComposeState::RunMiddlewares(ref mut state) => state.poll(info), ComposeState::RunMiddlewares(ref mut state) => state.poll(info),
ComposeState::Response(_) => None, ComposeState::Finishing(ref mut state) => state.poll(info),
ComposeState::Completed(_) => None,
} }
} }
} }
@ -310,7 +312,7 @@ impl<S> Future for Compose<S> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop { loop {
if let ComposeState::Response(ref mut resp) = self.state { if let ComposeState::Completed(ref mut resp) = self.state {
let resp = resp.resp.take().unwrap(); let resp = resp.resp.take().unwrap();
return Ok(Async::Ready(resp)); return Ok(Async::Ready(resp));
} }
@ -357,9 +359,9 @@ impl<S: 'static> StartMiddlewares<S> {
} }
info.count += 1; info.count += 1;
} }
Err(err) => return Response::init(err.into()), Err(err) => return FinishingMiddlewares::init(info, err.into()),
}, },
Err(err) => return Response::init(err.into()), Err(err) => return FinishingMiddlewares::init(info, err.into()),
} }
} }
} }
@ -389,12 +391,17 @@ impl<S: 'static> StartMiddlewares<S> {
self.fut = Some(fut); self.fut = Some(fut);
continue 'outer; continue 'outer;
} }
Err(err) => return Some(Response::init(err.into())), Err(err) => {
return Some(FinishingMiddlewares::init(
info,
err.into(),
))
} }
} }
} }
} }
Err(err) => return Some(Response::init(err.into())), }
Err(err) => return Some(FinishingMiddlewares::init(info, err.into())),
} }
} }
} }
@ -443,12 +450,12 @@ impl<S: 'static> RunMiddlewares<S> {
resp = match info.mws[curr].response(&mut info.req, resp) { resp = match info.mws[curr].response(&mut info.req, resp) {
Err(err) => { Err(err) => {
info.count = curr + 1; info.count = curr + 1;
return Response::init(err.into()); return FinishingMiddlewares::init(info, err.into());
} }
Ok(MiddlewareResponse::Done(r)) => { Ok(MiddlewareResponse::Done(r)) => {
curr += 1; curr += 1;
if curr == len { if curr == len {
return Response::init(r); return FinishingMiddlewares::init(info, r);
} else { } else {
r r
} }
@ -475,15 +482,17 @@ impl<S: 'static> RunMiddlewares<S> {
self.curr += 1; self.curr += 1;
resp resp
} }
Err(err) => return Some(Response::init(err.into())), Err(err) => return Some(FinishingMiddlewares::init(info, err.into())),
}; };
loop { loop {
if self.curr == len { if self.curr == len {
return Some(Response::init(resp)); return Some(FinishingMiddlewares::init(info, resp));
} else { } else {
match info.mws[self.curr].response(&mut info.req, resp) { match info.mws[self.curr].response(&mut info.req, resp) {
Err(err) => return Some(Response::init(err.into())), Err(err) => {
return Some(FinishingMiddlewares::init(info, err.into()))
}
Ok(MiddlewareResponse::Done(r)) => { Ok(MiddlewareResponse::Done(r)) => {
self.curr += 1; self.curr += 1;
resp = r resp = r
@ -499,6 +508,68 @@ impl<S: 'static> RunMiddlewares<S> {
} }
} }
/// Middlewares start executor
struct FinishingMiddlewares<S> {
resp: Option<HttpResponse>,
fut: Option<Box<Future<Item = (), Error = Error>>>,
_s: PhantomData<S>,
}
impl<S: 'static> FinishingMiddlewares<S> {
fn init(info: &mut ComposeInfo<S>, resp: HttpResponse) -> ComposeState<S> {
if info.count == 0 {
Response::init(resp)
} else {
let mut state = FinishingMiddlewares {
resp: Some(resp),
fut: None,
_s: PhantomData,
};
if let Some(st) = state.poll(info) {
st
} else {
ComposeState::Finishing(state)
}
}
}
fn poll(&mut self, info: &mut ComposeInfo<S>) -> Option<ComposeState<S>> {
loop {
// poll latest fut
let not_ready = if let Some(ref mut fut) = self.fut {
match fut.poll() {
Ok(Async::NotReady) => true,
Ok(Async::Ready(())) => false,
Err(err) => {
error!("Middleware finish error: {}", err);
false
}
}
} else {
false
};
if not_ready {
return None;
}
self.fut = None;
info.count -= 1;
match info.mws[info.count as usize]
.finish(&mut info.req, self.resp.as_ref().unwrap())
{
MiddlewareFinished::Done => {
if info.count == 0 {
return Some(Response::init(self.resp.take().unwrap()));
}
}
MiddlewareFinished::Future(fut) => {
self.fut = Some(fut);
}
}
}
}
}
struct Response<S> { struct Response<S> {
resp: Option<HttpResponse>, resp: Option<HttpResponse>,
_s: PhantomData<S>, _s: PhantomData<S>,
@ -506,7 +577,7 @@ struct Response<S> {
impl<S: 'static> Response<S> { impl<S: 'static> Response<S> {
fn init(resp: HttpResponse) -> ComposeState<S> { fn init(resp: HttpResponse) -> ComposeState<S> {
ComposeState::Response(Response { ComposeState::Completed(Response {
resp: Some(resp), resp: Some(resp),
_s: PhantomData, _s: PhantomData,
}) })

View File

@ -169,7 +169,7 @@ impl<S: 'static> Scope<S> {
/// This is similar to `App's` middlewares, but /// This is similar to `App's` middlewares, but
/// middlewares get invoked on scope level. /// middlewares get invoked on scope level.
/// ///
/// *Note* `Middleware::finish()` is fired right after response get /// *Note* `Middleware::finish()` fires right after response get
/// prepared. It does not wait until body get sent to peer. /// prepared. It does not wait until body get sent to peer.
pub fn middleware<M: Middleware<S>>(mut self, mw: M) -> Scope<S> { pub fn middleware<M: Middleware<S>>(mut self, mw: M) -> Scope<S> {
Rc::get_mut(&mut self.middlewares) Rc::get_mut(&mut self.middlewares)