1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

add write buffer capacity config

This commit is contained in:
Nikolay Kim 2018-03-09 10:00:15 -08:00
parent e2107ec6f4
commit 2853086463
4 changed files with 48 additions and 5 deletions

View File

@ -97,7 +97,8 @@ impl HttpRequest<()> {
/// Construct a new Request.
#[inline]
pub fn new(method: Method, uri: Uri,
version: Version, headers: HeaderMap, payload: Option<Payload>) -> HttpRequest
version: Version, headers: HeaderMap, payload: Option<Payload>)
-> HttpRequest
{
HttpRequest(
SharedHttpInnerMessage::from_message(HttpInnerMessage {

View File

@ -18,6 +18,10 @@ use handler::Responder;
use header::{Header, IntoHeaderValue, ContentEncoding};
use httprequest::HttpRequest;
/// max write buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
/// Represents various types of connection
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum ConnectionType {
@ -198,6 +202,16 @@ impl HttpResponse {
pub(crate) fn set_response_size(&mut self, size: u64) {
self.get_mut().response_size = size;
}
/// Set write buffer capacity
pub fn write_buffer_capacity(&self) -> usize {
self.get_ref().write_capacity
}
/// Set write buffer capacity
pub fn set_write_buffer_capacity(&mut self, cap: usize) {
self.get_mut().write_capacity = cap;
}
}
impl fmt::Debug for HttpResponse {
@ -462,6 +476,20 @@ impl HttpResponseBuilder {
self
}
/// Set write buffer capacity
///
/// This parameter makes sense only for streaming response
/// or actor. If write buffer reaches specified capacity, stream or actor get
/// paused.
///
/// Default write buffer capacity is 64kb
pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.write_capacity = cap;
}
self
}
/// Set a body and generate `HttpResponse`.
///
/// `HttpResponseBuilder` can not be used after this call.
@ -692,6 +720,7 @@ struct InnerHttpResponse {
chunked: Option<bool>,
encoding: Option<ContentEncoding>,
connection_type: Option<ConnectionType>,
write_capacity: usize,
response_size: u64,
error: Option<Error>,
}
@ -710,6 +739,7 @@ impl InnerHttpResponse {
encoding: None,
connection_type: None,
response_size: 0,
write_capacity: MAX_WRITE_BUFFER_SIZE,
error: None,
}
}
@ -763,6 +793,7 @@ impl Pool {
inner.connection_type = None;
inner.response_size = 0;
inner.error = None;
inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
v.push_front(inner);
}
})

View File

@ -34,6 +34,7 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
written: u64,
headers_size: u32,
buffer: SharedBytes,
buffer_capacity: usize,
}
impl<T: AsyncWrite> H1Writer<T> {
@ -45,6 +46,7 @@ impl<T: AsyncWrite> H1Writer<T> {
written: 0,
headers_size: 0,
buffer: buf,
buffer_capacity: 0,
stream,
}
}
@ -77,7 +79,7 @@ impl<T: AsyncWrite> H1Writer<T> {
let _ = self.buffer.split_to(n);
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@ -199,6 +201,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.written = bytes.len() as u64;
self.encoder.write(bytes)?;
} else {
// capacity, makes sense only for streaming or actor
self.buffer_capacity = msg.write_buffer_capacity();
msg.replace_body(body);
}
Ok(WriterState::Done)

View File

@ -34,6 +34,7 @@ pub(crate) struct H2Writer {
flags: Flags,
written: u64,
buffer: SharedBytes,
buffer_capacity: usize,
}
impl H2Writer {
@ -46,6 +47,7 @@ impl H2Writer {
flags: Flags::empty(),
written: 0,
buffer: buf,
buffer_capacity: 0,
}
}
@ -71,7 +73,7 @@ impl H2Writer {
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.len() > self.buffer_capacity {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@ -111,8 +113,11 @@ impl Writer for H2Writer {
self.written
}
fn start(&mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse, encoding: ContentEncoding)
-> io::Result<WriterState> {
fn start(&mut self,
req: &mut HttpInnerMessage,
msg: &mut HttpResponse,
encoding: ContentEncoding) -> io::Result<WriterState>
{
// prepare response
self.flags.insert(Flags::STARTED);
self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding);
@ -172,6 +177,7 @@ impl Writer for H2Writer {
Ok(WriterState::Pause)
} else {
msg.replace_body(body);
self.buffer_capacity = msg.write_buffer_capacity();
Ok(WriterState::Done)
}
}