1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

write buffer capacity for client

This commit is contained in:
Nikolay Kim 2018-03-09 10:09:13 -08:00
parent 2853086463
commit b56be8e571
3 changed files with 22 additions and 25 deletions

View File

@ -28,7 +28,7 @@ pub struct ClientRequest {
upgrade: bool, upgrade: bool,
encoding: ContentEncoding, encoding: ContentEncoding,
response_decompress: bool, response_decompress: bool,
buffer_capacity: Option<(usize, usize)>, buffer_capacity: usize,
conn: ConnectionType, conn: ConnectionType,
} }
@ -51,7 +51,7 @@ impl Default for ClientRequest {
upgrade: false, upgrade: false,
encoding: ContentEncoding::Auto, encoding: ContentEncoding::Auto,
response_decompress: true, response_decompress: true,
buffer_capacity: None, buffer_capacity: 32_768,
conn: ConnectionType::Default, conn: ConnectionType::Default,
} }
} }
@ -179,7 +179,8 @@ impl ClientRequest {
self.response_decompress self.response_decompress
} }
pub fn buffer_capacity(&self) -> Option<(usize, usize)> { /// Requested write buffer capacity
pub fn write_buffer_capacity(&self) -> usize {
self.buffer_capacity self.buffer_capacity
} }
@ -466,12 +467,11 @@ impl ClientRequestBuilder {
} }
/// Set write buffer capacity /// Set write buffer capacity
pub fn buffer_capacity(&mut self, ///
low_watermark: usize, /// Default buffer capacity is 32kb
high_watermark: usize) -> &mut Self pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self {
{
if let Some(parts) = parts(&mut self.request, &self.err) { if let Some(parts) = parts(&mut self.request, &self.err) {
parts.buffer_capacity = Some((low_watermark, high_watermark)); parts.buffer_capacity = cap;
} }
self self
} }

View File

@ -24,8 +24,6 @@ use server::encoding::{ContentEncoder, TransferEncoding};
use client::ClientRequest; use client::ClientRequest;
const LOW_WATERMARK: usize = 1024;
const HIGH_WATERMARK: usize = 8 * LOW_WATERMARK;
const AVERAGE_HEADER_SIZE: usize = 30; const AVERAGE_HEADER_SIZE: usize = 30;
bitflags! { bitflags! {
@ -42,9 +40,8 @@ pub(crate) struct HttpClientWriter {
written: u64, written: u64,
headers_size: u32, headers_size: u32,
buffer: SharedBytes, buffer: SharedBytes,
buffer_capacity: usize,
encoder: ContentEncoder, encoder: ContentEncoder,
low: usize,
high: usize,
} }
impl HttpClientWriter { impl HttpClientWriter {
@ -55,10 +52,9 @@ impl HttpClientWriter {
flags: Flags::empty(), flags: Flags::empty(),
written: 0, written: 0,
headers_size: 0, headers_size: 0,
buffer_capacity: 0,
buffer, buffer,
encoder, encoder,
low: LOW_WATERMARK,
high: HIGH_WATERMARK,
} }
} }
@ -70,12 +66,6 @@ impl HttpClientWriter {
// self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) // self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
// } // }
/// Set write buffer capacity
pub fn set_buffer_capacity(&mut self, low_watermark: usize, high_watermark: usize) {
self.low = low_watermark;
self.high = high_watermark;
}
fn write_to_stream<T: AsyncWrite>(&mut self, stream: &mut T) -> io::Result<WriterState> { fn write_to_stream<T: AsyncWrite>(&mut self, stream: &mut T) -> io::Result<WriterState> {
while !self.buffer.is_empty() { while !self.buffer.is_empty() {
match stream.write(self.buffer.as_ref()) { match stream.write(self.buffer.as_ref()) {
@ -87,7 +77,7 @@ impl HttpClientWriter {
let _ = self.buffer.split_to(n); let _ = self.buffer.split_to(n);
}, },
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.buffer.len() > self.high { if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause) return Ok(WriterState::Pause)
} else { } else {
return Ok(WriterState::Done) return Ok(WriterState::Done)
@ -106,9 +96,6 @@ impl HttpClientWriter {
// prepare task // prepare task
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
self.encoder = content_encoder(self.buffer.clone(), msg); self.encoder = content_encoder(self.buffer.clone(), msg);
if let Some(capacity) = msg.buffer_capacity() {
self.set_buffer_capacity(capacity.0, capacity.1);
}
// render message // render message
{ {
@ -153,6 +140,8 @@ impl HttpClientWriter {
self.written += bytes.len() as u64; self.written += bytes.len() as u64;
self.encoder.write(bytes)?; self.encoder.write(bytes)?;
} }
} else {
self.buffer_capacity = msg.write_buffer_capacity();
} }
} }
Ok(()) Ok(())
@ -168,7 +157,7 @@ impl HttpClientWriter {
} }
} }
if self.buffer.len() > self.high { if self.buffer.len() > self.buffer_capacity {
Ok(WriterState::Pause) Ok(WriterState::Pause)
} else { } else {
Ok(WriterState::Done) Ok(WriterState::Done)

View File

@ -192,6 +192,14 @@ impl Client {
self self
} }
/// Set write buffer capacity
///
/// Default buffer capacity is 32kb
pub fn write_buffer_capacity(mut self, cap: usize) -> Self {
self.request.write_buffer_capacity(cap);
self
}
/// Set request header /// Set request header
pub fn header<K, V>(mut self, key: K, value: V) -> Self pub fn header<K, V>(mut self, key: K, value: V) -> Self
where HeaderName: HttpTryFrom<K>, V: IntoHeaderValue where HeaderName: HttpTryFrom<K>, V: IntoHeaderValue