1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

refactor shared bytes api

This commit is contained in:
Nikolay Kim 2018-01-14 17:00:28 -08:00
parent 3425f7be40
commit 89a89e7b18
7 changed files with 159 additions and 157 deletions

View File

@ -66,84 +66,6 @@ impl fmt::Write for CachedDate {
} }
} }
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
self.0.as_ref().unwrap()
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}
/// Internal use only! unsafe /// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>); pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>);

View File

@ -16,11 +16,13 @@ use bytes::{Bytes, BytesMut, BufMut, Writer};
use headers::ContentEncoding; use headers::ContentEncoding;
use body::{Body, Binary}; use body::{Body, Binary};
use error::PayloadError; use error::PayloadError;
use helpers::SharedBytes;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadWriter}; use payload::{PayloadSender, PayloadWriter};
use super::shared::SharedBytes;
impl ContentEncoding { impl ContentEncoding {
#[inline] #[inline]
@ -399,7 +401,7 @@ impl PayloadEncoder {
let _ = enc.write(bytes.clone()); let _ = enc.write(bytes.clone());
let _ = enc.write_eof(); let _ = enc.write_eof();
*bytes = Binary::from(tmp.get_mut().take()); *bytes = Binary::from(tmp.take());
encoding = ContentEncoding::Identity; encoding = ContentEncoding::Identity;
} }
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
@ -503,16 +505,6 @@ impl PayloadEncoder {
impl PayloadEncoder { impl PayloadEncoder {
#[inline]
pub fn len(&self) -> usize {
self.0.get_ref().len()
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
self.0.get_mut()
}
#[inline] #[inline]
pub fn is_eof(&self) -> bool { pub fn is_eof(&self) -> bool {
self.0.is_eof() self.0.is_eof()
@ -554,34 +546,6 @@ impl ContentEncoder {
} }
} }
#[inline]
pub fn get_ref(&self) -> &BytesMut {
match *self {
ContentEncoder::Br(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Deflate(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Gzip(ref encoder) =>
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Identity(ref encoder) =>
encoder.buffer.get_ref(),
}
}
#[inline]
pub fn get_mut(&mut self) -> &mut BytesMut {
match *self {
ContentEncoder::Br(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Deflate(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Gzip(ref mut encoder) =>
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Identity(ref mut encoder) =>
encoder.buffer.get_mut(),
}
}
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)] #[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> { pub fn write_eof(&mut self) -> Result<(), io::Error> {
@ -727,11 +691,12 @@ impl TransferEncoding {
/// Encode message. Return `EOF` state of encoder /// Encode message. Return `EOF` state of encoder
#[inline] #[inline]
pub fn encode(&mut self, msg: Binary) -> io::Result<bool> { pub fn encode(&mut self, mut msg: Binary) -> io::Result<bool> {
match self.kind { match self.kind {
TransferEncodingKind::Eof => { TransferEncodingKind::Eof => {
self.buffer.get_mut().extend_from_slice(msg.as_ref()); let eof = msg.is_empty();
Ok(msg.is_empty()) self.buffer.extend(msg);
Ok(eof)
}, },
TransferEncodingKind::Chunked(ref mut eof) => { TransferEncodingKind::Chunked(ref mut eof) => {
if *eof { if *eof {
@ -740,12 +705,14 @@ impl TransferEncoding {
if msg.is_empty() { if msg.is_empty() {
*eof = true; *eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); self.buffer.extend_from_slice(b"0\r\n\r\n");
} else { } else {
write!(self.buffer.get_mut(), "{:X}\r\n", msg.len()) let mut buf = BytesMut::new();
write!(&mut buf, "{:X}\r\n", msg.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.buffer.get_mut().extend_from_slice(msg.as_ref()); self.buffer.extend(buf.into());
self.buffer.get_mut().extend_from_slice(b"\r\n"); self.buffer.extend(msg);
self.buffer.extend_from_slice(b"\r\n");
} }
Ok(*eof) Ok(*eof)
}, },
@ -754,7 +721,7 @@ impl TransferEncoding {
return Ok(*remaining == 0) return Ok(*remaining == 0)
} }
let max = cmp::min(*remaining, msg.len() as u64); let max = cmp::min(*remaining, msg.len() as u64);
self.buffer.get_mut().extend_from_slice(msg.as_ref()[..max as usize].as_ref()); self.buffer.extend(msg.take().split_to(max as usize).into());
*remaining -= max as u64; *remaining -= max as u64;
Ok(*remaining == 0) Ok(*remaining == 0)
@ -770,7 +737,7 @@ impl TransferEncoding {
TransferEncodingKind::Chunked(ref mut eof) => { TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof { if !*eof {
*eof = true; *eof = true;
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n"); self.buffer.extend_from_slice(b"0\r\n\r\n");
} }
}, },
} }

View File

@ -7,10 +7,10 @@ use http::header::{HeaderValue, CONNECTION, DATE};
use helpers; use helpers;
use body::{Body, Binary}; use body::{Body, Binary};
use helpers::SharedBytes;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes;
use super::encoding::PayloadEncoder; use super::encoding::PayloadEncoder;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -56,7 +56,7 @@ impl<T: AsyncWrite> H1Writer<T> {
} }
pub fn disconnected(&mut self) { pub fn disconnected(&mut self) {
self.encoder.get_mut().take(); self.buffer.take();
} }
pub fn keepalive(&self) -> bool { pub fn keepalive(&self) -> bool {
@ -64,15 +64,13 @@ impl<T: AsyncWrite> H1Writer<T> {
} }
fn write_to_stream(&mut self) -> io::Result<WriterState> { fn write_to_stream(&mut self) -> io::Result<WriterState> {
let buffer = self.encoder.get_mut(); while !self.buffer.is_empty() {
match self.stream.write(self.buffer.as_ref()) {
while !buffer.is_empty() {
match self.stream.write(buffer.as_ref()) {
Ok(n) => { Ok(n) => {
let _ = buffer.split_to(n); let _ = self.buffer.split_to(n);
}, },
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause) return Ok(WriterState::Pause)
} else { } else {
return Ok(WriterState::Done) return Ok(WriterState::Done)
@ -131,7 +129,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message // render message
{ {
let mut buffer = self.encoder.get_mut(); let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = body { if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else { } else {
@ -190,11 +188,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
return Ok(WriterState::Done) return Ok(WriterState::Done)
} else { } else {
// might be response to EXCEPT // might be response to EXCEPT
self.encoder.get_mut().extend_from_slice(payload.as_ref()) self.buffer.extend_from_slice(payload.as_ref())
} }
} }
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)
@ -207,7 +205,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
if !self.encoder.is_eof() { if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached")) "Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)

View File

@ -8,10 +8,10 @@ use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LEN
use helpers; use helpers;
use body::{Body, Binary}; use body::{Body, Binary};
use helpers::SharedBytes;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use super::encoding::PayloadEncoder; use super::encoding::PayloadEncoder;
use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
@ -58,9 +58,7 @@ impl H2Writer {
} }
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
let buffer = self.encoder.get_mut(); if self.buffer.is_empty() {
if buffer.is_empty() {
if self.flags.contains(Flags::EOF) { if self.flags.contains(Flags::EOF) {
let _ = stream.send_data(Bytes::new(), true); let _ = stream.send_data(Bytes::new(), true);
} }
@ -70,7 +68,7 @@ impl H2Writer {
loop { loop {
match stream.poll_capacity() { match stream.poll_capacity() {
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
if buffer.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause) return Ok(WriterState::Pause)
} else { } else {
return Ok(WriterState::Done) return Ok(WriterState::Done)
@ -80,15 +78,15 @@ impl H2Writer {
return Ok(WriterState::Done) return Ok(WriterState::Done)
} }
Ok(Async::Ready(Some(cap))) => { Ok(Async::Ready(Some(cap))) => {
let len = buffer.len(); let len = self.buffer.len();
let bytes = buffer.split_to(cmp::min(cap, len)); let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = buffer.is_empty() && self.flags.contains(Flags::EOF); let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64; self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) { if let Err(err) = stream.send_data(bytes.freeze(), eof) {
return Err(io::Error::new(io::ErrorKind::Other, err)) return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !buffer.is_empty() { } else if !self.buffer.is_empty() {
let cap = cmp::min(buffer.len(), CHUNK_SIZE); let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
stream.reserve_capacity(cap); stream.reserve_capacity(cap);
} else { } else {
return Ok(WriterState::Pause) return Ok(WriterState::Pause)
@ -170,7 +168,7 @@ impl Writer for H2Writer {
self.written = bytes.len() as u64; self.written = bytes.len() as u64;
self.encoder.write(bytes)?; self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream { if let Some(ref mut stream) = self.stream {
stream.reserve_capacity(cmp::min(self.encoder.len(), CHUNK_SIZE)); stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
} }
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
@ -188,11 +186,11 @@ impl Writer for H2Writer {
self.encoder.write(payload)?; self.encoder.write(payload)?;
} else { } else {
// might be response for EXCEPT // might be response for EXCEPT
self.encoder.get_mut().extend_from_slice(payload.as_ref()) self.buffer.extend_from_slice(payload.as_ref())
} }
} }
if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)
@ -206,7 +204,7 @@ impl Writer for H2Writer {
if !self.encoder.is_eof() { if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached")) "Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)

View File

@ -15,6 +15,7 @@ mod h2;
mod h1writer; mod h1writer;
mod h2writer; mod h2writer;
mod settings; mod settings;
mod shared;
mod utils; mod utils;
pub use self::srv::HttpServer; pub use self::srv::HttpServer;

View File

@ -4,6 +4,7 @@ use std::cell::{Cell, RefCell, RefMut};
use helpers; use helpers;
use super::channel::Node; use super::channel::Node;
use super::shared::{SharedBytes, SharedBytesPool};
/// Various server settings /// Various server settings
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -63,7 +64,7 @@ pub(crate) struct WorkerSettings<H> {
h: RefCell<Vec<H>>, h: RefCell<Vec<H>>,
enabled: bool, enabled: bool,
keep_alive: u64, keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>, bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>, messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>, channels: Cell<usize>,
node: Node<()>, node: Node<()>,
@ -75,7 +76,7 @@ impl<H> WorkerSettings<H> {
h: RefCell::new(h), h: RefCell::new(h),
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0), keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()), bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()), messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0), channels: Cell::new(0),
node: Node::head(), node: Node::head(),
@ -102,8 +103,8 @@ impl<H> WorkerSettings<H> {
self.enabled self.enabled
} }
pub fn get_shared_bytes(&self) -> helpers::SharedBytes { pub fn get_shared_bytes(&self) -> SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
} }
pub fn get_http_message(&self) -> helpers::SharedHttpMessage { pub fn get_http_message(&self) -> helpers::SharedHttpMessage {

115
src/server/shared.rs Normal file
View File

@ -0,0 +1,115 @@
use std::mem;
use std::cell::RefCell;
use std::rc::Rc;
use std::collections::VecDeque;
use bytes::BytesMut;
use body::Binary;
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn empty() -> Self {
SharedBytes(None, None)
}
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn len(&self) -> usize {
self.0.as_ref().unwrap().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.as_ref().unwrap().is_empty()
}
#[inline]
pub fn as_ref(&self) -> &[u8] {
self.0.as_ref().unwrap().as_ref()
}
pub fn split_to(&self, n: usize) -> BytesMut {
self.get_mut().split_to(n)
}
pub fn take(&self) -> BytesMut {
self.get_mut().take()
}
#[inline]
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
pub fn extend(&self, data: Binary) {
self.get_mut().extend_from_slice(data.as_ref());
}
#[inline]
pub fn extend_from_slice(&self, data: &[u8]) {
self.get_mut().extend_from_slice(data);
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}