From c37565cc4aa7c53aa39fa0a5f909945a7c5bd44d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 14 Dec 2017 19:34:31 -0800 Subject: [PATCH] various server optimizations --- src/encoding.rs | 86 ++++++++++++----------- src/h1.rs | 102 +++++++++++++++++++-------- src/h1writer.rs | 17 +++-- src/h2writer.rs | 7 +- src/helpers.rs | 170 +++++++++++++++++++++++++++++++++++++++++++++ src/httprequest.rs | 94 +++++++++++++++---------- src/param.rs | 4 ++ src/pipeline.rs | 22 +++--- src/server.rs | 10 +++ 9 files changed, 385 insertions(+), 127 deletions(-) diff --git a/src/encoding.rs b/src/encoding.rs index 653894a96..7918e20d9 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -14,6 +14,7 @@ use brotli2::write::{BrotliDecoder, BrotliEncoder}; use bytes::{Bytes, BytesMut, BufMut, Writer}; use helpers; +use helpers::SharedBytes; use body::{Body, Binary}; use error::PayloadError; use httprequest::HttpMessage; @@ -337,15 +338,15 @@ impl PayloadWriter for EncodedPayload { pub(crate) struct PayloadEncoder(ContentEncoder); -impl Default for PayloadEncoder { - fn default() -> PayloadEncoder { - PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof())) - } -} - impl PayloadEncoder { - pub fn new(req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder { + pub fn empty(bytes: SharedBytes) -> PayloadEncoder { + PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof(bytes))) + } + + pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) + -> PayloadEncoder + { let version = resp.version().unwrap_or_else(|| req.version); let mut body = resp.replace_body(Body::Empty); let has_body = match body { @@ -390,11 +391,11 @@ impl PayloadEncoder { error!("Chunked transfer is enabled but body is set to Empty"); } resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0")); - TransferEncoding::eof() + TransferEncoding::eof(buf) }, Body::Binary(ref mut bytes) => { if compression { - let transfer = TransferEncoding::eof(); + let transfer = TransferEncoding::eof(SharedBytes::default()); let mut enc = match encoding { ContentEncoding::Deflate => ContentEncoder::Deflate( DeflateEncoder::new(transfer, Compression::Default)), @@ -414,11 +415,11 @@ impl PayloadEncoder { CONTENT_LENGTH, helpers::convert_into_header(b.len())); *bytes = Binary::from(b); encoding = ContentEncoding::Identity; - TransferEncoding::eof() + TransferEncoding::eof(buf) } else { resp.headers_mut().insert( CONTENT_LENGTH, helpers::convert_into_header(bytes.len())); - TransferEncoding::eof() + TransferEncoding::eof(buf) } } Body::Streaming(_) | Body::StreamingContext => { @@ -429,26 +430,26 @@ impl PayloadEncoder { } if version == Version::HTTP_2 { resp.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::eof() + TransferEncoding::eof(buf) } else { resp.headers_mut().insert( TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - TransferEncoding::chunked() + TransferEncoding::chunked(buf) } } else if let Some(len) = resp.headers().get(CONTENT_LENGTH) { // Content-Length if let Ok(s) = len.to_str() { if let Ok(len) = s.parse::() { - TransferEncoding::length(len) + TransferEncoding::length(len, buf) } else { debug!("illegal Content-Length: {:?}", len); - TransferEncoding::eof() + TransferEncoding::eof(buf) } } else { - TransferEncoding::eof() + TransferEncoding::eof(buf) } } else { - TransferEncoding::eof() + TransferEncoding::eof(buf) } } Body::Upgrade(_) | Body::UpgradeContext => { @@ -462,7 +463,7 @@ impl PayloadEncoder { encoding = ContentEncoding::Identity; resp.headers_mut().remove(CONTENT_ENCODING); } - TransferEncoding::eof() + TransferEncoding::eof(buf) } }; resp.replace_body(body); @@ -540,13 +541,13 @@ impl ContentEncoder { pub fn get_ref(&self) -> &BytesMut { match *self { ContentEncoder::Br(ref encoder) => - &encoder.get_ref().buffer, + encoder.get_ref().buffer.get_ref(), ContentEncoder::Deflate(ref encoder) => - &encoder.get_ref().buffer, + encoder.get_ref().buffer.get_ref(), ContentEncoder::Gzip(ref encoder) => - &encoder.get_ref().buffer, + encoder.get_ref().buffer.get_ref(), ContentEncoder::Identity(ref encoder) => - &encoder.buffer, + encoder.buffer.get_ref(), } } @@ -554,20 +555,21 @@ impl ContentEncoder { pub fn get_mut(&mut self) -> &mut BytesMut { match *self { ContentEncoder::Br(ref mut encoder) => - &mut encoder.get_mut().buffer, + encoder.get_mut().buffer.get_mut(), ContentEncoder::Deflate(ref mut encoder) => - &mut encoder.get_mut().buffer, + encoder.get_mut().buffer.get_mut(), ContentEncoder::Gzip(ref mut encoder) => - &mut encoder.get_mut().buffer, + encoder.get_mut().buffer.get_mut(), ContentEncoder::Identity(ref mut encoder) => - &mut encoder.buffer, + encoder.buffer.get_mut(), } } #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[inline(always)] pub fn write_eof(&mut self) -> Result<(), io::Error> { - let encoder = mem::replace(self, ContentEncoder::Identity(TransferEncoding::eof())); + let encoder = mem::replace( + self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::default()))); match encoder { ContentEncoder::Br(encoder) => { @@ -639,7 +641,7 @@ impl ContentEncoder { } } ContentEncoder::Identity(ref mut encoder) => { - encoder.write_all(data)?; + encoder.encode(data); Ok(()) } } @@ -650,7 +652,7 @@ impl ContentEncoder { #[derive(Debug, Clone)] pub(crate) struct TransferEncoding { kind: TransferEncodingKind, - buffer: BytesMut, + buffer: SharedBytes, } #[derive(Debug, PartialEq, Clone)] @@ -670,26 +672,26 @@ enum TransferEncodingKind { impl TransferEncoding { #[inline] - pub fn eof() -> TransferEncoding { + pub fn eof(bytes: SharedBytes) -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Eof, - buffer: BytesMut::new(), + buffer: bytes, } } #[inline] - pub fn chunked() -> TransferEncoding { + pub fn chunked(bytes: SharedBytes) -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Chunked(false), - buffer: BytesMut::new(), + buffer: bytes, } } #[inline] - pub fn length(len: u64) -> TransferEncoding { + pub fn length(len: u64, bytes: SharedBytes) -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Length(len), - buffer: BytesMut::new(), + buffer: bytes, } } @@ -709,7 +711,7 @@ impl TransferEncoding { pub fn encode(&mut self, msg: &[u8]) -> bool { match self.kind { TransferEncodingKind::Eof => { - self.buffer.extend_from_slice(msg); + self.buffer.get_mut().extend_from_slice(msg); msg.is_empty() }, TransferEncodingKind::Chunked(ref mut eof) => { @@ -719,11 +721,11 @@ impl TransferEncoding { if msg.is_empty() { *eof = true; - self.buffer.extend_from_slice(b"0\r\n\r\n"); + self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); } else { - write!(self.buffer, "{:X}\r\n", msg.len()).unwrap(); - self.buffer.extend_from_slice(msg); - self.buffer.extend_from_slice(b"\r\n"); + write!(self.buffer.get_mut(), "{:X}\r\n", msg.len()).unwrap(); + self.buffer.get_mut().extend_from_slice(msg); + self.buffer.get_mut().extend_from_slice(b"\r\n"); } *eof }, @@ -733,7 +735,7 @@ impl TransferEncoding { } let max = cmp::min(*remaining, msg.len() as u64); trace!("sized write = {}", max); - self.buffer.extend_from_slice(msg[..max as usize].as_ref()); + self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref()); *remaining -= max as u64; trace!("encoded {} bytes, remaining = {}", max, remaining); @@ -750,7 +752,7 @@ impl TransferEncoding { TransferEncodingKind::Chunked(ref mut eof) => { if !*eof { *eof = true; - self.buffer.extend_from_slice(b"0\r\n\r\n"); + self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); } }, } diff --git a/src/h1.rs b/src/h1.rs index ba42584b3..8343a99d2 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -1,4 +1,4 @@ -use std::{self, io, ptr}; +use std::{self, io}; use std::rc::Rc; use std::net::SocketAddr; use std::time::Duration; @@ -16,14 +16,15 @@ use tokio_core::reactor::Timeout; use pipeline::Pipeline; use encoding::PayloadType; use channel::{HttpHandler, HttpHandlerTask}; -use h1writer::H1Writer; +use h1writer::{Writer, H1Writer}; use server::WorkerSettings; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use error::{ParseError, PayloadError, ResponseError}; use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE}; -const INIT_BUFFER_SIZE: usize = 8192; +const LW_BUFFER_SIZE: usize = 4096; +const HW_BUFFER_SIZE: usize = 16_384; const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 100; const MAX_PIPELINED_MESSAGES: usize = 16; @@ -78,10 +79,11 @@ impl Http1 H: HttpHandler + 'static { pub fn new(h: Rc>, stream: T, addr: Option) -> Self { + let bytes = h.get_shared_bytes(); Http1{ flags: Flags::KEEPALIVE, settings: h, addr: addr, - stream: H1Writer::new(stream), + stream: H1Writer::new(stream, bytes), reader: Reader::new(), read_buf: BytesMut::new(), tasks: VecDeque::new(), @@ -92,6 +94,18 @@ impl Http1 (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) } + fn poll_completed(&mut self) -> Result { + // check stream state + match self.stream.poll_completed() { + Ok(Async::Ready(_)) => Ok(false), + Ok(Async::NotReady) => Ok(true), + Err(err) => { + debug!("Error sending data: {}", err); + Err(()) + } + } + } + pub fn poll(&mut self) -> Poll { // keep-alive timer if self.keepalive_timer.is_some() { @@ -116,6 +130,10 @@ impl Http1 if !io && !item.flags.contains(EntryFlags::EOF) { if item.flags.contains(EntryFlags::ERROR) { + // check stream state + if let Ok(Async::NotReady) = self.stream.poll_completed() { + return Ok(Async::NotReady) + } return Err(()) } @@ -146,6 +164,12 @@ impl Http1 // it is not possible to recover from error // during pipe handling, so just drop connection error!("Unhandled error: {}", err); + item.flags.insert(EntryFlags::ERROR); + + // check stream state, we still can have valid data in buffer + if let Ok(Async::NotReady) = self.stream.poll_completed() { + return Ok(Async::NotReady) + } return Err(()) } } @@ -178,6 +202,10 @@ impl Http1 // no keep-alive if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() { + // check stream state + if self.poll_completed()? { + return Ok(Async::NotReady) + } if self.flags.contains(Flags::H2) { return Ok(Async::Ready(Http1Result::Switch)) } else { @@ -188,7 +216,9 @@ impl Http1 // read incoming data while !self.flags.contains(Flags::ERROR) && !self.flags.contains(Flags::H2) && self.tasks.len() < MAX_PIPELINED_MESSAGES { - match self.reader.parse(self.stream.get_mut(), &mut self.read_buf) { + match self.reader.parse(self.stream.get_mut(), + &mut self.read_buf, &self.settings) + { Ok(Async::Ready(Item::Http1(mut req))) => { not_ready = false; @@ -264,10 +294,16 @@ impl Http1 self.keepalive_timer = Some(to); } } else { + // check stream state + if self.poll_completed()? { + return Ok(Async::NotReady) + } // keep-alive disable, drop connection return Ok(Async::Ready(Http1Result::Done)) } } else { + // check stream state + self.poll_completed()?; // keep-alive unset, rely on operating system return Ok(Async::NotReady) } @@ -279,6 +315,11 @@ impl Http1 // check for parse error if self.tasks.is_empty() { + // check stream state + if self.poll_completed()? { + return Ok(Async::NotReady) + } + if self.flags.contains(Flags::H2) { return Ok(Async::Ready(Http1Result::Switch)) } @@ -288,6 +329,7 @@ impl Http1 } if not_ready { + self.poll_completed()?; return Ok(Async::NotReady) } } @@ -358,7 +400,9 @@ impl Reader { } } - pub fn parse(&mut self, io: &mut T, buf: &mut BytesMut) -> Poll + pub fn parse(&mut self, io: &mut T, + buf: &mut BytesMut, + settings: &WorkerSettings) -> Poll where T: AsyncRead { loop { @@ -394,7 +438,7 @@ impl Reader { } loop { - match Reader::parse_message(buf).map_err(ReaderError::Error)? { + match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? { Message::Http1(msg, decoder) => { if let Some(payload) = decoder { self.payload = Some(payload); @@ -465,15 +509,9 @@ impl Reader { } fn read_from_io(&mut self, io: &mut T, buf: &mut BytesMut) - -> Poll - { - if buf.remaining_mut() < INIT_BUFFER_SIZE { - buf.reserve(INIT_BUFFER_SIZE); - unsafe { // Zero out unused memory - let b = buf.bytes_mut(); - let len = b.len(); - ptr::write_bytes(b.as_mut_ptr(), 0, len); - } + -> Poll { + if buf.remaining_mut() < LW_BUFFER_SIZE { + buf.reserve(HW_BUFFER_SIZE); } unsafe { let n = match io.read(buf.bytes_mut()) { @@ -490,7 +528,9 @@ impl Reader { } } - fn parse_message(buf: &mut BytesMut) -> Result { + fn parse_message(buf: &mut BytesMut, settings: &WorkerSettings) + -> Result + { if buf.is_empty() { return Ok(Message::NotReady); } @@ -537,13 +577,14 @@ impl Reader { let uri = Uri::from_shared(path).map_err(ParseError::Uri)?; // convert headers - let mut headers = HeaderMap::with_capacity(headers_len); + let msg = settings.get_http_message(); + msg.get_mut().headers.reserve(headers_len); for header in headers_indices[..headers_len].iter() { if let Ok(name) = HeaderName::try_from(slice.slice(header.name.0, header.name.1)) { if let Ok(value) = HeaderValue::try_from( slice.slice(header.value.0, header.value.1)) { - headers.append(name, value); + msg.get_mut().headers.append(name, value); } else { return Err(ParseError::Header) } @@ -552,25 +593,27 @@ impl Reader { } } - let decoder = if upgrade(&method, &headers) { + let decoder = if upgrade(&method, &msg.get_mut().headers) { Decoder::eof() } else { - let has_len = headers.contains_key(header::CONTENT_LENGTH); + let has_len = msg.get_mut().headers.contains_key(header::CONTENT_LENGTH); // Chunked encoding - if chunked(&headers)? { + if chunked(&msg.get_mut().headers)? { if has_len { return Err(ParseError::Header) } Decoder::chunked() } else { if !has_len { - let msg = HttpRequest::new(method, uri, version, headers, None); - return Ok(Message::Http1(msg, None)) + msg.get_mut().uri = uri; + msg.get_mut().method = method; + msg.get_mut().version = version; + return Ok(Message::Http1(HttpRequest::from_message(msg), None)) } // Content-Length - let len = headers.get(header::CONTENT_LENGTH).unwrap(); + let len = msg.get_mut().headers.get(header::CONTENT_LENGTH).unwrap(); if let Ok(s) = len.to_str() { if let Ok(len) = s.parse::() { Decoder::length(len) @@ -587,11 +630,14 @@ impl Reader { let (psender, payload) = Payload::new(false); let info = PayloadInfo { - tx: PayloadType::new(&headers, psender), + tx: PayloadType::new(&msg.get_mut().headers, psender), decoder: decoder, }; - let msg = HttpRequest::new(method, uri, version, headers, Some(payload)); - Ok(Message::Http1(msg, Some(info))) + msg.get_mut().uri = uri; + msg.get_mut().method = method; + msg.get_mut().version = version; + msg.get_mut().payload = Some(payload); + Ok(Message::Http1(HttpRequest::from_message(msg), Some(info))) } } diff --git a/src/h1writer.rs b/src/h1writer.rs index ae4ef1644..aa1489b7d 100644 --- a/src/h1writer.rs +++ b/src/h1writer.rs @@ -6,6 +6,7 @@ use http::header::{HeaderValue, CONNECTION, DATE}; use helpers; use body::Body; +use helpers::SharedBytes; use encoding::PayloadEncoder; use httprequest::HttpMessage; use httpresponse::HttpResponse; @@ -31,7 +32,7 @@ pub trait Writer { fn write_eof(&mut self) -> Result; - fn poll_complete(&mut self) -> Poll<(), io::Error>; + fn poll_completed(&mut self) -> Poll<(), io::Error>; } bitflags! { @@ -49,17 +50,19 @@ pub(crate) struct H1Writer { encoder: PayloadEncoder, written: u64, headers_size: u32, + buffer: SharedBytes, } impl H1Writer { - pub fn new(stream: T) -> H1Writer { + pub fn new(stream: T, buf: SharedBytes) -> H1Writer { H1Writer { flags: Flags::empty(), stream: stream, - encoder: PayloadEncoder::default(), + encoder: PayloadEncoder::empty(buf.clone()), written: 0, headers_size: 0, + buffer: buf, } } @@ -125,7 +128,7 @@ impl Writer for H1Writer { // prepare task self.flags.insert(Flags::STARTED); - self.encoder = PayloadEncoder::new(req, msg); + self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg); if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { self.flags.insert(Flags::KEEPALIVE); } @@ -148,9 +151,9 @@ impl Writer for H1Writer { { let mut buffer = self.encoder.get_mut(); if let Body::Binary(ref bytes) = *msg.body() { - buffer.reserve(150 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); + buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); } else { - buffer.reserve(150 + msg.headers().len() * AVERAGE_HEADER_SIZE); + buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE); } match version { @@ -229,7 +232,7 @@ impl Writer for H1Writer { } } - fn poll_complete(&mut self) -> Poll<(), io::Error> { + fn poll_completed(&mut self) -> Poll<(), io::Error> { match self.write_to_stream() { Ok(WriterState::Done) => Ok(Async::Ready(())), Ok(WriterState::Pause) => Ok(Async::NotReady), diff --git a/src/h2writer.rs b/src/h2writer.rs index afcca2da4..e022432d7 100644 --- a/src/h2writer.rs +++ b/src/h2writer.rs @@ -8,6 +8,7 @@ use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE}; use helpers; use body::Body; +use helpers::SharedBytes; use encoding::PayloadEncoder; use httprequest::HttpMessage; use httpresponse::HttpResponse; @@ -38,7 +39,7 @@ impl H2Writer { H2Writer { respond: respond, stream: None, - encoder: PayloadEncoder::default(), + encoder: PayloadEncoder::empty(SharedBytes::default()), flags: Flags::empty(), written: 0, } @@ -115,7 +116,7 @@ impl Writer for H2Writer { // prepare response self.flags.insert(Flags::STARTED); - self.encoder = PayloadEncoder::new(req, msg); + self.encoder = PayloadEncoder::new(SharedBytes::default(), req, msg); if let Body::Empty = *msg.body() { self.flags.insert(Flags::EOF); } @@ -193,7 +194,7 @@ impl Writer for H2Writer { } } - fn poll_complete(&mut self) -> Poll<(), io::Error> { + fn poll_completed(&mut self) -> Poll<(), io::Error> { match self.write_to_stream() { Ok(WriterState::Done) => Ok(Async::Ready(())), Ok(WriterState::Pause) => Ok(Async::NotReady), diff --git a/src/helpers.rs b/src/helpers.rs index f49e04cda..e7733a1ec 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,10 +1,15 @@ use std::{str, mem, ptr, slice}; use std::cell::RefCell; use std::fmt::{self, Write}; +use std::rc::Rc; +use std::ops::{Deref, DerefMut}; +use std::collections::VecDeque; use time; use bytes::BytesMut; use http::header::HeaderValue; +use httprequest::HttpMessage; + // "Sun, 06 Nov 1994 08:49:37 GMT".len() pub const DATE_VALUE_LENGTH: usize = 29; @@ -51,6 +56,171 @@ impl fmt::Write for CachedDate { } } +/// Internal use only! unsafe +#[derive(Debug)] +pub(crate) struct SharedBytesPool(RefCell>>); + +impl SharedBytesPool { + pub fn new() -> SharedBytesPool { + SharedBytesPool(RefCell::new(VecDeque::with_capacity(128))) + } + + pub fn get_bytes(&self) -> Rc { + if let Some(bytes) = self.0.borrow_mut().pop_front() { + bytes + } else { + Rc::new(BytesMut::new()) + } + } + + pub fn release_bytes(&self, mut bytes: Rc) { + if self.0.borrow().len() < 128 { + Rc::get_mut(&mut bytes).unwrap().take(); + self.0.borrow_mut().push_front(bytes); + } + } +} + +#[derive(Debug)] +pub(crate) struct SharedBytes( + Option>, Option>); + +impl Drop for SharedBytes { + fn drop(&mut self) { + if let Some(ref pool) = self.1 { + if let Some(bytes) = self.0.take() { + if Rc::strong_count(&bytes) == 1 { + pool.release_bytes(bytes); + } + } + } + } +} + +impl SharedBytes { + + pub fn new(bytes: Rc, pool: Rc) -> SharedBytes { + SharedBytes(Some(bytes), Some(pool)) + } + + #[inline] + #[allow(mutable_transmutes)] + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))] + pub fn get_mut(&self) -> &mut BytesMut { + let r: &BytesMut = self.0.as_ref().unwrap().as_ref(); + unsafe{mem::transmute(r)} + } + + #[inline] + pub fn get_ref(&self) -> &BytesMut { + self.0.as_ref().unwrap() + } +} + +impl Default for SharedBytes { + fn default() -> Self { + SharedBytes(Some(Rc::new(BytesMut::new())), None) + } +} + +impl Clone for SharedBytes { + fn clone(&self) -> SharedBytes { + SharedBytes(self.0.clone(), self.1.clone()) + } +} + +/// Internal use only! unsafe +pub(crate) struct SharedMessagePool(RefCell>>); + +impl SharedMessagePool { + pub fn new() -> SharedMessagePool { + SharedMessagePool(RefCell::new(VecDeque::with_capacity(128))) + } + + pub fn get(&self) -> Rc { + if let Some(msg) = self.0.borrow_mut().pop_front() { + msg + } else { + Rc::new(HttpMessage::default()) + } + } + + pub fn release(&self, mut msg: Rc) { + if self.0.borrow().len() < 128 { + Rc::get_mut(&mut msg).unwrap().reset(); + self.0.borrow_mut().push_front(msg); + } + } +} + +pub(crate) struct SharedHttpMessage( + Option>, Option>); + +impl Drop for SharedHttpMessage { + fn drop(&mut self) { + if let Some(ref pool) = self.1 { + if let Some(msg) = self.0.take() { + if Rc::strong_count(&msg) == 1 { + pool.release(msg); + } + } + } + } +} + +impl Deref for SharedHttpMessage { + type Target = HttpMessage; + + fn deref(&self) -> &HttpMessage { + self.get_ref() + } +} + +impl DerefMut for SharedHttpMessage { + + fn deref_mut(&mut self) -> &mut HttpMessage { + self.get_mut() + } +} + +impl Clone for SharedHttpMessage { + + fn clone(&self) -> SharedHttpMessage { + SharedHttpMessage(self.0.clone(), self.1.clone()) + } +} + +impl Default for SharedHttpMessage { + + fn default() -> SharedHttpMessage { + SharedHttpMessage(Some(Rc::new(HttpMessage::default())), None) + } +} + +impl SharedHttpMessage { + + pub fn from_message(msg: HttpMessage) -> SharedHttpMessage { + SharedHttpMessage(Some(Rc::new(msg)), None) + } + + pub fn new(msg: Rc, pool: Rc) -> SharedHttpMessage { + SharedHttpMessage(Some(msg), Some(pool)) + } + + #[inline(always)] + #[allow(mutable_transmutes)] + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] + pub fn get_mut(&self) -> &mut HttpMessage { + let r: &HttpMessage = self.0.as_ref().unwrap().as_ref(); + unsafe{mem::transmute(r)} + } + + #[inline] + pub fn get_ref(&self) -> &HttpMessage { + self.0.as_ref().unwrap() + } +} + const DEC_DIGITS_LUT: &[u8] = b"0001020304050607080910111213141516171819\ 2021222324252627282930313233343536373839\ diff --git a/src/httprequest.rs b/src/httprequest.rs index 80aa24409..b2adeb03a 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -15,6 +15,7 @@ use param::Params; use router::Router; use payload::Payload; use multipart::Multipart; +use helpers::SharedHttpMessage; use error::{ParseError, PayloadError, UrlGenerationError, MultipartError, CookieParseError, HttpRangeError, UrlencodedError}; @@ -69,10 +70,20 @@ impl HttpMessage { self.version != Version::HTTP_10 } } + + pub(crate) fn reset(&mut self) { + self.headers.clear(); + self.extensions.clear(); + self.params.clear(); + self.cookies.take(); + self.addr.take(); + self.payload.take(); + self.info.take(); + } } /// An HTTP Request -pub struct HttpRequest(Rc, Option>, Option>); +pub struct HttpRequest(SharedHttpMessage, Option>, Option>); impl HttpRequest<()> { /// Construct a new Request. @@ -81,7 +92,7 @@ impl HttpRequest<()> { version: Version, headers: HeaderMap, payload: Option) -> HttpRequest { HttpRequest( - Rc::new(HttpMessage { + SharedHttpMessage::from_message(HttpMessage { method: method, uri: uri, version: version, @@ -98,6 +109,10 @@ impl HttpRequest<()> { ) } + pub(crate) fn from_message(msg: SharedHttpMessage) -> HttpRequest { + HttpRequest(msg, None, None) + } + /// Construct a new Request. #[inline] #[cfg(test)] @@ -106,7 +121,7 @@ impl HttpRequest<()> { use std::str::FromStr; HttpRequest( - Rc::new(HttpMessage { + SharedHttpMessage::from_message(HttpMessage { method: Method::GET, uri: Uri::from_str(path).unwrap(), version: Version::HTTP_11, @@ -133,7 +148,7 @@ impl HttpRequest { /// Construct new http request without state. pub fn clone_without_state(&self) -> HttpRequest { - HttpRequest(Rc::clone(&self.0), None, None) + HttpRequest(self.0.clone(), None, None) } // get mutable reference for inner message @@ -142,10 +157,15 @@ impl HttpRequest { #[allow(mutable_transmutes)] #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))] fn as_mut(&self) -> &mut HttpMessage { - let r: &HttpMessage = self.0.as_ref(); - unsafe{mem::transmute(r)} + self.0.get_mut() } + #[inline] + fn as_ref(&self) -> &HttpMessage { + self.0.get_ref() + } + + #[inline] pub(crate) fn get_inner(&mut self) -> &mut HttpMessage { self.as_mut() } @@ -173,22 +193,22 @@ impl HttpRequest { /// Read the Request Uri. #[inline] - pub fn uri(&self) -> &Uri { &self.0.uri } + pub fn uri(&self) -> &Uri { &self.as_ref().uri } /// Read the Request method. #[inline] - pub fn method(&self) -> &Method { &self.0.method } + pub fn method(&self) -> &Method { &self.as_ref().method } /// Read the Request Version. #[inline] pub fn version(&self) -> Version { - self.0.version + self.as_ref().version } /// Read the Request Headers. #[inline] pub fn headers(&self) -> &HeaderMap { - &self.0.headers + &self.as_ref().headers } #[doc(hidden)] @@ -200,17 +220,17 @@ impl HttpRequest { /// The target path of this Request. #[inline] pub fn path(&self) -> &str { - self.0.uri.path() + self.as_ref().uri.path() } /// Get *ConnectionInfo* for currect request. pub fn connection_info(&self) -> &ConnectionInfo { - if self.0.info.is_none() { + if self.as_ref().info.is_none() { let info: ConnectionInfo<'static> = unsafe{ mem::transmute(ConnectionInfo::new(self))}; self.as_mut().info = Some(info); } - self.0.info.as_ref().unwrap() + self.as_ref().info.as_ref().unwrap() } pub fn url_for(&self, name: &str, elements: U) -> Result @@ -237,7 +257,7 @@ impl HttpRequest { #[inline] pub fn peer_addr(&self) -> Option<&SocketAddr> { - self.0.addr.as_ref() + self.as_ref().addr.as_ref() } #[inline] @@ -248,7 +268,7 @@ impl HttpRequest { /// Return a new iterator that yields pairs of `Cow` for query parameters pub fn query(&self) -> HashMap { let mut q: HashMap = HashMap::new(); - if let Some(query) = self.0.uri.query().as_ref() { + if let Some(query) = self.as_ref().uri.query().as_ref() { for (key, val) in form_urlencoded::parse(query.as_ref()) { q.insert(key.to_string(), val.to_string()); } @@ -261,7 +281,7 @@ impl HttpRequest { /// E.g., id=10 #[inline] pub fn query_string(&self) -> &str { - if let Some(query) = self.0.uri.query().as_ref() { + if let Some(query) = self.as_ref().uri.query().as_ref() { query } else { "" @@ -271,7 +291,7 @@ impl HttpRequest { /// Load request cookies. #[inline] pub fn cookies(&self) -> Result<&Vec>, CookieParseError> { - if self.0.cookies.is_none() { + if self.as_ref().cookies.is_none() { let msg = self.as_mut(); let mut cookies = Vec::new(); if let Some(val) = msg.headers.get(header::COOKIE) { @@ -283,7 +303,7 @@ impl HttpRequest { } msg.cookies = Some(cookies) } - Ok(self.0.cookies.as_ref().unwrap()) + Ok(self.as_ref().cookies.as_ref().unwrap()) } /// Return request cookie. @@ -304,7 +324,7 @@ impl HttpRequest { /// for matching storing that segment of the request url in the Params object. #[inline] pub fn match_info(&self) -> &Params { - unsafe{ mem::transmute(&self.0.params) } + unsafe{ mem::transmute(&self.as_ref().params) } } /// Set request Params. @@ -315,25 +335,25 @@ impl HttpRequest { /// Checks if a connection should be kept alive. pub fn keep_alive(&self) -> bool { - if let Some(conn) = self.0.headers.get(header::CONNECTION) { + if let Some(conn) = self.headers().get(header::CONNECTION) { if let Ok(conn) = conn.to_str() { - if self.0.version == Version::HTTP_10 && conn.contains("keep-alive") { + if self.as_ref().version == Version::HTTP_10 && conn.contains("keep-alive") { true } else { - self.0.version == Version::HTTP_11 && + self.as_ref().version == Version::HTTP_11 && !(conn.contains("close") || conn.contains("upgrade")) } } else { false } } else { - self.0.version != Version::HTTP_10 + self.as_ref().version != Version::HTTP_10 } } /// Read the request content type pub fn content_type(&self) -> &str { - if let Some(content_type) = self.0.headers.get(header::CONTENT_TYPE) { + if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) { if let Ok(content_type) = content_type.to_str() { return content_type } @@ -343,17 +363,17 @@ impl HttpRequest { /// Check if request requires connection upgrade pub(crate) fn upgrade(&self) -> bool { - if let Some(conn) = self.0.headers.get(header::CONNECTION) { + if let Some(conn) = self.as_ref().headers.get(header::CONNECTION) { if let Ok(s) = conn.to_str() { return s.to_lowercase().contains("upgrade") } } - self.0.method == Method::CONNECT + self.as_ref().method == Method::CONNECT } /// Check if request has chunked transfer encoding pub fn chunked(&self) -> Result { - if let Some(encodings) = self.0.headers.get(header::TRANSFER_ENCODING) { + if let Some(encodings) = self.headers().get(header::TRANSFER_ENCODING) { if let Ok(s) = encodings.to_str() { Ok(s.to_lowercase().contains("chunked")) } else { @@ -367,7 +387,7 @@ impl HttpRequest { /// Parses Range HTTP header string as per RFC 2616. /// `size` is full size of response (file). pub fn range(&self, size: u64) -> Result, HttpRangeError> { - if let Some(range) = self.0.headers.get(header::RANGE) { + if let Some(range) = self.headers().get(header::RANGE) { HttpRange::parse(unsafe{str::from_utf8_unchecked(range.as_bytes())}, size) .map_err(|e| e.into()) } else { @@ -378,7 +398,7 @@ impl HttpRequest { /// Returns reference to the associated http payload. #[inline] pub fn payload(&self) -> Option<&Payload> { - self.0.payload.as_ref() + self.as_ref().payload.as_ref() } /// Returns mutable reference to the associated http payload. @@ -397,7 +417,7 @@ impl HttpRequest { /// /// Content-type: multipart/form-data; pub fn multipart(&mut self) -> Result { - let boundary = Multipart::boundary(&self.0.headers)?; + let boundary = Multipart::boundary(self.headers())?; if let Some(payload) = self.take_payload() { Ok(Multipart::new(boundary, payload)) } else { @@ -434,7 +454,7 @@ impl HttpRequest { } // check content type - let t = if let Some(content_type) = self.0.headers.get(header::CONTENT_TYPE) { + let t = if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) { if let Ok(content_type) = content_type.to_str() { content_type.to_lowercase() == "application/x-www-form-urlencoded" } else { @@ -460,29 +480,29 @@ impl Default for HttpRequest<()> { /// Construct default request fn default() -> HttpRequest { - HttpRequest(Rc::new(HttpMessage::default()), None, None) + HttpRequest(SharedHttpMessage::default(), None, None) } } impl Clone for HttpRequest { fn clone(&self) -> HttpRequest { - HttpRequest(Rc::clone(&self.0), self.1.clone(), None) + HttpRequest(self.0.clone(), self.1.clone(), None) } } impl fmt::Debug for HttpRequest { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let res = write!(f, "\nHttpRequest {:?} {}:{}\n", - self.0.version, self.0.method, self.0.uri); + self.as_ref().version, self.as_ref().method, self.as_ref().uri); if !self.query_string().is_empty() { let _ = write!(f, " query: ?{:?}\n", self.query_string()); } if !self.match_info().is_empty() { - let _ = write!(f, " params: {:?}\n", self.0.params); + let _ = write!(f, " params: {:?}\n", self.as_ref().params); } let _ = write!(f, " headers:\n"); - for key in self.0.headers.keys() { - let vals: Vec<_> = self.0.headers.get_all(key).iter().collect(); + for key in self.as_ref().headers.keys() { + let vals: Vec<_> = self.as_ref().headers.get_all(key).iter().collect(); if vals.len() > 1 { let _ = write!(f, " {:?}: {:?}\n", key, vals); } else { diff --git a/src/param.rs b/src/param.rs index b948ac187..5aaa4f849 100644 --- a/src/param.rs +++ b/src/param.rs @@ -30,6 +30,10 @@ impl<'a> Default for Params<'a> { impl<'a> Params<'a> { + pub(crate) fn clear(&mut self) { + self.0.clear(); + } + pub(crate) fn add(&mut self, name: &'a str, value: &'a str) { self.0.push((name, value)); } diff --git a/src/pipeline.rs b/src/pipeline.rs index 9c7aa60d4..48f80c13b 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -755,16 +755,18 @@ impl ProcessResponse { } } - // flush io - match io.poll_complete() { - Ok(Async::Ready(_)) => - self.running.resume(), - Ok(Async::NotReady) => - return Err(PipelineState::Response(self)), - Err(err) => { - debug!("Error sending data: {}", err); - info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init(info, self.resp)) + // flush io but only if we need to + if self.running == RunningState::Paused || !self.drain.0.is_empty() { + match io.poll_completed() { + Ok(Async::Ready(_)) => + self.running.resume(), + Ok(Async::NotReady) => + return Err(PipelineState::Response(self)), + Err(err) => { + debug!("Error sending data: {}", err); + info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(info, self.resp)) + } } } diff --git a/src/server.rs b/src/server.rs index bb9552a94..bf85b2ac7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -413,6 +413,8 @@ pub(crate) struct WorkerSettings { h: Vec, enabled: bool, keep_alive: u64, + bytes: Rc, + messages: Rc, } impl WorkerSettings { @@ -421,6 +423,8 @@ impl WorkerSettings { h: h, enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, keep_alive: keep_alive.unwrap_or(0), + bytes: Rc::new(helpers::SharedBytesPool::new()), + messages: Rc::new(helpers::SharedMessagePool::new()), } } @@ -433,6 +437,12 @@ impl WorkerSettings { pub fn keep_alive_enabled(&self) -> bool { self.enabled } + pub fn get_shared_bytes(&self) -> helpers::SharedBytes { + helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) + } + pub fn get_http_message(&self) -> helpers::SharedHttpMessage { + helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages)) + } } impl Worker {