1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

fix back pressure for h1 import stream

This commit is contained in:
Nikolay Kim 2018-02-09 16:20:10 -08:00
parent 728377a447
commit 74377ef73d
3 changed files with 71 additions and 52 deletions

View File

@ -260,6 +260,7 @@ impl PayloadWriter for PayloadSender {
}
}
#[inline]
fn capacity(&self) -> usize {
if let Some(shared) = self.inner.upgrade() {
shared.borrow().capacity()
@ -327,10 +328,10 @@ impl Inner {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
Ok(Async::Ready(Some(PayloadItem(data))))
} else if self.eof {
Ok(Async::Ready(None))
} else if let Some(err) = self.err.take() {
Err(err)
} else if self.eof {
Ok(Async::Ready(None))
} else {
self.task = Some(current_task());
Ok(Async::NotReady)
@ -439,6 +440,7 @@ impl Inner {
self.items.push_front(data);
}
#[inline]
fn capacity(&self) -> usize {
if self.len > self.buf_size {
0

View File

@ -94,6 +94,7 @@ impl PayloadType {
}
impl PayloadWriter for PayloadType {
#[inline]
fn set_error(&mut self, err: PayloadError) {
match *self {
PayloadType::Sender(ref mut sender) => sender.set_error(err),
@ -101,6 +102,7 @@ impl PayloadWriter for PayloadType {
}
}
#[inline]
fn feed_eof(&mut self) {
match *self {
PayloadType::Sender(ref mut sender) => sender.feed_eof(),
@ -108,6 +110,7 @@ impl PayloadWriter for PayloadType {
}
}
#[inline]
fn feed_data(&mut self, data: Bytes) {
match *self {
PayloadType::Sender(ref mut sender) => sender.feed_data(data),
@ -115,6 +118,7 @@ impl PayloadWriter for PayloadType {
}
}
#[inline]
fn capacity(&self) -> usize {
match *self {
PayloadType::Sender(ref sender) => sender.capacity(),

View File

@ -16,7 +16,7 @@ use pipeline::Pipeline;
use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use error::{ParseError, PayloadError, ResponseError};
use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
use payload::{Payload, PayloadWriter};
use super::{utils, Writer};
use super::h1writer::H1Writer;
@ -319,7 +319,6 @@ struct Reader {
}
enum Decoding {
Paused,
Ready,
NotReady,
}
@ -343,61 +342,76 @@ impl Reader {
}
}
fn decode(&mut self, buf: &mut BytesMut) -> std::result::Result<Decoding, ReaderError> {
if let Some(ref mut payload) = self.payload {
if payload.tx.capacity() > DEFAULT_BUFFER_SIZE {
return Ok(Decoding::Paused)
}
loop {
match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes)
},
Ok(Async::Ready(None)) => {
payload.tx.feed_eof();
return Ok(Decoding::Ready)
},
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
Err(err) => {
payload.tx.set_error(err.into());
return Err(ReaderError::Payload)
}
#[inline]
fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo)
-> std::result::Result<Decoding, ReaderError>
{
loop {
match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes)
},
Ok(Async::Ready(None)) => {
payload.tx.feed_eof();
return Ok(Decoding::Ready)
},
Ok(Async::NotReady) => return Ok(Decoding::NotReady),
Err(err) => {
payload.tx.set_error(err.into());
return Err(ReaderError::Payload)
}
}
} else {
return Ok(Decoding::Ready)
}
}
pub fn parse<T, H>(&mut self, io: &mut T,
buf: &mut BytesMut,
settings: &WorkerSettings<H>) -> Poll<HttpRequest, ReaderError>
where T: IoStream
{
// read payload
if self.payload.is_some() {
match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(PayloadError::Incomplete);
}
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
},
Err(err) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(err.into());
}
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
let done = {
if let Some(ref mut payload) = self.payload {
if payload.tx.capacity() == 0 {
return Ok(Async::NotReady)
}
_ => (),
match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
payload.tx.set_error(PayloadError::Incomplete);
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
},
Err(err) => {
payload.tx.set_error(err.into());
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
}
_ => (),
}
loop {
match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes)
},
Ok(Async::Ready(None)) => {
payload.tx.feed_eof();
break true
},
Ok(Async::NotReady) =>
break false,
Err(err) => {
payload.tx.set_error(err.into());
return Err(ReaderError::Payload)
}
}
}
} else {
false
}
match self.decode(buf)? {
Decoding::Ready => self.payload = None,
Decoding::Paused | Decoding::NotReady => return Ok(Async::NotReady),
}
}
};
if done { self.payload = None }
// if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() {
@ -421,11 +435,10 @@ impl Reader {
match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? {
Async::Ready((msg, decoder)) => {
// process payload
if let Some(payload) = decoder {
self.payload = Some(payload);
match self.decode(buf)? {
Decoding::Paused | Decoding::NotReady => (),
Decoding::Ready => self.payload = None,
if let Some(mut payload) = decoder {
match self.decode(buf, &mut payload)? {
Decoding::Ready => (),
Decoding::NotReady => self.payload = Some(payload),
}
}
return Ok(Async::Ready(msg));