1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-27 17:22:57 +01:00

various optimizations

This commit is contained in:
Nikolay Kim 2018-03-18 11:05:44 -07:00
parent c10dedf7e4
commit e0c8da567c
13 changed files with 159 additions and 141 deletions

View File

@ -1,5 +1,10 @@
# Changes
## 0.4.10 (2018-03-xx)
..
## 0.4.9 (2018-03-16)
* Allow to disable http/2 support

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.4.9"
version = "0.4.10"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic, extremely fast, web framework for Rust."
readme = "README.md"

View File

@ -235,9 +235,10 @@ impl<'a> From<&'a Arc<Vec<u8>>> for Binary {
}
impl AsRef<[u8]> for Binary {
#[inline]
fn as_ref(&self) -> &[u8] {
match *self {
Binary::Bytes(ref bytes) => bytes.as_ref(),
Binary::Bytes(ref bytes) => &bytes[..],
Binary::Slice(slice) => slice,
Binary::SharedString(ref s) => s.as_bytes(),
Binary::ArcSharedString(ref s) => s.as_bytes(),

View File

@ -149,6 +149,8 @@ impl ContentEncoding {
ContentEncoding::Identity | ContentEncoding::Auto => "identity",
}
}
#[inline]
/// default quality value
pub fn quality(&self) -> f64 {
match *self {

View File

@ -1,71 +1,13 @@
use std::{str, mem, ptr, slice};
use std::{mem, ptr, slice};
use std::cell::RefCell;
use std::fmt::{self, Write};
use std::rc::Rc;
use std::ops::{Deref, DerefMut};
use std::collections::VecDeque;
use time;
use bytes::{BufMut, BytesMut};
use http::Version;
use httprequest::HttpInnerMessage;
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
pub(crate) const DATE_VALUE_LENGTH: usize = 29;
pub(crate) fn date(dst: &mut BytesMut) {
CACHED.with(|cache| {
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
buf[..6].copy_from_slice(b"date: ");
buf[6..35].copy_from_slice(cache.borrow().buffer());
buf[35..].copy_from_slice(b"\r\n\r\n");
dst.extend_from_slice(&buf);
})
}
pub(crate) fn date_value(dst: &mut BytesMut) {
CACHED.with(|cache| {
dst.extend_from_slice(cache.borrow().buffer());
})
}
pub(crate) fn update_date() {
CACHED.with(|cache| {
cache.borrow_mut().update();
});
}
struct CachedDate {
bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
}
thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate {
bytes: [0; DATE_VALUE_LENGTH],
pos: 0,
}));
impl CachedDate {
fn buffer(&self) -> &[u8] {
&self.bytes[..]
}
fn update(&mut self) {
self.pos = 0;
write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap();
assert_eq!(self.pos, DATE_VALUE_LENGTH);
}
}
impl fmt::Write for CachedDate {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
/// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpInnerMessage>>>);
@ -202,7 +144,7 @@ pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesM
}
}
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
if four {
bytes.put(b' ');
}
@ -214,7 +156,7 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
b'n',b't',b'-',b'l',b'e',b'n',b'g',
b't',b'h',b':',b' ',b'0',b'\r',b'\n'];
buf[18] = (n as u8) + b'0';
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
} else if n < 100 {
let mut buf: [u8; 22] = [b'\r',b'\n',b'c',b'o',b'n',b't',b'e',
b'n',b't',b'-',b'l',b'e',b'n',b'g',
@ -224,7 +166,7 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
ptr::copy_nonoverlapping(
DEC_DIGITS_LUT.as_ptr().offset(d1 as isize), buf.as_mut_ptr().offset(18), 2);
}
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
} else if n < 1000 {
let mut buf: [u8; 23] = [b'\r',b'\n',b'c',b'o',b'n',b't',b'e',
b'n',b't',b'-',b'l',b'e',b'n',b'g',
@ -238,9 +180,9 @@ pub(crate) fn write_content_length(mut n: usize, bytes: &mut BytesMut) {
// decode last 1
buf[18] = (n as u8) + b'0';
bytes.extend_from_slice(&buf);
bytes.put_slice(&buf);
} else {
bytes.extend_from_slice(b"\r\ncontent-length: ");
bytes.put_slice(b"\r\ncontent-length: ");
convert_usize(n, bytes);
}
}
@ -299,20 +241,6 @@ pub(crate) fn convert_usize(mut n: usize, bytes: &mut BytesMut) {
mod tests {
use super::*;
#[test]
fn test_date_len() {
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]
fn test_date() {
let mut buf1 = BytesMut::new();
date(&mut buf1);
let mut buf2 = BytesMut::new();
date(&mut buf2);
assert_eq!(buf1, buf2);
}
#[test]
fn test_write_content_length() {
let mut bytes = BytesMut::new();

View File

@ -368,6 +368,7 @@ impl ContentEncoder {
response_encoding: ContentEncoding) -> ContentEncoder
{
let version = resp.version().unwrap_or_else(|| req.version);
let is_head = req.method == Method::HEAD;
let mut body = resp.replace_body(Body::Empty);
let has_body = match body {
Body::Empty => false,
@ -410,7 +411,9 @@ impl ContentEncoder {
TransferEncoding::length(0, buf)
},
Body::Binary(ref mut bytes) => {
if encoding.is_compression() {
if !(encoding == ContentEncoding::Identity
|| encoding == ContentEncoding::Auto)
{
let tmp = SharedBytes::default();
let transfer = TransferEncoding::eof(tmp.clone());
let mut enc = match encoding {
@ -431,13 +434,13 @@ impl ContentEncoder {
*bytes = Binary::from(tmp.take());
encoding = ContentEncoding::Identity;
}
if req.method == Method::HEAD {
if is_head {
let mut b = BytesMut::new();
let _ = write!(b, "{}", bytes.len());
resp.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
} else {
resp.headers_mut().remove(CONTENT_LENGTH);
// resp.headers_mut().remove(CONTENT_LENGTH);
}
TransferEncoding::eof(buf)
}
@ -460,7 +463,7 @@ impl ContentEncoder {
}
};
//
if req.method == Method::HEAD {
if is_head {
transfer.kind = TransferEncodingKind::Length(0);
} else {
resp.replace_body(body);

View File

@ -51,7 +51,7 @@ pub(crate) struct Http1<T: IoStream, H: 'static> {
flags: Flags,
settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>,
stream: H1Writer<T>,
stream: H1Writer<T, H>,
reader: Reader,
read_buf: BytesMut,
tasks: VecDeque<Entry>,
@ -72,7 +72,7 @@ impl<T, H> Http1<T, H>
{
let bytes = settings.get_shared_bytes();
Http1{ flags: Flags::KEEPALIVE,
stream: H1Writer::new(stream, bytes),
stream: H1Writer::new(stream, bytes, Rc::clone(&settings)),
reader: Reader::new(),
tasks: VecDeque::new(),
keepalive_timer: None,
@ -353,7 +353,7 @@ impl Reader {
PayloadStatus::Read
}
}
#[inline]
fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo)
-> Result<Decoding, ReaderError>
@ -502,10 +502,12 @@ impl Reader {
httparse::Status::Complete(len) => {
let method = Method::try_from(req.method.unwrap())
.map_err(|_| ParseError::Method)?;
let path = req.path.unwrap();
let path_start = path.as_ptr() as usize - bytes_ptr;
let path_end = path_start + path.len();
let path = (path_start, path_end);
//let path = req.path.unwrap();
//let path_start = path.as_ptr() as usize - bytes_ptr;
//let path_end = path_start + path.len();
//let path = (path_start, path_end);
let path = Uri::try_from(req.path.unwrap()).unwrap();
//.map_err(|_| ParseError::Uri)?;
let version = if req.version.unwrap() == 1 {
Version::HTTP_11
@ -525,9 +527,7 @@ impl Reader {
{
let msg_mut = msg.get_mut();
for header in headers[..headers_len].iter() {
let n_start = header.name.as_ptr() as usize - bytes_ptr;
let n_end = n_start + header.name.len();
if let Ok(name) = HeaderName::try_from(slice.slice(n_start, n_end)) {
if let Ok(name) = HeaderName::try_from(header.name) {
let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len();
let value = unsafe {
@ -539,8 +539,9 @@ impl Reader {
}
}
msg_mut.uri = Uri::from_shared(
slice.slice(path.0, path.1)).map_err(ParseError::Uri)?;
msg_mut.uri = path;
//msg_mut.uri = Uri::from_shared(
//slice.slice(path.0, path.1)).map_err(ParseError::Uri)?;
msg_mut.method = method;
msg_mut.version = version;
}

View File

@ -1,11 +1,12 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
use std::{io, mem};
use std::rc::Rc;
use bytes::BufMut;
use futures::{Async, Poll};
use tokio_io::AsyncWrite;
use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, DATE};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
use helpers;
use body::{Body, Binary};
@ -15,6 +16,7 @@ use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes;
use super::encoding::ContentEncoder;
use super::settings::WorkerSettings;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -27,7 +29,7 @@ bitflags! {
}
}
pub(crate) struct H1Writer<T: AsyncWrite> {
pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
flags: Flags,
stream: T,
encoder: ContentEncoder,
@ -35,11 +37,14 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
headers_size: u32,
buffer: SharedBytes,
buffer_capacity: usize,
settings: Rc<WorkerSettings<H>>,
}
impl<T: AsyncWrite> H1Writer<T> {
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
pub fn new(stream: T, buf: SharedBytes) -> H1Writer<T> {
pub fn new(stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>)
-> H1Writer<T, H>
{
H1Writer {
flags: Flags::empty(),
encoder: ContentEncoder::empty(buf.clone()),
@ -48,6 +53,7 @@ impl<T: AsyncWrite> H1Writer<T> {
buffer: buf,
buffer_capacity: 0,
stream,
settings,
}
}
@ -87,7 +93,7 @@ impl<T: AsyncWrite> H1Writer<T> {
}
}
impl<T: AsyncWrite> Writer for H1Writer<T> {
impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn written(&self) -> u64 {
@ -126,11 +132,14 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message
{
let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
let mut is_bin = if let Body::Binary(ref bytes) = body {
buffer.reserve(
256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
true
} else {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
}
false
};
// status line
helpers::write_status_line(version, msg.status().as_u16(), &mut buffer);
@ -139,21 +148,28 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
match body {
Body::Empty =>
if req.method != Method::HEAD {
SharedBytes::extend_from_slice_(buffer, b"\r\ncontent-length: 0\r\n");
SharedBytes::put_slice(
buffer, b"\r\ncontent-length: 0\r\n");
} else {
SharedBytes::extend_from_slice_(buffer, b"\r\n");
SharedBytes::put_slice(buffer, b"\r\n");
},
Body::Binary(ref bytes) =>
helpers::write_content_length(bytes.len(), &mut buffer),
_ =>
SharedBytes::extend_from_slice_(buffer, b"\r\n"),
SharedBytes::put_slice(buffer, b"\r\n"),
}
// write headers
let mut pos = 0;
let mut has_date = false;
let mut remaining = buffer.remaining_mut();
let mut buf: &mut [u8] = unsafe{ mem::transmute(buffer.bytes_mut()) };
for (key, value) in msg.headers() {
if is_bin && key == CONTENT_LENGTH {
is_bin = false;
continue
}
has_date = has_date || key == DATE;
let v = value.as_ref();
let k = key.as_str().as_bytes();
let len = k.len() + v.len() + 4;
@ -182,9 +198,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
}
unsafe{buffer.advance_mut(pos)};
// using helpers::date is quite a lot faster
if !msg.headers().contains_key(DATE) {
helpers::date(&mut buffer);
// optimized date header
if !has_date {
self.settings.set_date(&mut buffer);
} else {
// msg eof
SharedBytes::extend_from_slice_(buffer, b"\r\n");

View File

@ -43,7 +43,7 @@ struct Http2<T, H>
settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>,
state: State<IoWrapper<T>>,
tasks: VecDeque<Entry>,
tasks: VecDeque<Entry<H>>,
keepalive_timer: Option<Timeout>,
}
@ -274,20 +274,20 @@ bitflags! {
}
}
struct Entry {
struct Entry<H: 'static> {
task: Box<HttpHandlerTask>,
payload: PayloadType,
recv: RecvStream,
stream: H2Writer,
stream: H2Writer<H>,
flags: EntryFlags,
}
impl Entry {
fn new<H>(parts: Parts,
recv: RecvStream,
resp: SendResponse<Bytes>,
addr: Option<SocketAddr>,
settings: &Rc<WorkerSettings<H>>) -> Entry
impl<H: 'static> Entry<H> {
fn new(parts: Parts,
recv: RecvStream,
resp: SendResponse<Bytes>,
addr: Option<SocketAddr>,
settings: &Rc<WorkerSettings<H>>) -> Entry<H>
where H: HttpHandler + 'static
{
// Payload and Content-Encoding
@ -320,7 +320,8 @@ impl Entry {
Entry {task: task.unwrap_or_else(|| Pipeline::error(HttpNotFound)),
payload: psender,
stream: H2Writer::new(resp, settings.get_shared_bytes()),
stream: H2Writer::new(
resp, settings.get_shared_bytes(), Rc::clone(settings)),
flags: EntryFlags::empty(),
recv,
}

View File

@ -1,6 +1,7 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
use std::{io, cmp};
use std::rc::Rc;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll};
use http2::{Reason, SendStream};
@ -15,6 +16,7 @@ use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
use super::encoding::ContentEncoder;
use super::shared::SharedBytes;
use super::settings::WorkerSettings;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
const CHUNK_SIZE: usize = 16_384;
@ -28,7 +30,7 @@ bitflags! {
}
}
pub(crate) struct H2Writer {
pub(crate) struct H2Writer<H: 'static> {
respond: SendResponse<Bytes>,
stream: Option<SendStream<Bytes>>,
encoder: ContentEncoder,
@ -36,13 +38,17 @@ pub(crate) struct H2Writer {
written: u64,
buffer: SharedBytes,
buffer_capacity: usize,
settings: Rc<WorkerSettings<H>>,
}
impl H2Writer {
impl<H: 'static> H2Writer<H> {
pub fn new(respond: SendResponse<Bytes>, buf: SharedBytes) -> H2Writer {
pub fn new(respond: SendResponse<Bytes>,
buf: SharedBytes, settings: Rc<WorkerSettings<H>>) -> H2Writer<H>
{
H2Writer {
respond,
settings,
stream: None,
encoder: ContentEncoder::empty(buf.clone()),
flags: Flags::empty(),
@ -59,7 +65,7 @@ impl H2Writer {
}
}
impl Writer for H2Writer {
impl<H: 'static> Writer for H2Writer<H> {
fn written(&self) -> u64 {
self.written
@ -84,7 +90,7 @@ impl Writer for H2Writer {
// using helpers::date is quite a lot faster
if !msg.headers().contains_key(DATE) {
let mut bytes = BytesMut::with_capacity(29);
helpers::date_value(&mut bytes);
self.settings.set_date(&mut bytes);
msg.headers_mut().insert(DATE, HeaderValue::try_from(bytes.freeze()).unwrap());
}
@ -95,7 +101,8 @@ impl Writer for H2Writer {
helpers::convert_usize(bytes.len(), &mut val);
let l = val.len();
msg.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap());
CONTENT_LENGTH,
HeaderValue::try_from(val.split_to(l-2).freeze()).unwrap());
}
Body::Empty => {
msg.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));

View File

@ -1,7 +1,10 @@
use std::{fmt, net};
use std::{fmt, mem, net};
use std::fmt::Write;
use std::rc::Rc;
use std::sync::Arc;
use std::cell::{Cell, RefCell, RefMut, UnsafeCell};
use time;
use bytes::BytesMut;
use futures_cpupool::{Builder, CpuPool};
use helpers;
@ -95,6 +98,8 @@ impl ServerSettings {
}
}
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH: usize = 29;
pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>,
@ -104,6 +109,7 @@ pub(crate) struct WorkerSettings<H> {
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
node: Box<Node<()>>,
date: UnsafeCell<Date>,
}
impl<H> WorkerSettings<H> {
@ -121,6 +127,7 @@ impl<H> WorkerSettings<H> {
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
node: Box::new(Node::head()),
date: UnsafeCell::new(Date::new()),
}
}
@ -164,4 +171,63 @@ impl<H> WorkerSettings<H> {
error!("Number of removed channels is bigger than added channel. Bug in actix-web");
}
}
pub fn update_date(&self) {
unsafe{&mut *self.date.get()}.update();
}
pub fn set_date(&self, dst: &mut BytesMut) {
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
buf[..6].copy_from_slice(b"date: ");
buf[6..35].copy_from_slice(&(unsafe{&*self.date.get()}.bytes));
buf[35..].copy_from_slice(b"\r\n\r\n");
dst.extend_from_slice(&buf);
}
}
struct Date {
bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
}
impl Date {
fn new() -> Date {
let mut date = Date{bytes: [0; DATE_VALUE_LENGTH], pos: 0};
date.update();
date
}
fn update(&mut self) {
self.pos = 0;
write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap();
}
}
impl fmt::Write for Date {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_date_len() {
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]
fn test_date() {
let settings = WorkerSettings::<()>::new(Vec::new(), KeepAlive::Os);
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf1);
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf2);
assert_eq!(buf1, buf2);
}
}

View File

@ -18,7 +18,6 @@ use native_tls::TlsAcceptor;
#[cfg(feature="alpn")]
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
use helpers;
use super::{IntoHttpHandler, IoStream, KeepAlive};
use super::{PauseServer, ResumeServer, StopServer};
use super::channel::{HttpChannel, WrapperStream};
@ -58,13 +57,8 @@ enum ServerCommand {
WorkerDied(usize, Info),
}
impl<H> Actor for HttpServer<H> where H: IntoHttpHandler
{
impl<H> Actor for HttpServer<H> where H: IntoHttpHandler {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
@ -95,11 +89,6 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
/// Set number of workers to start.
///
/// By default http server uses number of available logical cpu as threads count.

View File

@ -22,7 +22,6 @@ use tokio_openssl::SslAcceptorExt;
use actix::*;
use actix::msgs::StopArbiter;
use helpers;
use server::{HttpHandler, KeepAlive};
use server::channel::HttpChannel;
use server::settings::WorkerSettings;
@ -76,7 +75,7 @@ impl<H: HttpHandler + 'static> Worker<H> {
}
fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date();
self.settings.update_date();
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}