1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

fix and refactor middleware runner

This commit is contained in:
Nikolay Kim 2017-12-29 01:01:31 -08:00
parent 308df19865
commit d87fafb563
5 changed files with 123 additions and 82 deletions

View File

@ -1,4 +1,5 @@
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use handler::Reply; use handler::Reply;
@ -6,7 +7,7 @@ use router::{Router, Pattern};
use resource::Resource; use resource::Resource;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use channel::{HttpHandler, IntoHttpHandler, HttpHandlerTask}; use channel::{HttpHandler, IntoHttpHandler, HttpHandlerTask};
use pipeline::Pipeline; use pipeline::{Pipeline, PipelineHandler};
use middleware::Middleware; use middleware::Middleware;
use server::ServerSettings; use server::ServerSettings;
@ -14,19 +15,20 @@ use server::ServerSettings;
pub struct HttpApplication<S=()> { pub struct HttpApplication<S=()> {
state: Rc<S>, state: Rc<S>,
prefix: String, prefix: String,
default: Resource<S>,
router: Router, router: Router,
resources: Vec<Resource<S>>, inner: Rc<RefCell<Inner<S>>>,
middlewares: Rc<Vec<Box<Middleware<S>>>>, middlewares: Rc<Vec<Box<Middleware<S>>>>,
} }
impl<S: 'static> HttpApplication<S> { pub(crate) struct Inner<S> {
default: Resource<S>,
router: Router,
resources: Vec<Resource<S>>,
}
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> { impl<S: 'static> PipelineHandler<S> for Inner<S> {
req.with_state(Rc::clone(&self.state), self.router.clone())
}
pub(crate) fn run(&mut self, mut req: HttpRequest<S>) -> Reply { fn handle(&mut self, mut req: HttpRequest<S>) -> Reply {
if let Some(idx) = self.router.recognize(&mut req) { if let Some(idx) = self.router.recognize(&mut req) {
self.resources[idx].handle(req.clone(), Some(&mut self.default)) self.resources[idx].handle(req.clone(), Some(&mut self.default))
} else { } else {
@ -35,14 +37,25 @@ impl<S: 'static> HttpApplication<S> {
} }
} }
impl<S: 'static> HttpApplication<S> {
#[cfg(test)]
pub(crate) fn run(&mut self, req: HttpRequest<S>) -> Reply {
self.inner.borrow_mut().handle(req)
}
#[cfg(test)]
pub(crate) fn prepare_request(&self, req: HttpRequest) -> HttpRequest<S> {
req.with_state(Rc::clone(&self.state), self.router.clone())
}
}
impl<S: 'static> HttpHandler for HttpApplication<S> { impl<S: 'static> HttpHandler for HttpApplication<S> {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> { fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
if req.path().starts_with(&self.prefix) { if req.path().starts_with(&self.prefix) {
let req = self.prepare_request(req); let inner = Rc::clone(&self.inner);
// TODO: redesign run callback let req = req.with_state(Rc::clone(&self.state), self.router.clone());
Ok(Box::new(Pipeline::new(req, Rc::clone(&self.middlewares),
&mut |req: HttpRequest<S>| self.run(req)))) Ok(Box::new(Pipeline::new(req, Rc::clone(&self.middlewares), inner)))
} else { } else {
Err(req) Err(req)
} }
@ -267,12 +280,19 @@ impl<S> Application<S> where S: 'static {
} }
let (router, resources) = Router::new(prefix, resources); let (router, resources) = Router::new(prefix, resources);
let inner = Rc::new(RefCell::new(
Inner {
default: parts.default,
router: router.clone(),
resources: resources }
));
HttpApplication { HttpApplication {
state: Rc::new(parts.state), state: Rc::new(parts.state),
prefix: prefix.to_owned(), prefix: prefix.to_owned(),
default: parts.default, inner: inner,
router: router, router: router.clone(),
resources: resources,
middlewares: Rc::new(parts.middlewares), middlewares: Rc::new(parts.middlewares),
} }
} }

View File

