1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

remove read buffer management api

This commit is contained in:
Nikolay Kim 2018-02-26 20:07:22 -08:00
parent 0ab8bc11f3
commit a344c3a02e
4 changed files with 134 additions and 127 deletions

View File

@ -5,12 +5,9 @@ use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task};
use error::PayloadError; use error::PayloadError;
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
/// Buffered stream of bytes chunks /// Buffered stream of bytes chunks
/// ///
/// Payload stores chunks in a vector. First chunk can be received with `.readany()` method. /// Payload stores chunks in a vector. First chunk can be received with `.readany()` method.
@ -68,18 +65,6 @@ impl Payload {
self.inner.borrow_mut().unread_data(data); self.inner.borrow_mut().unread_data(data);
} }
/// Get size of payload buffer
#[inline]
pub fn buffer_size(&self) -> usize {
self.inner.borrow().buffer_size()
}
/// Set size of payload buffer
#[inline]
pub fn set_buffer_size(&self, size: usize) {
self.inner.borrow_mut().set_buffer_size(size)
}
#[cfg(test)] #[cfg(test)]
pub(crate) fn readall(&self) -> Option<Bytes> { pub(crate) fn readall(&self) -> Option<Bytes> {
self.inner.borrow_mut().readall() self.inner.borrow_mut().readall()
@ -92,7 +77,7 @@ impl Stream for Payload {
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> { fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.inner.borrow_mut().readany(false) self.inner.borrow_mut().readany()
} }
} }
@ -103,7 +88,7 @@ impl Clone for Payload {
} }
/// Payload writer interface. /// Payload writer interface.
pub trait PayloadWriter { pub(crate) trait PayloadWriter {
/// Set stream error. /// Set stream error.
fn set_error(&mut self, err: PayloadError); fn set_error(&mut self, err: PayloadError);
@ -114,8 +99,8 @@ pub trait PayloadWriter {
/// Feed bytes into a payload stream /// Feed bytes into a payload stream
fn feed_data(&mut self, data: Bytes); fn feed_data(&mut self, data: Bytes);
/// Get estimated available capacity /// Need read data
fn capacity(&self) -> usize; fn need_read(&self) -> bool;
} }
/// Sender part of the payload stream /// Sender part of the payload stream
@ -144,24 +129,22 @@ impl PayloadWriter for PayloadSender {
} }
#[inline] #[inline]
fn capacity(&self) -> usize { fn need_read(&self) -> bool {
if let Some(shared) = self.inner.upgrade() { if let Some(shared) = self.inner.upgrade() {
shared.borrow().capacity() shared.borrow().need_read
} else { } else {
0 false
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
struct Inner { struct Inner {
len: usize, len: usize,
eof: bool, eof: bool,
err: Option<PayloadError>, err: Option<PayloadError>,
task: Option<Task>, need_read: bool,
items: VecDeque<Bytes>, items: VecDeque<Bytes>,
buf_size: usize,
} }
impl Inner { impl Inner {
@ -171,32 +154,23 @@ impl Inner {
eof, eof,
len: 0, len: 0,
err: None, err: None,
task: None,
items: VecDeque::new(), items: VecDeque::new(),
buf_size: DEFAULT_BUFFER_SIZE, need_read: false,
} }
} }
fn set_error(&mut self, err: PayloadError) { fn set_error(&mut self, err: PayloadError) {
self.err = Some(err); self.err = Some(err);
if let Some(task) = self.task.take() {
task.notify()
}
} }
fn feed_eof(&mut self) { fn feed_eof(&mut self) {
self.eof = true; self.eof = true;
if let Some(task) = self.task.take() {
task.notify()
}
} }
fn feed_data(&mut self, data: Bytes) { fn feed_data(&mut self, data: Bytes) {
self.len += data.len(); self.len += data.len();
self.need_read = false;
self.items.push_back(data); self.items.push_back(data);
if let Some(task) = self.task.take() {
task.notify()
}
} }
fn eof(&self) -> bool { fn eof(&self) -> bool {
@ -219,11 +193,12 @@ impl Inner {
self.len = 0; self.len = 0;
Some(buf.take().freeze()) Some(buf.take().freeze())
} else { } else {
self.need_read = true;
None None
} }
} }
fn readany(&mut self, notify: bool) -> Poll<Option<Bytes>, PayloadError> { fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
self.len -= data.len(); self.len -= data.len();
Ok(Async::Ready(Some(data))) Ok(Async::Ready(Some(data)))
@ -232,9 +207,7 @@ impl Inner {
} else if self.eof { } else if self.eof {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
if notify { self.need_read = true;
self.task = Some(current_task());
}
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
@ -243,23 +216,6 @@ impl Inner {
self.len += data.len(); self.len += data.len();
self.items.push_front(data); self.items.push_front(data);
} }
#[inline]
fn capacity(&self) -> usize {
if self.len > self.buf_size {
0
} else {
self.buf_size - self.len
}
}
fn buffer_size(&self) -> usize {
self.buf_size
}
fn set_buffer_size(&mut self, size: usize) {
self.buf_size = size
}
} }
pub struct PayloadHelper<S> { pub struct PayloadHelper<S> {

View File

@ -120,10 +120,10 @@ impl PayloadWriter for PayloadType {
} }
#[inline] #[inline]
fn capacity(&self) -> usize { fn need_read(&self) -> bool {
match *self { match *self {
PayloadType::Sender(ref sender) => sender.capacity(), PayloadType::Sender(ref sender) => sender.need_read(),
PayloadType::Encoding(ref enc) => enc.capacity(), PayloadType::Encoding(ref enc) => enc.need_read(),
} }
} }
} }
@ -351,8 +351,9 @@ impl PayloadWriter for EncodedPayload {
} }
} }
fn capacity(&self) -> usize { #[inline]
self.inner.capacity() fn need_read(&self) -> bool {
self.inner.need_read()
} }
} }

