From 40ca9ba9c5a9aebd03c83a33c321b7645a00f1cc Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 24 Jun 2018 10:30:58 +0600 Subject: [PATCH] simplify write buffer --- Cargo.toml | 2 + src/client/writer.rs | 14 +++--- src/fs.rs | 1 - src/server/encoding.rs | 82 +++++++++++++++++++++++-------- src/server/h1.rs | 3 +- src/server/h1writer.rs | 30 ++++++------ src/server/h2.rs | 6 +-- src/server/h2writer.rs | 17 ++++--- src/server/mod.rs | 1 - src/server/settings.rs | 35 +++++++++++-- src/server/shared.rs | 109 ----------------------------------------- 11 files changed, 130 insertions(+), 170 deletions(-) delete mode 100644 src/server/shared.rs diff --git a/Cargo.toml b/Cargo.toml index c7f8c458d..d4221cbcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,6 +103,8 @@ tokio-tls = { version="0.1", optional = true } openssl = { version="0.10", optional = true } tokio-openssl = { version="0.2", optional = true } +backtrace="*" + [dev-dependencies] env_logger = "0.5" serde_derive = "1.0" diff --git a/src/client/writer.rs b/src/client/writer.rs index bf626513b..653289794 100644 --- a/src/client/writer.rs +++ b/src/client/writer.rs @@ -22,7 +22,6 @@ use tokio_io::AsyncWrite; use body::{Binary, Body}; use header::ContentEncoding; use server::encoding::{ContentEncoder, Output, TransferEncoding}; -use server::shared::SharedBytes; use server::WriterState; use client::ClientRequest; @@ -53,7 +52,7 @@ impl HttpClientWriter { written: 0, headers_size: 0, buffer_capacity: 0, - buffer: Output::Buffer(SharedBytes::empty()), + buffer: Output::Buffer(BytesMut::new()), } } @@ -110,6 +109,7 @@ impl<'a> io::Write for Writer<'a> { impl HttpClientWriter { pub fn start(&mut self, msg: &mut ClientRequest) -> io::Result<()> { // prepare task + self.buffer = content_encoder(self.buffer.take(), msg); self.flags.insert(Flags::STARTED); if msg.upgrade() { self.flags.insert(Flags::UPGRADE); @@ -118,7 +118,7 @@ impl HttpClientWriter { // render message { // output buffer - let buffer = self.buffer.get_mut(); + let buffer = self.buffer.as_mut(); // status line writeln!( @@ -160,8 +160,6 @@ impl HttpClientWriter { } self.headers_size = self.buffer.len() as u32; - self.buffer = content_encoder(self.buffer.take(), msg); - if msg.body().is_binary() { if let Body::Binary(bytes) = msg.replace_body(Body::Empty) { self.written += bytes.len() as u64; @@ -215,7 +213,7 @@ impl HttpClientWriter { } } -fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> Output { +fn content_encoder(buf: BytesMut, req: &mut ClientRequest) -> Output { let version = req.version(); let mut body = req.replace_body(Body::Empty); let mut encoding = req.content_encoding(); @@ -227,7 +225,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> Output { } Body::Binary(ref mut bytes) => { if encoding.is_compression() { - let mut tmp = SharedBytes::empty(); + let mut tmp = BytesMut::new(); let mut transfer = TransferEncoding::eof(tmp); let mut enc = match encoding { #[cfg(feature = "flate2")] @@ -308,7 +306,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> Output { } fn streaming_encoding( - buf: SharedBytes, version: Version, req: &mut ClientRequest, + buf: BytesMut, version: Version, req: &mut ClientRequest, ) -> TransferEncoding { if req.chunked() { // Enable transfer encoding diff --git a/src/fs.rs b/src/fs.rs index bf9079cc5..c5a7de615 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1127,7 +1127,6 @@ mod tests { let response = srv.execute(request.send()).unwrap(); - println!("RESP: {:?}", response); let te = response .headers() .get(header::TRANSFER_ENCODING) diff --git a/src/server/encoding.rs b/src/server/encoding.rs index e7dc7a1a8..5acce762b 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -1,7 +1,7 @@ use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; use std::str::FromStr; -use std::{cmp, io, mem}; +use std::{cmp, fmt, io, mem}; #[cfg(feature = "brotli")] use brotli2::write::{BrotliDecoder, BrotliEncoder}; @@ -25,8 +25,6 @@ use httprequest::HttpInnerMessage; use httpresponse::HttpResponse; use payload::{PayloadSender, PayloadStatus, PayloadWriter}; -use super::shared::SharedBytes; - pub(crate) enum PayloadType { Sender(PayloadSender), Encoding(Box), @@ -370,21 +368,34 @@ impl PayloadStream { } } +#[derive(Debug)] pub(crate) enum Output { - Buffer(SharedBytes), + Buffer(BytesMut), Encoder(ContentEncoder), TE(TransferEncoding), Empty, } impl Output { - pub fn take(&mut self) -> SharedBytes { + pub fn take(&mut self) -> BytesMut { match mem::replace(self, Output::Empty) { Output::Buffer(bytes) => bytes, + Output::Encoder(mut enc) => enc.take_buf(), + Output::TE(mut te) => te.take(), _ => panic!(), } } - pub fn as_ref(&mut self) -> &SharedBytes { + + pub fn take_option(&mut self) -> Option { + match mem::replace(self, Output::Empty) { + Output::Buffer(bytes) => Some(bytes), + Output::Encoder(mut enc) => Some(enc.take_buf()), + Output::TE(mut te) => Some(te.take()), + _ => None, + } + } + + pub fn as_ref(&mut self) -> &BytesMut { match self { Output::Buffer(ref mut bytes) => bytes, Output::Encoder(ref mut enc) => enc.buf_ref(), @@ -392,9 +403,11 @@ impl Output { Output::Empty => panic!(), } } - pub fn get_mut(&mut self) -> &mut BytesMut { + pub fn as_mut(&mut self) -> &mut BytesMut { match self { - Output::Buffer(ref mut bytes) => bytes.get_mut(), + Output::Buffer(ref mut bytes) => bytes, + Output::Encoder(ref mut enc) => enc.buf_mut(), + Output::TE(ref mut te) => te.buf_mut(), _ => panic!(), } } @@ -457,9 +470,23 @@ pub(crate) enum ContentEncoder { Identity(TransferEncoding), } +impl fmt::Debug for ContentEncoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + #[cfg(feature = "brotli")] + ContentEncoder::Br(_) => writeln!(f, "ContentEncoder(Brotli)"), + #[cfg(feature = "flate2")] + ContentEncoder::Deflate(_) => writeln!(f, "ContentEncoder(Deflate)"), + #[cfg(feature = "flate2")] + ContentEncoder::Gzip(_) => writeln!(f, "ContentEncoder(Gzip)"), + ContentEncoder::Identity(_) => writeln!(f, "ContentEncoder(Identity)"), + } + } +} + impl ContentEncoder { pub fn for_server( - buf: SharedBytes, req: &HttpInnerMessage, resp: &mut HttpResponse, + buf: BytesMut, req: &HttpInnerMessage, resp: &mut HttpResponse, response_encoding: ContentEncoding, ) -> Output { let version = resp.version().unwrap_or_else(|| req.version); @@ -522,7 +549,7 @@ impl ContentEncoder { if !(encoding == ContentEncoding::Identity || encoding == ContentEncoding::Auto) { - let mut tmp = SharedBytes::empty(); + let mut tmp = BytesMut::new(); let mut transfer = TransferEncoding::eof(tmp); let mut enc = match encoding { #[cfg(feature = "flate2")] @@ -613,7 +640,7 @@ impl ContentEncoder { } fn streaming_encoding( - buf: SharedBytes, version: Version, resp: &mut HttpResponse, + buf: BytesMut, version: Version, resp: &mut HttpResponse, ) -> TransferEncoding { match resp.chunked() { Some(true) => { @@ -703,6 +730,19 @@ impl ContentEncoder { } } + #[inline] + pub(crate) fn take_buf(&mut self) -> BytesMut { + match *self { + #[cfg(feature = "brotli")] + ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(), + #[cfg(feature = "flate2")] + ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(), + #[cfg(feature = "flate2")] + ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(), + ContentEncoder::Identity(ref mut encoder) => encoder.take(), + } + } + #[inline] pub(crate) fn buf_mut(&mut self) -> &mut BytesMut { match *self { @@ -717,7 +757,7 @@ impl ContentEncoder { } #[inline] - pub(crate) fn buf_ref(&mut self) -> &SharedBytes { + pub(crate) fn buf_ref(&mut self) -> &BytesMut { match *self { #[cfg(feature = "brotli")] ContentEncoder::Br(ref mut encoder) => encoder.get_mut().buf_ref(), @@ -810,7 +850,7 @@ impl ContentEncoder { /// Encoders to handle different Transfer-Encodings. #[derive(Debug)] pub(crate) struct TransferEncoding { - buf: Option, + buf: Option, kind: TransferEncodingKind, } @@ -829,11 +869,11 @@ enum TransferEncodingKind { } impl TransferEncoding { - fn take(self) -> SharedBytes { - self.buf.unwrap() + fn take(&mut self) -> BytesMut { + self.buf.take().unwrap() } - fn buf_ref(&mut self) -> &SharedBytes { + fn buf_ref(&mut self) -> &BytesMut { self.buf.as_ref().unwrap() } @@ -846,7 +886,7 @@ impl TransferEncoding { } fn buf_mut(&mut self) -> &mut BytesMut { - self.buf.as_mut().unwrap().get_mut() + self.buf.as_mut().unwrap() } #[inline] @@ -858,7 +898,7 @@ impl TransferEncoding { } #[inline] - pub fn eof(buf: SharedBytes) -> TransferEncoding { + pub fn eof(buf: BytesMut) -> TransferEncoding { TransferEncoding { buf: Some(buf), kind: TransferEncodingKind::Eof, @@ -866,7 +906,7 @@ impl TransferEncoding { } #[inline] - pub fn chunked(buf: SharedBytes) -> TransferEncoding { + pub fn chunked(buf: BytesMut) -> TransferEncoding { TransferEncoding { buf: Some(buf), kind: TransferEncodingKind::Chunked(false), @@ -874,7 +914,7 @@ impl TransferEncoding { } #[inline] - pub fn length(len: u64, buf: SharedBytes) -> TransferEncoding { + pub fn length(len: u64, buf: BytesMut) -> TransferEncoding { TransferEncoding { buf: Some(buf), kind: TransferEncodingKind::Length(len), @@ -1034,7 +1074,7 @@ mod tests { #[test] fn test_chunked_te() { - let bytes = SharedBytes::empty(); + let bytes = BytesMut::new(); let mut enc = TransferEncoding::chunked(bytes); { assert!(!enc.encode(b"test").ok().unwrap()); diff --git a/src/server/h1.rs b/src/server/h1.rs index 87eeccb0b..8bca504c9 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -93,10 +93,9 @@ where settings: Rc>, stream: T, addr: Option, buf: BytesMut, ) -> Self { - let bytes = settings.get_shared_bytes(); Http1 { flags: Flags::KEEPALIVE, - stream: H1Writer::new(stream, bytes, Rc::clone(&settings)), + stream: H1Writer::new(stream, Rc::clone(&settings)), decoder: H1Decoder::new(), payload: None, tasks: VecDeque::new(), diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 36571d880..502793f7a 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -9,7 +9,6 @@ use tokio_io::AsyncWrite; use super::encoding::{ContentEncoder, Output}; use super::helpers; use super::settings::WorkerSettings; -use super::shared::SharedBytes; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use body::{Binary, Body}; use header::ContentEncoding; @@ -40,14 +39,12 @@ pub(crate) struct H1Writer { } impl H1Writer { - pub fn new( - stream: T, buf: SharedBytes, settings: Rc>, - ) -> H1Writer { + pub fn new(stream: T, settings: Rc>) -> H1Writer { H1Writer { flags: Flags::KEEPALIVE, written: 0, headers_size: 0, - buffer: Output::Buffer(buf), + buffer: Output::Buffer(settings.get_bytes()), buffer_capacity: 0, stream, settings, @@ -91,6 +88,14 @@ impl H1Writer { } } +impl Drop for H1Writer { + fn drop(&mut self) { + if let Some(bytes) = self.buffer.take_option() { + self.settings.release_bytes(bytes); + } + } +} + impl Writer for H1Writer { #[inline] fn written(&self) -> u64 { @@ -104,8 +109,7 @@ impl Writer for H1Writer { #[inline] fn buffer(&mut self) -> &mut BytesMut { - //self.buffer.get_mut() - unimplemented!() + self.buffer.as_mut() } fn start( @@ -113,6 +117,7 @@ impl Writer for H1Writer { encoding: ContentEncoding, ) -> io::Result { // prepare task + self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding); if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { self.flags = Flags::STARTED | Flags::KEEPALIVE; } else { @@ -141,7 +146,7 @@ impl Writer for H1Writer { // render message { // output buffer - let mut buffer = self.buffer.get_mut(); + let mut buffer = self.buffer.as_mut(); let reason = msg.reason().as_bytes(); let mut is_bin = if let Body::Binary(ref bytes) = body { @@ -221,9 +226,6 @@ impl Writer for H1Writer { self.headers_size = buffer.len() as u32; } - // output encoding - self.buffer = ContentEncoder::for_server(self.buffer.take(), req, msg, encoding); - if let Body::Binary(bytes) = body { self.written = bytes.len() as u64; self.buffer.write(bytes.as_ref())?; @@ -255,11 +257,11 @@ impl Writer for H1Writer { Ok(val) => val, }; if n < pl.len() { - self.buffer.write(&pl[n..]); + self.buffer.write(&pl[n..])?; return Ok(WriterState::Done); } } else { - self.buffer.write(payload.as_ref()); + self.buffer.write(payload.as_ref())?; } } else { // TODO: add warning, write after EOF @@ -267,7 +269,7 @@ impl Writer for H1Writer { } } else { // could be response to EXCEPT header - self.buffer.write(payload.as_ref()); + self.buffer.write(payload.as_ref())?; } } diff --git a/src/server/h2.rs b/src/server/h2.rs index 993376efc..1904734c6 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -363,11 +363,7 @@ impl Entry { EntryPipe::Error(Pipeline::error(HttpResponse::NotFound())) }), payload: psender, - stream: H2Writer::new( - resp, - settings.get_shared_bytes(), - Rc::clone(settings), - ), + stream: H2Writer::new(resp, Rc::clone(settings)), flags: EntryFlags::empty(), recv, } diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index e5f579b25..c44af51a7 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -14,7 +14,6 @@ use http::{HttpTryFrom, Version}; use super::encoding::{ContentEncoder, Output}; use super::helpers; use super::settings::WorkerSettings; -use super::shared::SharedBytes; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use body::{Binary, Body}; use header::ContentEncoding; @@ -44,16 +43,16 @@ pub(crate) struct H2Writer { impl H2Writer { pub fn new( - respond: SendResponse, buf: SharedBytes, settings: Rc>, + respond: SendResponse, settings: Rc>, ) -> H2Writer { H2Writer { - respond, - settings, stream: None, flags: Flags::empty(), written: 0, - buffer: Output::Buffer(buf), + buffer: Output::Buffer(settings.get_bytes()), buffer_capacity: 0, + respond, + settings, } } @@ -64,6 +63,12 @@ impl H2Writer { } } +impl Drop for H2Writer { + fn drop(&mut self) { + self.settings.release_bytes(self.buffer.take()); + } +} + impl Writer for H2Writer { fn written(&self) -> u64 { self.written @@ -76,7 +81,7 @@ impl Writer for H2Writer { #[inline] fn buffer(&mut self) -> &mut BytesMut { - self.buffer.get_mut() + self.buffer.as_mut() } fn start( diff --git a/src/server/mod.rs b/src/server/mod.rs index bffdf427a..1bbf460b0 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -16,7 +16,6 @@ mod h2; mod h2writer; pub(crate) mod helpers; pub(crate) mod settings; -pub(crate) mod shared; mod srv; mod worker; diff --git a/src/server/settings.rs b/src/server/settings.rs index ca5acb917..0fc81fe59 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -1,4 +1,5 @@ use std::cell::{Cell, RefCell, RefMut, UnsafeCell}; +use std::collections::VecDeque; use std::fmt::Write; use std::rc::Rc; use std::{env, fmt, mem, net}; @@ -11,7 +12,6 @@ use time; use super::channel::Node; use super::helpers; -use super::shared::{SharedBytes, SharedBytesPool}; use super::KeepAlive; use body::Body; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; @@ -201,8 +201,12 @@ impl WorkerSettings { self.ka_enabled } - pub fn get_shared_bytes(&self) -> SharedBytes { - SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) + pub fn get_bytes(&self) -> BytesMut { + self.bytes.get_bytes() + } + + pub fn release_bytes(&self, bytes: BytesMut) { + self.bytes.release_bytes(bytes) } pub fn get_http_message(&self) -> helpers::SharedHttpInnerMessage { @@ -273,6 +277,31 @@ impl fmt::Write for Date { } } +#[derive(Debug)] +pub(crate) struct SharedBytesPool(RefCell>); + +impl SharedBytesPool { + pub fn new() -> SharedBytesPool { + SharedBytesPool(RefCell::new(VecDeque::with_capacity(128))) + } + + pub fn get_bytes(&self) -> BytesMut { + if let Some(bytes) = self.0.borrow_mut().pop_front() { + bytes + } else { + BytesMut::new() + } + } + + pub fn release_bytes(&self, mut bytes: BytesMut) { + let v = &mut self.0.borrow_mut(); + if v.len() < 128 { + bytes.clear(); + v.push_front(bytes); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/server/shared.rs b/src/server/shared.rs deleted file mode 100644 index 4d6a59d8e..000000000 --- a/src/server/shared.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::cell::RefCell; -use std::collections::VecDeque; -use std::io; -use std::rc::Rc; - -use bytes::BytesMut; - -#[derive(Debug)] -pub(crate) struct SharedBytesPool(RefCell>); - -impl SharedBytesPool { - pub fn new() -> SharedBytesPool { - SharedBytesPool(RefCell::new(VecDeque::with_capacity(128))) - } - - pub fn get_bytes(&self) -> BytesMut { - if let Some(bytes) = self.0.borrow_mut().pop_front() { - bytes - } else { - BytesMut::new() - } - } - - pub fn release_bytes(&self, mut bytes: BytesMut) { - let v = &mut self.0.borrow_mut(); - if v.len() < 128 { - bytes.clear(); - v.push_front(bytes); - } - } -} - -#[derive(Debug)] -pub(crate) struct SharedBytes(Option, Option>); - -impl Drop for SharedBytes { - fn drop(&mut self) { - if let Some(pool) = self.1.take() { - if let Some(bytes) = self.0.take() { - pool.release_bytes(bytes); - } - } - } -} - -impl SharedBytes { - pub fn new(bytes: BytesMut, pool: Rc) -> SharedBytes { - SharedBytes(Some(bytes), Some(pool)) - } - - pub fn empty() -> SharedBytes { - SharedBytes(Some(BytesMut::new()), None) - } - - #[inline] - pub(crate) fn get_mut(&mut self) -> &mut BytesMut { - self.0.as_mut().unwrap() - } - - #[inline] - pub fn len(&self) -> usize { - self.0.as_ref().unwrap().len() - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.0.as_ref().unwrap().is_empty() - } - - #[inline] - pub fn as_ref(&self) -> &[u8] { - self.0.as_ref().unwrap().as_ref() - } - - pub fn split_to(&mut self, n: usize) -> BytesMut { - self.get_mut().split_to(n) - } - - pub fn take(&mut self) -> BytesMut { - self.get_mut().take() - } - - #[inline] - pub fn reserve(&mut self, cap: usize) { - self.get_mut().reserve(cap); - } - - #[inline] - pub fn extend_from_slice(&mut self, data: &[u8]) { - let buf = self.get_mut(); - buf.extend_from_slice(data); - } -} - -impl Default for SharedBytes { - fn default() -> Self { - SharedBytes(Some(BytesMut::new()), None) - } -} - -impl io::Write for SharedBytes { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -}