diff --git a/src/payload.rs b/src/payload.rs index 401832b89..4fb80b0bc 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -5,12 +5,9 @@ use std::cell::RefCell; use std::collections::VecDeque; use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; -use futures::task::{Task, current as current_task}; use error::PayloadError; -pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k - /// Buffered stream of bytes chunks /// /// Payload stores chunks in a vector. First chunk can be received with `.readany()` method. @@ -68,18 +65,6 @@ impl Payload { self.inner.borrow_mut().unread_data(data); } - /// Get size of payload buffer - #[inline] - pub fn buffer_size(&self) -> usize { - self.inner.borrow().buffer_size() - } - - /// Set size of payload buffer - #[inline] - pub fn set_buffer_size(&self, size: usize) { - self.inner.borrow_mut().set_buffer_size(size) - } - #[cfg(test)] pub(crate) fn readall(&self) -> Option { self.inner.borrow_mut().readall() @@ -92,7 +77,7 @@ impl Stream for Payload { #[inline] fn poll(&mut self) -> Poll, PayloadError> { - self.inner.borrow_mut().readany(false) + self.inner.borrow_mut().readany() } } @@ -103,7 +88,7 @@ impl Clone for Payload { } /// Payload writer interface. -pub trait PayloadWriter { +pub(crate) trait PayloadWriter { /// Set stream error. fn set_error(&mut self, err: PayloadError); @@ -114,8 +99,8 @@ pub trait PayloadWriter { /// Feed bytes into a payload stream fn feed_data(&mut self, data: Bytes); - /// Get estimated available capacity - fn capacity(&self) -> usize; + /// Need read data + fn need_read(&self) -> bool; } /// Sender part of the payload stream @@ -144,24 +129,22 @@ impl PayloadWriter for PayloadSender { } #[inline] - fn capacity(&self) -> usize { + fn need_read(&self) -> bool { if let Some(shared) = self.inner.upgrade() { - shared.borrow().capacity() + shared.borrow().need_read } else { - 0 + false } } } - #[derive(Debug)] struct Inner { len: usize, eof: bool, err: Option, - task: Option, + need_read: bool, items: VecDeque, - buf_size: usize, } impl Inner { @@ -171,32 +154,23 @@ impl Inner { eof, len: 0, err: None, - task: None, items: VecDeque::new(), - buf_size: DEFAULT_BUFFER_SIZE, + need_read: false, } } fn set_error(&mut self, err: PayloadError) { self.err = Some(err); - if let Some(task) = self.task.take() { - task.notify() - } } fn feed_eof(&mut self) { self.eof = true; - if let Some(task) = self.task.take() { - task.notify() - } } fn feed_data(&mut self, data: Bytes) { self.len += data.len(); + self.need_read = false; self.items.push_back(data); - if let Some(task) = self.task.take() { - task.notify() - } } fn eof(&self) -> bool { @@ -219,11 +193,12 @@ impl Inner { self.len = 0; Some(buf.take().freeze()) } else { + self.need_read = true; None } } - fn readany(&mut self, notify: bool) -> Poll, PayloadError> { + fn readany(&mut self) -> Poll, PayloadError> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); Ok(Async::Ready(Some(data))) @@ -232,9 +207,7 @@ impl Inner { } else if self.eof { Ok(Async::Ready(None)) } else { - if notify { - self.task = Some(current_task()); - } + self.need_read = true; Ok(Async::NotReady) } } @@ -243,23 +216,6 @@ impl Inner { self.len += data.len(); self.items.push_front(data); } - - #[inline] - fn capacity(&self) -> usize { - if self.len > self.buf_size { - 0 - } else { - self.buf_size - self.len - } - } - - fn buffer_size(&self) -> usize { - self.buf_size - } - - fn set_buffer_size(&mut self, size: usize) { - self.buf_size = size - } } pub struct PayloadHelper { diff --git a/src/server/encoding.rs b/src/server/encoding.rs index d2b2db939..c666b7232 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -120,10 +120,10 @@ impl PayloadWriter for PayloadType { } #[inline] - fn capacity(&self) -> usize { + fn need_read(&self) -> bool { match *self { - PayloadType::Sender(ref sender) => sender.capacity(), - PayloadType::Encoding(ref enc) => enc.capacity(), + PayloadType::Sender(ref sender) => sender.need_read(), + PayloadType::Encoding(ref enc) => enc.need_read(), } } } @@ -351,8 +351,9 @@ impl PayloadWriter for EncodedPayload { } } - fn capacity(&self) -> usize { - self.inner.capacity() + #[inline] + fn need_read(&self) -> bool { + self.inner.need_read() } } diff --git a/src/server/h1.rs b/src/server/h1.rs index 527eec671..cb24e6d0f 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -88,18 +88,6 @@ impl Http1 self.stream.get_mut() } - fn poll_completed(&mut self, shutdown: bool) -> Result { - // check stream state - match self.stream.poll_completed(shutdown) { - Ok(Async::Ready(_)) => Ok(true), - Ok(Async::NotReady) => Ok(false), - Err(err) => { - debug!("Error sending data: {}", err); - Err(()) - } - } - } - pub fn poll(&mut self) -> Poll<(), ()> { // keep-alive timer if let Some(ref mut timer) = self.keepalive_timer { @@ -113,11 +101,29 @@ impl Http1 } } - self.poll_io() + loop { + match self.poll_io()? { + Async::Ready(true) => (), + Async::Ready(false) => return Ok(Async::Ready(())), + Async::NotReady => return Ok(Async::NotReady), + } + } + } + + fn poll_completed(&mut self, shutdown: bool) -> Result { + // check stream state + match self.stream.poll_completed(shutdown) { + Ok(Async::Ready(_)) => Ok(true), + Ok(Async::NotReady) => Ok(false), + Err(err) => { + debug!("Error sending data: {}", err); + Err(()) + } + } } // TODO: refactor - pub fn poll_io(&mut self) -> Poll<(), ()> { + pub fn poll_io(&mut self) -> Poll { // read incoming data let need_read = if !self.flags.contains(Flags::ERROR) && self.tasks.len() < MAX_PIPELINED_MESSAGES @@ -135,9 +141,9 @@ impl Http1 // start request processing for h in self.settings.handlers().iter_mut() { req = match h.handle(req) { - Ok(t) => { + Ok(pipe) => { self.tasks.push_back( - Entry {pipe: t, flags: EntryFlags::empty()}); + Entry {pipe, flags: EntryFlags::empty()}); continue 'outer }, Err(req) => req, @@ -150,13 +156,6 @@ impl Http1 continue }, Ok(Async::NotReady) => (), - Err(ReaderError::Disconnect) => { - self.flags.insert(Flags::ERROR); - self.stream.disconnected(); - for entry in &mut self.tasks { - entry.pipe.disconnected() - } - }, Err(err) => { // notify all tasks self.stream.disconnected(); @@ -171,12 +170,16 @@ impl Http1 // on parse error, stop reading stream but tasks need to be completed self.flags.insert(Flags::ERROR); - if self.tasks.is_empty() { - if let ReaderError::Error(err) = err { - self.tasks.push_back( - Entry {pipe: Pipeline::error(err.error_response()), - flags: EntryFlags::empty()}); - } + match err { + ReaderError::Disconnect => (), + _ => + if self.tasks.is_empty() { + if let ReaderError::Error(err) = err { + self.tasks.push_back( + Entry {pipe: Pipeline::error(err.error_response()), + flags: EntryFlags::empty()}); + } + } } }, } @@ -187,6 +190,8 @@ impl Http1 true }; + let retry = self.reader.need_read(); + loop { // check in-flight messages let mut io = false; @@ -221,7 +226,12 @@ impl Http1 } }, // no more IO for this iteration - Ok(Async::NotReady) => io = true, + Ok(Async::NotReady) => { + if self.reader.need_read() && !retry { + return Ok(Async::Ready(true)); + } + io = true; + } Err(err) => { // it is not possible to recover from error // during pipe handling, so just drop connection @@ -268,14 +278,14 @@ impl Http1 if !self.poll_completed(true)? { return Ok(Async::NotReady) } - return Ok(Async::Ready(())) + return Ok(Async::Ready(false)) } // start keep-alive timer, this also is slow request timeout if self.tasks.is_empty() { // check stream state if self.flags.contains(Flags::ERROR) { - return Ok(Async::Ready(())) + return Ok(Async::Ready(false)) } if self.settings.keep_alive_enabled() { @@ -295,7 +305,7 @@ impl Http1 return Ok(Async::NotReady) } // keep-alive is disabled, drop connection - return Ok(Async::Ready(())) + return Ok(Async::Ready(false)) } } else if !self.poll_completed(false)? || self.flags.contains(Flags::KEEPALIVE) { @@ -303,7 +313,7 @@ impl Http1 // if keep-alive unset, rely on operating system return Ok(Async::NotReady) } else { - return Ok(Async::Ready(())) + return Ok(Async::Ready(false)) } } else { self.poll_completed(false)?; @@ -341,14 +351,27 @@ impl Reader { } } + #[inline] + fn need_read(&self) -> bool { + if let Some(ref info) = self.payload { + info.tx.need_read() + } else { + true + } + } + #[inline] fn decode(&mut self, buf: &mut BytesMut, payload: &mut PayloadInfo) -> Result { - loop { + while !buf.is_empty() { match payload.decoder.decode(buf) { Ok(Async::Ready(Some(bytes))) => { - payload.tx.feed_data(bytes) + payload.tx.feed_data(bytes); + if payload.decoder.is_eof() { + payload.tx.feed_eof(); + return Ok(Decoding::Ready) + } }, Ok(Async::Ready(None)) => { payload.tx.feed_eof(); @@ -361,6 +384,7 @@ impl Reader { } } } + Ok(Decoding::NotReady) } pub fn parse(&mut self, io: &mut T, @@ -368,12 +392,13 @@ impl Reader { settings: &WorkerSettings) -> Poll where T: IoStream { + if !self.need_read() { + return Ok(Async::NotReady) + } + // read payload let done = { if let Some(ref mut payload) = self.payload { - if payload.tx.capacity() == 0 { - return Ok(Async::NotReady) - } match utils::read_from_io(io, buf) { Ok(Async::Ready(0)) => { payload.tx.set_error(PayloadError::Incomplete); @@ -392,7 +417,11 @@ impl Reader { loop { match payload.decoder.decode(buf) { Ok(Async::Ready(Some(bytes))) => { - payload.tx.feed_data(bytes) + payload.tx.feed_data(bytes); + if payload.decoder.is_eof() { + payload.tx.feed_eof(); + break true + } }, Ok(Async::Ready(None)) => { payload.tx.feed_eof(); @@ -628,6 +657,13 @@ enum ChunkedState { } impl Decoder { + pub fn is_eof(&self) -> bool { + match self.kind { + Kind::Length(0) | Kind::Chunked(ChunkedState::End, _) | Kind::Eof(true) => true, + _ => false, + } + } + pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { match self.kind { Kind::Length(ref mut remaining) => { @@ -819,7 +855,7 @@ mod tests { use std::{io, cmp, time}; use std::net::Shutdown; use bytes::{Bytes, BytesMut, Buf}; - use futures::Async; + use futures::{Async, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use http::{Version, Method}; @@ -1324,6 +1360,7 @@ mod tests { assert!(!req.payload().eof()); buf.feed_data("4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); + let _ = req.payload_mut().poll(); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(!req.payload().eof()); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); @@ -1348,6 +1385,7 @@ mod tests { "4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\ POST /test2 HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n"); + let _ = req.payload_mut().poll(); let req2 = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert_eq!(*req2.method(), Method::POST); @@ -1391,10 +1429,14 @@ mod tests { //buf.feed_data("test: test\r\n"); //not_ready!(reader.parse(&mut buf, &mut readbuf)); + let _ = req.payload_mut().poll(); + not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert!(!req.payload().eof()); buf.feed_data("\r\n"); + let _ = req.payload_mut().poll(); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(req.payload().eof()); } @@ -1413,6 +1455,7 @@ mod tests { assert!(!req.payload().eof()); buf.feed_data("4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") + let _ = req.payload_mut().poll(); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(!req.payload().eof()); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); diff --git a/src/server/h2.rs b/src/server/h2.rs index 0a3875250..5dfcb57ad 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -34,7 +34,8 @@ bitflags! { } /// HTTP/2 Transport -pub(crate) struct Http2 +pub(crate) +struct Http2 where T: AsyncRead + AsyncWrite + 'static, H: 'static { flags: Flags, @@ -103,21 +104,29 @@ impl Http2 item.poll_payload(); if !item.flags.contains(EntryFlags::EOF) { - match item.task.poll_io(&mut item.stream) { - Ok(Async::Ready(ready)) => { - item.flags.insert(EntryFlags::EOF); - if ready { - item.flags.insert(EntryFlags::FINISHED); + let retry = item.payload.need_read(); + loop { + match item.task.poll_io(&mut item.stream) { + Ok(Async::Ready(ready)) => { + item.flags.insert(EntryFlags::EOF); + if ready { + item.flags.insert(EntryFlags::FINISHED); + } + not_ready = false; + }, + Ok(Async::NotReady) => { + if item.payload.need_read() && !retry { + continue + } + }, + Err(err) => { + error!("Unhandled error: {}", err); + item.flags.insert(EntryFlags::EOF); + item.flags.insert(EntryFlags::ERROR); + item.stream.reset(Reason::INTERNAL_ERROR); } - not_ready = false; - }, - Ok(Async::NotReady) => (), - Err(err) => { - error!("Unhandled error: {}", err); - item.flags.insert(EntryFlags::EOF); - item.flags.insert(EntryFlags::ERROR); - item.stream.reset(Reason::INTERNAL_ERROR); } + break } } else if !item.flags.contains(EntryFlags::FINISHED) { match item.task.poll() { @@ -248,7 +257,6 @@ struct Entry { payload: PayloadType, recv: RecvStream, stream: H2Writer, - capacity: usize, flags: EntryFlags, } @@ -292,13 +300,20 @@ impl Entry { payload: psender, stream: H2Writer::new(resp, settings.get_shared_bytes()), flags: EntryFlags::empty(), - capacity: 0, recv, } } fn poll_payload(&mut self) { if !self.flags.contains(EntryFlags::REOF) { + if self.payload.need_read() { + if let Err(err) = self.recv.release_capacity().release_capacity(32_768) { + self.payload.set_error(PayloadError::Http2(err)) + } + } else if let Err(err) = self.recv.release_capacity().release_capacity(0) { + self.payload.set_error(PayloadError::Http2(err)) + } + match self.recv.poll() { Ok(Async::Ready(Some(chunk))) => { self.payload.feed_data(chunk); @@ -311,14 +326,6 @@ impl Entry { self.payload.set_error(PayloadError::Http2(err)) } } - - let capacity = self.payload.capacity(); - if self.capacity != capacity { - self.capacity = capacity; - if let Err(err) = self.recv.release_capacity().release_capacity(capacity) { - self.payload.set_error(PayloadError::Http2(err)) - } - } } } }