1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

fix socket polling

This commit is contained in:
Nikolay Kim 2018-03-03 12:17:26 -08:00
parent 058630d041
commit 2ccbd5fa18
2 changed files with 55 additions and 58 deletions

View File

@ -2,7 +2,7 @@ use std::mem;
use httparse; use httparse;
use http::{Version, HttpTryFrom, HeaderMap, StatusCode}; use http::{Version, HttpTryFrom, HeaderMap, StatusCode};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{Bytes, BytesMut};
use futures::{Poll, Async}; use futures::{Poll, Async};
use error::{ParseError, PayloadError}; use error::{ParseError, PayloadError};
@ -37,7 +37,7 @@ impl HttpResponseParser {
where T: IoStream where T: IoStream
{ {
// if buf is empty parse_message will always return NotReady, let's avoid that // if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() { if buf.is_empty() {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => Ok(Async::Ready(0)) =>
return Err(HttpResponseParserError::Disconnect), return Err(HttpResponseParserError::Disconnect),
@ -47,13 +47,12 @@ impl HttpResponseParser {
Err(err) => Err(err) =>
return Err(HttpResponseParserError::Error(err.into())) return Err(HttpResponseParserError::Error(err.into()))
} }
false }
} else {
true
};
loop { loop {
match HttpResponseParser::parse_message(buf).map_err(HttpResponseParserError::Error)? { match HttpResponseParser::parse_message(buf)
.map_err(HttpResponseParserError::Error)?
{
Async::Ready((msg, decoder)) => { Async::Ready((msg, decoder)) => {
self.decoder = decoder; self.decoder = decoder;
return Ok(Async::Ready(msg)); return Ok(Async::Ready(msg));
@ -62,17 +61,13 @@ impl HttpResponseParser {
if buf.capacity() >= MAX_BUFFER_SIZE { if buf.capacity() >= MAX_BUFFER_SIZE {
return Err(HttpResponseParserError::Error(ParseError::TooLarge)); return Err(HttpResponseParserError::Error(ParseError::TooLarge));
} }
if read || buf.remaining_mut() == 0 { match utils::read_from_io(io, buf) {
match utils::read_from_io(io, buf) { Ok(Async::Ready(0)) =>
Ok(Async::Ready(0)) => return Err(HttpResponseParserError::Disconnect),
return Err(HttpResponseParserError::Disconnect), Ok(Async::Ready(_)) => (),
Ok(Async::Ready(_)) => (), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) =>
Err(err) => return Err(HttpResponseParserError::Error(err.into())),
return Err(HttpResponseParserError::Error(err.into())),
}
} else {
return Ok(Async::NotReady)
} }
}, },
} }
@ -84,25 +79,34 @@ impl HttpResponseParser {
where T: IoStream where T: IoStream
{ {
if self.decoder.is_some() { if self.decoder.is_some() {
// read payload loop {
match utils::read_from_io(io, buf) { // read payload
Ok(Async::Ready(0)) => { let not_ready = match utils::read_from_io(io, buf) {
if buf.is_empty() { Ok(Async::Ready(0)) => {
return Err(PayloadError::Incomplete) if buf.is_empty() {
return Err(PayloadError::Incomplete)
}
true
} }
} Err(err) => return Err(err.into()),
Err(err) => return Err(err.into()), Ok(Async::NotReady) => true,
_ => (), _ => false,
} };
match self.decoder.as_mut().unwrap().decode(buf) { match self.decoder.as_mut().unwrap().decode(buf) {
Ok(Async::Ready(Some(b))) => Ok(Async::Ready(Some(b))), Ok(Async::Ready(Some(b))) =>
Ok(Async::Ready(None)) => { return Ok(Async::Ready(Some(b))),
self.decoder.take(); Ok(Async::Ready(None)) => {
Ok(Async::Ready(None)) self.decoder.take();
return Ok(Async::Ready(None))
}
Ok(Async::NotReady) => {
if not_ready {
return Ok(Async::NotReady)
}
}
Err(err) => return Err(err.into()),
} }
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
} }
} else { } else {
Ok(Async::Ready(None)) Ok(Async::Ready(None))

View File

@ -10,7 +10,7 @@ use actix::Arbiter;
use httparse; use httparse;
use http::{Uri, Method, Version, HttpTryFrom, HeaderMap}; use http::{Uri, Method, Version, HttpTryFrom, HeaderMap};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{Bytes, BytesMut};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use tokio_core::reactor::Timeout; use tokio_core::reactor::Timeout;
@ -403,22 +403,22 @@ impl Reader {
let done = { let done = {
if let Some(ref mut payload) = self.payload { if let Some(ref mut payload) = self.payload {
'buf: loop { 'buf: loop {
match utils::read_from_io(io, buf) { let not_ready = match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
payload.tx.set_error(PayloadError::Incomplete); payload.tx.set_error(PayloadError::Incomplete);
// http channel should not deal with payload errors // http channel should not deal with payload errors
return Err(ReaderError::Payload) return Err(ReaderError::Payload)
}, },
Ok(Async::NotReady) => true,
Err(err) => { Err(err) => {
payload.tx.set_error(err.into()); payload.tx.set_error(err.into());
// http channel should not deal with payload errors // http channel should not deal with payload errors
return Err(ReaderError::Payload) return Err(ReaderError::Payload)
} }
_ => (), _ => false,
} };
let is_full = buf.remaining_mut() == 0;
loop { loop {
match payload.decoder.decode(buf) { match payload.decoder.decode(buf) {
Ok(Async::Ready(Some(bytes))) => { Ok(Async::Ready(Some(bytes))) => {
@ -435,10 +435,10 @@ impl Reader {
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// if buffer is full then // if buffer is full then
// socket still can contain more data // socket still can contain more data
if is_full { if not_ready {
continue 'buf return Ok(Async::NotReady)
} }
return Ok(Async::NotReady) continue 'buf
}, },
Err(err) => { Err(err) => {
payload.tx.set_error(err.into()); payload.tx.set_error(err.into());
@ -454,16 +454,13 @@ impl Reader {
if done { self.payload = None } if done { self.payload = None }
// if buf is empty parse_message will always return NotReady, let's avoid that // if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() { if buf.is_empty() {
match utils::read_from_io(io, buf) { match utils::read_from_io(io, buf) {
Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect), Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect),
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(ReaderError::Error(err.into())) Err(err) => return Err(ReaderError::Error(err.into()))
} }
false
} else {
true
}; };
loop { loop {
@ -483,18 +480,14 @@ impl Reader {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ReaderError::Error(ParseError::TooLarge)); return Err(ReaderError::Error(ParseError::TooLarge));
} }
if read || buf.remaining_mut() == 0 { match utils::read_from_io(io, buf) {
match utils::read_from_io(io, buf) { Ok(Async::Ready(0)) => {
Ok(Async::Ready(0)) => { debug!("Ignored premature client disconnection");
debug!("Ignored premature client disconnection"); return Err(ReaderError::Disconnect);
return Err(ReaderError::Disconnect); },
}, Ok(Async::Ready(_)) => (),
Ok(Async::Ready(_)) => (), Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => return Err(ReaderError::Error(err.into())),
Err(err) => return Err(ReaderError::Error(err.into())),
}
} else {
return Ok(Async::NotReady)
} }
}, },
} }