1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-25 06:39:22 +02:00

use objects pool for HttpRequest; optimize nested services call

This commit is contained in:
Nikolay Kim
2019-04-07 23:06:21 -07:00
parent 75b213a6f0
commit aa78565453
20 changed files with 343 additions and 295 deletions

View File

@ -104,9 +104,8 @@ where
let (parts, body) = resp.into_parts();
let payload = if head_req { Payload::None } else { body.into() };
let mut head = ResponseHead::default();
let mut head = ResponseHead::new(parts.status);
head.version = parts.version;
head.status = parts.status;
head.headers = parts.headers.into();
Ok((head, payload))

View File

@ -21,6 +21,7 @@ use tokio_timer::{sleep, Delay};
use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectError;
#[allow(dead_code)]
#[derive(Clone, Copy, PartialEq)]
pub enum Protocol {
Http1,

View File

@ -73,78 +73,75 @@ pub(crate) trait MessageType: Sized {
let headers = self.headers_mut();
for idx in raw_headers.iter() {
if let Ok(name) = HeaderName::from_bytes(&slice[idx.name.0..idx.name.1])
{
// Unsafe: httparse check header value for valid utf-8
let value = unsafe {
HeaderValue::from_shared_unchecked(
slice.slice(idx.value.0, idx.value.1),
)
};
match name {
header::CONTENT_LENGTH => {
if let Ok(s) = value.to_str() {
if let Ok(len) = s.parse::<u64>() {
if len != 0 {
content_length = Some(len);
}
} else {
debug!("illegal Content-Length: {:?}", s);
return Err(ParseError::Header);
let name =
HeaderName::from_bytes(&slice[idx.name.0..idx.name.1]).unwrap();
// Unsafe: httparse check header value for valid utf-8
let value = unsafe {
HeaderValue::from_shared_unchecked(
slice.slice(idx.value.0, idx.value.1),
)
};
match name {
header::CONTENT_LENGTH => {
if let Ok(s) = value.to_str() {
if let Ok(len) = s.parse::<u64>() {
if len != 0 {
content_length = Some(len);
}
} else {
debug!("illegal Content-Length: {:?}", value);
debug!("illegal Content-Length: {:?}", s);
return Err(ParseError::Header);
}
} else {
debug!("illegal Content-Length: {:?}", value);
return Err(ParseError::Header);
}
// transfer-encoding
header::TRANSFER_ENCODING => {
if let Ok(s) = value.to_str().map(|s| s.trim()) {
chunked = s.eq_ignore_ascii_case("chunked");
} else {
return Err(ParseError::Header);
}
}
// transfer-encoding
header::TRANSFER_ENCODING => {
if let Ok(s) = value.to_str().map(|s| s.trim()) {
chunked = s.eq_ignore_ascii_case("chunked");
} else {
return Err(ParseError::Header);
}
// connection keep-alive state
header::CONNECTION => {
ka = if let Ok(conn) = value.to_str().map(|conn| conn.trim())
{
if conn.eq_ignore_ascii_case("keep-alive") {
Some(ConnectionType::KeepAlive)
} else if conn.eq_ignore_ascii_case("close") {
Some(ConnectionType::Close)
} else if conn.eq_ignore_ascii_case("upgrade") {
Some(ConnectionType::Upgrade)
} else {
None
}
}
// connection keep-alive state
header::CONNECTION => {
ka = if let Ok(conn) = value.to_str().map(|conn| conn.trim()) {
if conn.eq_ignore_ascii_case("keep-alive") {
Some(ConnectionType::KeepAlive)
} else if conn.eq_ignore_ascii_case("close") {
Some(ConnectionType::Close)
} else if conn.eq_ignore_ascii_case("upgrade") {
Some(ConnectionType::Upgrade)
} else {
None
};
}
header::UPGRADE => {
has_upgrade = true;
// check content-length, some clients (dart)
// sends "content-length: 0" with websocket upgrade
if let Ok(val) = value.to_str().map(|val| val.trim()) {
if val.eq_ignore_ascii_case("websocket") {
content_length = None;
}
}
}
header::EXPECT => {
let bytes = value.as_bytes();
if bytes.len() >= 4 && &bytes[0..4] == b"100-" {
expect = true;
}
}
_ => (),
} else {
None
};
}
headers.append(name, value);
} else {
return Err(ParseError::Header);
header::UPGRADE => {
has_upgrade = true;
// check content-length, some clients (dart)
// sends "content-length: 0" with websocket upgrade
if let Ok(val) = value.to_str().map(|val| val.trim()) {
if val.eq_ignore_ascii_case("websocket") {
content_length = None;
}
}
}
header::EXPECT => {
let bytes = value.as_bytes();
if bytes.len() >= 4 && &bytes[0..4] == b"100-" {
expect = true;
}
}
_ => (),
}
headers.append(name, value);
}
}
self.set_connection_type(ka);
@ -217,10 +214,10 @@ impl MessageType for Request {
let mut msg = Request::new();
// convert headers
let len = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len])?;
let length = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len])?;
// payload decoder
let decoder = match len {
let decoder = match length {
PayloadLength::Payload(pl) => pl,
PayloadLength::Upgrade => {
// upgrade(websocket)
@ -287,13 +284,14 @@ impl MessageType for ResponseHead {
}
};
let mut msg = ResponseHead::default();
let mut msg = ResponseHead::new(status);
msg.version = ver;
// convert headers
let len = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len])?;
let length = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len])?;
// message payload
let decoder = if let PayloadLength::Payload(pl) = len {
let decoder = if let PayloadLength::Payload(pl) = length {
pl
} else if status == StatusCode::SWITCHING_PROTOCOLS {
// switching protocol or connect
@ -305,9 +303,6 @@ impl MessageType for ResponseHead {
PayloadType::None
};
msg.status = status;
msg.version = ver;
Ok(Some((msg, decoder)))
}
}

View File

@ -218,7 +218,7 @@ where
{
fn can_read(&self) -> bool {
if self.flags.contains(Flags::READ_DISCONNECT) {
return false;
false
} else if let Some(ref info) = self.payload {
info.need_read() == PayloadStatus::Read
} else {

View File

@ -8,7 +8,7 @@ macro_rules! STATIC_RESP {
($name:ident, $status:expr) => {
#[allow(non_snake_case, missing_docs)]
pub fn $name() -> ResponseBuilder {
Response::build($status)
ResponseBuilder::new($status)
}
};
}

View File

@ -1,5 +1,9 @@
//! Basic http primitives for actix-net framework.
#![allow(clippy::type_complexity, clippy::new_without_default)]
#![allow(
clippy::type_complexity,
clippy::new_without_default,
clippy::borrow_interior_mutable_const
)]
#[macro_use]
extern crate log;

View File

@ -1,5 +1,4 @@
use std::cell::{Ref, RefCell, RefMut};
use std::collections::VecDeque;
use std::rc::Rc;
use bitflags::bitflags;
@ -171,20 +170,20 @@ pub struct ResponseHead {
flags: Flags,
}
impl Default for ResponseHead {
fn default() -> ResponseHead {
impl ResponseHead {
/// Create new instance of `ResponseHead` type
#[inline]
pub fn new(status: StatusCode) -> ResponseHead {
ResponseHead {
status,
version: Version::default(),
status: StatusCode::OK,
headers: HeaderMap::with_capacity(12),
reason: None,
flags: Flags::empty(),
extensions: RefCell::new(Extensions::new()),
}
}
}
impl ResponseHead {
/// Message extensions
#[inline]
pub fn extensions(&self) -> Ref<Extensions> {
@ -335,8 +334,8 @@ pub(crate) struct BoxedResponseHead {
impl BoxedResponseHead {
/// Get new message from the pool of objects
pub fn new() -> Self {
RESPONSE_POOL.with(|p| p.get_message())
pub fn new(status: StatusCode) -> Self {
RESPONSE_POOL.with(|p| p.get_message(status))
}
}
@ -362,25 +361,25 @@ impl Drop for BoxedResponseHead {
#[doc(hidden)]
/// Request's objects pool
pub struct MessagePool<T: Head>(RefCell<VecDeque<Rc<T>>>);
pub struct MessagePool<T: Head>(RefCell<Vec<Rc<T>>>);
#[doc(hidden)]
/// Request's objects pool
pub struct BoxedResponsePool(RefCell<VecDeque<Box<ResponseHead>>>);
pub struct BoxedResponsePool(RefCell<Vec<Box<ResponseHead>>>);
thread_local!(static REQUEST_POOL: &'static MessagePool<RequestHead> = MessagePool::<RequestHead>::create());
thread_local!(static RESPONSE_POOL: &'static BoxedResponsePool = BoxedResponsePool::create());
impl<T: Head> MessagePool<T> {
fn create() -> &'static MessagePool<T> {
let pool = MessagePool(RefCell::new(VecDeque::with_capacity(128)));
let pool = MessagePool(RefCell::new(Vec::with_capacity(128)));
Box::leak(Box::new(pool))
}
/// Get message from the pool
#[inline]
fn get_message(&'static self) -> Message<T> {
if let Some(mut msg) = self.0.borrow_mut().pop_front() {
if let Some(mut msg) = self.0.borrow_mut().pop() {
if let Some(r) = Rc::get_mut(&mut msg) {
r.clear();
}
@ -397,28 +396,29 @@ impl<T: Head> MessagePool<T> {
fn release(&self, msg: Rc<T>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
v.push_front(msg);
v.push(msg);
}
}
}
impl BoxedResponsePool {
fn create() -> &'static BoxedResponsePool {
let pool = BoxedResponsePool(RefCell::new(VecDeque::with_capacity(128)));
let pool = BoxedResponsePool(RefCell::new(Vec::with_capacity(128)));
Box::leak(Box::new(pool))
}
/// Get message from the pool
#[inline]
fn get_message(&'static self) -> BoxedResponseHead {
if let Some(mut head) = self.0.borrow_mut().pop_front() {
fn get_message(&'static self, status: StatusCode) -> BoxedResponseHead {
if let Some(mut head) = self.0.borrow_mut().pop() {
head.reason = None;
head.status = status;
head.headers.clear();
head.flags = Flags::empty();
BoxedResponseHead { head: Some(head) }
} else {
BoxedResponseHead {
head: Some(Box::alloc().init(ResponseHead::default())),
head: Some(Box::alloc().init(ResponseHead::new(status))),
}
}
}
@ -428,7 +428,7 @@ impl BoxedResponsePool {
fn release(&self, msg: Box<ResponseHead>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
v.push_front(msg);
v.push(msg);
}
}
}

View File

@ -41,11 +41,8 @@ impl Response<Body> {
/// Constructs a response
#[inline]
pub fn new(status: StatusCode) -> Response {
let mut head = BoxedResponseHead::new();
head.status = status;
Response {
head,
head: BoxedResponseHead::new(status),
body: ResponseBody::Body(Body::Empty),
error: None,
}
@ -78,6 +75,16 @@ impl Response<Body> {
}
impl<B> Response<B> {
/// Constructs a response with body
#[inline]
pub fn with_body(status: StatusCode, body: B) -> Response<B> {
Response {
head: BoxedResponseHead::new(status),
body: ResponseBody::Body(body),
error: None,
}
}
#[inline]
/// Http message part of the response
pub fn head(&self) -> &ResponseHead {
@ -90,18 +97,6 @@ impl<B> Response<B> {
&mut *self.head
}
/// Constructs a response with body
#[inline]
pub fn with_body(status: StatusCode, body: B) -> Response<B> {
let mut head = BoxedResponseHead::new();
head.status = status;
Response {
head,
body: ResponseBody::Body(body),
error: None,
}
}
/// The source `error` for this response
#[inline]
pub fn error(&self) -> Option<&Error> {
@ -325,13 +320,11 @@ pub struct ResponseBuilder {
}
impl ResponseBuilder {
#[inline]
/// Create response builder
pub fn new(status: StatusCode) -> Self {
let mut head = BoxedResponseHead::new();
head.status = status;
ResponseBuilder {
head: Some(head),
head: Some(BoxedResponseHead::new(status)),
err: None,
cookies: None,
}
@ -555,15 +548,13 @@ impl ResponseBuilder {
/// }
/// ```
pub fn del_cookie<'a>(&mut self, cookie: &Cookie<'a>) -> &mut Self {
{
if self.cookies.is_none() {
self.cookies = Some(CookieJar::new())
}
let jar = self.cookies.as_mut().unwrap();
let cookie = cookie.clone().into_owned();
jar.add_original(cookie.clone());
jar.remove(cookie);
if self.cookies.is_none() {
self.cookies = Some(CookieJar::new())
}
let jar = self.cookies.as_mut().unwrap();
let cookie = cookie.clone().into_owned();
jar.add_original(cookie.clone());
jar.remove(cookie);
self
}
@ -605,6 +596,7 @@ impl ResponseBuilder {
head.extensions.borrow_mut()
}
#[inline]
/// Set a body and generate `Response`.
///
/// `ResponseBuilder` can not be used after this call.
@ -625,9 +617,7 @@ impl ResponseBuilder {
if let Some(ref jar) = self.cookies {
for cookie in jar.delta() {
match HeaderValue::from_str(&cookie.to_string()) {
Ok(val) => {
let _ = response.headers.append(header::SET_COOKIE, val);
}
Ok(val) => response.headers.append(header::SET_COOKIE, val),
Err(e) => return Response::from(Error::from(e)).into_body(),
};
}
@ -652,6 +642,7 @@ impl ResponseBuilder {
self.body(Body::from_message(BodyStream::new(stream)))
}
#[inline]
/// Set a json body and generate `Response`
///
/// `ResponseBuilder` can not be used after this call.
@ -751,11 +742,12 @@ impl<'a> From<&'a ResponseHead> for ResponseBuilder {
}
}
let mut msg = BoxedResponseHead::new();
let mut msg = BoxedResponseHead::new(head.status);
msg.version = head.version;
msg.status = head.status;
msg.reason = head.reason;
// msg.headers = head.headers.clone();
for (k, v) in &head.headers {
msg.headers.append(k.clone(), v.clone());
}
msg.no_chunking(!head.chunked());
ResponseBuilder {

View File

@ -178,6 +178,6 @@ impl TestRequest {
}
#[inline]
fn parts<'a>(parts: &'a mut Option<Inner>) -> &'a mut Inner {
fn parts(parts: &mut Option<Inner>) -> &mut Inner {
parts.as_mut().expect("cannot reuse test request builder")
}