From 89a89e7b1827f7b9732659cb9bbb6717114c622a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 14 Jan 2018 17:00:28 -0800 Subject: [PATCH] refactor shared bytes api --- src/helpers.rs | 78 ---------------------------- src/server/encoding.rs | 65 ++++++----------------- src/server/h1writer.rs | 22 ++++---- src/server/h2writer.rs | 26 +++++----- src/server/mod.rs | 1 + src/server/settings.rs | 9 ++-- src/server/shared.rs | 115 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 159 insertions(+), 157 deletions(-) create mode 100644 src/server/shared.rs diff --git a/src/helpers.rs b/src/helpers.rs index 1b4bd0e11..947851ea9 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -66,84 +66,6 @@ impl fmt::Write for CachedDate { } } -/// Internal use only! unsafe -#[derive(Debug)] -pub(crate) struct SharedBytesPool(RefCell>>); - -impl SharedBytesPool { - pub fn new() -> SharedBytesPool { - SharedBytesPool(RefCell::new(VecDeque::with_capacity(128))) - } - - pub fn get_bytes(&self) -> Rc { - if let Some(bytes) = self.0.borrow_mut().pop_front() { - bytes - } else { - Rc::new(BytesMut::new()) - } - } - - pub fn release_bytes(&self, mut bytes: Rc) { - let v = &mut self.0.borrow_mut(); - if v.len() < 128 { - Rc::get_mut(&mut bytes).unwrap().take(); - v.push_front(bytes); - } - } -} - -#[derive(Debug)] -pub(crate) struct SharedBytes( - Option>, Option>); - -impl Drop for SharedBytes { - fn drop(&mut self) { - if let Some(ref pool) = self.1 { - if let Some(bytes) = self.0.take() { - if Rc::strong_count(&bytes) == 1 { - pool.release_bytes(bytes); - } - } - } - } -} - -impl SharedBytes { - - pub fn empty() -> Self { - SharedBytes(None, None) - } - - pub fn new(bytes: Rc, pool: Rc) -> SharedBytes { - SharedBytes(Some(bytes), Some(pool)) - } - - #[inline(always)] - #[allow(mutable_transmutes)] - #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] - pub fn get_mut(&self) -> &mut BytesMut { - let r: &BytesMut = self.0.as_ref().unwrap().as_ref(); - unsafe{mem::transmute(r)} - } - - #[inline] - pub fn get_ref(&self) -> &BytesMut { - self.0.as_ref().unwrap() - } -} - -impl Default for SharedBytes { - fn default() -> Self { - SharedBytes(Some(Rc::new(BytesMut::new())), None) - } -} - -impl Clone for SharedBytes { - fn clone(&self) -> SharedBytes { - SharedBytes(self.0.clone(), self.1.clone()) - } -} - /// Internal use only! unsafe pub(crate) struct SharedMessagePool(RefCell>>); diff --git a/src/server/encoding.rs b/src/server/encoding.rs index 0b1363eeb..1c6ed7d76 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -16,11 +16,13 @@ use bytes::{Bytes, BytesMut, BufMut, Writer}; use headers::ContentEncoding; use body::{Body, Binary}; use error::PayloadError; -use helpers::SharedBytes; use httprequest::HttpMessage; use httpresponse::HttpResponse; use payload::{PayloadSender, PayloadWriter}; +use super::shared::SharedBytes; + + impl ContentEncoding { #[inline] @@ -399,7 +401,7 @@ impl PayloadEncoder { let _ = enc.write(bytes.clone()); let _ = enc.write_eof(); - *bytes = Binary::from(tmp.get_mut().take()); + *bytes = Binary::from(tmp.take()); encoding = ContentEncoding::Identity; } resp.headers_mut().remove(CONTENT_LENGTH); @@ -503,16 +505,6 @@ impl PayloadEncoder { impl PayloadEncoder { - #[inline] - pub fn len(&self) -> usize { - self.0.get_ref().len() - } - - #[inline] - pub fn get_mut(&mut self) -> &mut BytesMut { - self.0.get_mut() - } - #[inline] pub fn is_eof(&self) -> bool { self.0.is_eof() @@ -554,34 +546,6 @@ impl ContentEncoder { } } - #[inline] - pub fn get_ref(&self) -> &BytesMut { - match *self { - ContentEncoder::Br(ref encoder) => - encoder.get_ref().buffer.get_ref(), - ContentEncoder::Deflate(ref encoder) => - encoder.get_ref().buffer.get_ref(), - ContentEncoder::Gzip(ref encoder) => - encoder.get_ref().buffer.get_ref(), - ContentEncoder::Identity(ref encoder) => - encoder.buffer.get_ref(), - } - } - - #[inline] - pub fn get_mut(&mut self) -> &mut BytesMut { - match *self { - ContentEncoder::Br(ref mut encoder) => - encoder.get_mut().buffer.get_mut(), - ContentEncoder::Deflate(ref mut encoder) => - encoder.get_mut().buffer.get_mut(), - ContentEncoder::Gzip(ref mut encoder) => - encoder.get_mut().buffer.get_mut(), - ContentEncoder::Identity(ref mut encoder) => - encoder.buffer.get_mut(), - } - } - #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[inline(always)] pub fn write_eof(&mut self) -> Result<(), io::Error> { @@ -727,11 +691,12 @@ impl TransferEncoding { /// Encode message. Return `EOF` state of encoder #[inline] - pub fn encode(&mut self, msg: Binary) -> io::Result { + pub fn encode(&mut self, mut msg: Binary) -> io::Result { match self.kind { TransferEncodingKind::Eof => { - self.buffer.get_mut().extend_from_slice(msg.as_ref()); - Ok(msg.is_empty()) + let eof = msg.is_empty(); + self.buffer.extend(msg); + Ok(eof) }, TransferEncodingKind::Chunked(ref mut eof) => { if *eof { @@ -740,12 +705,14 @@ impl TransferEncoding { if msg.is_empty() { *eof = true; - self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); + self.buffer.extend_from_slice(b"0\r\n\r\n"); } else { - write!(self.buffer.get_mut(), "{:X}\r\n", msg.len()) + let mut buf = BytesMut::new(); + write!(&mut buf, "{:X}\r\n", msg.len()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - self.buffer.get_mut().extend_from_slice(msg.as_ref()); - self.buffer.get_mut().extend_from_slice(b"\r\n"); + self.buffer.extend(buf.into()); + self.buffer.extend(msg); + self.buffer.extend_from_slice(b"\r\n"); } Ok(*eof) }, @@ -754,7 +721,7 @@ impl TransferEncoding { return Ok(*remaining == 0) } let max = cmp::min(*remaining, msg.len() as u64); - self.buffer.get_mut().extend_from_slice(msg.as_ref()[..max as usize].as_ref()); + self.buffer.extend(msg.take().split_to(max as usize).into()); *remaining -= max as u64; Ok(*remaining == 0) @@ -770,7 +737,7 @@ impl TransferEncoding { TransferEncodingKind::Chunked(ref mut eof) => { if !*eof { *eof = true; - self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); + self.buffer.extend_from_slice(b"0\r\n\r\n"); } }, } diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index c448313bc..7f18170fe 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -7,10 +7,10 @@ use http::header::{HeaderValue, CONNECTION, DATE}; use helpers; use body::{Body, Binary}; -use helpers::SharedBytes; use httprequest::HttpMessage; use httpresponse::HttpResponse; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; +use super::shared::SharedBytes; use super::encoding::PayloadEncoder; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific @@ -56,7 +56,7 @@ impl H1Writer { } pub fn disconnected(&mut self) { - self.encoder.get_mut().take(); + self.buffer.take(); } pub fn keepalive(&self) -> bool { @@ -64,15 +64,13 @@ impl H1Writer { } fn write_to_stream(&mut self) -> io::Result { - let buffer = self.encoder.get_mut(); - - while !buffer.is_empty() { - match self.stream.write(buffer.as_ref()) { + while !self.buffer.is_empty() { + match self.stream.write(self.buffer.as_ref()) { Ok(n) => { - let _ = buffer.split_to(n); + let _ = self.buffer.split_to(n); }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - if buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { return Ok(WriterState::Pause) } else { return Ok(WriterState::Done) @@ -131,7 +129,7 @@ impl Writer for H1Writer { // render message { - let mut buffer = self.encoder.get_mut(); + let mut buffer = self.buffer.get_mut(); if let Body::Binary(ref bytes) = body { buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); } else { @@ -190,11 +188,11 @@ impl Writer for H1Writer { return Ok(WriterState::Done) } else { // might be response to EXCEPT - self.encoder.get_mut().extend_from_slice(payload.as_ref()) + self.buffer.extend_from_slice(payload.as_ref()) } } - if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) @@ -207,7 +205,7 @@ impl Writer for H1Writer { if !self.encoder.is_eof() { Err(io::Error::new(io::ErrorKind::Other, "Last payload item, but eof is not reached")) - } else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { + } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index 9b522b38d..0701d028e 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -8,10 +8,10 @@ use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LEN use helpers; use body::{Body, Binary}; -use helpers::SharedBytes; use httprequest::HttpMessage; use httpresponse::HttpResponse; use super::encoding::PayloadEncoder; +use super::shared::SharedBytes; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; const CHUNK_SIZE: usize = 16_384; @@ -58,9 +58,7 @@ impl H2Writer { } if let Some(ref mut stream) = self.stream { - let buffer = self.encoder.get_mut(); - - if buffer.is_empty() { + if self.buffer.is_empty() { if self.flags.contains(Flags::EOF) { let _ = stream.send_data(Bytes::new(), true); } @@ -70,7 +68,7 @@ impl H2Writer { loop { match stream.poll_capacity() { Ok(Async::NotReady) => { - if buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { return Ok(WriterState::Pause) } else { return Ok(WriterState::Done) @@ -80,15 +78,15 @@ impl H2Writer { return Ok(WriterState::Done) } Ok(Async::Ready(Some(cap))) => { - let len = buffer.len(); - let bytes = buffer.split_to(cmp::min(cap, len)); - let eof = buffer.is_empty() && self.flags.contains(Flags::EOF); + let len = self.buffer.len(); + let bytes = self.buffer.split_to(cmp::min(cap, len)); + let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF); self.written += bytes.len() as u64; if let Err(err) = stream.send_data(bytes.freeze(), eof) { return Err(io::Error::new(io::ErrorKind::Other, err)) - } else if !buffer.is_empty() { - let cap = cmp::min(buffer.len(), CHUNK_SIZE); + } else if !self.buffer.is_empty() { + let cap = cmp::min(self.buffer.len(), CHUNK_SIZE); stream.reserve_capacity(cap); } else { return Ok(WriterState::Pause) @@ -170,7 +168,7 @@ impl Writer for H2Writer { self.written = bytes.len() as u64; self.encoder.write(bytes)?; if let Some(ref mut stream) = self.stream { - stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); + stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); } Ok(WriterState::Pause) } else { @@ -188,11 +186,11 @@ impl Writer for H2Writer { self.encoder.write(payload)?; } else { // might be response for EXCEPT - self.encoder.get_mut().extend_from_slice(payload.as_ref()) + self.buffer.extend_from_slice(payload.as_ref()) } } - if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) @@ -206,7 +204,7 @@ impl Writer for H2Writer { if !self.encoder.is_eof() { Err(io::Error::new(io::ErrorKind::Other, "Last payload item, but eof is not reached")) - } else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { + } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) diff --git a/src/server/mod.rs b/src/server/mod.rs index 44e2c7676..a44d2835a 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -15,6 +15,7 @@ mod h2; mod h1writer; mod h2writer; mod settings; +mod shared; mod utils; pub use self::srv::HttpServer; diff --git a/src/server/settings.rs b/src/server/settings.rs index b6cc634ed..0ca4b4371 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -4,6 +4,7 @@ use std::cell::{Cell, RefCell, RefMut}; use helpers; use super::channel::Node; +use super::shared::{SharedBytes, SharedBytesPool}; /// Various server settings #[derive(Debug, Clone)] @@ -63,7 +64,7 @@ pub(crate) struct WorkerSettings { h: RefCell>, enabled: bool, keep_alive: u64, - bytes: Rc, + bytes: Rc, messages: Rc, channels: Cell, node: Node<()>, @@ -75,7 +76,7 @@ impl WorkerSettings { h: RefCell::new(h), enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, keep_alive: keep_alive.unwrap_or(0), - bytes: Rc::new(helpers::SharedBytesPool::new()), + bytes: Rc::new(SharedBytesPool::new()), messages: Rc::new(helpers::SharedMessagePool::new()), channels: Cell::new(0), node: Node::head(), @@ -102,8 +103,8 @@ impl WorkerSettings { self.enabled } - pub fn get_shared_bytes(&self) -> helpers::SharedBytes { - helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) + pub fn get_shared_bytes(&self) -> SharedBytes { + SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) } pub fn get_http_message(&self) -> helpers::SharedHttpMessage { diff --git a/src/server/shared.rs b/src/server/shared.rs new file mode 100644 index 000000000..15307e0fe --- /dev/null +++ b/src/server/shared.rs @@ -0,0 +1,115 @@ +use std::mem; +use std::cell::RefCell; +use std::rc::Rc; +use std::collections::VecDeque; +use bytes::BytesMut; + +use body::Binary; + + +/// Internal use only! unsafe +#[derive(Debug)] +pub(crate) struct SharedBytesPool(RefCell>>); + +impl SharedBytesPool { + pub fn new() -> SharedBytesPool { + SharedBytesPool(RefCell::new(VecDeque::with_capacity(128))) + } + + pub fn get_bytes(&self) -> Rc { + if let Some(bytes) = self.0.borrow_mut().pop_front() { + bytes + } else { + Rc::new(BytesMut::new()) + } + } + + pub fn release_bytes(&self, mut bytes: Rc) { + let v = &mut self.0.borrow_mut(); + if v.len() < 128 { + Rc::get_mut(&mut bytes).unwrap().take(); + v.push_front(bytes); + } + } +} + +#[derive(Debug)] +pub(crate) struct SharedBytes( + Option>, Option>); + +impl Drop for SharedBytes { + fn drop(&mut self) { + if let Some(ref pool) = self.1 { + if let Some(bytes) = self.0.take() { + if Rc::strong_count(&bytes) == 1 { + pool.release_bytes(bytes); + } + } + } + } +} + +impl SharedBytes { + + pub fn empty() -> Self { + SharedBytes(None, None) + } + + pub fn new(bytes: Rc, pool: Rc) -> SharedBytes { + SharedBytes(Some(bytes), Some(pool)) + } + + #[inline(always)] + #[allow(mutable_transmutes)] + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] + pub fn get_mut(&self) -> &mut BytesMut { + let r: &BytesMut = self.0.as_ref().unwrap().as_ref(); + unsafe{mem::transmute(r)} + } + + #[inline] + pub fn len(&self) -> usize { + self.0.as_ref().unwrap().len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.0.as_ref().unwrap().is_empty() + } + + #[inline] + pub fn as_ref(&self) -> &[u8] { + self.0.as_ref().unwrap().as_ref() + } + + pub fn split_to(&self, n: usize) -> BytesMut { + self.get_mut().split_to(n) + } + + pub fn take(&self) -> BytesMut { + self.get_mut().take() + } + + #[inline] + #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] + pub fn extend(&self, data: Binary) { + self.get_mut().extend_from_slice(data.as_ref()); + } + + #[inline] + pub fn extend_from_slice(&self, data: &[u8]) { + self.get_mut().extend_from_slice(data); + } +} + +impl Default for SharedBytes { + fn default() -> Self { + SharedBytes(Some(Rc::new(BytesMut::new())), None) + } +} + +impl Clone for SharedBytes { + fn clone(&self) -> SharedBytes { + SharedBytes(self.0.clone(), self.1.clone()) + } +}