@ -48,15 +48,13 @@ extern crate regex;
#[macro_use] #[macro_use]
extern crate bitflags; extern crate bitflags;
#[macro_use] #[macro_use]
extern crate failure;
#[macro_use]
extern crate futures; extern crate futures;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
extern crate mio; extern crate mio;
extern crate net2; extern crate net2;
extern crate failure;
#[macro_use] extern crate failure_derive;
extern crate cookie; extern crate cookie;
extern crate http; extern crate http;
extern crate httparse; extern crate httparse;

View File

@ -276,7 +276,7 @@ impl CookieSessionInner {
fn new(key: &[u8]) -> CookieSessionInner { fn new(key: &[u8]) -> CookieSessionInner {
CookieSessionInner { CookieSessionInner {
key: Key::from_master(key), key: Key::from_master(key),
name: "actix_session".to_owned(), name: "actix-session".to_owned(),
path: "/".to_owned(), path: "/".to_owned(),
domain: None, domain: None,
secure: true } secure: true }

View File

@ -1,5 +1,6 @@
use std::{io, mem}; use std::{io, mem};
use std::rc::Rc; use std::rc::Rc;
use std::cell::RefCell;
use std::marker::PhantomData; use std::marker::PhantomData;
use futures::{Async, Poll, Future, Stream}; use futures::{Async, Poll, Future, Stream};
@ -14,21 +15,23 @@ use h1writer::{Writer, WriterState};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use middleware::{Middleware, Finished, Started, Response}; use middleware::{Middleware, Finished, Started, Response};
use application::Inner;
type Handler<S> = FnMut(HttpRequest<S>) -> Reply; pub trait PipelineHandler<S> {
pub(crate) type PipelineHandler<'a, S> = &'a mut FnMut(HttpRequest<S>) -> Reply; fn handle(&mut self, req: HttpRequest<S>) -> Reply;
}
pub struct Pipeline<S>(PipelineInfo<S>, PipelineState<S>); pub struct Pipeline<S, H>(PipelineInfo<S>, PipelineState<S, H>);
enum PipelineState<S> { enum PipelineState<S, H> {
None, None,
Error, Error,
Starting(StartMiddlewares<S>), Starting(StartMiddlewares<S, H>),
Handler(WaitingResponse<S>), Handler(WaitingResponse<S, H>),
RunMiddlewares(RunMiddlewares<S>), RunMiddlewares(RunMiddlewares<S, H>),
Response(ProcessResponse<S>), Response(ProcessResponse<S, H>),
Finishing(FinishingMiddlewares<S>), Finishing(FinishingMiddlewares<S, H>),
Completed(Completed<S>), Completed(Completed<S, H>),
} }
struct PipelineInfo<S> { struct PipelineInfo<S> {
@ -75,11 +78,11 @@ enum PipelineResponse {
Response(Box<Future<Item=HttpResponse, Error=Error>>), Response(Box<Future<Item=HttpResponse, Error=Error>>),
} }
impl<S> Pipeline<S> { impl<S, H: PipelineHandler<S>> Pipeline<S, H> {
pub fn new(req: HttpRequest<S>, pub fn new(req: HttpRequest<S>,
mws: Rc<Vec<Box<Middleware<S>>>>, mws: Rc<Vec<Box<Middleware<S>>>>,
handler: PipelineHandler<S>) -> Pipeline<S> handler: Rc<RefCell<H>>) -> Pipeline<S, H>
{ {
let mut info = PipelineInfo { let mut info = PipelineInfo {
req: req, req: req,
@ -94,15 +97,14 @@ impl<S> Pipeline<S> {
} }
} }
impl Pipeline<()> { impl Pipeline<(), Inner<()>> {
pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> { pub fn error<R: Into<HttpResponse>>(err: R) -> Box<HttpHandlerTask> {
Box::new(Pipeline( Box::new(Pipeline::<(), Inner<()>>(
PipelineInfo::new( PipelineInfo::new(HttpRequest::default()), ProcessResponse::init(err.into())))
HttpRequest::default()), ProcessResponse::init(err.into())))
} }
} }
impl<S> Pipeline<S> { impl<S, H> Pipeline<S, H> {
fn is_done(&self) -> bool { fn is_done(&self) -> bool {
match self.1 { match self.1 {
@ -115,7 +117,7 @@ impl<S> Pipeline<S> {
} }
} }
impl<S> HttpHandlerTask for Pipeline<S> { impl<S, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
fn disconnected(&mut self) { fn disconnected(&mut self) {
if let Some(ref mut context) = self.0.context { if let Some(ref mut context) = self.0.context {
@ -274,20 +276,22 @@ impl<S> HttpHandlerTask for Pipeline<S> {
type Fut = Box<Future<Item=Option<HttpResponse>, Error=Error>>; type Fut = Box<Future<Item=Option<HttpResponse>, Error=Error>>;
/// Middlewares start executor /// Middlewares start executor
struct StartMiddlewares<S> { struct StartMiddlewares<S, H> {
hnd: *mut Handler<S>, hnd: Rc<RefCell<H>>,
fut: Option<Fut>, fut: Option<Fut>,
_s: PhantomData<S>,
} }
impl<S> StartMiddlewares<S> { impl<S, H: PipelineHandler<S>> StartMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, handler: PipelineHandler<S>) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>, handler: Rc<RefCell<H>>) -> PipelineState<S, H>
{
// execute middlewares, we need this stage because middlewares could be non-async // execute middlewares, we need this stage because middlewares could be non-async
// and we can move to next state immidietly // and we can move to next state immidietly
let len = info.mws.len(); let len = info.mws.len();
loop { loop {
if info.count == len { if info.count == len {
let reply = (&mut *handler)(info.req.clone()); let reply = handler.borrow_mut().handle(info.req.clone());
return WaitingResponse::init(info, reply) return WaitingResponse::init(info, reply)
} else { } else {
match info.mws[info.count].start(&mut info.req) { match info.mws[info.count].start(&mut info.req) {
@ -299,8 +303,9 @@ impl<S> StartMiddlewares<S> {
match fut.poll() { match fut.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady) =>
return PipelineState::Starting(StartMiddlewares { return PipelineState::Starting(StartMiddlewares {
hnd: handler as *const _ as *mut _, hnd: handler,
fut: Some(fut)}), fut: Some(fut),
_s: PhantomData}),
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
if let Some(resp) = resp { if let Some(resp) = resp {
return RunMiddlewares::init(info, resp); return RunMiddlewares::init(info, resp);
@ -317,7 +322,8 @@ impl<S> StartMiddlewares<S> {
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
{
let len = info.mws.len(); let len = info.mws.len();
'outer: loop { 'outer: loop {
match self.fut.as_mut().unwrap().poll() { match self.fut.as_mut().unwrap().poll() {
@ -329,7 +335,7 @@ impl<S> StartMiddlewares<S> {
return Ok(RunMiddlewares::init(info, resp)); return Ok(RunMiddlewares::init(info, resp));
} }
if info.count == len { if info.count == len {
let reply = (unsafe{&mut *self.hnd})(info.req.clone()); let reply = (*self.hnd.borrow_mut()).handle(info.req.clone());
return Ok(WaitingResponse::init(info, reply)); return Ok(WaitingResponse::init(info, reply));
} else { } else {
loop { loop {
@ -357,29 +363,33 @@ impl<S> StartMiddlewares<S> {
} }
// waiting for response // waiting for response
struct WaitingResponse<S> { struct WaitingResponse<S, H> {
stream: PipelineResponse, stream: PipelineResponse,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
impl<S> WaitingResponse<S> { impl<S, H> WaitingResponse<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>, reply: Reply) -> PipelineState<S> fn init(info: &mut PipelineInfo<S>, reply: Reply) -> PipelineState<S, H>
{ {
match reply.into() { match reply.into() {
ReplyItem::Message(resp) => ReplyItem::Message(resp) =>
RunMiddlewares::init(info, resp), RunMiddlewares::init(info, resp),
ReplyItem::Actor(ctx) => ReplyItem::Actor(ctx) =>
PipelineState::Handler( PipelineState::Handler(
WaitingResponse { stream: PipelineResponse::Context(ctx), _s: PhantomData }), WaitingResponse { stream: PipelineResponse::Context(ctx),
_s: PhantomData, _h: PhantomData }),
ReplyItem::Future(fut) => ReplyItem::Future(fut) =>
PipelineState::Handler( PipelineState::Handler(
WaitingResponse { stream: PipelineResponse::Response(fut), _s: PhantomData }), WaitingResponse { stream: PipelineResponse::Response(fut),
_s: PhantomData, _h: PhantomData }),
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
{
let stream = mem::replace(&mut self.stream, PipelineResponse::None); let stream = mem::replace(&mut self.stream, PipelineResponse::None);
match stream { match stream {
@ -430,15 +440,16 @@ impl<S> WaitingResponse<S> {
} }
/// Middlewares response executor /// Middlewares response executor
struct RunMiddlewares<S> { struct RunMiddlewares<S, H> {
curr: usize, curr: usize,
fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>, fut: Option<Box<Future<Item=HttpResponse, Error=Error>>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
impl<S> RunMiddlewares<S> { impl<S, H> RunMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S> fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S, H>
{ {
if info.count == 0 { if info.count == 0 {
return ProcessResponse::init(resp); return ProcessResponse::init(resp);
@ -462,20 +473,23 @@ impl<S> RunMiddlewares<S> {
}, },
Response::Future(fut) => { Response::Future(fut) => {
return PipelineState::RunMiddlewares( return PipelineState::RunMiddlewares(
RunMiddlewares { curr: curr, fut: Some(fut), _s: PhantomData }) RunMiddlewares { curr: curr, fut: Some(fut),
_s: PhantomData, _h: PhantomData })
}, },
}; };
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S,H>, PipelineState<S, H>>
{
let len = info.mws.len(); let len = info.mws.len();
loop { loop {
// poll latest fut // poll latest fut
let mut resp = match self.fut.as_mut().unwrap().poll() { let mut resp = match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady) => {
return Ok(PipelineState::RunMiddlewares(self)), return Err(PipelineState::RunMiddlewares(self))
}
Ok(Async::Ready(resp)) => { Ok(Async::Ready(resp)) => {
self.curr += 1; self.curr += 1;
resp resp
@ -506,12 +520,13 @@ impl<S> RunMiddlewares<S> {
} }
} }
struct ProcessResponse<S> { struct ProcessResponse<S, H> {
resp: HttpResponse, resp: HttpResponse,
iostate: IOState, iostate: IOState,
running: RunningState, running: RunningState,
drain: Option<oneshot::Sender<()>>, drain: Option<oneshot::Sender<()>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
#[derive(PartialEq)] #[derive(PartialEq)]
@ -543,21 +558,21 @@ enum IOState {
Done, Done,
} }
impl<S> ProcessResponse<S> { impl<S, H> ProcessResponse<S, H> {
#[inline] #[inline]
fn init(resp: HttpResponse) -> PipelineState<S> fn init(resp: HttpResponse) -> PipelineState<S, H>
{ {
PipelineState::Response( PipelineState::Response(
ProcessResponse{ resp: resp, ProcessResponse{ resp: resp,
iostate: IOState::Response, iostate: IOState::Response,
running: RunningState::Running, running: RunningState::Running,
drain: None, drain: None,
_s: PhantomData}) _s: PhantomData, _h: PhantomData})
} }
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>) fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
-> Result<PipelineState<S>, PipelineState<S>> -> Result<PipelineState<S, H>, PipelineState<S, H>>
{ {
if self.drain.is_none() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
// if task is paused, write buffer is probably full // if task is paused, write buffer is probably full
@ -725,25 +740,28 @@ impl<S> ProcessResponse<S> {
} }
/// Middlewares start executor /// Middlewares start executor
struct FinishingMiddlewares<S> { struct FinishingMiddlewares<S, H> {
resp: HttpResponse, resp: HttpResponse,
fut: Option<Box<Future<Item=(), Error=Error>>>, fut: Option<Box<Future<Item=(), Error=Error>>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>,
} }
impl<S> FinishingMiddlewares<S> { impl<S, H> FinishingMiddlewares<S, H> {
fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S, H> {
if info.count == 0 { if info.count == 0 {
Completed::init(info) Completed::init(info)
} else { } else {
match (FinishingMiddlewares{resp: resp, fut: None, _s: PhantomData}).poll(info) { match (FinishingMiddlewares{resp: resp, fut: None,
_s: PhantomData, _h: PhantomData}).poll(info) {
Ok(st) | Err(st) => st, Ok(st) | Err(st) => st,
} }
} }
} }
fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(mut self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>>
{
loop { loop {
// poll latest fut // poll latest fut
let not_ready = if let Some(ref mut fut) = self.fut { let not_ready = if let Some(ref mut fut) = self.fut {
@ -782,24 +800,26 @@ impl<S> FinishingMiddlewares<S> {
} }
} }
struct Completed<S>(PhantomData<S>); struct Completed<S, H>(PhantomData<S>, PhantomData<H>);
impl<S> Completed<S> { impl<S, H> Completed<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S> { fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> {
if info.context.is_none() { if info.context.is_none() {
PipelineState::None PipelineState::None
} else { } else {
PipelineState::Completed(Completed(PhantomData)) PipelineState::Completed(Completed(PhantomData, PhantomData))
} }
} }
#[inline] #[inline]
fn poll(self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S>, PipelineState<S>> { fn poll(self, info: &mut PipelineInfo<S>) -> Result<PipelineState<S, H>, PipelineState<S, H>> {
match info.poll_context() { match info.poll_context() {
Ok(Async::NotReady) => Ok(PipelineState::Completed(Completed(PhantomData))), Ok(Async::NotReady) =>
Ok(Async::Ready(())) => Ok(PipelineState::None), Ok(PipelineState::Completed(Completed(PhantomData, PhantomData))),
Ok(Async::Ready(())) =>
Ok(PipelineState::None),
Err(_) => Ok(PipelineState::Error), Err(_) => Ok(PipelineState::Error),
} }
} }
@ -813,11 +833,11 @@ mod tests {
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use futures::future::{lazy, result}; use futures::future::{lazy, result};
impl<S> PipelineState<S> { impl<S, H> PipelineState<S, H> {
fn is_none(&self) -> Option<bool> { fn is_none(&self) -> Option<bool> {
if let PipelineState::None = *self { Some(true) } else { None } if let PipelineState::None = *self { Some(true) } else { None }
} }
fn completed(self) -> Option<Completed<S>> { fn completed(self) -> Option<Completed<S, H>> {
if let PipelineState::Completed(c) = self { Some(c) } else { None } if let PipelineState::Completed(c) = self { Some(c) } else { None }
} }
} }
@ -831,14 +851,14 @@ mod tests {
fn test_completed() { fn test_completed() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let mut info = PipelineInfo::new(HttpRequest::default()); let mut info = PipelineInfo::new(HttpRequest::default());
Completed::init(&mut info).is_none().unwrap(); Completed::<(), Inner<()>>::init(&mut info).is_none().unwrap();
let req = HttpRequest::default(); let req = HttpRequest::default();
let mut ctx = HttpContext::new(req.clone(), MyActor); let mut ctx = HttpContext::new(req.clone(), MyActor);
let addr: Address<_> = ctx.address(); let addr: Address<_> = ctx.address();
let mut info = PipelineInfo::new(req); let mut info = PipelineInfo::new(req);
info.context = Some(Box::new(ctx)); info.context = Some(Box::new(ctx));
let mut state = Completed::init(&mut info).completed().unwrap(); let mut state = Completed::<(), Inner<()>>::init(&mut info).completed().unwrap();
let st = state.poll(&mut info).ok().unwrap(); let st = state.poll(&mut info).ok().unwrap();
let pp = Pipeline(info, st); let pp = Pipeline(info, st);

View File

@ -54,8 +54,11 @@ impl Router {
srv: ServerSettings::default() })), resources) srv: ServerSettings::default() })), resources)
} }
#[allow(mutable_transmutes)]
pub(crate) fn set_server_settings(&mut self, settings: ServerSettings) { pub(crate) fn set_server_settings(&mut self, settings: ServerSettings) {
Rc::get_mut(&mut self.0).unwrap().srv = settings; let inner: &Inner = self.0.as_ref();
let inner: &mut Inner = unsafe{mem::transmute(inner)};
inner.srv = settings;
} }
/// Router prefix /// Router prefix