diff --git a/src/httprequest.rs b/src/httprequest.rs index 33926b280..692fc32ce 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -97,7 +97,8 @@ impl HttpRequest<()> { /// Construct a new Request. #[inline] pub fn new(method: Method, uri: Uri, - version: Version, headers: HeaderMap, payload: Option) -> HttpRequest + version: Version, headers: HeaderMap, payload: Option) + -> HttpRequest { HttpRequest( SharedHttpInnerMessage::from_message(HttpInnerMessage { diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 9c99d4d68..ad2f0016b 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -18,6 +18,10 @@ use handler::Responder; use header::{Header, IntoHeaderValue, ContentEncoding}; use httprequest::HttpRequest; +/// max write buffer size 64k +pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536; + + /// Represents various types of connection #[derive(Copy, Clone, PartialEq, Debug)] pub enum ConnectionType { @@ -198,6 +202,16 @@ impl HttpResponse { pub(crate) fn set_response_size(&mut self, size: u64) { self.get_mut().response_size = size; } + + /// Set write buffer capacity + pub fn write_buffer_capacity(&self) -> usize { + self.get_ref().write_capacity + } + + /// Set write buffer capacity + pub fn set_write_buffer_capacity(&mut self, cap: usize) { + self.get_mut().write_capacity = cap; + } } impl fmt::Debug for HttpResponse { @@ -462,6 +476,20 @@ impl HttpResponseBuilder { self } + /// Set write buffer capacity + /// + /// This parameter makes sense only for streaming response + /// or actor. If write buffer reaches specified capacity, stream or actor get + /// paused. + /// + /// Default write buffer capacity is 64kb + pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self { + if let Some(parts) = parts(&mut self.response, &self.err) { + parts.write_capacity = cap; + } + self + } + /// Set a body and generate `HttpResponse`. /// /// `HttpResponseBuilder` can not be used after this call. @@ -692,6 +720,7 @@ struct InnerHttpResponse { chunked: Option, encoding: Option, connection_type: Option, + write_capacity: usize, response_size: u64, error: Option, } @@ -710,6 +739,7 @@ impl InnerHttpResponse { encoding: None, connection_type: None, response_size: 0, + write_capacity: MAX_WRITE_BUFFER_SIZE, error: None, } } @@ -763,6 +793,7 @@ impl Pool { inner.connection_type = None; inner.response_size = 0; inner.error = None; + inner.write_capacity = MAX_WRITE_BUFFER_SIZE; v.push_front(inner); } }) diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 80d02f292..e77e60ca7 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -34,6 +34,7 @@ pub(crate) struct H1Writer { written: u64, headers_size: u32, buffer: SharedBytes, + buffer_capacity: usize, } impl H1Writer { @@ -45,6 +46,7 @@ impl H1Writer { written: 0, headers_size: 0, buffer: buf, + buffer_capacity: 0, stream, } } @@ -77,7 +79,7 @@ impl H1Writer { let _ = self.buffer.split_to(n); }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > self.buffer_capacity { return Ok(WriterState::Pause) } else { return Ok(WriterState::Done) @@ -199,6 +201,9 @@ impl Writer for H1Writer { self.written = bytes.len() as u64; self.encoder.write(bytes)?; } else { + // capacity, makes sense only for streaming or actor + self.buffer_capacity = msg.write_buffer_capacity(); + msg.replace_body(body); } Ok(WriterState::Done) diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index 095cd78f2..d57d92db5 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -34,6 +34,7 @@ pub(crate) struct H2Writer { flags: Flags, written: u64, buffer: SharedBytes, + buffer_capacity: usize, } impl H2Writer { @@ -46,6 +47,7 @@ impl H2Writer { flags: Flags::empty(), written: 0, buffer: buf, + buffer_capacity: 0, } } @@ -71,7 +73,7 @@ impl H2Writer { loop { match stream.poll_capacity() { Ok(Async::NotReady) => { - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > self.buffer_capacity { return Ok(WriterState::Pause) } else { return Ok(WriterState::Done) @@ -111,8 +113,11 @@ impl Writer for H2Writer { self.written } - fn start(&mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse, encoding: ContentEncoding) - -> io::Result { + fn start(&mut self, + req: &mut HttpInnerMessage, + msg: &mut HttpResponse, + encoding: ContentEncoding) -> io::Result + { // prepare response self.flags.insert(Flags::STARTED); self.encoder = ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding); @@ -172,6 +177,7 @@ impl Writer for H2Writer { Ok(WriterState::Pause) } else { msg.replace_body(body); + self.buffer_capacity = msg.write_buffer_capacity(); Ok(WriterState::Done) } }