1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

refactor HttpHandlerTask trait

This commit is contained in:
Nikolay Kim 2018-06-18 05:45:54 +06:00
parent ef15646bd7
commit 26f37ec2e3
6 changed files with 128 additions and 58 deletions

View File

@ -26,7 +26,8 @@ pub struct HttpApplication<S = ()> {
middlewares: Rc<RefCell<Vec<Box<Middleware<S>>>>>, middlewares: Rc<RefCell<Vec<Box<Middleware<S>>>>>,
} }
pub(crate) struct Inner<S> { #[doc(hidden)]
pub struct Inner<S> {
prefix: usize, prefix: usize,
default: ResourceHandler<S>, default: ResourceHandler<S>,
encoding: ContentEncoding, encoding: ContentEncoding,
@ -136,7 +137,11 @@ impl<S: 'static> HttpApplication<S> {
} }
impl<S: 'static> HttpHandler for HttpApplication<S> { impl<S: 'static> HttpHandler for HttpApplication<S> {
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> { type Task = Pipeline<S, Inner<S>>;
fn handle(
&mut self, req: HttpRequest,
) -> Result<Pipeline<S, Inner<S>>, HttpRequest> {
let m = { let m = {
let path = req.path(); let path = req.path();
path.starts_with(&self.prefix) path.starts_with(&self.prefix)
@ -157,12 +162,7 @@ impl<S: 'static> HttpHandler for HttpApplication<S> {
let tp = self.get_handler(&mut req2); let tp = self.get_handler(&mut req2);
let inner = Rc::clone(&self.inner); let inner = Rc::clone(&self.inner);
Ok(Box::new(Pipeline::new( Ok(Pipeline::new(req2, Rc::clone(&self.middlewares), inner, tp))
req2,
Rc::clone(&self.middlewares),
inner,
tp,
)))
} else { } else {
Err(req) Err(req)
} }
@ -679,8 +679,23 @@ where
/// # }); /// # });
/// } /// }
/// ``` /// ```
pub fn boxed(mut self) -> Box<HttpHandler> { pub fn boxed(mut self) -> Box<HttpHandler<Task = Box<HttpHandlerTask>>> {
Box::new(self.finish()) Box::new(BoxedApplication { app: self.finish() })
}
}
struct BoxedApplication<S> {
app: HttpApplication<S>,
}
impl<S: 'static> HttpHandler for BoxedApplication<S> {
type Task = Box<HttpHandlerTask>;
fn handle(&mut self, req: HttpRequest) -> Result<Self::Task, HttpRequest> {
self.app.handle(req).map(|t| {
let task: Self::Task = Box::new(t);
task
})
} }
} }
@ -798,9 +813,7 @@ mod tests {
#[test] #[test]
fn test_handler() { fn test_handler() {
let mut app = App::new() let mut app = App::new().handler("/test", |_| HttpResponse::Ok()).finish();
.handler("/test", |_| HttpResponse::Ok())
.finish();
let req = TestRequest::with_uri("/test").finish(); let req = TestRequest::with_uri("/test").finish();
let resp = app.run(req); let resp = app.run(req);
@ -825,9 +838,7 @@ mod tests {
#[test] #[test]
fn test_handler2() { fn test_handler2() {
let mut app = App::new() let mut app = App::new().handler("test", |_| HttpResponse::Ok()).finish();
.handler("test", |_| HttpResponse::Ok())
.finish();
let req = TestRequest::with_uri("/test").finish(); let req = TestRequest::with_uri("/test").finish();
let resp = app.run(req); let resp = app.run(req);
@ -881,29 +892,21 @@ mod tests {
#[test] #[test]
fn test_route() { fn test_route() {
let mut app = App::new() let mut app = App::new()
.route("/test", Method::GET, |_: HttpRequest| { .route("/test", Method::GET, |_: HttpRequest| HttpResponse::Ok())
HttpResponse::Ok()
})
.route("/test", Method::POST, |_: HttpRequest| { .route("/test", Method::POST, |_: HttpRequest| {
HttpResponse::Created() HttpResponse::Created()
}) })
.finish(); .finish();
let req = TestRequest::with_uri("/test") let req = TestRequest::with_uri("/test").method(Method::GET).finish();
.method(Method::GET)
.finish();
let resp = app.run(req); let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::OK); assert_eq!(resp.as_msg().status(), StatusCode::OK);
let req = TestRequest::with_uri("/test") let req = TestRequest::with_uri("/test").method(Method::POST).finish();
.method(Method::POST)
.finish();
let resp = app.run(req); let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::CREATED); assert_eq!(resp.as_msg().status(), StatusCode::CREATED);
let req = TestRequest::with_uri("/test") let req = TestRequest::with_uri("/test").method(Method::HEAD).finish();
.method(Method::HEAD)
.finish();
let resp = app.run(req); let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND); assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND);
} }
@ -972,9 +975,6 @@ mod tests {
let req = TestRequest::with_uri("/some").finish(); let req = TestRequest::with_uri("/some").finish();
let resp = app.run(req); let resp = app.run(req);
assert_eq!(resp.as_msg().status(), StatusCode::OK); assert_eq!(resp.as_msg().status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.as_msg().body(), &Body::Binary(Binary::Slice(b"some")));
resp.as_msg().body(),
&Body::Binary(Binary::Slice(b"some"))
);
} }
} }

