From f55ef3a05969b47ba1138534d1a28986e8a86037 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 7 Mar 2018 14:56:53 -0800 Subject: [PATCH] create default CpuPool --- Cargo.toml | 1 + src/application.rs | 5 +++-- src/httprequest.rs | 30 +++++++++++++++------------- src/lib.rs | 1 + src/route.rs | 6 +++--- src/server/settings.rs | 45 +++++++++++++++++++++++++++++++++++++++--- src/test.rs | 8 +++++--- 7 files changed, 71 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c64d72e49..7c756e298 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ net2 = "0.2" bytes = "0.4" byteorder = "1" futures = "0.1" +futures-cpupool = "0.1" tokio-io = "0.1" tokio-core = "0.1" trust-dns-resolver = "0.8" diff --git a/src/application.rs b/src/application.rs index 9f0e399b3..4588659b0 100644 --- a/src/application.rs +++ b/src/application.rs @@ -44,8 +44,9 @@ impl PipelineHandler for Inner { for &mut (ref prefix, ref mut handler) in &mut self.handlers { let m = { let path = &req.path()[self.prefix..]; - path.starts_with(prefix) && (path.len() == prefix.len() || - path.split_at(prefix.len()).1.starts_with('/')) + path.starts_with(prefix) && ( + path.len() == prefix.len() || + path.split_at(prefix.len()).1.starts_with('/')) }; if m { let path: &'static str = unsafe { diff --git a/src/httprequest.rs b/src/httprequest.rs index 688bea7a4..6f30dc010 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use bytes::Bytes; use cookie::Cookie; use futures::{Async, Stream, Poll}; +use futures_cpupool::CpuPool; use failure; use url::{Url, form_urlencoded}; use http::{header, Uri, Method, Version, HeaderMap, Extensions}; @@ -129,12 +130,6 @@ impl HttpRequest<()> { pub fn with_state(self, state: Rc, router: Router) -> HttpRequest { HttpRequest(self.0, Some(state), Some(router)) } - - #[cfg(test)] - /// Construct new http request with state. - pub(crate) fn with_state_no_router(self, state: Rc) -> HttpRequest { - HttpRequest(self.0, Some(state), None) - } } @@ -156,7 +151,7 @@ impl HttpRequest { #[inline] /// Construct new http request without state. pub(crate) fn without_state(&self) -> HttpRequest { - HttpRequest(self.0.clone(), None, None) + HttpRequest(self.0.clone(), None, self.2.clone()) } /// get mutable reference for inner message @@ -184,12 +179,20 @@ impl HttpRequest { self.1.as_ref().unwrap() } - /// Protocol extensions. + /// Request extensions #[inline] pub fn extensions(&mut self) -> &mut Extensions { &mut self.as_mut().extensions } + /// Default `CpuPool` + #[inline] + #[doc(hidden)] + pub fn cpu_pool(&mut self) -> &CpuPool { + self.router().expect("HttpRequest has to have Router instance") + .server_settings().cpu_pool() + } + #[doc(hidden)] pub fn prefix_len(&self) -> usize { if let Some(router) = self.router() { router.prefix().len() } else { 0 } @@ -567,8 +570,9 @@ mod tests { #[test] fn test_url_for() { - let req = TestRequest::with_header(header::HOST, "www.rust-lang.org") - .finish_no_router(); + let req2 = HttpRequest::default(); + assert_eq!(req2.url_for("unknown", &["test"]), + Err(UrlGenerationError::RouterNotAvailable)); let mut resource = Resource::<()>::default(); resource.name("index"); @@ -577,10 +581,8 @@ mod tests { assert!(router.has_route("/user/test.html")); assert!(!router.has_route("/test/unknown")); - assert_eq!(req.url_for("unknown", &["test"]), - Err(UrlGenerationError::RouterNotAvailable)); - - let req = req.with_state(Rc::new(()), router); + let req = TestRequest::with_header(header::HOST, "www.rust-lang.org") + .finish_with_router(router); assert_eq!(req.url_for("unknown", &["test"]), Err(UrlGenerationError::ResourceNotFound)); diff --git a/src/lib.rs b/src/lib.rs index 098c68559..7bf9a598b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,7 @@ extern crate bitflags; extern crate failure; #[macro_use] extern crate futures; +extern crate futures_cpupool; extern crate tokio_io; extern crate tokio_core; extern crate mio; diff --git a/src/route.rs b/src/route.rs index 856d6fa85..d57f1c332 100644 --- a/src/route.rs +++ b/src/route.rs @@ -84,13 +84,13 @@ impl Route { } /// Set handler object. Usually call to this method is last call - /// during route configuration, because it does not return reference to self. + /// during route configuration, so it does not return reference to self. pub fn h>(&mut self, handler: H) { self.handler = InnerHandler::new(handler); } /// Set handler function. Usually call to this method is last call - /// during route configuration, because it does not return reference to self. + /// during route configuration, so it does not return reference to self. pub fn f(&mut self, handler: F) where F: Fn(HttpRequest) -> R + 'static, R: Responder + 'static, @@ -133,7 +133,7 @@ impl InnerHandler { #[inline] pub fn handle(&self, req: HttpRequest) -> Reply { // reason: handler is unique per thread, - // handler get called from async code, and handler doesn't have side effects + // handler get called from async code only #[allow(mutable_transmutes)] #[cfg_attr(feature = "cargo-clippy", allow(borrowed_box))] let h: &mut Box> = unsafe { mem::transmute(self.0.as_ref()) }; diff --git a/src/server/settings.rs b/src/server/settings.rs index b0b8eb552..7c7299ec8 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -1,6 +1,8 @@ -use std::net; +use std::{fmt, net}; use std::rc::Rc; -use std::cell::{Cell, RefCell, RefMut}; +use std::sync::Arc; +use std::cell::{Cell, RefCell, RefMut, UnsafeCell}; +use futures_cpupool::{Builder, CpuPool}; use helpers; use super::channel::Node; @@ -12,14 +14,45 @@ pub struct ServerSettings { addr: Option, secure: bool, host: String, + cpu_pool: Arc, } +struct InnerCpuPool { + cpu_pool: UnsafeCell>, +} + +impl fmt::Debug for InnerCpuPool { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "CpuPool") + } +} + +impl InnerCpuPool { + fn new() -> Self { + InnerCpuPool { + cpu_pool: UnsafeCell::new(None), + } + } + fn cpu_pool(&self) -> &CpuPool { + unsafe { + let val = &mut *self.cpu_pool.get(); + if val.is_none() { + *val = Some(Builder::new().create()); + } + val.as_ref().unwrap() + } + } +} + +unsafe impl Sync for InnerCpuPool {} + impl Default for ServerSettings { fn default() -> Self { ServerSettings { addr: None, secure: false, host: "localhost:8080".to_owned(), + cpu_pool: Arc::new(InnerCpuPool::new()), } } } @@ -36,7 +69,8 @@ impl ServerSettings { } else { "localhost".to_owned() }; - ServerSettings { addr, secure, host } + let cpu_pool = Arc::new(InnerCpuPool::new()); + ServerSettings { addr, secure, host, cpu_pool } } /// Returns the socket address of the local half of this TCP connection @@ -53,6 +87,11 @@ impl ServerSettings { pub fn host(&self) -> &str { &self.host } + + /// Returns default `CpuPool` for server + pub fn cpu_pool(&self) -> &CpuPool { + self.cpu_pool.cpu_pool() + } } diff --git a/src/test.rs b/src/test.rs index 7f400f947..7173f9c32 100644 --- a/src/test.rs +++ b/src/test.rs @@ -431,12 +431,14 @@ impl TestRequest { #[cfg(test)] /// Complete request creation and generate `HttpRequest` instance - pub(crate) fn finish_no_router(self) -> HttpRequest { - let TestRequest { state, method, uri, version, headers, params, cookies, payload } = self; + pub(crate) fn finish_with_router(self, router: Router) -> HttpRequest { + let TestRequest { state, method, uri, + version, headers, params, cookies, payload } = self; + let req = HttpRequest::new(method, uri, version, headers, payload); req.as_mut().cookies = cookies; req.as_mut().params = params; - req.with_state_no_router(Rc::new(state)) + req.with_state(Rc::new(state), router) } /// This method generates `HttpRequest` instance and runs handler