1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

various optimizations

This commit is contained in:
Nikolay Kim 2017-12-13 16:44:35 -08:00
parent 81f8da03ae
commit 96f598f2c4
13 changed files with 328 additions and 232 deletions

View File

@ -48,6 +48,7 @@ pub enum Binary {
impl Body {
/// Does this body streaming.
#[inline]
pub fn is_streaming(&self) -> bool {
match *self {
Body::Streaming(_) | Body::StreamingContext
@ -57,6 +58,7 @@ impl Body {
}
/// Is this binary body.
#[inline]
pub fn is_binary(&self) -> bool {
match *self {
Body::Binary(_) => true,
@ -114,10 +116,12 @@ impl<T> From<T> for Body where T: Into<Binary>{
}
impl Binary {
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn len(&self) -> usize {
match *self {
Binary::Bytes(ref bytes) => bytes.len(),

View File

@ -1,68 +0,0 @@
use std::cell::RefCell;
use std::fmt::{self, Write};
use std::str;
use time::{self, Duration};
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
pub const DATE_VALUE_LENGTH: usize = 29;
pub fn extend(dst: &mut [u8]) {
CACHED.with(|cache| {
let mut cache = cache.borrow_mut();
let now = time::get_time();
if now > cache.next_update {
cache.update(now);
}
dst.copy_from_slice(cache.buffer());
})
}
struct CachedDate {
bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
next_update: time::Timespec,
}
thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate {
bytes: [0; DATE_VALUE_LENGTH],
pos: 0,
next_update: time::Timespec::new(0, 0),
}));
impl CachedDate {
fn buffer(&self) -> &[u8] {
&self.bytes[..]
}
fn update(&mut self, now: time::Timespec) {
self.pos = 0;
write!(self, "{}", time::at_utc(now).rfc822()).unwrap();
assert_eq!(self.pos, DATE_VALUE_LENGTH);
self.next_update = now + Duration::seconds(1);
self.next_update.nsec = 0;
}
}
impl fmt::Write for CachedDate {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
#[test]
fn test_date_len() {
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]
fn test_date() {
let mut buf1 = [0u8; 29];
extend(&mut buf1);
let mut buf2 = [0u8; 29];
extend(&mut buf2);
assert_eq!(buf1, buf2);
}

View File

@ -13,6 +13,7 @@ use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut, BufMut, Writer};
use utils;
use body::{Body, Binary};
use error::PayloadError;
use httprequest::HttpMessage;
@ -35,6 +36,14 @@ pub enum ContentEncoding {
}
impl ContentEncoding {
fn is_compression(&self) -> bool {
match *self {
ContentEncoding::Identity | ContentEncoding::Auto => false,
_ => true
}
}
fn as_str(&self) -> &'static str {
match *self {
ContentEncoding::Br => "br",
@ -270,10 +279,10 @@ impl PayloadWriter for EncodedPayload {
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
let mut buf = BytesMut::new();
buf.extend(data);
buf.extend_from_slice(&data);
*(decoder.as_mut()) = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap());
} else {
decoder.as_mut().as_mut().unwrap().get_mut().buf.extend(data);
decoder.as_mut().as_mut().unwrap().get_mut().buf.extend_from_slice(&data);
}
loop {
@ -362,8 +371,10 @@ impl PayloadEncoder {
}
encoding => encoding,
};
if encoding.is_compression() {
resp.headers_mut().insert(
CONTENT_ENCODING, HeaderValue::from_static(encoding.as_str()));
}
encoding
} else {
ContentEncoding::Identity
@ -400,15 +411,13 @@ impl PayloadEncoder {
let b = enc.get_mut().take();
resp.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::from_str(&b.len().to_string()).unwrap());
CONTENT_LENGTH, utils::convert_into_header(b.len()));
*bytes = Binary::from(b);
encoding = ContentEncoding::Identity;
TransferEncoding::eof()
} else {
resp.headers_mut().insert(
CONTENT_LENGTH,
HeaderValue::from_str(&bytes.len().to_string()).unwrap());
CONTENT_LENGTH, utils::convert_into_header(bytes.len()));
TransferEncoding::eof()
}
}
@ -491,12 +500,14 @@ impl PayloadEncoder {
self.0.is_eof()
}
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write(&mut self, payload: &[u8]) -> Result<(), io::Error> {
self.0.write(payload)
}
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> {
self.0.write_eof()
}
@ -553,6 +564,7 @@ impl ContentEncoder {
}
}
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> {
let encoder = mem::replace(self, ContentEncoder::Identity(TransferEncoding::eof()));
@ -592,6 +604,7 @@ impl ContentEncoder {
}
}
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
match *self {
@ -692,11 +705,11 @@ impl TransferEncoding {
}
/// Encode message. Return `EOF` state of encoder
#[inline(always)]
#[inline]
pub fn encode(&mut self, msg: &[u8]) -> bool {
match self.kind {
TransferEncodingKind::Eof => {
self.buffer.extend(msg);
self.buffer.extend_from_slice(msg);
msg.is_empty()
},
TransferEncodingKind::Chunked(ref mut eof) => {
@ -706,11 +719,11 @@ impl TransferEncoding {
if msg.is_empty() {
*eof = true;
self.buffer.extend(b"0\r\n\r\n");
self.buffer.extend_from_slice(b"0\r\n\r\n");
} else {
write!(self.buffer, "{:X}\r\n", msg.len()).unwrap();
self.buffer.extend(msg);
self.buffer.extend(b"\r\n");
self.buffer.extend_from_slice(msg);
self.buffer.extend_from_slice(b"\r\n");
}
*eof
},
@ -720,7 +733,7 @@ impl TransferEncoding {
}
let max = cmp::min(*remaining, msg.len() as u64);
trace!("sized write = {}", max);
self.buffer.extend(msg[..max as usize].as_ref());
self.buffer.extend_from_slice(msg[..max as usize].as_ref());
*remaining -= max as u64;
trace!("encoded {} bytes, remaining = {}", max, remaining);
@ -730,14 +743,14 @@ impl TransferEncoding {
}
/// Encode eof. Return `EOF` state of encoder
#[inline(always)]
#[inline]
pub fn encode_eof(&mut self) {
match self.kind {
TransferEncodingKind::Eof | TransferEncodingKind::Length(_) => (),
TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof {
*eof = true;
self.buffer.extend(b"0\r\n\r\n");
self.buffer.extend_from_slice(b"0\r\n\r\n");
}
},
}

View File

@ -88,8 +88,8 @@ impl<T, H> Http1<T, H>
keepalive_timer: None }
}
pub fn into_inner(mut self) -> (Rc<Vec<H>>, T, Option<SocketAddr>, Bytes) {
(self.handlers, self.stream.unwrap(), self.addr, self.read_buf.freeze())
pub fn into_inner(self) -> (Rc<Vec<H>>, T, Option<SocketAddr>, Bytes) {
(self.handlers, self.stream.into_inner(), self.addr, self.read_buf.freeze())
}
pub fn poll(&mut self) -> Poll<Http1Result, ()> {
@ -129,7 +129,7 @@ impl<T, H> Http1<T, H>
} else {
self.flags.remove(Flags::KEEPALIVE);
}
self.stream = H1Writer::new(self.stream.unwrap());
self.stream.reset();
item.flags.insert(EntryFlags::EOF);
if ready {
@ -252,12 +252,12 @@ impl<T, H> Http1<T, H>
if self.flags.contains(Flags::KEEPALIVE) {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut timeout = Timeout::new(
let mut to = Timeout::new(
Duration::new(KEEPALIVE_PERIOD, 0),
Arbiter::handle()).unwrap();
// register timeout
let _ = timeout.poll();
self.keepalive_timer = Some(timeout);
let _ = to.poll();
self.keepalive_timer = Some(to);
}
} else {
// keep-alive disable, drop connection
@ -482,8 +482,7 @@ impl Reader {
}
}
fn parse_message(buf: &mut BytesMut) -> Result<Message, ParseError>
{
fn parse_message(buf: &mut BytesMut) -> Result<Message, ParseError> {
if buf.is_empty() {
return Ok(Message::NotReady);
}

View File

@ -4,7 +4,7 @@ use tokio_io::AsyncWrite;
use http::Version;
use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, DATE};
use date;
use utils;
use body::Body;
use encoding::PayloadEncoder;
use httprequest::HttpMessage;
@ -45,7 +45,7 @@ bitflags! {
pub(crate) struct H1Writer<T: AsyncWrite> {
flags: Flags,
stream: Option<T>,
stream: T,
encoder: PayloadEncoder,
written: u64,
headers_size: u32,
@ -56,7 +56,7 @@ impl<T: AsyncWrite> H1Writer<T> {
pub fn new(stream: T) -> H1Writer<T> {
H1Writer {
flags: Flags::empty(),
stream: Some(stream),
stream: stream,
encoder: PayloadEncoder::default(),
written: 0,
headers_size: 0,
@ -64,11 +64,16 @@ impl<T: AsyncWrite> H1Writer<T> {
}
pub fn get_mut(&mut self) -> &mut T {
self.stream.as_mut().unwrap()
&mut self.stream
}
pub fn unwrap(&mut self) -> T {
self.stream.take().unwrap()
pub fn reset(&mut self) {
self.written = 0;
self.flags = Flags::empty();
}
pub fn into_inner(self) -> T {
self.stream
}
pub fn disconnected(&mut self) {
@ -82,9 +87,8 @@ impl<T: AsyncWrite> H1Writer<T> {
fn write_to_stream(&mut self) -> Result<WriterState, io::Error> {
let buffer = self.encoder.get_mut();
if let Some(ref mut stream) = self.stream {
while !buffer.is_empty() {
match stream.write(buffer.as_ref()) {
match self.stream.write(buffer.as_ref()) {
Ok(n) => {
buffer.split_to(n);
self.written += n as u64;
@ -99,7 +103,6 @@ impl<T: AsyncWrite> H1Writer<T> {
Err(err) => return Err(err),
}
}
}
Ok(WriterState::Done)
}
}
@ -143,50 +146,47 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message
{
let buffer = self.encoder.get_mut();
let mut buffer = self.encoder.get_mut();
if let Body::Binary(ref bytes) = *msg.body() {
buffer.reserve(130 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
buffer.reserve(150 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else {
buffer.reserve(130 + msg.headers().len() * AVERAGE_HEADER_SIZE);
buffer.reserve(150 + msg.headers().len() * AVERAGE_HEADER_SIZE);
}
match version {
Version::HTTP_11 => buffer.extend(b"HTTP/1.1 "),
Version::HTTP_2 => buffer.extend(b"HTTP/2.0 "),
Version::HTTP_10 => buffer.extend(b"HTTP/1.0 "),
Version::HTTP_09 => buffer.extend(b"HTTP/0.9 "),
Version::HTTP_11 => buffer.extend_from_slice(b"HTTP/1.1 "),
Version::HTTP_2 => buffer.extend_from_slice(b"HTTP/2.0 "),
Version::HTTP_10 => buffer.extend_from_slice(b"HTTP/1.0 "),
Version::HTTP_09 => buffer.extend_from_slice(b"HTTP/0.9 "),
}
buffer.extend(msg.status().as_u16().to_string().as_bytes());
buffer.extend(b" ");
buffer.extend(msg.reason().as_bytes());
buffer.extend(b"\r\n");
utils::convert_u16(msg.status().as_u16(), &mut buffer);
buffer.extend_from_slice(b" ");
buffer.extend_from_slice(msg.reason().as_bytes());
buffer.extend_from_slice(b"\r\n");
for (key, value) in msg.headers() {
let t: &[u8] = key.as_ref();
buffer.extend(t);
buffer.extend(b": ");
buffer.extend(value.as_ref());
buffer.extend(b"\r\n");
buffer.extend_from_slice(t);
buffer.extend_from_slice(b": ");
buffer.extend_from_slice(value.as_ref());
buffer.extend_from_slice(b"\r\n");
}
// using http::h1::date is quite a lot faster than generating
// a unique Date header each time like req/s goes up about 10%
// using utils::date is quite a lot faster
if !msg.headers().contains_key(DATE) {
buffer.reserve(date::DATE_VALUE_LENGTH + 8);
buffer.extend(b"Date: ");
let mut bytes = [0u8; 29];
date::extend(&mut bytes[..]);
buffer.extend(&bytes);
buffer.extend(b"\r\n");
buffer.reserve(utils::DATE_VALUE_LENGTH + 8);
buffer.extend_from_slice(b"Date: ");
utils::extend(&mut buffer);
buffer.extend_from_slice(b"\r\n");
}
// default content-type
if !msg.headers().contains_key(CONTENT_TYPE) {
buffer.extend(b"ContentType: application/octet-stream\r\n".as_ref());
buffer.extend_from_slice(b"ContentType: application/octet-stream\r\n");
}
// msg eof
buffer.extend(b"\r\n");
buffer.extend_from_slice(b"\r\n");
self.headers_size = buffer.len() as u32;
}

View File

@ -1,12 +1,12 @@
use std::{io, cmp};
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll};
use http2::{Reason, SendStream};
use http2::server::Respond;
use http::{Version, HttpTryFrom, Response};
use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, TRANSFER_ENCODING, DATE};
use date;
use utils;
use body::Body;
use encoding::PayloadEncoder;
use httprequest::HttpMessage;
@ -124,11 +124,10 @@ impl Writer for H2Writer {
msg.headers_mut().remove(CONNECTION);
msg.headers_mut().remove(TRANSFER_ENCODING);
// using http::h1::date is quite a lot faster than generating
// a unique Date header each time like req/s goes up about 10%
// using utils::date is quite a lot faster
if !msg.headers().contains_key(DATE) {
let mut bytes = [0u8; 29];
date::extend(&mut bytes[..]);
let mut bytes = BytesMut::with_capacity(29);
utils::extend(&mut bytes);
msg.headers_mut().insert(DATE, HeaderValue::try_from(&bytes[..]).unwrap());
}

View File

@ -515,7 +515,7 @@ impl Future for UrlEncoded {
Ok(Async::Ready(m))
},
Ok(Async::Ready(Some(item))) => {
self.body.extend(item.0);
self.body.extend_from_slice(&item.0);
continue
},
Err(err) => Err(err),

View File

@ -47,8 +47,9 @@ impl HttpResponse {
#[inline]
pub fn build(status: StatusCode) -> HttpResponseBuilder {
HttpResponseBuilder {
parts: Some(Parts::new(status)),
response: Some(HttpResponse::new(status, Body::Empty)),
err: None,
cookies: None,
}
}
@ -57,7 +58,7 @@ impl HttpResponse {
pub fn new(status: StatusCode, body: Body) -> HttpResponse {
HttpResponse {
version: None,
headers: Default::default(),
headers: HeaderMap::with_capacity(8),
status: status,
reason: None,
body: body,
@ -213,49 +214,22 @@ impl fmt::Debug for HttpResponse {
}
}
#[derive(Debug)]
struct Parts {
version: Option<Version>,
headers: HeaderMap,
status: StatusCode,
reason: Option<&'static str>,
chunked: bool,
encoding: ContentEncoding,
connection_type: Option<ConnectionType>,
cookies: Option<CookieJar>,
}
impl Parts {
fn new(status: StatusCode) -> Self {
Parts {
version: None,
headers: HeaderMap::with_capacity(8),
status: status,
reason: None,
chunked: false,
encoding: ContentEncoding::Auto,
connection_type: None,
cookies: None,
}
}
}
/// An HTTP response builder
///
/// This type can be used to construct an instance of `HttpResponse` through a
/// builder-like pattern.
#[derive(Debug)]
pub struct HttpResponseBuilder {
parts: Option<Parts>,
response: Option<HttpResponse>,
err: Option<HttpError>,
cookies: Option<CookieJar>,
}
impl HttpResponseBuilder {
/// Set the HTTP version of this response.
#[inline]
pub fn version(&mut self, version: Version) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.version = Some(version);
}
self
@ -264,7 +238,7 @@ impl HttpResponseBuilder {
/// Set the `StatusCode` for this response.
#[inline]
pub fn status(&mut self, status: StatusCode) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.status = status;
}
self
@ -276,7 +250,7 @@ impl HttpResponseBuilder {
where HeaderName: HttpTryFrom<K>,
HeaderValue: HttpTryFrom<V>
{
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
match HeaderName::try_from(key) {
Ok(key) => {
match HeaderValue::try_from(value) {
@ -293,7 +267,7 @@ impl HttpResponseBuilder {
/// Set the custom reason for the response.
#[inline]
pub fn reason(&mut self, reason: &'static str) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.reason = Some(reason);
}
self
@ -306,7 +280,7 @@ impl HttpResponseBuilder {
/// To enforce specific encoding, use specific ContentEncoding` value.
#[inline]
pub fn content_encoding(&mut self, enc: ContentEncoding) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.encoding = enc;
}
self
@ -315,7 +289,7 @@ impl HttpResponseBuilder {
/// Set connection type
#[inline]
pub fn connection_type(&mut self, conn: ConnectionType) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.connection_type = Some(conn);
}
self
@ -336,7 +310,7 @@ impl HttpResponseBuilder {
/// Enables automatic chunked transfer encoding
#[inline]
pub fn enable_chunked(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.chunked = true;
}
self
@ -347,7 +321,7 @@ impl HttpResponseBuilder {
pub fn content_type<V>(&mut self, value: V) -> &mut Self
where HeaderValue: HttpTryFrom<V>
{
if let Some(parts) = parts(&mut self.parts, &self.err) {
if let Some(parts) = parts(&mut self.response, &self.err) {
match HeaderValue::try_from(value) {
Ok(value) => { parts.headers.insert(header::CONTENT_TYPE, value); },
Err(e) => self.err = Some(e.into()),
@ -358,25 +332,23 @@ impl HttpResponseBuilder {
/// Set a cookie
pub fn cookie<'c>(&mut self, cookie: Cookie<'c>) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if parts.cookies.is_none() {
if self.cookies.is_none() {
let mut jar = CookieJar::new();
jar.add(cookie.into_owned());
parts.cookies = Some(jar)
self.cookies = Some(jar)
} else {
parts.cookies.as_mut().unwrap().add(cookie.into_owned());
}
self.cookies.as_mut().unwrap().add(cookie.into_owned());
}
self
}
/// Remote cookie, cookie has to be cookie from `HttpRequest::cookies()` method.
pub fn del_cookie<'a>(&mut self, cookie: &Cookie<'a>) -> &mut Self {
if let Some(parts) = parts(&mut self.parts, &self.err) {
if parts.cookies.is_none() {
parts.cookies = Some(CookieJar::new())
{
if self.cookies.is_none() {
self.cookies = Some(CookieJar::new())
}
let mut jar = parts.cookies.as_mut().unwrap();
let jar = self.cookies.as_mut().unwrap();
let cookie = cookie.clone().into_owned();
jar.add_original(cookie.clone());
jar.remove(cookie);
@ -397,36 +369,26 @@ impl HttpResponseBuilder {
/// Set a body and generate `HttpResponse`.
/// `HttpResponseBuilder` can not be used after this call.
pub fn body<B: Into<Body>>(&mut self, body: B) -> Result<HttpResponse, HttpError> {
let mut parts = self.parts.take().expect("cannot reuse response builder");
if let Some(e) = self.err.take() {
return Err(e)
}
if let Some(jar) = parts.cookies {
let mut response = self.response.take().expect("cannot reuse response builder");
if let Some(ref jar) = self.cookies {
for cookie in jar.delta() {
parts.headers.append(
response.headers.append(
header::SET_COOKIE,
HeaderValue::from_str(&cookie.to_string())?);
}
}
Ok(HttpResponse {
version: parts.version,
headers: parts.headers,
status: parts.status,
reason: parts.reason,
body: body.into(),
chunked: parts.chunked,
encoding: parts.encoding,
connection_type: parts.connection_type,
response_size: 0,
error: None,
})
response.body = body.into();
Ok(response)
}
/// Set a json body and generate `HttpResponse`
pub fn json<T: Serialize>(&mut self, value: T) -> Result<HttpResponse, Error> {
let body = serde_json::to_string(&value)?;
let contains = if let Some(parts) = parts(&mut self.parts, &self.err) {
let contains = if let Some(parts) = parts(&mut self.response, &self.err) {
parts.headers.contains_key(header::CONTENT_TYPE)
} else {
true
@ -444,7 +406,8 @@ impl HttpResponseBuilder {
}
}
fn parts<'a>(parts: &'a mut Option<Parts>, err: &Option<HttpError>) -> Option<&'a mut Parts>
fn parts<'a>(parts: &'a mut Option<HttpResponse>, err: &Option<HttpError>)
-> Option<&'a mut HttpResponse>
{
if err.is_some() {
return None

View File

@ -55,7 +55,7 @@ extern crate tokio_openssl;
mod application;
mod body;
mod context;
mod date;
mod utils;
mod encoding;
mod httprequest;
mod httpresponse;

View File

@ -250,7 +250,7 @@ impl Inner {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
self.len -= rem;
buf.extend(&chunk.split_to(rem));
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
return Ok(Async::Ready(buf.freeze()))
@ -299,12 +299,12 @@ impl Inner {
let mut buf = BytesMut::with_capacity(length);
if num > 0 {
for _ in 0..num {
buf.extend(self.items.pop_front().unwrap());
buf.extend_from_slice(&self.items.pop_front().unwrap());
}
}
if offset > 0 {
let mut chunk = self.items.pop_front().unwrap();
buf.extend(chunk.split_to(offset));
buf.extend_from_slice(&chunk.split_to(offset));
if !chunk.is_empty() {
self.items.push_front(chunk)
}
@ -330,7 +330,7 @@ impl Inner {
if len > 0 {
let mut buf = BytesMut::with_capacity(len);
for item in &self.items {
buf.extend(item);
buf.extend_from_slice(item);
}
self.items = VecDeque::new();
self.len = 0;

View File

@ -1,7 +1,7 @@
use std::{io, net, thread};
use std::rc::Rc;
use std::sync::Arc;
//use std::time::Duration;
use std::time::Duration;
use std::marker::PhantomData;
use actix::dev::*;
@ -28,6 +28,7 @@ use openssl::pkcs12::ParsedPkcs12;
#[cfg(feature="alpn")]
use tokio_openssl::{SslStream, SslAcceptorExt};
use utils;
use channel::{HttpChannel, HttpHandler, IntoHttpHandler};
/// Various server settings
@ -99,10 +100,23 @@ pub struct HttpServer<T, A, H, U>
impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<T: 'static, A: 'static, H, U: 'static> HttpServer<T, A, H, U> {
fn update_time(&self, ctx: &mut Context<Self>) {
utils::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
}
impl<T, A, H, U, V> HttpServer<T, A, H, U>
where H: HttpHandler,
where A: 'static,
T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
@ -126,15 +140,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
self.threads = num;
self
}
}
impl<T, A, H, U, V> HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static,
A: 'static,
H: HttpHandler,
U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>,
{
/// Start listening for incomming connections from a stream.
///
/// This method uses only one thread for handling incoming connections.
@ -387,11 +393,18 @@ struct Worker<H> {
handler: StreamHandlerType,
}
impl<H: 'static> Worker<H> {
fn update_time(&self, ctx: &mut Context<Self>) {
utils::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
}
impl<H: 'static> Actor for Worker<H> {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}

173
src/utils.rs Normal file
View File

@ -0,0 +1,173 @@
use std::{str, mem, ptr, slice};
use std::cell::RefCell;
use std::fmt::{self, Write};
use time;
use bytes::BytesMut;
use http::header::HeaderValue;
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
pub const DATE_VALUE_LENGTH: usize = 29;
pub fn extend(dst: &mut BytesMut) {
CACHED.with(|cache| {
dst.extend_from_slice(cache.borrow().buffer());
})
}
pub fn update_date() {
CACHED.with(|cache| {
cache.borrow_mut().update();
});
}
struct CachedDate {
bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
}
thread_local!(static CACHED: RefCell<CachedDate> = RefCell::new(CachedDate {
bytes: [0; DATE_VALUE_LENGTH],
pos: 0,
}));
impl CachedDate {
fn buffer(&self) -> &[u8] {
&self.bytes[..]
}
fn update(&mut self) {
self.pos = 0;
write!(self, "{}", time::at_utc(time::get_time()).rfc822()).unwrap();
assert_eq!(self.pos, DATE_VALUE_LENGTH);
}
}
impl fmt::Write for CachedDate {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
const DEC_DIGITS_LUT: &[u8] =
b"0001020304050607080910111213141516171819\
2021222324252627282930313233343536373839\
4041424344454647484950515253545556575859\
6061626364656667686970717273747576777879\
8081828384858687888990919293949596979899";
pub(crate) fn convert_u16(mut n: u16, bytes: &mut BytesMut) {
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
let mut curr = buf.len() as isize;
let buf_ptr = buf.as_mut_ptr();
let lut_ptr = DEC_DIGITS_LUT.as_ptr();
unsafe {
// need at least 16 bits for the 4-characters-at-a-time to work.
if mem::size_of::<u16>() >= 2 {
// eagerly decode 4 characters at a time
while n >= 10_000 {
let rem = (n % 10_000) as isize;
n /= 10_000;
let d1 = (rem / 100) << 1;
let d2 = (rem % 100) << 1;
curr -= 4;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2);
ptr::copy_nonoverlapping(lut_ptr.offset(d2), buf_ptr.offset(curr + 2), 2);
}
}
// if we reach here numbers are <= 9999, so at most 4 chars long
let mut n = n as isize; // possibly reduce 64bit math
// decode 2 more chars, if > 2 chars
if n >= 100 {
let d1 = (n % 100) << 1;
n /= 100;
curr -= 2;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2);
}
// decode last 1 or 2 chars
if n < 10 {
curr -= 1;
*buf_ptr.offset(curr) = (n as u8) + b'0';
} else {
let d1 = n << 1;
curr -= 2;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2);
}
}
unsafe {
bytes.extend_from_slice(
slice::from_raw_parts(buf_ptr.offset(curr), buf.len() - curr as usize));
}
}
pub(crate) fn convert_into_header(mut n: usize) -> HeaderValue {
let mut curr: isize = 39;
let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
let buf_ptr = buf.as_mut_ptr();
let lut_ptr = DEC_DIGITS_LUT.as_ptr();
unsafe {
// need at least 16 bits for the 4-characters-at-a-time to work.
if mem::size_of::<usize>() >= 2 {
// eagerly decode 4 characters at a time
while n >= 10_000 {
let rem = (n % 10_000) as isize;
n /= 10_000;
let d1 = (rem / 100) << 1;
let d2 = (rem % 100) << 1;
curr -= 4;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2);
ptr::copy_nonoverlapping(lut_ptr.offset(d2), buf_ptr.offset(curr + 2), 2);
}
}
// if we reach here numbers are <= 9999, so at most 4 chars long
let mut n = n as isize; // possibly reduce 64bit math
// decode 2 more chars, if > 2 chars
if n >= 100 {
let d1 = (n % 100) << 1;
n /= 100;
curr -= 2;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2);
}
// decode last 1 or 2 chars
if n < 10 {
curr -= 1;
*buf_ptr.offset(curr) = (n as u8) + b'0';
} else {
let d1 = n << 1;
curr -= 2;
ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2);
}
}
unsafe {
HeaderValue::from_bytes(
slice::from_raw_parts(buf_ptr.offset(curr), buf.len() - curr as usize)).unwrap()
}
}
#[test]
fn test_date_len() {
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]
fn test_date() {
let mut buf1 = BytesMut::new();
extend(&mut buf1);
let mut buf2 = BytesMut::new();
extend(&mut buf2);
assert_eq!(buf1, buf2);
}

View File

@ -201,7 +201,7 @@ impl Stream for WsStream {
loop {
match self.rx.readany() {
Ok(Async::Ready(Some(chunk))) => {
self.buf.extend(chunk.0)
self.buf.extend_from_slice(&chunk.0)
}
Ok(Async::Ready(None)) => {
done = true;