View File

@ -88,18 +88,6 @@ impl<T, H> Http1<T, H>
self.stream.get_mut() self.stream.get_mut()
} }
fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> {
// check stream state
match self.stream.poll_completed(shutdown) {
Ok(Async::Ready(_)) => Ok(true),
Ok(Async::NotReady) => Ok(false),
Err(err) => {
debug!("Error sending data: {}", err);
Err(())
}
}
}
pub fn poll(&mut self) -> Poll<(), ()> { pub fn poll(&mut self) -> Poll<(), ()> {
// keep-alive timer // keep-alive timer
if let Some(ref mut timer) = self.keepalive_timer { if let Some(ref mut timer) = self.keepalive_timer {
@ -113,11 +101,29 @@ impl<T, H> Http1<T, H>
} }
} }
self.poll_io() loop {
match self.poll_io()? {
Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(())),
Async::NotReady => return Ok(Async::NotReady),
}
}
}
fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> {
// check stream state
match self.stream.poll_completed(shutdown) {
Ok(Async::Ready(_)) => Ok(true),
Ok(Async::NotReady) => Ok(false),
Err(err) => {
debug!("Error sending data: {}", err);
Err(())
}
}
} }
// TODO: refactor // TODO: refactor
pub fn poll_io(&mut self) -> Poll<(), ()> { pub fn poll_io(&mut self) -> Poll<bool, ()> {
// read incoming data // read incoming data
let need_read = let need_read =
if !self.flags.contains(Flags::ERROR) && self.tasks.len() < MAX_PIPELINED_MESSAGES if !self.flags.contains(Flags::ERROR) && self.tasks.len() < MAX_PIPELINED_MESSAGES
@ -135,9 +141,9 @@ impl<T, H> Http1<T, H>
// start request processing // start request processing
for h in self.settings.handlers().iter_mut() { for h in self.settings.handlers().iter_mut() {
req = match h.handle(req) { req = match h.handle(req) {
Ok(t) => { Ok(pipe) => {
self.tasks.push_back( self.tasks.push_back(
Entry {pipe: t, flags: EntryFlags::empty()}); Entry {pipe, flags: EntryFlags::empty()});
continue 'outer continue 'outer
}, },
Err(req) => req, Err(req) => req,
@ -150,13 +156,6 @@ impl<T, H> Http1<T, H>
continue continue
}, },
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),
Err(ReaderError::Disconnect) => {
self.flags.insert(Flags::ERROR);
self.stream.disconnected();
for entry in &mut self.tasks {
entry.pipe.disconnected()
}
},
Err(err) => { Err(err) => {
// notify all tasks // notify all tasks
self.stream.disconnected(); self.stream.disconnected();
@ -171,12 +170,16 @@ impl<T, H> Http1<T, H>
// on parse error, stop reading stream but tasks need to be completed // on parse error, stop reading stream but tasks need to be completed
self.flags.insert(Flags::ERROR); self.flags.insert(Flags::ERROR);
if self.tasks.is_empty() { match err {
if let ReaderError::Error(err) = err { ReaderError::Disconnect => (),
self.tasks.push_back( _ =>
Entry {pipe: Pipeline::error(err.error_response()), if self.tasks.is_empty() {
flags: EntryFlags::empty()}); if let ReaderError::Error(err) = err {
} self.tasks.push_back(
Entry {pipe: Pipeline::error(err.error_response()),
flags: EntryFlags::empty()});
}
}
} }
}, },
} }
@ -187,6 +190,8 @@ impl<T, H> Http1<T, H>
true true
}; };
let retry = self.reader.need_read();
loop { loop {
// check in-flight messages // check in-flight messages
let mut io = false; let mut io = false;
@ -221,7 +226,12 @@ impl<T, H> Http1<T, H>
} }
}, },
// no more IO for this iteration // no more IO for this iteration
Ok(Async::NotReady) => io = true, Ok(Async::NotReady) => {
if self.reader.need_read() && !retry {
return Ok(Async::Ready(true));
}
io = true;
}
Err(err) => { Err(err) => {
// it is not possible to recover from error // it is not possible to recover from error
// during pipe handling, so just drop connection // during pipe handling, so just drop connection
@ -268,14 +278,14 @@ impl<T, H> Http1<T, H>
if !self.poll_completed(true)? { if !self.poll_completed(true)? {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
return Ok(Async::Ready(())) return Ok(Async::Ready(false))
} }
// start keep-alive timer, this also is slow request timeout // start keep-alive timer, this also is slow request timeout
if self.tasks.is_empty() { if self.tasks.is_empty() {
// check stream state // check stream state
if self.flags.contains(Flags::ERROR) { if self.flags.contains(Flags::ERROR) {
return Ok(Async::Ready(())) return Ok(Async::Ready(false))
} }
if self.settings.keep_alive_enabled() { if self.settings.keep_alive_enabled() {
@ -295,7 +305,7 @@ impl<T, H> Http1<T, H>
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
// keep-alive is disabled, drop connection // keep-alive is disabled, drop connection
return Ok(Async::Ready(())) return Ok(Async::Ready(false))
} }
} else if !self.poll_completed(false)? || } else if !self.poll_completed(false)? ||
self.flags.contains(Flags::KEEPALIVE) { self.flags.contains(Flags::KEEPALIVE) {
@ -303,7 +313,7 @@ impl<T, H> Http1<T, H>
// if keep-alive unset, rely on operating system // if keep-alive unset, rely on operating system
return Ok(Async::NotReady) return Ok(Async::NotReady)
} else { } else {
return Ok(Async::Ready(())) return Ok(Async::Ready(false))
} }
} else { } else {
self.poll_completed(false)?; self.poll_completed(false)?;
@ -341,14 +351,27 @@ impl Reader {
} }
} }
#[inline]
fn need_read(&self) -> bool {
if let Some(ref info) = self.payload {
info.tx.need_read()
} else {
true
}
}
#[inline] #[inline]
fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo) fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo)
-> Result<Decoding, ReaderError> -> Result<Decoding, ReaderError>
{ {
loop { while !buf.is_empty() {
match payload.decoder.decode(buf) { match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => { Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes) payload.tx.feed_data(bytes);
if payload.decoder.is_eof() {
payload.tx.feed_eof();
return Ok(Decoding::Ready)
}
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
payload.tx.feed_eof(); payload.tx.feed_eof();
@ -361,6 +384,7 @@ impl Reader {
} }
} }
} }
Ok(Decoding::NotReady)
} }
pub fn parse<T, H>(&mut self, io: &mut T, pub fn parse<T, H>(&mut self, io: &mut T,
@ -368,12 +392,13 @@ impl Reader {
settings: &WorkerSettings<H>) -> Poll<HttpRequest, ReaderError> settings: &WorkerSettings<H>) -> Poll<HttpRequest, ReaderError>
where T: IoStream where T: IoStream
{ {
if !self.need_read() {
return Ok(Async::NotReady)
}
// read payload // read payload
let done = { let done = {
if let Some(ref mut payload) = self.payload { if let Some(ref mut payload) = self.payload {
if payload.tx.capacity() == 0 {
return Ok(Async::NotReady)
}
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
payload.tx.set_error(PayloadError::Incomplete); payload.tx.set_error(PayloadError::Incomplete);
@ -392,7 +417,11 @@ impl Reader {
loop { loop {
match payload.decoder.decode(buf) { match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => { Ok(Async::Ready(Some(bytes))) => {
payload.tx.feed_data(bytes) payload.tx.feed_data(bytes);
if payload.decoder.is_eof() {
payload.tx.feed_eof();
break true
}
}, },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
payload.tx.feed_eof(); payload.tx.feed_eof();
@ -628,6 +657,13 @@ enum ChunkedState {
} }
impl Decoder { impl Decoder {
pub fn is_eof(&self) -> bool {
match self.kind {
Kind::Length(0) | Kind::Chunked(ChunkedState::End, _) | Kind::Eof(true) => true,
_ => false,
}
}
pub fn decode(&mut self, body: &mut BytesMut) -> Poll<Option<Bytes>, io::Error> { pub fn decode(&mut self, body: &mut BytesMut) -> Poll<Option<Bytes>, io::Error> {
match self.kind { match self.kind {
Kind::Length(ref mut remaining) => { Kind::Length(ref mut remaining) => {
@ -819,7 +855,7 @@ mod tests {
use std::{io, cmp, time}; use std::{io, cmp, time};
use std::net::Shutdown; use std::net::Shutdown;
use bytes::{Bytes, BytesMut, Buf}; use bytes::{Bytes, BytesMut, Buf};
use futures::Async; use futures::{Async, Stream};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use http::{Version, Method}; use http::{Version, Method};
@ -1324,6 +1360,7 @@ mod tests {
assert!(!req.payload().eof()); assert!(!req.payload().eof());
buf.feed_data("4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); buf.feed_data("4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n");
let _ = req.payload_mut().poll();
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(!req.payload().eof()); assert!(!req.payload().eof());
assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline");
@ -1348,6 +1385,7 @@ mod tests {
"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\ "4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\
POST /test2 HTTP/1.1\r\n\ POST /test2 HTTP/1.1\r\n\
transfer-encoding: chunked\r\n\r\n"); transfer-encoding: chunked\r\n\r\n");
let _ = req.payload_mut().poll();
let req2 = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); let req2 = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert_eq!(*req2.method(), Method::POST); assert_eq!(*req2.method(), Method::POST);
@ -1391,10 +1429,14 @@ mod tests {
//buf.feed_data("test: test\r\n"); //buf.feed_data("test: test\r\n");
//not_ready!(reader.parse(&mut buf, &mut readbuf)); //not_ready!(reader.parse(&mut buf, &mut readbuf));
let _ = req.payload_mut().poll();
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline");
assert!(!req.payload().eof()); assert!(!req.payload().eof());
buf.feed_data("\r\n"); buf.feed_data("\r\n");
let _ = req.payload_mut().poll();
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(req.payload().eof()); assert!(req.payload().eof());
} }
@ -1413,6 +1455,7 @@ mod tests {
assert!(!req.payload().eof()); assert!(!req.payload().eof());
buf.feed_data("4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") buf.feed_data("4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n")
let _ = req.payload_mut().poll();
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(!req.payload().eof()); assert!(!req.payload().eof());
assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline");

View File

@ -34,7 +34,8 @@ bitflags! {
} }
/// HTTP/2 Transport /// HTTP/2 Transport
pub(crate) struct Http2<T, H> pub(crate)
struct Http2<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: 'static where T: AsyncRead + AsyncWrite + 'static, H: 'static
{ {
flags: Flags, flags: Flags,
@ -103,21 +104,29 @@ impl<T, H> Http2<T, H>
item.poll_payload(); item.poll_payload();
if !item.flags.contains(EntryFlags::EOF) { if !item.flags.contains(EntryFlags::EOF) {
match item.task.poll_io(&mut item.stream) { let retry = item.payload.need_read();
Ok(Async::Ready(ready)) => { loop {
item.flags.insert(EntryFlags::EOF); match item.task.poll_io(&mut item.stream) {
if ready { Ok(Async::Ready(ready)) => {
item.flags.insert(EntryFlags::FINISHED); item.flags.insert(EntryFlags::EOF);
if ready {
item.flags.insert(EntryFlags::FINISHED);
}
not_ready = false;
},
Ok(Async::NotReady) => {
if item.payload.need_read() && !retry {
continue
}
},
Err(err) => {
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::EOF);
item.flags.insert(EntryFlags::ERROR);
item.stream.reset(Reason::INTERNAL_ERROR);
} }
not_ready = false;
},
Ok(Async::NotReady) => (),
Err(err) => {
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::EOF);
item.flags.insert(EntryFlags::ERROR);
item.stream.reset(Reason::INTERNAL_ERROR);
} }
break
} }
} else if !item.flags.contains(EntryFlags::FINISHED) { } else if !item.flags.contains(EntryFlags::FINISHED) {
match item.task.poll() { match item.task.poll() {
@ -248,7 +257,6 @@ struct Entry {
payload: PayloadType, payload: PayloadType,
recv: RecvStream, recv: RecvStream,
stream: H2Writer, stream: H2Writer,
capacity: usize,
flags: EntryFlags, flags: EntryFlags,
} }
@ -292,13 +300,20 @@ impl Entry {
payload: psender, payload: psender,
stream: H2Writer::new(resp, settings.get_shared_bytes()), stream: H2Writer::new(resp, settings.get_shared_bytes()),
flags: EntryFlags::empty(), flags: EntryFlags::empty(),
capacity: 0,
recv, recv,
} }
} }
fn poll_payload(&mut self) { fn poll_payload(&mut self) {
if !self.flags.contains(EntryFlags::REOF) { if !self.flags.contains(EntryFlags::REOF) {
if self.payload.need_read() {
if let Err(err) = self.recv.release_capacity().release_capacity(32_768) {
self.payload.set_error(PayloadError::Http2(err))
}
} else if let Err(err) = self.recv.release_capacity().release_capacity(0) {
self.payload.set_error(PayloadError::Http2(err))
}
match self.recv.poll() { match self.recv.poll() {
Ok(Async::Ready(Some(chunk))) => { Ok(Async::Ready(Some(chunk))) => {
self.payload.feed_data(chunk); self.payload.feed_data(chunk);
@ -311,14 +326,6 @@ impl Entry {
self.payload.set_error(PayloadError::Http2(err)) self.payload.set_error(PayloadError::Http2(err))
} }
} }
let capacity = self.payload.capacity();
if self.capacity != capacity {
self.capacity = capacity;
if let Err(err) = self.recv.release_capacity().release_capacity(capacity) {
self.payload.set_error(PayloadError::Http2(err))
}
}
} }
} }
} }