From 26f37ec2e3a804c2f2087200146964ffe6780baf Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 18 Jun 2018 05:45:54 +0600 Subject: [PATCH] refactor HttpHandlerTask trait --- src/application.rs | 64 +++++++++++++++++++++---------------------- src/pipeline.rs | 11 +++++--- src/server/channel.rs | 2 +- src/server/h1.rs | 48 +++++++++++++++++++++++++------- src/server/h2.rs | 42 +++++++++++++++++++++++----- src/server/mod.rs | 19 ++++++++++--- 6 files changed, 128 insertions(+), 58 deletions(-) diff --git a/src/application.rs b/src/application.rs index 5555ee396..e53382de4 100644 --- a/src/application.rs +++ b/src/application.rs @@ -26,7 +26,8 @@ pub struct HttpApplication { middlewares: Rc>>>>, } -pub(crate) struct Inner { +#[doc(hidden)] +pub struct Inner { prefix: usize, default: ResourceHandler, encoding: ContentEncoding, @@ -136,7 +137,11 @@ impl HttpApplication { } impl HttpHandler for HttpApplication { - fn handle(&mut self, req: HttpRequest) -> Result, HttpRequest> { + type Task = Pipeline>; + + fn handle( + &mut self, req: HttpRequest, + ) -> Result>, HttpRequest> { let m = { let path = req.path(); path.starts_with(&self.prefix) @@ -157,12 +162,7 @@ impl HttpHandler for HttpApplication { let tp = self.get_handler(&mut req2); let inner = Rc::clone(&self.inner); - Ok(Box::new(Pipeline::new( - req2, - Rc::clone(&self.middlewares), - inner, - tp, - ))) + Ok(Pipeline::new(req2, Rc::clone(&self.middlewares), inner, tp)) } else { Err(req) } @@ -679,8 +679,23 @@ where /// # }); /// } /// ``` - pub fn boxed(mut self) -> Box { - Box::new(self.finish()) + pub fn boxed(mut self) -> Box>> { + Box::new(BoxedApplication { app: self.finish() }) + } +} + +struct BoxedApplication { + app: HttpApplication, +} + +impl HttpHandler for BoxedApplication { + type Task = Box; + + fn handle(&mut self, req: HttpRequest) -> Result { + self.app.handle(req).map(|t| { + let task: Self::Task = Box::new(t); + task + }) } } @@ -798,9 +813,7 @@ mod tests { #[test] fn test_handler() { - let mut app = App::new() - .handler("/test", |_| HttpResponse::Ok()) - .finish(); + let mut app = App::new().handler("/test", |_| HttpResponse::Ok()).finish(); let req = TestRequest::with_uri("/test").finish(); let resp = app.run(req); @@ -825,9 +838,7 @@ mod tests { #[test] fn test_handler2() { - let mut app = App::new() - .handler("test", |_| HttpResponse::Ok()) - .finish(); + let mut app = App::new().handler("test", |_| HttpResponse::Ok()).finish(); let req = TestRequest::with_uri("/test").finish(); let resp = app.run(req); @@ -881,29 +892,21 @@ mod tests { #[test] fn test_route() { let mut app = App::new() - .route("/test", Method::GET, |_: HttpRequest| { - HttpResponse::Ok() - }) + .route("/test", Method::GET, |_: HttpRequest| HttpResponse::Ok()) .route("/test", Method::POST, |_: HttpRequest| { HttpResponse::Created() }) .finish(); - let req = TestRequest::with_uri("/test") - .method(Method::GET) - .finish(); + let req = TestRequest::with_uri("/test").method(Method::GET).finish(); let resp = app.run(req); assert_eq!(resp.as_msg().status(), StatusCode::OK); - let req = TestRequest::with_uri("/test") - .method(Method::POST) - .finish(); + let req = TestRequest::with_uri("/test").method(Method::POST).finish(); let resp = app.run(req); assert_eq!(resp.as_msg().status(), StatusCode::CREATED); - let req = TestRequest::with_uri("/test") - .method(Method::HEAD) - .finish(); + let req = TestRequest::with_uri("/test").method(Method::HEAD).finish(); let resp = app.run(req); assert_eq!(resp.as_msg().status(), StatusCode::NOT_FOUND); } @@ -972,9 +975,6 @@ mod tests { let req = TestRequest::with_uri("/some").finish(); let resp = app.run(req); assert_eq!(resp.as_msg().status(), StatusCode::OK); - assert_eq!( - resp.as_msg().body(), - &Body::Binary(Binary::Slice(b"some")) - ); + assert_eq!(resp.as_msg().body(), &Body::Binary(Binary::Slice(b"some"))); } } diff --git a/src/pipeline.rs b/src/pipeline.rs index e5eb51d51..d08a739dd 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -18,14 +18,16 @@ use httpresponse::HttpResponse; use middleware::{Finished, Middleware, Response, Started}; use server::{HttpHandlerTask, Writer, WriterState}; +#[doc(hidden)] #[derive(Debug, Clone, Copy)] -pub(crate) enum HandlerType { +pub enum HandlerType { Normal(usize), Handler(usize), Default, } -pub(crate) trait PipelineHandler { +#[doc(hidden)] +pub trait PipelineHandler { fn encoding(&self) -> ContentEncoding; fn handle( @@ -33,7 +35,8 @@ pub(crate) trait PipelineHandler { ) -> AsyncResult; } -pub(crate) struct Pipeline(PipelineInfo, PipelineState); +#[doc(hidden)] +pub struct Pipeline(PipelineInfo, PipelineState); enum PipelineState { None, @@ -207,7 +210,7 @@ impl> HttpHandlerTask for Pipeline { } } - fn poll(&mut self) -> Poll<(), Error> { + fn poll_completed(&mut self) -> Poll<(), Error> { let info: &mut PipelineInfo<_> = unsafe { &mut *(&mut self.0 as *mut _) }; loop { diff --git a/src/server/channel.rs b/src/server/channel.rs index 34f6733c0..a38ecc936 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -11,7 +11,7 @@ use super::{h1, h2, utils, HttpHandler, IoStream}; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; -enum HttpProtocol { +enum HttpProtocol { H1(h1::Http1), H2(h2::Http2), Unknown(Rc>, Option, T, BytesMut), diff --git a/src/server/h1.rs b/src/server/h1.rs index 689d64dc2..ababda6b4 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -8,7 +8,7 @@ use bytes::{BufMut, BytesMut}; use futures::{Async, Future, Poll}; use tokio_timer::Delay; -use error::PayloadError; +use error::{Error, PayloadError}; use httprequest::HttpRequest; use httpresponse::HttpResponse; use payload::{Payload, PayloadStatus, PayloadWriter}; @@ -43,7 +43,7 @@ bitflags! { } } -pub(crate) struct Http1 { +pub(crate) struct Http1 { flags: Flags, settings: Rc>, addr: Option, @@ -51,12 +51,38 @@ pub(crate) struct Http1 { decoder: H1Decoder, payload: Option, buf: BytesMut, - tasks: VecDeque, + tasks: VecDeque>, keepalive_timer: Option, } -struct Entry { - pipe: Box, +enum EntryPipe { + Task(H::Task), + Error(Box), +} + +impl EntryPipe { + fn disconnected(&mut self) { + match *self { + EntryPipe::Task(ref mut task) => task.disconnected(), + EntryPipe::Error(ref mut task) => task.disconnected(), + } + } + fn poll_io(&mut self, io: &mut Writer) -> Poll { + match *self { + EntryPipe::Task(ref mut task) => task.poll_io(io), + EntryPipe::Error(ref mut task) => task.poll_io(io), + } + } + fn poll_completed(&mut self) -> Poll<(), Error> { + match *self { + EntryPipe::Task(ref mut task) => task.poll_completed(), + EntryPipe::Error(ref mut task) => task.poll_completed(), + } + } +} + +struct Entry { + pipe: EntryPipe, flags: EntryFlags, } @@ -181,7 +207,7 @@ where let mut io = false; let mut idx = 0; while idx < self.tasks.len() { - let item: &mut Entry = unsafe { &mut *(&mut self.tasks[idx] as *mut _) }; + let item: &mut Entry = unsafe { &mut *(&mut self.tasks[idx] as *mut _) }; // only one task can do io operation in http/1 if !io && !item.flags.contains(EntryFlags::EOF) { @@ -232,7 +258,7 @@ where } } } else if !item.flags.contains(EntryFlags::FINISHED) { - match item.pipe.poll() { + match item.pipe.poll_completed() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED), Err(err) => { @@ -342,7 +368,7 @@ where if !ready { let item = Entry { - pipe, + pipe: EntryPipe::Task(pipe), flags: EntryFlags::EOF, }; self.tasks.push_back(item); @@ -358,7 +384,7 @@ where } } self.tasks.push_back(Entry { - pipe, + pipe: EntryPipe::Task(pipe), flags: EntryFlags::empty(), }); continue 'outer; @@ -369,7 +395,9 @@ where // handler is not found self.tasks.push_back(Entry { - pipe: Pipeline::error(HttpResponse::NotFound()), + pipe: EntryPipe::Error( + Pipeline::error(HttpResponse::NotFound()), + ), flags: EntryFlags::empty(), }); } diff --git a/src/server/h2.rs b/src/server/h2.rs index e194bc7d2..993376efc 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -15,7 +15,7 @@ use modhttp::request::Parts; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; -use error::PayloadError; +use error::{Error, PayloadError}; use httpmessage::HttpMessage; use httprequest::HttpRequest; use httpresponse::HttpResponse; @@ -38,7 +38,7 @@ bitflags! { pub(crate) struct Http2 where T: AsyncRead + AsyncWrite + 'static, - H: 'static, + H: HttpHandler + 'static, { flags: Flags, settings: Rc>, @@ -142,7 +142,7 @@ where break; } } else if !item.flags.contains(EntryFlags::FINISHED) { - match item.task.poll() { + match item.task.poll_completed() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { not_ready = false; @@ -288,15 +288,41 @@ bitflags! { } } -struct Entry { - task: Box, +enum EntryPipe { + Task(H::Task), + Error(Box), +} + +impl EntryPipe { + fn disconnected(&mut self) { + match *self { + EntryPipe::Task(ref mut task) => task.disconnected(), + EntryPipe::Error(ref mut task) => task.disconnected(), + } + } + fn poll_io(&mut self, io: &mut Writer) -> Poll { + match *self { + EntryPipe::Task(ref mut task) => task.poll_io(io), + EntryPipe::Error(ref mut task) => task.poll_io(io), + } + } + fn poll_completed(&mut self) -> Poll<(), Error> { + match *self { + EntryPipe::Task(ref mut task) => task.poll_completed(), + EntryPipe::Error(ref mut task) => task.poll_completed(), + } + } +} + +struct Entry { + task: EntryPipe, payload: PayloadType, recv: RecvStream, stream: H2Writer, flags: EntryFlags, } -impl Entry { +impl Entry { fn new( parts: Parts, recv: RecvStream, resp: SendResponse, addr: Option, settings: &Rc>, @@ -333,7 +359,9 @@ impl Entry { } Entry { - task: task.unwrap_or_else(|| Pipeline::error(HttpResponse::NotFound())), + task: task.map(EntryPipe::Task).unwrap_or_else(|| { + EntryPipe::Error(Pipeline::error(HttpResponse::NotFound())) + }), payload: psender, stream: H2Writer::new( resp, diff --git a/src/server/mod.rs b/src/server/mod.rs index 82ba24e25..91b4d1fa9 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -122,20 +122,25 @@ impl Message for StopServer { /// Low level http request handler #[allow(unused_variables)] pub trait HttpHandler: 'static { + /// Request handling task + type Task: HttpHandlerTask; + /// Handle request - fn handle(&mut self, req: HttpRequest) -> Result, HttpRequest>; + fn handle(&mut self, req: HttpRequest) -> Result; } -impl HttpHandler for Box { +impl HttpHandler for Box>> { + type Task = Box; + fn handle(&mut self, req: HttpRequest) -> Result, HttpRequest> { self.as_mut().handle(req) } } -#[doc(hidden)] +/// Low level http request handler pub trait HttpHandlerTask { /// Poll task, this method is used before or after *io* object is available - fn poll(&mut self) -> Poll<(), Error> { + fn poll_completed(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } @@ -146,6 +151,12 @@ pub trait HttpHandlerTask { fn disconnected(&mut self) {} } +impl HttpHandlerTask for Box { + fn poll_io(&mut self, io: &mut Writer) -> Poll { + self.as_mut().poll_io(io) + } +} + /// Conversion helper trait pub trait IntoHttpHandler { /// The associated type which is result of conversion.