From 2ccbd5fa181dc15ead0adb843e6354f11e097a61 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 3 Mar 2018 12:17:26 -0800 Subject: [PATCH] fix socket polling --- src/client/parser.rs | 72 +++++++++++++++++++++++--------------------- src/server/h1.rs | 41 +++++++++++-------------- 2 files changed, 55 insertions(+), 58 deletions(-) diff --git a/src/client/parser.rs b/src/client/parser.rs index 22b8f78aa..8fe399009 100644 --- a/src/client/parser.rs +++ b/src/client/parser.rs @@ -2,7 +2,7 @@ use std::mem; use httparse; use http::{Version, HttpTryFrom, HeaderMap, StatusCode}; use http::header::{self, HeaderName, HeaderValue}; -use bytes::{Bytes, BytesMut, BufMut}; +use bytes::{Bytes, BytesMut}; use futures::{Poll, Async}; use error::{ParseError, PayloadError}; @@ -37,7 +37,7 @@ impl HttpResponseParser { where T: IoStream { // if buf is empty parse_message will always return NotReady, let's avoid that - let read = if buf.is_empty() { + if buf.is_empty() { match utils::read_from_io(io, buf) { Ok(Async::Ready(0)) => return Err(HttpResponseParserError::Disconnect), @@ -47,13 +47,12 @@ impl HttpResponseParser { Err(err) => return Err(HttpResponseParserError::Error(err.into())) } - false - } else { - true - }; + } loop { - match HttpResponseParser::parse_message(buf).map_err(HttpResponseParserError::Error)? { + match HttpResponseParser::parse_message(buf) + .map_err(HttpResponseParserError::Error)? + { Async::Ready((msg, decoder)) => { self.decoder = decoder; return Ok(Async::Ready(msg)); @@ -62,17 +61,13 @@ impl HttpResponseParser { if buf.capacity() >= MAX_BUFFER_SIZE { return Err(HttpResponseParserError::Error(ParseError::TooLarge)); } - if read || buf.remaining_mut() == 0 { - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => - return Err(HttpResponseParserError::Disconnect), - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => - return Err(HttpResponseParserError::Error(err.into())), - } - } else { - return Ok(Async::NotReady) + match utils::read_from_io(io, buf) { + Ok(Async::Ready(0)) => + return Err(HttpResponseParserError::Disconnect), + Ok(Async::Ready(_)) => (), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(err) => + return Err(HttpResponseParserError::Error(err.into())), } }, } @@ -84,25 +79,34 @@ impl HttpResponseParser { where T: IoStream { if self.decoder.is_some() { - // read payload - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - if buf.is_empty() { - return Err(PayloadError::Incomplete) + loop { + // read payload + let not_ready = match utils::read_from_io(io, buf) { + Ok(Async::Ready(0)) => { + if buf.is_empty() { + return Err(PayloadError::Incomplete) + } + true } - } - Err(err) => return Err(err.into()), - _ => (), - } + Err(err) => return Err(err.into()), + Ok(Async::NotReady) => true, + _ => false, + }; - match self.decoder.as_mut().unwrap().decode(buf) { - Ok(Async::Ready(Some(b))) => Ok(Async::Ready(Some(b))), - Ok(Async::Ready(None)) => { - self.decoder.take(); - Ok(Async::Ready(None)) + match self.decoder.as_mut().unwrap().decode(buf) { + Ok(Async::Ready(Some(b))) => + return Ok(Async::Ready(Some(b))), + Ok(Async::Ready(None)) => { + self.decoder.take(); + return Ok(Async::Ready(None)) + } + Ok(Async::NotReady) => { + if not_ready { + return Ok(Async::NotReady) + } + } + Err(err) => return Err(err.into()), } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err.into()), } } else { Ok(Async::Ready(None)) diff --git a/src/server/h1.rs b/src/server/h1.rs index d21aa4f48..a55ac2799 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -10,7 +10,7 @@ use actix::Arbiter; use httparse; use http::{Uri, Method, Version, HttpTryFrom, HeaderMap}; use http::header::{self, HeaderName, HeaderValue}; -use bytes::{Bytes, BytesMut, BufMut}; +use bytes::{Bytes, BytesMut}; use futures::{Future, Poll, Async}; use tokio_core::reactor::Timeout; @@ -403,22 +403,22 @@ impl Reader { let done = { if let Some(ref mut payload) = self.payload { 'buf: loop { - match utils::read_from_io(io, buf) { + let not_ready = match utils::read_from_io(io, buf) { Ok(Async::Ready(0)) => { payload.tx.set_error(PayloadError::Incomplete); // http channel should not deal with payload errors return Err(ReaderError::Payload) }, + Ok(Async::NotReady) => true, Err(err) => { payload.tx.set_error(err.into()); // http channel should not deal with payload errors return Err(ReaderError::Payload) } - _ => (), - } - let is_full = buf.remaining_mut() == 0; + _ => false, + }; loop { match payload.decoder.decode(buf) { Ok(Async::Ready(Some(bytes))) => { @@ -435,10 +435,10 @@ impl Reader { Ok(Async::NotReady) => { // if buffer is full then // socket still can contain more data - if is_full { - continue 'buf + if not_ready { + return Ok(Async::NotReady) } - return Ok(Async::NotReady) + continue 'buf }, Err(err) => { payload.tx.set_error(err.into()); @@ -454,16 +454,13 @@ impl Reader { if done { self.payload = None } // if buf is empty parse_message will always return NotReady, let's avoid that - let read = if buf.is_empty() { + if buf.is_empty() { match utils::read_from_io(io, buf) { Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect), Ok(Async::Ready(_)) => (), Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => return Err(ReaderError::Error(err.into())) } - false - } else { - true }; loop { @@ -483,18 +480,14 @@ impl Reader { error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); return Err(ReaderError::Error(ParseError::TooLarge)); } - if read || buf.remaining_mut() == 0 { - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - debug!("Ignored premature client disconnection"); - return Err(ReaderError::Disconnect); - }, - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => return Err(ReaderError::Error(err.into())), - } - } else { - return Ok(Async::NotReady) + match utils::read_from_io(io, buf) { + Ok(Async::Ready(0)) => { + debug!("Ignored premature client disconnection"); + return Err(ReaderError::Disconnect); + }, + Ok(Async::Ready(_)) => (), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(err) => return Err(ReaderError::Error(err.into())), } }, }