From 6f3e70a92a39501c8655c9c8e45e4004e424efa6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 9 Sep 2018 14:33:45 -0700 Subject: [PATCH] simplify application factory --- src/server/h1.rs | 75 +++++++++++++++++++++++------------------- src/server/h2.rs | 26 +++++---------- src/server/http.rs | 60 ++++++++++----------------------- src/server/mod.rs | 7 ++-- src/server/settings.rs | 10 +++--- src/test.rs | 12 +++---- 6 files changed, 80 insertions(+), 110 deletions(-) diff --git a/src/server/h1.rs b/src/server/h1.rs index 1d2ddbe2..739c6651 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -410,45 +410,52 @@ where self.keepalive_timer.take(); // search handler for request - for h in self.settings.handlers().iter() { - msg = match h.handle(msg) { - Ok(mut pipe) => { - if self.tasks.is_empty() { - match pipe.poll_io(&mut self.stream) { - Ok(Async::Ready(ready)) => { - // override keep-alive state - if self.stream.keepalive() { - self.flags.insert(Flags::KEEPALIVE); - } else { - self.flags.remove(Flags::KEEPALIVE); - } - // prepare stream for next response - self.stream.reset(); + match self.settings.handler().handle(msg) { + Ok(mut pipe) => { + if self.tasks.is_empty() { + match pipe.poll_io(&mut self.stream) { + Ok(Async::Ready(ready)) => { + // override keep-alive state + if self.stream.keepalive() { + self.flags.insert(Flags::KEEPALIVE); + } else { + self.flags.remove(Flags::KEEPALIVE); + } + // prepare stream for next response + self.stream.reset(); - if !ready { - let item = Entry { - pipe: EntryPipe::Task(pipe), - flags: EntryFlags::EOF, - }; - self.tasks.push_back(item); - } - continue 'outer; - } - Ok(Async::NotReady) => {} - Err(err) => { - error!("Unhandled error: {}", err); - self.flags.insert(Flags::ERROR); - return; + if !ready { + let item = Entry { + pipe: EntryPipe::Task(pipe), + flags: EntryFlags::EOF, + }; + self.tasks.push_back(item); } + continue 'outer; + } + Ok(Async::NotReady) => {} + Err(err) => { + error!("Unhandled error: {}", err); + self.flags.insert(Flags::ERROR); + return; } } - self.tasks.push_back(Entry { - pipe: EntryPipe::Task(pipe), - flags: EntryFlags::empty(), - }); - continue 'outer; } - Err(msg) => msg, + self.tasks.push_back(Entry { + pipe: EntryPipe::Task(pipe), + flags: EntryFlags::empty(), + }); + continue 'outer; + } + Err(msg) => { + // handler is not found + self.tasks.push_back(Entry { + pipe: EntryPipe::Error(ServerError::err( + Version::HTTP_11, + StatusCode::NOT_FOUND, + )), + flags: EntryFlags::empty(), + }); } } diff --git a/src/server/h2.rs b/src/server/h2.rs index ba52a884..a7cf8aec 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -368,28 +368,20 @@ impl Entry { let psender = PayloadType::new(msg.headers(), psender); // start request processing - let mut task = None; - for h in settings.handlers().iter() { - msg = match h.handle(msg) { - Ok(t) => { - task = Some(t); - break; - } - Err(msg) => msg, - } - } + let task = match settings.handler().handle(msg) { + Ok(task) => EntryPipe::Task(task), + Err(msg) => EntryPipe::Error(ServerError::err( + Version::HTTP_2, + StatusCode::NOT_FOUND, + )), + }; Entry { - task: task.map(EntryPipe::Task).unwrap_or_else(|| { - EntryPipe::Error(ServerError::err( - Version::HTTP_2, - StatusCode::NOT_FOUND, - )) - }), + task, + recv, payload: psender, stream: H2Writer::new(resp, settings), flags: EntryFlags::empty(), - recv, } } diff --git a/src/server/http.rs b/src/server/http.rs index 5cdeb564..faee041c 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -39,7 +39,7 @@ struct Socket { pub struct HttpServer where H: IntoHttpHandler + 'static, - F: Fn() -> Vec + Send + Clone, + F: Fn() -> H + Send + Clone, { factory: F, host: Option, @@ -58,33 +58,10 @@ where impl HttpServer where H: IntoHttpHandler + 'static, - F: Fn() -> Vec + Send + Clone + 'static, + F: Fn() -> H + Send + Clone + 'static, { /// Create new http server with application factory - pub fn new(factory: F1) -> HttpServer Vec + Send + Clone> - where - F1: Fn() -> U + Send + Clone, - U: IntoIterator + 'static, - { - let f = move || (factory.clone())().into_iter().collect(); - - HttpServer { - threads: num_cpus::get(), - factory: f, - host: None, - backlog: 2048, - keep_alive: KeepAlive::Os, - shutdown_timeout: 30, - exit: false, - no_http2: false, - no_signals: false, - maxconn: 25_600, - maxconnrate: 256, - sockets: Vec::new(), - } - } - - pub(crate) fn with_factory(factory: F) -> HttpServer { + pub fn new(factory: F) -> HttpServer { HttpServer { factory, threads: num_cpus::get(), @@ -489,7 +466,7 @@ where // } } -impl Vec + Send + Clone> HttpServer { +impl H + Send + Clone> HttpServer { /// Start listening for incoming connections. /// /// This method starts number of http workers in separate threads. @@ -629,7 +606,7 @@ impl Vec + Send + Clone> HttpServer { struct HttpService where - F: Fn() -> Vec, + F: Fn() -> H, H: IntoHttpHandler, Io: IoStream, { @@ -642,7 +619,7 @@ where impl NewService for HttpService where - F: Fn() -> Vec, + F: Fn() -> H, H: IntoHttpHandler, Io: IoStream, { @@ -655,12 +632,9 @@ where fn new_service(&self) -> Self::Future { let s = ServerSettings::new(Some(self.addr), &self.host, false); - let apps: Vec<_> = (self.factory)() - .into_iter() - .map(|h| h.into_handler()) - .collect(); + let app = (self.factory)().into_handler(); - ok(HttpServiceHandler::new(apps, self.keep_alive, s)) + ok(HttpServiceHandler::new(app, self.keep_alive, s)) } } @@ -680,14 +654,14 @@ where Io: IoStream, { fn new( - apps: Vec, keep_alive: KeepAlive, settings: ServerSettings, + app: H, keep_alive: KeepAlive, settings: ServerSettings, ) -> HttpServiceHandler { let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { Some(time::Duration::new(val as u64, 0)) } else { None }; - let settings = WorkerSettings::new(apps, keep_alive, settings); + let settings = WorkerSettings::new(app, keep_alive, settings); HttpServiceHandler { tcp_ka, @@ -733,7 +707,7 @@ where struct SimpleFactory where H: IntoHttpHandler, - F: Fn() -> Vec + Send + Clone, + F: Fn() -> H + Send + Clone, P: HttpPipelineFactory, { pub addr: net::SocketAddr, @@ -744,7 +718,7 @@ where impl Clone for SimpleFactory where P: HttpPipelineFactory, - F: Fn() -> Vec + Send + Clone, + F: Fn() -> H + Send + Clone, { fn clone(&self) -> Self { SimpleFactory { @@ -758,7 +732,7 @@ where impl ServiceFactory for SimpleFactory where H: IntoHttpHandler + 'static, - F: Fn() -> Vec + Send + Clone + 'static, + F: Fn() -> H + Send + Clone + 'static, P: HttpPipelineFactory, { fn register(&self, server: Server, lst: net::TcpListener) -> Server { @@ -894,7 +868,7 @@ where struct DefaultPipelineFactory where - F: Fn() -> Vec + Send + Clone, + F: Fn() -> H + Send + Clone, { factory: F, host: Option, @@ -906,7 +880,7 @@ where impl DefaultPipelineFactory where Io: IoStream + Send, - F: Fn() -> Vec + Send + Clone + 'static, + F: Fn() -> H + Send + Clone + 'static, H: IntoHttpHandler + 'static, { fn new( @@ -925,7 +899,7 @@ where impl Clone for DefaultPipelineFactory where Io: IoStream, - F: Fn() -> Vec + Send + Clone, + F: Fn() -> H + Send + Clone, H: IntoHttpHandler, { fn clone(&self) -> Self { @@ -942,7 +916,7 @@ where impl HttpPipelineFactory for DefaultPipelineFactory where Io: IoStream + Send, - F: Fn() -> Vec + Send + Clone + 'static, + F: Fn() -> H + Send + Clone + 'static, H: IntoHttpHandler + 'static, { type Io = Io; diff --git a/src/server/mod.rs b/src/server/mod.rs index 6ba03376..ec7e8e4e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -174,13 +174,12 @@ const HW_BUFFER_SIZE: usize = 32_768; /// sys.run(); /// } /// ``` -pub fn new(factory: F) -> HttpServer Vec + Send + Clone> +pub fn new(factory: F) -> HttpServer where - F: Fn() -> U + Send + Clone + 'static, - U: IntoIterator, + F: Fn() -> H + Send + Clone + 'static, H: IntoHttpHandler + 'static, { - HttpServer::with_factory(move || (factory.clone())().into_iter().collect()) + HttpServer::new(factory) } #[doc(hidden)] diff --git a/src/server/settings.rs b/src/server/settings.rs index 47da515a..18a8c095 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -136,7 +136,7 @@ const DATE_VALUE_LENGTH: usize = 29; pub(crate) struct WorkerSettings(Rc>); struct Inner { - h: Vec, + handler: H, keep_alive: u64, ka_enabled: bool, bytes: Rc, @@ -153,7 +153,7 @@ impl Clone for WorkerSettings { impl WorkerSettings { pub(crate) fn new( - h: Vec, keep_alive: KeepAlive, settings: ServerSettings, + handler: H, keep_alive: KeepAlive, settings: ServerSettings, ) -> WorkerSettings { let (keep_alive, ka_enabled) = match keep_alive { KeepAlive::Timeout(val) => (val as u64, true), @@ -162,7 +162,7 @@ impl WorkerSettings { }; WorkerSettings(Rc::new(Inner { - h, + handler, keep_alive, ka_enabled, bytes: Rc::new(SharedBytesPool::new()), @@ -176,8 +176,8 @@ impl WorkerSettings { self.0.node.borrow_mut() } - pub fn handlers(&self) -> &Vec { - &self.0.h + pub fn handler(&self) -> &H { + &self.0.handler } pub fn keep_alive_timer(&self) -> Option { diff --git a/src/test.rs b/src/test.rs index c589ea4b..b9d64f27 100644 --- a/src/test.rs +++ b/src/test.rs @@ -103,14 +103,12 @@ impl TestServer { } /// Start new test server with application factory - pub fn with_factory(factory: F) -> Self + pub fn with_factory(factory: F) -> Self where - F: Fn() -> U + Send + Clone + 'static, - U: IntoIterator, + F: Fn() -> H + Send + Clone + 'static, H: IntoHttpHandler + 'static, { let (tx, rx) = mpsc::channel(); - let factory = move || (factory.clone())().into_iter().collect(); // run server in separate thread thread::spawn(move || { @@ -118,7 +116,7 @@ impl TestServer { let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - let _ = HttpServer::with_factory(factory) + let _ = HttpServer::new(factory) .disable_signals() .listen(tcp) .keep_alive(5) @@ -328,10 +326,10 @@ where let sys = System::new("actix-test-server"); let state = self.state; - let mut srv = HttpServer::with_factory(move || { + let mut srv = HttpServer::new(move || { let mut app = TestApp::new(state()); config(&mut app); - vec![app] + app }).workers(1) .keep_alive(5) .disable_signals();