View File

@ -18,14 +18,16 @@ use httpresponse::HttpResponse;
use middleware::{Finished, Middleware, Response, Started}; use middleware::{Finished, Middleware, Response, Started};
use server::{HttpHandlerTask, Writer, WriterState}; use server::{HttpHandlerTask, Writer, WriterState};
#[doc(hidden)]
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub(crate) enum HandlerType { pub enum HandlerType {
Normal(usize), Normal(usize),
Handler(usize), Handler(usize),
Default, Default,
} }
pub(crate) trait PipelineHandler<S> { #[doc(hidden)]
pub trait PipelineHandler<S> {
fn encoding(&self) -> ContentEncoding; fn encoding(&self) -> ContentEncoding;
fn handle( fn handle(
@ -33,7 +35,8 @@ pub(crate) trait PipelineHandler<S> {
) -> AsyncResult<HttpResponse>; ) -> AsyncResult<HttpResponse>;
} }
pub(crate) struct Pipeline<S, H>(PipelineInfo<S>, PipelineState<S, H>); #[doc(hidden)]
pub struct Pipeline<S, H>(PipelineInfo<S>, PipelineState<S, H>);
enum PipelineState<S, H> { enum PipelineState<S, H> {
None, None,
@ -207,7 +210,7 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
} }
} }
fn poll(&mut self) -> Poll<(), Error> { fn poll_completed(&mut self) -> Poll<(), Error> {
let info: &mut PipelineInfo<_> = unsafe { &mut *(&mut self.0 as *mut _) }; let info: &mut PipelineInfo<_> = unsafe { &mut *(&mut self.0 as *mut _) };
loop { loop {

View File

@ -11,7 +11,7 @@ use super::{h1, h2, utils, HttpHandler, IoStream};
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
enum HttpProtocol<T: IoStream, H: 'static> { enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> {
H1(h1::Http1<T, H>), H1(h1::Http1<T, H>),
H2(h2::Http2<T, H>), H2(h2::Http2<T, H>),
Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut), Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut),

View File

@ -8,7 +8,7 @@ use bytes::{BufMut, BytesMut};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use tokio_timer::Delay; use tokio_timer::Delay;
use error::PayloadError; use error::{Error, PayloadError};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use payload::{Payload, PayloadStatus, PayloadWriter}; use payload::{Payload, PayloadStatus, PayloadWriter};
@ -43,7 +43,7 @@ bitflags! {
} }
} }
pub(crate) struct Http1<T: IoStream, H: 'static> { pub(crate) struct Http1<T: IoStream, H: HttpHandler + 'static> {
flags: Flags, flags: Flags,
settings: Rc<WorkerSettings<H>>, settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
@ -51,12 +51,38 @@ pub(crate) struct Http1<T: IoStream, H: 'static> {
decoder: H1Decoder, decoder: H1Decoder,
payload: Option<PayloadType>, payload: Option<PayloadType>,
buf: BytesMut, buf: BytesMut,
tasks: VecDeque<Entry>, tasks: VecDeque<Entry<H>>,
keepalive_timer: Option<Delay>, keepalive_timer: Option<Delay>,
} }
struct Entry { enum EntryPipe<H: HttpHandler> {
pipe: Box<HttpHandlerTask>, Task(H::Task),
Error(Box<HttpHandlerTask>),
}
impl<H: HttpHandler> EntryPipe<H> {
fn disconnected(&mut self) {
match *self {
EntryPipe::Task(ref mut task) => task.disconnected(),
EntryPipe::Error(ref mut task) => task.disconnected(),
}
}
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
match *self {
EntryPipe::Task(ref mut task) => task.poll_io(io),
EntryPipe::Error(ref mut task) => task.poll_io(io),
}
}
fn poll_completed(&mut self) -> Poll<(), Error> {
match *self {
EntryPipe::Task(ref mut task) => task.poll_completed(),
EntryPipe::Error(ref mut task) => task.poll_completed(),
}
}
}
struct Entry<H: HttpHandler> {
pipe: EntryPipe<H>,
flags: EntryFlags, flags: EntryFlags,
} }
@ -181,7 +207,7 @@ where
let mut io = false; let mut io = false;
let mut idx = 0; let mut idx = 0;
while idx < self.tasks.len() { while idx < self.tasks.len() {
let item: &mut Entry = unsafe { &mut *(&mut self.tasks[idx] as *mut _) }; let item: &mut Entry<H> = unsafe { &mut *(&mut self.tasks[idx] as *mut _) };
// only one task can do io operation in http/1 // only one task can do io operation in http/1
if !io && !item.flags.contains(EntryFlags::EOF) { if !io && !item.flags.contains(EntryFlags::EOF) {
@ -232,7 +258,7 @@ where
} }
} }
} else if !item.flags.contains(EntryFlags::FINISHED) { } else if !item.flags.contains(EntryFlags::FINISHED) {
match item.pipe.poll() { match item.pipe.poll_completed() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED), Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED),
Err(err) => { Err(err) => {
@ -342,7 +368,7 @@ where
if !ready { if !ready {
let item = Entry { let item = Entry {
pipe, pipe: EntryPipe::Task(pipe),
flags: EntryFlags::EOF, flags: EntryFlags::EOF,
}; };
self.tasks.push_back(item); self.tasks.push_back(item);
@ -358,7 +384,7 @@ where
} }
} }
self.tasks.push_back(Entry { self.tasks.push_back(Entry {
pipe, pipe: EntryPipe::Task(pipe),
flags: EntryFlags::empty(), flags: EntryFlags::empty(),
}); });
continue 'outer; continue 'outer;
@ -369,7 +395,9 @@ where
// handler is not found // handler is not found
self.tasks.push_back(Entry { self.tasks.push_back(Entry {
pipe: Pipeline::error(HttpResponse::NotFound()), pipe: EntryPipe::Error(
Pipeline::error(HttpResponse::NotFound()),
),
flags: EntryFlags::empty(), flags: EntryFlags::empty(),
}); });
} }

