From 4af115a19c63dbcfa7b538e96d9907497e742502 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 11 Mar 2018 16:37:44 -0700 Subject: [PATCH] Fix steraming response handling for http/2 --- CHANGES.md | 5 ++- src/server/h2.rs | 38 +++++++++++++----- src/server/h2writer.rs | 91 +++++++++++++++++------------------------- 3 files changed, 70 insertions(+), 64 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 24f01b75c..75a29d5d3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,11 +1,14 @@ # Changes -## 0.4.7 (2018-03-xx) +## 0.4.7 (2018-03-11) * Fix panic on unknown content encoding * Fix connection get closed too early +* Fix steraming response handling for http/2 + + ## 0.4.6 (2018-03-10) * Fix client cookie handling diff --git a/src/server/h2.rs b/src/server/h2.rs index 02951593e..6cc682a11 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -26,7 +26,7 @@ use payload::{Payload, PayloadWriter, PayloadStatus}; use super::h2writer::H2Writer; use super::encoding::PayloadType; use super::settings::WorkerSettings; -use super::{HttpHandler, HttpHandlerTask}; +use super::{HttpHandler, HttpHandlerTask, Writer}; bitflags! { struct Flags: u8 { @@ -109,22 +109,27 @@ impl Http2 loop { match item.task.poll_io(&mut item.stream) { Ok(Async::Ready(ready)) => { - item.flags.insert(EntryFlags::EOF); if ready { - item.flags.insert(EntryFlags::FINISHED); + item.flags.insert( + EntryFlags::EOF | EntryFlags::FINISHED); + } else { + item.flags.insert(EntryFlags::EOF); } not_ready = false; }, Ok(Async::NotReady) => { - if item.payload.need_read() == PayloadStatus::Read && !retry + if item.payload.need_read() == PayloadStatus::Read + && !retry { continue } }, Err(err) => { error!("Unhandled error: {}", err); - item.flags.insert(EntryFlags::EOF); - item.flags.insert(EntryFlags::ERROR); + item.flags.insert( + EntryFlags::EOF | + EntryFlags::ERROR | + EntryFlags::WRITE_DONE); item.stream.reset(Reason::INTERNAL_ERROR); } } @@ -138,18 +143,32 @@ impl Http2 item.flags.insert(EntryFlags::FINISHED); }, Err(err) => { - item.flags.insert(EntryFlags::ERROR); - item.flags.insert(EntryFlags::FINISHED); + item.flags.insert( + EntryFlags::ERROR | EntryFlags::WRITE_DONE | + EntryFlags::FINISHED); error!("Unhandled error: {}", err); } } } + + if !item.flags.contains(EntryFlags::WRITE_DONE) { + match item.stream.poll_completed(false) { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + not_ready = false; + item.flags.insert(EntryFlags::WRITE_DONE); + } + Err(_err) => { + item.flags.insert(EntryFlags::ERROR); + } + } + } } // cleanup finished tasks while !self.tasks.is_empty() { if self.tasks[0].flags.contains(EntryFlags::EOF) && - self.tasks[0].flags.contains(EntryFlags::FINISHED) || + self.tasks[0].flags.contains(EntryFlags::WRITE_DONE) || self.tasks[0].flags.contains(EntryFlags::ERROR) { self.tasks.pop_front(); @@ -251,6 +270,7 @@ bitflags! { const REOF = 0b0000_0010; const ERROR = 0b0000_0100; const FINISHED = 0b0000_1000; + const WRITE_DONE = 0b0001_0000; } } diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index d57d92db5..7fccd4ecf 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -24,6 +24,7 @@ bitflags! { const STARTED = 0b0000_0001; const DISCONNECTED = 0b0000_0010; const EOF = 0b0000_0100; + const RESERVED = 0b0000_1000; } } @@ -56,55 +57,6 @@ impl H2Writer { stream.send_reset(reason) } } - - fn write_to_stream(&mut self) -> io::Result { - if !self.flags.contains(Flags::STARTED) { - return Ok(WriterState::Done) - } - - if let Some(ref mut stream) = self.stream { - if self.buffer.is_empty() { - if self.flags.contains(Flags::EOF) { - let _ = stream.send_data(Bytes::new(), true); - } - return Ok(WriterState::Done) - } - - loop { - match stream.poll_capacity() { - Ok(Async::NotReady) => { - if self.buffer.len() > self.buffer_capacity { - return Ok(WriterState::Pause) - } else { - return Ok(WriterState::Done) - } - } - Ok(Async::Ready(None)) => { - return Ok(WriterState::Done) - } - Ok(Async::Ready(Some(cap))) => { - let len = self.buffer.len(); - let bytes = self.buffer.split_to(cmp::min(cap, len)); - let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF); - self.written += bytes.len() as u64; - - if let Err(err) = stream.send_data(bytes.freeze(), eof) { - return Err(io::Error::new(io::ErrorKind::Other, err)) - } else if !self.buffer.is_empty() { - let cap = cmp::min(self.buffer.len(), CHUNK_SIZE); - stream.reserve_capacity(cap); - } else { - return Ok(WriterState::Pause) - } - } - Err(_) => { - return Err(io::Error::new(io::ErrorKind::Other, "")) - } - } - } - } - Ok(WriterState::Done) - } } impl Writer for H2Writer { @@ -172,6 +124,7 @@ impl Writer for H2Writer { self.written = bytes.len() as u64; self.encoder.write(bytes)?; if let Some(ref mut stream) = self.stream { + self.flags.insert(Flags::RESERVED); stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); } Ok(WriterState::Pause) @@ -195,7 +148,7 @@ impl Writer for H2Writer { } } - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.len() > self.buffer_capacity { Ok(WriterState::Pause) } else { Ok(WriterState::Done) @@ -217,10 +170,40 @@ impl Writer for H2Writer { } fn poll_completed(&mut self, _shutdown: bool) -> Poll<(), io::Error> { - match self.write_to_stream() { - Ok(WriterState::Done) => Ok(Async::Ready(())), - Ok(WriterState::Pause) => Ok(Async::NotReady), - Err(err) => Err(err) + if !self.flags.contains(Flags::STARTED) { + return Ok(Async::NotReady); } + + if let Some(ref mut stream) = self.stream { + // reserve capacity + if !self.flags.contains(Flags::RESERVED) && !self.buffer.is_empty() { + self.flags.insert(Flags::RESERVED); + stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); + } + + loop { + match stream.poll_capacity() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(cap))) => { + let len = self.buffer.len(); + let bytes = self.buffer.split_to(cmp::min(cap, len)); + let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF); + self.written += bytes.len() as u64; + + if let Err(e) = stream.send_data(bytes.freeze(), eof) { + return Err(io::Error::new(io::ErrorKind::Other, e)) + } else if !self.buffer.is_empty() { + let cap = cmp::min(self.buffer.len(), CHUNK_SIZE); + stream.reserve_capacity(cap); + } else { + return Ok(Async::NotReady) + } + } + Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)), + } + } + } + return Ok(Async::NotReady) } }