From ed8bd3d6a3a177fa6ec9f09087764877442c9898 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 15 Dec 2017 22:49:48 -0800 Subject: [PATCH] h1 cleanups --- README.md | 2 +- src/h1.rs | 130 ++++++++++++++++---------------------------- src/h1writer.rs | 3 +- src/helpers.rs | 19 ++++--- src/httprequest.rs | 45 ++++++--------- src/httpresponse.rs | 14 ++--- 6 files changed, 83 insertions(+), 130 deletions(-) diff --git a/README.md b/README.md index fb162fc55..fcb40d8fe 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Each result is best of five runs. All measurements are req/sec. Name | 1 thread | 1 pipeline | 3 thread | 3 pipeline | 8 thread | 8 pipeline ---- | -------- | ---------- | -------- | ---------- | -------- | ---------- -Actix | 89.300 | 871.200 | 122.100 | 1.877.000 | 107.400 | 2.560.000 +Actix | 91.200 | 912.000 | 122.100 | 2.083.000 | 107.400 | 2.650.000 Gotham | 61.000 | 178.000 | | | | Iron | | | | | 94.500 | 78.000 Rocket | | | | | 95.500 | failed diff --git a/src/h1.rs b/src/h1.rs index 91801e836..fa1a1f2e4 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -20,6 +20,7 @@ use h1writer::{Writer, H1Writer}; use server::WorkerSettings; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; +use helpers::SharedHttpMessage; use error::{ParseError, PayloadError, ResponseError}; use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE}; @@ -527,60 +528,60 @@ impl Reader { } // Parse http message - let mut headers_indices: [HeaderIndices; MAX_HEADERS] = - unsafe{std::mem::uninitialized()}; - - let (len, method, path, version, headers_len) = { + let msg = { + let bytes_ptr = buf.as_ref().as_ptr() as usize; let mut headers: [httparse::Header; MAX_HEADERS] = unsafe{std::mem::uninitialized()}; - let mut req = httparse::Request::new(&mut headers); - match try!(req.parse(buf)) { - httparse::Status::Complete(len) => { - let method = Method::try_from(req.method.unwrap()) - .map_err(|_| ParseError::Method)?; - let path = req.path.unwrap(); - let bytes_ptr = buf.as_ref().as_ptr() as usize; - let path_start = path.as_ptr() as usize - bytes_ptr; - let path_end = path_start + path.len(); - let path = (path_start, path_end); - let version = if req.version.unwrap() == 1 { - Version::HTTP_11 - } else { - Version::HTTP_10 - }; + let (len, method, path, version, headers_len) = { + let b = unsafe{ let b: &[u8] = buf; std::mem::transmute(b) }; + let mut req = httparse::Request::new(&mut headers); + match req.parse(b)? { + httparse::Status::Complete(len) => { + let method = Method::try_from(req.method.unwrap()) + .map_err(|_| ParseError::Method)?; + let path = req.path.unwrap(); + let path_start = path.as_ptr() as usize - bytes_ptr; + let path_end = path_start + path.len(); + let path = (path_start, path_end); - record_header_indices(buf.as_ref(), req.headers, &mut headers_indices); - let headers_len = req.headers.len(); - (len, method, path, version, headers_len) + let version = if req.version.unwrap() == 1 { + Version::HTTP_11 + } else { + Version::HTTP_10 + }; + (len, method, path, version, req.headers.len()) + } + httparse::Status::Partial => return Ok(Message::NotReady), } - httparse::Status::Partial => return Ok(Message::NotReady), - } - }; + }; - let slice = buf.split_to(len).freeze(); - let path = slice.slice(path.0, path.1); - // path was found to be utf8 by httparse - let uri = Uri::from_shared(path).map_err(ParseError::Uri)?; + let slice = buf.split_to(len).freeze(); - // convert headers - let msg = settings.get_http_message(); - msg.get_mut().headers.reserve(headers_len); - for header in headers_indices[..headers_len].iter() { - if let Ok(name) = HeaderName::try_from(slice.slice(header.name.0, header.name.1)) { - if let Ok(value) = HeaderValue::try_from( - slice.slice(header.value.0, header.value.1)) - { + // convert headers + let msg = settings.get_http_message(); + for header in headers[..headers_len].iter() { + if let Ok(name) = HeaderName::try_from(header.name) { + let v_start = header.value.as_ptr() as usize - bytes_ptr; + let v_end = v_start + header.value.len(); + let value = unsafe { + HeaderValue::from_shared_unchecked(slice.slice(v_start, v_end)) }; msg.get_mut().headers.append(name, value); } else { return Err(ParseError::Header) } - } else { - return Err(ParseError::Header) } - } - let decoder = if upgrade(&method, &msg.get_mut().headers) { + let path = slice.slice(path.0, path.1); + let uri = Uri::from_shared(path).map_err(ParseError::Uri)?; + + msg.get_mut().uri = uri; + msg.get_mut().method = method; + msg.get_mut().version = version; + msg + }; + + let decoder = if upgrade(&msg) { Decoder::eof() } else { let has_len = msg.get_mut().headers.contains_key(header::CONTENT_LENGTH); @@ -593,9 +594,6 @@ impl Reader { Decoder::chunked() } else { if !has_len { - msg.get_mut().uri = uri; - msg.get_mut().method = method; - msg.get_mut().version = version; return Ok(Message::Http1(HttpRequest::from_message(msg), None)) } @@ -620,45 +618,21 @@ impl Reader { tx: PayloadType::new(&msg.get_mut().headers, psender), decoder: decoder, }; - msg.get_mut().uri = uri; - msg.get_mut().method = method; - msg.get_mut().version = version; msg.get_mut().payload = Some(payload); Ok(Message::Http1(HttpRequest::from_message(msg), Some(info))) } } -#[derive(Clone, Copy)] -struct HeaderIndices { - name: (usize, usize), - value: (usize, usize), -} - -fn record_header_indices(bytes: &[u8], - headers: &[httparse::Header], - indices: &mut [HeaderIndices]) -{ - let bytes_ptr = bytes.as_ptr() as usize; - for (header, indices) in headers.iter().zip(indices.iter_mut()) { - let name_start = header.name.as_ptr() as usize - bytes_ptr; - let name_end = name_start + header.name.len(); - indices.name = (name_start, name_end); - let value_start = header.value.as_ptr() as usize - bytes_ptr; - let value_end = value_start + header.value.len(); - indices.value = (value_start, value_end); - } -} - /// Check if request is UPGRADE -fn upgrade(method: &Method, headers: &HeaderMap) -> bool { - if let Some(conn) = headers.get(header::CONNECTION) { +fn upgrade(msg: &SharedHttpMessage) -> bool { + if let Some(conn) = msg.get_ref().headers.get(header::CONNECTION) { if let Ok(s) = conn.to_str() { s.to_lowercase().contains("upgrade") } else { - *method == Method::CONNECT + msg.get_ref().method == Method::CONNECT } } else { - *method == Method::CONNECT + msg.get_ref().method == Method::CONNECT } } @@ -735,18 +709,6 @@ enum ChunkedState { End, } -impl Decoder { - /*pub fn is_eof(&self) -> bool { - trace!("is_eof? {:?}", self); - match self.kind { - Kind::Length(0) | - Kind::Chunked(ChunkedState::End, _) | - Kind::Eof(true) => true, - _ => false, - } - }*/ -} - impl Decoder { pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { match self.kind { diff --git a/src/h1writer.rs b/src/h1writer.rs index aa1489b7d..dd868d7dc 100644 --- a/src/h1writer.rs +++ b/src/h1writer.rs @@ -168,8 +168,7 @@ impl Writer for H1Writer { buffer.extend_from_slice(b"\r\n"); for (key, value) in msg.headers() { - let t: &[u8] = key.as_ref(); - buffer.extend_from_slice(t); + buffer.extend_from_slice(key.as_str().as_bytes()); buffer.extend_from_slice(b": "); buffer.extend_from_slice(value.as_ref()); buffer.extend_from_slice(b"\r\n"); diff --git a/src/helpers.rs b/src/helpers.rs index 57ecb70d0..1873df9f7 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -74,9 +74,10 @@ impl SharedBytesPool { } pub fn release_bytes(&self, mut bytes: Rc) { - if self.0.borrow().len() < 128 { + let v = &mut self.0.borrow_mut(); + if v.len() < 128 { Rc::get_mut(&mut bytes).unwrap().take(); - self.0.borrow_mut().push_front(bytes); + v.push_front(bytes); } } } @@ -107,9 +108,9 @@ impl SharedBytes { SharedBytes(Some(bytes), Some(pool)) } - #[inline] + #[inline(always)] #[allow(mutable_transmutes)] - #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))] + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] pub fn get_mut(&self) -> &mut BytesMut { let r: &BytesMut = self.0.as_ref().unwrap().as_ref(); unsafe{mem::transmute(r)} @@ -150,9 +151,10 @@ impl SharedMessagePool { } pub fn release(&self, mut msg: Rc) { - if self.0.borrow().len() < 128 { + let v = &mut self.0.borrow_mut(); + if v.len() < 128 { Rc::get_mut(&mut msg).unwrap().reset(); - self.0.borrow_mut().push_front(msg); + v.push_front(msg); } } } @@ -219,7 +221,8 @@ impl SharedHttpMessage { unsafe{mem::transmute(r)} } - #[inline] + #[inline(always)] + #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] pub fn get_ref(&self) -> &HttpMessage { self.0.as_ref().unwrap() } @@ -234,7 +237,7 @@ const DEC_DIGITS_LUT: &[u8] = pub(crate) fn convert_u16(mut n: u16, bytes: &mut BytesMut) { let mut buf: [u8; 39] = unsafe { mem::uninitialized() }; - let mut curr = buf.len() as isize; + let mut curr: isize = 39; let buf_ptr = buf.as_mut_ptr(); let lut_ptr = DEC_DIGITS_LUT.as_ptr(); diff --git a/src/httprequest.rs b/src/httprequest.rs index b06740ebb..0fab3c342 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -40,7 +40,7 @@ impl Default for HttpMessage { method: Method::GET, uri: Uri::default(), version: Version::HTTP_11, - headers: HeaderMap::new(), + headers: HeaderMap::with_capacity(16), params: Params::default(), cookies: None, addr: None, @@ -54,6 +54,7 @@ impl Default for HttpMessage { impl HttpMessage { /// Checks if a connection should be kept alive. + #[inline] pub fn keep_alive(&self) -> bool { if let Some(conn) = self.headers.get(header::CONNECTION) { if let Ok(conn) = conn.to_str() { @@ -71,14 +72,15 @@ impl HttpMessage { } } + #[inline] pub(crate) fn reset(&mut self) { self.headers.clear(); self.extensions.clear(); self.params.clear(); - self.cookies.take(); - self.addr.take(); - self.payload.take(); - self.info.take(); + self.cookies = None; + self.addr = None; + self.payload = None; + self.info = None; } } @@ -109,6 +111,8 @@ impl HttpRequest<()> { ) } + #[inline(always)] + #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] pub(crate) fn from_message(msg: SharedHttpMessage) -> HttpRequest { HttpRequest(msg, None, None) } @@ -138,6 +142,7 @@ impl HttpRequest<()> { ) } + #[inline] /// Construct new http request with state. pub fn with_state(self, state: Rc, router: Router) -> HttpRequest { HttpRequest(self.0, Some(state), Some(router)) @@ -146,6 +151,7 @@ impl HttpRequest<()> { impl HttpRequest { + #[inline] /// Construct new http request without state. pub fn clone_without_state(&self) -> HttpRequest { HttpRequest(self.0.clone(), None, None) @@ -153,13 +159,14 @@ impl HttpRequest { // get mutable reference for inner message // mutable reference should not be returned as result for request's method - #[inline] - #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))] + #[inline(always)] + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] fn as_mut(&self) -> &mut HttpMessage { self.0.get_mut() } - #[inline] + #[inline(always)] + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] fn as_ref(&self) -> &HttpMessage { self.0.get_ref() } @@ -183,11 +190,7 @@ impl HttpRequest { #[doc(hidden)] pub fn prefix_len(&self) -> usize { - if let Some(router) = self.router() { - router.prefix().len() - } else { - 0 - } + if let Some(router) = self.router() { router.prefix().len() } else { 0 } } /// Read the Request Uri. @@ -288,7 +291,6 @@ impl HttpRequest { } /// Load request cookies. - #[inline] pub fn cookies(&self) -> Result<&Vec>, CookieParseError> { if self.as_ref().cookies.is_none() { let msg = self.as_mut(); @@ -334,20 +336,7 @@ impl HttpRequest { /// Checks if a connection should be kept alive. pub fn keep_alive(&self) -> bool { - if let Some(conn) = self.headers().get(header::CONNECTION) { - if let Ok(conn) = conn.to_str() { - if self.as_ref().version == Version::HTTP_10 && conn.contains("keep-alive") { - true - } else { - self.as_ref().version == Version::HTTP_11 && - !(conn.contains("close") || conn.contains("upgrade")) - } - } else { - false - } - } else { - self.as_ref().version != Version::HTTP_10 - } + self.as_ref().keep_alive() } /// Read the request content type diff --git a/src/httpresponse.rs b/src/httpresponse.rs index b7f9d8301..f8b410877 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -584,7 +584,7 @@ impl InnerHttpResponse { fn new(status: StatusCode, body: Body) -> InnerHttpResponse { InnerHttpResponse { version: None, - headers: HeaderMap::with_capacity(8), + headers: HeaderMap::with_capacity(16), status: status, reason: None, body: body, @@ -595,19 +595,17 @@ impl InnerHttpResponse { error: None, } } - } /// Internal use only! unsafe struct Pool(VecDeque>); -thread_local!(static POOL: RefCell = RefCell::new(Pool::new())); +thread_local!(static POOL: RefCell = + RefCell::new(Pool(VecDeque::with_capacity(128)))); impl Pool { - fn new() -> Pool { - Pool(VecDeque::with_capacity(128)) - } + #[inline] fn get(status: StatusCode) -> Box { POOL.with(|pool| { if let Some(mut resp) = pool.borrow_mut().0.pop_front() { @@ -620,6 +618,7 @@ impl Pool { }) } + #[inline] fn with_body(status: StatusCode, body: Body) -> Box { POOL.with(|pool| { if let Some(mut resp) = pool.borrow_mut().0.pop_front() { @@ -632,7 +631,8 @@ impl Pool { }) } - #[cfg_attr(feature = "cargo-clippy", allow(boxed_local))] + #[inline(always)] + #[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))] fn release(mut inner: Box) { POOL.with(|pool| { let v = &mut pool.borrow_mut().0;