View File

@ -15,7 +15,7 @@ use modhttp::request::Parts;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay; use tokio_timer::Delay;
use error::PayloadError; use error::{Error, PayloadError};
use httpmessage::HttpMessage; use httpmessage::HttpMessage;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
@ -38,7 +38,7 @@ bitflags! {
pub(crate) struct Http2<T, H> pub(crate) struct Http2<T, H>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
H: 'static, H: HttpHandler + 'static,
{ {
flags: Flags, flags: Flags,
settings: Rc<WorkerSettings<H>>, settings: Rc<WorkerSettings<H>>,
@ -142,7 +142,7 @@ where
break; break;
} }
} else if !item.flags.contains(EntryFlags::FINISHED) { } else if !item.flags.contains(EntryFlags::FINISHED) {
match item.task.poll() { match item.task.poll_completed() {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
not_ready = false; not_ready = false;
@ -288,15 +288,41 @@ bitflags! {
} }
} }
struct Entry<H: 'static> { enum EntryPipe<H: HttpHandler> {
task: Box<HttpHandlerTask>, Task(H::Task),
Error(Box<HttpHandlerTask>),
}
impl<H: HttpHandler> EntryPipe<H> {
fn disconnected(&mut self) {
match *self {
EntryPipe::Task(ref mut task) => task.disconnected(),
EntryPipe::Error(ref mut task) => task.disconnected(),
}
}
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
match *self {
EntryPipe::Task(ref mut task) => task.poll_io(io),
EntryPipe::Error(ref mut task) => task.poll_io(io),
}
}
fn poll_completed(&mut self) -> Poll<(), Error> {
match *self {
EntryPipe::Task(ref mut task) => task.poll_completed(),
EntryPipe::Error(ref mut task) => task.poll_completed(),
}
}
}
struct Entry<H: HttpHandler + 'static> {
task: EntryPipe<H>,
payload: PayloadType, payload: PayloadType,
recv: RecvStream, recv: RecvStream,
stream: H2Writer<H>, stream: H2Writer<H>,
flags: EntryFlags, flags: EntryFlags,
} }
impl<H: 'static> Entry<H> { impl<H: HttpHandler + 'static> Entry<H> {
fn new( fn new(
parts: Parts, recv: RecvStream, resp: SendResponse<Bytes>, parts: Parts, recv: RecvStream, resp: SendResponse<Bytes>,
addr: Option<SocketAddr>, settings: &Rc<WorkerSettings<H>>, addr: Option<SocketAddr>, settings: &Rc<WorkerSettings<H>>,
@ -333,7 +359,9 @@ impl<H: 'static> Entry<H> {
} }
Entry { Entry {
task: task.unwrap_or_else(|| Pipeline::error(HttpResponse::NotFound())), task: task.map(EntryPipe::Task).unwrap_or_else(|| {
EntryPipe::Error(Pipeline::error(HttpResponse::NotFound()))
}),
payload: psender, payload: psender,
stream: H2Writer::new( stream: H2Writer::new(
resp, resp,

View File

@ -122,20 +122,25 @@ impl Message for StopServer {
/// Low level http request handler /// Low level http request handler
#[allow(unused_variables)] #[allow(unused_variables)]
pub trait HttpHandler: 'static { pub trait HttpHandler: 'static {
/// Request handling task
type Task: HttpHandlerTask;
/// Handle request /// Handle request
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest>; fn handle(&mut self, req: HttpRequest) -> Result<Self::Task, HttpRequest>;
} }
impl HttpHandler for Box<HttpHandler> { impl HttpHandler for Box<HttpHandler<Task = Box<HttpHandlerTask>>> {
type Task = Box<HttpHandlerTask>;
fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> { fn handle(&mut self, req: HttpRequest) -> Result<Box<HttpHandlerTask>, HttpRequest> {
self.as_mut().handle(req) self.as_mut().handle(req)
} }
} }
#[doc(hidden)] /// Low level http request handler
pub trait HttpHandlerTask { pub trait HttpHandlerTask {
/// Poll task, this method is used before or after *io* object is available /// Poll task, this method is used before or after *io* object is available
fn poll(&mut self) -> Poll<(), Error> { fn poll_completed(&mut self) -> Poll<(), Error> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
@ -146,6 +151,12 @@ pub trait HttpHandlerTask {
fn disconnected(&mut self) {} fn disconnected(&mut self) {}
} }
impl HttpHandlerTask for Box<HttpHandlerTask> {
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error> {
self.as_mut().poll_io(io)
}
}
/// Conversion helper trait /// Conversion helper trait
pub trait IntoHttpHandler { pub trait IntoHttpHandler {
/// The associated type which is result of conversion. /// The associated type which is result of conversion.