1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

simplify write buffer

This commit is contained in:
Nikolay Kim 2018-06-24 10:30:58 +06:00
parent 45682c04a8
commit 40ca9ba9c5
11 changed files with 130 additions and 170 deletions

View File

@ -103,6 +103,8 @@ tokio-tls = { version="0.1", optional = true }
openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.2", optional = true }
backtrace="*"
[dev-dependencies]
env_logger = "0.5"
serde_derive = "1.0"

View File

@ -22,7 +22,6 @@ use tokio_io::AsyncWrite;
use body::{Binary, Body};
use header::ContentEncoding;
use server::encoding::{ContentEncoder, Output, TransferEncoding};
use server::shared::SharedBytes;
use server::WriterState;
use client::ClientRequest;
@ -53,7 +52,7 @@ impl HttpClientWriter {
written: 0,
headers_size: 0,
buffer_capacity: 0,
buffer: Output::Buffer(SharedBytes::empty()),
buffer: Output::Buffer(BytesMut::new()),
}
}
@ -110,6 +109,7 @@ impl<'a> io::Write for Writer<'a> {
impl HttpClientWriter {
pub fn start(&mut self, msg: &mut ClientRequest) -> io::Result<()> {
// prepare task
self.buffer = content_encoder(self.buffer.take(), msg);
self.flags.insert(Flags::STARTED);
if msg.upgrade() {
self.flags.insert(Flags::UPGRADE);
@ -118,7 +118,7 @@ impl HttpClientWriter {
// render message
{
// output buffer
let buffer = self.buffer.get_mut();
let buffer = self.buffer.as_mut();
// status line
writeln!(
@ -160,8 +160,6 @@ impl HttpClientWriter {
}
self.headers_size = self.buffer.len() as u32;
self.buffer = content_encoder(self.buffer.take(), msg);
if msg.body().is_binary() {
if let Body::Binary(bytes) = msg.replace_body(Body::Empty) {
self.written += bytes.len() as u64;
@ -215,7 +213,7 @@ impl HttpClientWriter {
}
}
fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> Output {
fn content_encoder(buf: BytesMut, req: &mut ClientRequest) -> Output {
let version = req.version();
let mut body = req.replace_body(Body::Empty);
let mut encoding = req.content_encoding();
@ -227,7 +225,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> Output {
}
Body::Binary(ref mut bytes) => {
if encoding.is_compression() {
let mut tmp = SharedBytes::empty();
let mut tmp = BytesMut::new();
let mut transfer = TransferEncoding::eof(tmp);
let mut enc = match encoding {
#[cfg(feature = "flate2")]
@ -308,7 +306,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> Output {
}
fn streaming_encoding(
buf: SharedBytes, version: Version, req: &mut ClientRequest,
buf: BytesMut, version: Version, req: &mut ClientRequest,
) -> TransferEncoding {
if req.chunked() {
// Enable transfer encoding

View File

@ -1127,7 +1127,6 @@ mod tests {
let response = srv.execute(request.send()).unwrap();
println!("RESP: {:?}", response);
let te = response
.headers()
.get(header::TRANSFER_ENCODING)

View File

@ -1,7 +1,7 @@
use std::fmt::Write as FmtWrite;
use std::io::{Read, Write};
use std::str::FromStr;
use std::{cmp, io, mem};
use std::{cmp, fmt, io, mem};
#[cfg(feature = "brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder};
@ -25,8 +25,6 @@ use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadStatus, PayloadWriter};
use super::shared::SharedBytes;
pub(crate) enum PayloadType {
Sender(PayloadSender),
Encoding(Box<EncodedPayload>),
@ -370,21 +368,34 @@ impl PayloadStream {
}
}
#[derive(Debug)]
pub(crate) enum Output {
Buffer(SharedBytes),
Buffer(BytesMut),
Encoder(ContentEncoder),
TE(TransferEncoding),
Empty,
}
impl Output {
pub fn take(&mut self) -> SharedBytes {
pub fn take(&mut self) -> BytesMut {
match mem::replace(self, Output::Empty) {
Output::Buffer(bytes) => bytes,
Output::Encoder(mut enc) => enc.take_buf(),
Output::TE(mut te) => te.take(),
_ => panic!(),
}
}
pub fn as_ref(&mut self) -> &SharedBytes {
pub fn take_option(&mut self) -> Option<BytesMut> {
match mem::replace(self, Output::Empty) {
Output::Buffer(bytes) => Some(bytes),
Output::Encoder(mut enc) => Some(enc.take_buf()),
Output::TE(mut te) => Some(te.take()),
_ => None,
}
}
pub fn as_ref(&mut self) -> &BytesMut {
match self {
Output::Buffer(ref mut bytes) => bytes,
Output::Encoder(ref mut enc) => enc.buf_ref(),
@ -392,9 +403,11 @@ impl Output {
Output::Empty => panic!(),
}
}
pub fn get_mut(&mut self) -> &mut BytesMut {
pub fn as_mut(&mut self) -> &mut BytesMut {
match self {
Output::Buffer(ref mut bytes) => bytes.get_mut(),
Output::Buffer(ref mut bytes) => bytes,
Output::Encoder(ref mut enc) => enc.buf_mut(),
Output::TE(ref mut te) => te.buf_mut(),
_ => panic!(),
}
}
@ -457,9 +470,23 @@ pub(crate) enum ContentEncoder {
Identity(TransferEncoding),
}
impl fmt::Debug for ContentEncoder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
#[cfg(feature = "brotli")]
ContentEncoder::Br(_) => writeln!(f, "ContentEncoder(Brotli)"),
#[cfg(feature = "flate2")]
ContentEncoder::Deflate(_) => writeln!(f, "ContentEncoder(Deflate)"),
#[cfg(feature = "flate2")]
ContentEncoder::Gzip(_) => writeln!(f, "ContentEncoder(Gzip)"),
ContentEncoder::Identity(_) => writeln!(f, "ContentEncoder(Identity)"),
}
}
}
impl ContentEncoder {
pub fn for_server(
buf: SharedBytes, req: &HttpInnerMessage, resp: &mut HttpResponse,
buf: BytesMut, req: &HttpInnerMessage, resp: &mut HttpResponse,
response_encoding: ContentEncoding,
) -> Output {
let version = resp.version().unwrap_or_else(|| req.version);
@ -522,7 +549,7 @@ impl ContentEncoder {
if !(encoding == ContentEncoding::Identity
|| encoding == ContentEncoding::Auto)
{
let mut tmp = SharedBytes::empty();
let mut tmp = BytesMut::new();
let mut transfer = TransferEncoding::eof(tmp);
let mut enc = match encoding {
#[cfg(feature = "flate2")]
@ -613,7 +640,7 @@ impl ContentEncoder {
}
fn streaming_encoding(
buf: SharedBytes, version: Version, resp: &mut HttpResponse,
buf: BytesMut, version: Version, resp: &mut HttpResponse,
) -> TransferEncoding {
match resp.chunked() {
Some(true) => {
@ -703,6 +730,19 @@ impl ContentEncoder {
}
}
#[inline]
pub(crate) fn take_buf(&mut self) -> BytesMut {
match *self {
#[cfg(feature = "brotli")]
ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(),
#[cfg(feature = "flate2")]
ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
#[cfg(feature = "flate2")]
ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
ContentEncoder::Identity(ref mut encoder) => encoder.take(),
}
}
#[inline]
pub(crate) fn buf_mut(&mut self) -> &mut BytesMut {
match *self {
@ -717,7 +757,7 @@ impl ContentEncoder {
}
#[inline]
pub(crate) fn buf_ref(&mut self) -> &SharedBytes {
pub(crate) fn buf_ref(&mut self) -> &BytesMut {
match *self {
#[cfg(feature = "brotli")]
ContentEncoder::Br(ref mut encoder) => encoder.get_mut().buf_ref(),
@ -810,7 +850,7 @@ impl ContentEncoder {
/// Encoders to handle different Transfer-Encodings.
#[derive(Debug)]
pub(crate) struct TransferEncoding {
buf: Option<SharedBytes>,
buf: Option<BytesMut>,
kind: TransferEncodingKind,
}
@ -829,11 +869,11 @@ enum TransferEncodingKind {
}
impl TransferEncoding {
fn take(self) -> SharedBytes {
self.buf.unwrap()
fn take(&mut self) -> BytesMut {
self.buf.take().unwrap()
}
fn buf_ref(&mut self) -> &SharedBytes {
fn buf_ref(&mut self) -> &BytesMut {
self.buf.as_ref().unwrap()
}
@ -846,7 +886,7 @@ impl TransferEncoding {
}
fn buf_mut(&mut self) -> &mut BytesMut {
self.buf.as_mut().unwrap().get_mut()
self.buf.as_mut().unwrap()
}
#[inline]
@ -858,7 +898,7 @@ impl TransferEncoding {
}
#[inline]
pub fn eof(buf: SharedBytes) -> TransferEncoding {
pub fn eof(buf: BytesMut) -> TransferEncoding {
TransferEncoding {
buf: Some(buf),
kind: TransferEncodingKind::Eof,
@ -866,7 +906,7 @@ impl TransferEncoding {
}
#[inline]
pub fn chunked(buf: SharedBytes) -> TransferEncoding {
pub fn chunked(buf: BytesMut) -> TransferEncoding {
TransferEncoding {
buf: Some(buf),
kind: TransferEncodingKind::Chunked(false),
@ -874,7 +914,7 @@ impl TransferEncoding {
}
#[inline]
pub fn length(len: u64, buf: SharedBytes) -> TransferEncoding {
pub fn length(len: u64, buf: BytesMut) -> TransferEncoding {
TransferEncoding {
buf: Some(buf),
kind: TransferEncodingKind::Length(len),
@ -1034,7 +1074,7 @@ mod tests {
#[test]
fn test_chunked_te() {
let bytes = SharedBytes::empty();
let bytes = BytesMut::new();
let mut enc = TransferEncoding::chunked(bytes);
{
assert!(!enc.encode(b"test").ok().unwrap());

View File

@ -93,10 +93,9 @@ where
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
buf: BytesMut,
) -> Self {
let bytes = settings.get_shared_bytes();
Http1 {
flags: Flags::KEEPALIVE,
stream: H1Writer::new(stream, bytes, Rc::clone(&settings)),
stream: H1Writer::new(stream, Rc::clone(&settings)),
decoder: H1Decoder::new(),
payload: None,
tasks: VecDeque::new(),

View File

@ -9,7 +9,6 @@ use tokio_io::AsyncWrite;
use super::encoding::{ContentEncoder, Output};
use super::helpers;
use super::settings::WorkerSettings;
use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body};
use header::ContentEncoding;
@ -40,14 +39,12 @@ pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
}
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
pub fn new(
stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>,
) -> H1Writer<T, H> {
pub fn new(stream: T, settings: Rc<WorkerSettings<H>>) -> H1Writer<T, H> {
H1Writer {
flags: Flags::KEEPALIVE,
written: 0,
headers_size: 0,
buffer: Output::Buffer(buf),
buffer: Output::Buffer(settings.get_bytes()),
buffer_capacity: 0,
stream,
settings,
@ -91,6 +88,14 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
}
}
impl<T: AsyncWrite, H: 'static> Drop for H1Writer<T, H> {
fn drop(&mut self) {
if let Some(bytes) = self.buffer.take_option() {
self.settings.release_bytes(bytes);
}
}
}
impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn written(&self) -> u64 {
@ -104,8 +109,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn buffer(&mut self) -> &mut BytesMut {
//self.buffer.get_mut()
unimplemented!()
self.buffer.as_mut()
}
fn start(
@ -113,6 +117,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
encoding: ContentEncoding,
) -> io::Result<WriterState> {
// prepare task
self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding);
if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
self.flags = Flags::STARTED | Flags::KEEPALIVE;
} else {
@ -141,7 +146,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
// render message
{
// output buffer
let mut buffer = self.buffer.get_mut();
let mut buffer = self.buffer.as_mut();
let reason = msg.reason().as_bytes();
let mut is_bin = if let Body::Binary(ref bytes) = body {
@ -221,9 +226,6 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
self.headers_size = buffer.len() as u32;
}
// output encoding
self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding);
if let Body::Binary(bytes) = body {
self.written = bytes.len() as u64;
self.buffer.write(bytes.as_ref())?;
@ -255,11 +257,11 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
Ok(val) => val,
};
if n < pl.len() {
self.buffer.write(&pl[n..]);
self.buffer.write(&pl[n..])?;
return Ok(WriterState::Done);
}
} else {
self.buffer.write(payload.as_ref());
self.buffer.write(payload.as_ref())?;
}
} else {
// TODO: add warning, write after EOF
@ -267,7 +269,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
}
} else {
// could be response to EXCEPT header
self.buffer.write(payload.as_ref());
self.buffer.write(payload.as_ref())?;
}
}

View File

@ -363,11 +363,7 @@ impl<H: HttpHandler + 'static> Entry<H> {
EntryPipe::Error(Pipeline::error(HttpResponse::NotFound()))
}),
payload: psender,
stream: H2Writer::new(
resp,
settings.get_shared_bytes(),
Rc::clone(settings),
),
stream: H2Writer::new(resp, Rc::clone(settings)),
flags: EntryFlags::empty(),
recv,
}

View File

@ -14,7 +14,6 @@ use http::{HttpTryFrom, Version};
use super::encoding::{ContentEncoder, Output};
use super::helpers;
use super::settings::WorkerSettings;
use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body};
use header::ContentEncoding;
@ -44,16 +43,16 @@ pub(crate) struct H2Writer<H: 'static> {
impl<H: 'static> H2Writer<H> {
pub fn new(
respond: SendResponse<Bytes>, buf: SharedBytes, settings: Rc<WorkerSettings<H>>,
respond: SendResponse<Bytes>, settings: Rc<WorkerSettings<H>>,
) -> H2Writer<H> {
H2Writer {
respond,
settings,
stream: None,
flags: Flags::empty(),
written: 0,
buffer: Output::Buffer(buf),
buffer: Output::Buffer(settings.get_bytes()),
buffer_capacity: 0,
respond,
settings,
}
}
@ -64,6 +63,12 @@ impl<H: 'static> H2Writer<H> {
}
}
impl<H: 'static> Drop for H2Writer<H> {
fn drop(&mut self) {
self.settings.release_bytes(self.buffer.take());
}
}
impl<H: 'static> Writer for H2Writer<H> {
fn written(&self) -> u64 {
self.written
@ -76,7 +81,7 @@ impl<H: 'static> Writer for H2Writer<H> {
#[inline]
fn buffer(&mut self) -> &mut BytesMut {
self.buffer.get_mut()
self.buffer.as_mut()
}
fn start(

View File

@ -16,7 +16,6 @@ mod h2;
mod h2writer;
pub(crate) mod helpers;
pub(crate) mod settings;
pub(crate) mod shared;
mod srv;
mod worker;

View File

@ -1,4 +1,5 @@
use std::cell::{Cell, RefCell, RefMut, UnsafeCell};
use std::collections::VecDeque;
use std::fmt::Write;
use std::rc::Rc;
use std::{env, fmt, mem, net};
@ -11,7 +12,6 @@ use time;
use super::channel::Node;
use super::helpers;
use super::shared::{SharedBytes, SharedBytesPool};
use super::KeepAlive;
use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
@ -201,8 +201,12 @@ impl<H> WorkerSettings<H> {
self.ka_enabled
}
pub fn get_shared_bytes(&self) -> SharedBytes {
SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
pub fn get_bytes(&self) -> BytesMut {
self.bytes.get_bytes()
}
pub fn release_bytes(&self, bytes: BytesMut) {
self.bytes.release_bytes(bytes)
}
pub fn get_http_message(&self) -> helpers::SharedHttpInnerMessage {
@ -273,6 +277,31 @@ impl fmt::Write for Date {
}
}
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<BytesMut>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> BytesMut {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
BytesMut::new()
}
}
pub fn release_bytes(&self, mut bytes: BytesMut) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
bytes.clear();
v.push_front(bytes);
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,109 +0,0 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io;
use std::rc::Rc;
use bytes::BytesMut;
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<BytesMut>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> BytesMut {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
BytesMut::new()
}
}
pub fn release_bytes(&self, mut bytes: BytesMut) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
bytes.clear();
v.push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(Option<BytesMut>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(pool) = self.1.take() {
if let Some(bytes) = self.0.take() {
pool.release_bytes(bytes);
}
}
}
}
impl SharedBytes {
pub fn new(bytes: BytesMut, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
pub fn empty() -> SharedBytes {
SharedBytes(Some(BytesMut::new()), None)
}
#[inline]
pub(crate) fn get_mut(&mut self) -> &mut BytesMut {
self.0.as_mut().unwrap()
}
#[inline]
pub fn len(&self) -> usize {
self.0.as_ref().unwrap().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0.as_ref().unwrap().is_empty()
}
#[inline]
pub fn as_ref(&self) -> &[u8] {
self.0.as_ref().unwrap().as_ref()
}
pub fn split_to(&mut self, n: usize) -> BytesMut {
self.get_mut().split_to(n)
}
pub fn take(&mut self) -> BytesMut {
self.get_mut().take()
}
#[inline]
pub fn reserve(&mut self, cap: usize) {
self.get_mut().reserve(cap);
}
#[inline]
pub fn extend_from_slice(&mut self, data: &[u8]) {
let buf = self.get_mut();
buf.extend_from_slice(data);
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(BytesMut::new()), None)
}
}
impl io::Write for SharedBytes {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}