1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

h1 cleanups

This commit is contained in:
Nikolay Kim 2017-12-15 22:49:48 -08:00
parent 1daf50095a
commit ed8bd3d6a3
6 changed files with 83 additions and 130 deletions

View File

@ -54,7 +54,7 @@ Each result is best of five runs. All measurements are req/sec.
Name | 1 thread | 1 pipeline | 3 thread | 3 pipeline | 8 thread | 8 pipeline
---- | -------- | ---------- | -------- | ---------- | -------- | ----------
Actix | 89.300 | 871.200 | 122.100 | 1.877.000 | 107.400 | 2.560.000
Actix | 91.200 | 912.000 | 122.100 | 2.083.000 | 107.400 | 2.650.000
Gotham | 61.000 | 178.000 | | | |
Iron | | | | | 94.500 | 78.000
Rocket | | | | | 95.500 | failed

View File

@ -20,6 +20,7 @@ use h1writer::{Writer, H1Writer};
use server::WorkerSettings;
use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use helpers::SharedHttpMessage;
use error::{ParseError, PayloadError, ResponseError};
use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
@ -527,19 +528,19 @@ impl Reader {
}
// Parse http message
let mut headers_indices: [HeaderIndices; MAX_HEADERS] =
let msg = {
let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] =
unsafe{std::mem::uninitialized()};
let (len, method, path, version, headers_len) = {
let mut headers: [httparse::Header; MAX_HEADERS] =
unsafe{std::mem::uninitialized()};
let b = unsafe{ let b: &[u8] = buf; std::mem::transmute(b) };
let mut req = httparse::Request::new(&mut headers);
match try!(req.parse(buf)) {
match req.parse(b)? {
httparse::Status::Complete(len) => {
let method = Method::try_from(req.method.unwrap())
.map_err(|_| ParseError::Method)?;
let path = req.path.unwrap();
let bytes_ptr = buf.as_ref().as_ptr() as usize;
let path_start = path.as_ptr() as usize - bytes_ptr;
let path_end = path_start + path.len();
let path = (path_start, path_end);
@ -549,38 +550,38 @@ impl Reader {
} else {
Version::HTTP_10
};
record_header_indices(buf.as_ref(), req.headers, &mut headers_indices);
let headers_len = req.headers.len();
(len, method, path, version, headers_len)
(len, method, path, version, req.headers.len())
}
httparse::Status::Partial => return Ok(Message::NotReady),
}
};
let slice = buf.split_to(len).freeze();
let path = slice.slice(path.0, path.1);
// path was found to be utf8 by httparse
let uri = Uri::from_shared(path).map_err(ParseError::Uri)?;
// convert headers
let msg = settings.get_http_message();
msg.get_mut().headers.reserve(headers_len);
for header in headers_indices[..headers_len].iter() {
if let Ok(name) = HeaderName::try_from(slice.slice(header.name.0, header.name.1)) {
if let Ok(value) = HeaderValue::try_from(
slice.slice(header.value.0, header.value.1))
{
for header in headers[..headers_len].iter() {
if let Ok(name) = HeaderName::try_from(header.name) {
let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len();
let value = unsafe {
HeaderValue::from_shared_unchecked(slice.slice(v_start, v_end)) };
msg.get_mut().headers.append(name, value);
} else {
return Err(ParseError::Header)
}
} else {
return Err(ParseError::Header)
}
}
let decoder = if upgrade(&method, &msg.get_mut().headers) {
let path = slice.slice(path.0, path.1);
let uri = Uri::from_shared(path).map_err(ParseError::Uri)?;
msg.get_mut().uri = uri;
msg.get_mut().method = method;
msg.get_mut().version = version;
msg
};
let decoder = if upgrade(&msg) {
Decoder::eof()
} else {
let has_len = msg.get_mut().headers.contains_key(header::CONTENT_LENGTH);
@ -593,9 +594,6 @@ impl Reader {
Decoder::chunked()
} else {
if !has_len {
msg.get_mut().uri = uri;
msg.get_mut().method = method;
msg.get_mut().version = version;
return Ok(Message::Http1(HttpRequest::from_message(msg), None))
}
@ -620,45 +618,21 @@ impl Reader {
tx: PayloadType::new(&msg.get_mut().headers, psender),
decoder: decoder,
};
msg.get_mut().uri = uri;
msg.get_mut().method = method;
msg.get_mut().version = version;
msg.get_mut().payload = Some(payload);
Ok(Message::Http1(HttpRequest::from_message(msg), Some(info)))
}
}
#[derive(Clone, Copy)]
struct HeaderIndices {
name: (usize, usize),
value: (usize, usize),
}
fn record_header_indices(bytes: &[u8],
headers: &[httparse::Header],
indices: &mut [HeaderIndices])
{
let bytes_ptr = bytes.as_ptr() as usize;
for (header, indices) in headers.iter().zip(indices.iter_mut()) {
let name_start = header.name.as_ptr() as usize - bytes_ptr;
let name_end = name_start + header.name.len();
indices.name = (name_start, name_end);
let value_start = header.value.as_ptr() as usize - bytes_ptr;
let value_end = value_start + header.value.len();
indices.value = (value_start, value_end);
}
}
/// Check if request is UPGRADE
fn upgrade(method: &Method, headers: &HeaderMap) -> bool {
if let Some(conn) = headers.get(header::CONNECTION) {
fn upgrade(msg: &SharedHttpMessage) -> bool {
if let Some(conn) = msg.get_ref().headers.get(header::CONNECTION) {
if let Ok(s) = conn.to_str() {
s.to_lowercase().contains("upgrade")
} else {
*method == Method::CONNECT
msg.get_ref().method == Method::CONNECT
}
} else {
*method == Method::CONNECT
msg.get_ref().method == Method::CONNECT
}
}
@ -735,18 +709,6 @@ enum ChunkedState {
End,
}
impl Decoder {
/*pub fn is_eof(&self) -> bool {
trace!("is_eof? {:?}", self);
match self.kind {
Kind::Length(0) |
Kind::Chunked(ChunkedState::End, _) |
Kind::Eof(true) => true,
_ => false,
}
}*/
}
impl Decoder {
pub fn decode(&mut self, body: &mut BytesMut) -> Poll<Option<Bytes>, io::Error> {
match self.kind {

View File

@ -168,8 +168,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
buffer.extend_from_slice(b"\r\n");
for (key, value) in msg.headers() {
let t: &[u8] = key.as_ref();
buffer.extend_from_slice(t);
buffer.extend_from_slice(key.as_str().as_bytes());
buffer.extend_from_slice(b": ");
buffer.extend_from_slice(value.as_ref());
buffer.extend_from_slice(b"\r\n");

View File

@ -74,9 +74,10 @@ impl SharedBytesPool {
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
if self.0.borrow().len() < 128 {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
self.0.borrow_mut().push_front(bytes);
v.push_front(bytes);
}
}
}
@ -107,9 +108,9 @@ impl SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline]
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
@ -150,9 +151,10 @@ impl SharedMessagePool {
}
pub fn release(&self, mut msg: Rc<HttpMessage>) {
if self.0.borrow().len() < 128 {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut msg).unwrap().reset();
self.0.borrow_mut().push_front(msg);
v.push_front(msg);
}
}
}
@ -219,7 +221,8 @@ impl SharedHttpMessage {
unsafe{mem::transmute(r)}
}
#[inline]
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
pub fn get_ref(&self) -> &HttpMessage {
self.0.as_ref().unwrap()
}
@ -234,7 +237,7 @@ const DEC_DIGITS_LUT: &[u8] =
pub(crate) fn convert_u16(mut n: u16, bytes: &mut BytesMut) {
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
let mut curr = buf.len() as isize;
let mut curr: isize = 39;
let buf_ptr = buf.as_mut_ptr();
let lut_ptr = DEC_DIGITS_LUT.as_ptr();

View File

@ -40,7 +40,7 @@ impl Default for HttpMessage {
method: Method::GET,
uri: Uri::default(),
version: Version::HTTP_11,
headers: HeaderMap::new(),
headers: HeaderMap::with_capacity(16),
params: Params::default(),
cookies: None,
addr: None,
@ -54,6 +54,7 @@ impl Default for HttpMessage {
impl HttpMessage {
/// Checks if a connection should be kept alive.
#[inline]
pub fn keep_alive(&self) -> bool {
if let Some(conn) = self.headers.get(header::CONNECTION) {
if let Ok(conn) = conn.to_str() {
@ -71,14 +72,15 @@ impl HttpMessage {
}
}
#[inline]
pub(crate) fn reset(&mut self) {
self.headers.clear();
self.extensions.clear();
self.params.clear();
self.cookies.take();
self.addr.take();
self.payload.take();
self.info.take();
self.cookies = None;
self.addr = None;
self.payload = None;
self.info = None;
}
}
@ -109,6 +111,8 @@ impl HttpRequest<()> {
)
}
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
pub(crate) fn from_message(msg: SharedHttpMessage) -> HttpRequest {
HttpRequest(msg, None, None)
}
@ -138,6 +142,7 @@ impl HttpRequest<()> {
)
}
#[inline]
/// Construct new http request with state.
pub fn with_state<S>(self, state: Rc<S>, router: Router<S>) -> HttpRequest<S> {
HttpRequest(self.0, Some(state), Some(router))
@ -146,6 +151,7 @@ impl HttpRequest<()> {
impl<S> HttpRequest<S> {
#[inline]
/// Construct new http request without state.
pub fn clone_without_state(&self) -> HttpRequest {
HttpRequest(self.0.clone(), None, None)
@ -153,13 +159,14 @@ impl<S> HttpRequest<S> {
// get mutable reference for inner message
// mutable reference should not be returned as result for request's method
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))]
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
fn as_mut(&self) -> &mut HttpMessage {
self.0.get_mut()
}
#[inline]
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
fn as_ref(&self) -> &HttpMessage {
self.0.get_ref()
}
@ -183,11 +190,7 @@ impl<S> HttpRequest<S> {
#[doc(hidden)]
pub fn prefix_len(&self) -> usize {
if let Some(router) = self.router() {
router.prefix().len()
} else {
0
}
if let Some(router) = self.router() { router.prefix().len() } else { 0 }
}
/// Read the Request Uri.
@ -288,7 +291,6 @@ impl<S> HttpRequest<S> {
}
/// Load request cookies.
#[inline]
pub fn cookies(&self) -> Result<&Vec<Cookie<'static>>, CookieParseError> {
if self.as_ref().cookies.is_none() {
let msg = self.as_mut();
@ -334,20 +336,7 @@ impl<S> HttpRequest<S> {
/// Checks if a connection should be kept alive.
pub fn keep_alive(&self) -> bool {
if let Some(conn) = self.headers().get(header::CONNECTION) {
if let Ok(conn) = conn.to_str() {
if self.as_ref().version == Version::HTTP_10 && conn.contains("keep-alive") {
true
} else {
self.as_ref().version == Version::HTTP_11 &&
!(conn.contains("close") || conn.contains("upgrade"))
}
} else {
false
}
} else {
self.as_ref().version != Version::HTTP_10
}
self.as_ref().keep_alive()
}
/// Read the request content type

View File

@ -584,7 +584,7 @@ impl InnerHttpResponse {
fn new(status: StatusCode, body: Body) -> InnerHttpResponse {
InnerHttpResponse {
version: None,
headers: HeaderMap::with_capacity(8),
headers: HeaderMap::with_capacity(16),
status: status,
reason: None,
body: body,
@ -595,19 +595,17 @@ impl InnerHttpResponse {
error: None,
}
}
}
/// Internal use only! unsafe
struct Pool(VecDeque<Box<InnerHttpResponse>>);
thread_local!(static POOL: RefCell<Pool> = RefCell::new(Pool::new()));
thread_local!(static POOL: RefCell<Pool> =
RefCell::new(Pool(VecDeque::with_capacity(128))));
impl Pool {
fn new() -> Pool {
Pool(VecDeque::with_capacity(128))
}
#[inline]
fn get(status: StatusCode) -> Box<InnerHttpResponse> {
POOL.with(|pool| {
if let Some(mut resp) = pool.borrow_mut().0.pop_front() {
@ -620,6 +618,7 @@ impl Pool {
})
}
#[inline]
fn with_body(status: StatusCode, body: Body) -> Box<InnerHttpResponse> {
POOL.with(|pool| {
if let Some(mut resp) = pool.borrow_mut().0.pop_front() {
@ -632,7 +631,8 @@ impl Pool {
})
}
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local))]
#[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))]
fn release(mut inner: Box<InnerHttpResponse>) {
POOL.with(|pool| {
let v = &mut pool.borrow_mut().0;