1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

cleanup h1 parse

This commit is contained in:
Nikolay Kim 2017-12-15 13:10:12 -08:00
parent 71c37bde5a
commit c3d5e4301a
2 changed files with 78 additions and 90 deletions

View File

@ -54,7 +54,7 @@ Each result is best of five runs. All measurements are req/sec.
Name | 1 thread | 1 pipeline | 3 thread | 3 pipeline | 8 thread | 8 pipeline Name | 1 thread | 1 pipeline | 3 thread | 3 pipeline | 8 thread | 8 pipeline
---- | -------- | ---------- | -------- | ---------- | -------- | ---------- ---- | -------- | ---------- | -------- | ---------- | -------- | ----------
Actix | 87.200 | 813.200 | 122.100 | 1.877.000 | 107.400 | 2.390.000 Actix | 89.100 | 815.200 | 122.100 | 1.877.000 | 107.400 | 2.350.000
Gotham | 61.000 | 178.000 | | | | Gotham | 61.000 | 178.000 | | | |
Iron | | | | | 94.500 | 78.000 Iron | | | | | 94.500 | 78.000
Rocket | | | | | 95.500 | failed Rocket | | | | | 95.500 | failed
@ -65,7 +65,7 @@ Some notes on results. Iron and Rocket got tested with 8 threads,
which showed best results. Gothan and tokio-minihttp seem does not support which showed best results. Gothan and tokio-minihttp seem does not support
multithreading, or at least i couldn't figured out. I manually enabled pipelining multithreading, or at least i couldn't figured out. I manually enabled pipelining
for *Shio* and Gotham*. While shio seems support multithreading, but it showed for *Shio* and Gotham*. While shio seems support multithreading, but it showed
absolutly same results for any how number of threads (maybe macos?) absolutly same results for any how number of threads (maybe macos problem?)
Rocket completely failed in pipelined tests. Rocket completely failed in pipelined tests.
## Examples ## Examples

164
src/h1.rs
View File

@ -215,10 +215,10 @@ impl<T, H> Http1<T, H>
// read incoming data // read incoming data
while !self.flags.contains(Flags::ERROR) && !self.flags.contains(Flags::H2) && while !self.flags.contains(Flags::ERROR) && !self.flags.contains(Flags::H2) &&
self.tasks.len() < MAX_PIPELINED_MESSAGES { self.tasks.len() < MAX_PIPELINED_MESSAGES
match self.reader.parse(self.stream.get_mut(), {
&mut self.read_buf, &self.settings) match self.reader.parse(self.stream.get_mut(),
{ &mut self.read_buf, &self.settings) {
Ok(Async::Ready(Item::Http1(mut req))) => { Ok(Async::Ready(Item::Http1(mut req))) => {
not_ready = false; not_ready = false;
@ -405,77 +405,58 @@ impl Reader {
settings: &WorkerSettings<H>) -> Poll<Item, ReaderError> settings: &WorkerSettings<H>) -> Poll<Item, ReaderError>
where T: AsyncRead where T: AsyncRead
{ {
loop { // read payload
match self.decode(buf)? { if self.payload.is_some() {
Decoding::Paused => return Ok(Async::NotReady), match self.read_from_io(io, buf) {
Decoding::Ready => { Ok(Async::Ready(0)) => {
self.payload = None; if let Some(ref mut payload) = self.payload {
break payload.tx.set_error(PayloadError::Incomplete);
},
Decoding::NotReady => {
match self.read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(PayloadError::Incomplete);
}
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
}
Ok(Async::Ready(_)) => {
continue
}
Ok(Async::NotReady) => break,
Err(err) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(err.into());
}
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
}
} }
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
},
Err(err) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(err.into());
}
// http channel should not deal with payload errors
return Err(ReaderError::Payload)
} }
_ => (),
}
match self.decode(buf)? {
Decoding::Ready => self.payload = None,
Decoding::Paused | Decoding::NotReady => return Ok(Async::NotReady),
} }
} }
// if buf is empty parse_message will always return NotReady, let's avoid that
let read = if buf.is_empty() {
match self.read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
debug!("Ignored premature client disconnection");
return Err(ReaderError::Disconnect);
},
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Err(err) =>
return Err(ReaderError::Error(err.into()))
}
false
} else {
true
};
loop { loop {
match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? { match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? {
Message::Http1(msg, decoder) => { Message::Http1(msg, decoder) => {
// process payload
if let Some(payload) = decoder { if let Some(payload) = decoder {
self.payload = Some(payload); self.payload = Some(payload);
match self.decode(buf)? {
loop { Decoding::Paused | Decoding::NotReady => (),
match self.decode(buf)? { Decoding::Ready => self.payload = None,
Decoding::Paused =>
break,
Decoding::Ready => {
self.payload = None;
break
},
Decoding::NotReady => {
match self.read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
trace!("parse eof");
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(
PayloadError::Incomplete);
}
// http channel should deal with payload errors
return Err(ReaderError::Payload)
}
Ok(Async::Ready(_)) => {
continue
}
Ok(Async::NotReady) => break,
Err(err) => {
if let Some(ref mut payload) = self.payload {
payload.tx.set_error(err.into());
}
// http channel should deal with payload errors
return Err(ReaderError::Payload)
}
}
}
}
} }
} }
self.h1 = true; self.h1 = true;
@ -489,42 +470,49 @@ impl Reader {
}, },
Message::NotReady => { Message::NotReady => {
if buf.capacity() >= MAX_BUFFER_SIZE { if buf.capacity() >= MAX_BUFFER_SIZE {
debug!("MAX_BUFFER_SIZE reached, closing"); error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(ReaderError::Error(ParseError::TooLarge)); return Err(ReaderError::Error(ParseError::TooLarge));
} }
if read {
match self.read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
debug!("Ignored premature client disconnection");
return Err(ReaderError::Disconnect);
},
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Err(err) =>
return Err(ReaderError::Error(err.into()))
}
} else {
return Ok(Async::NotReady)
}
}, },
} }
match self.read_from_io(io, buf) {
Ok(Async::Ready(0)) => {
debug!("Ignored premature client disconnection");
return Err(ReaderError::Disconnect);
},
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Err(err) =>
return Err(ReaderError::Error(err.into()))
}
} }
} }
fn read_from_io<T: AsyncRead>(&mut self, io: &mut T, buf: &mut BytesMut) fn read_from_io<T: AsyncRead>(&mut self, io: &mut T, buf: &mut BytesMut)
-> Poll<usize, io::Error> { -> Poll<usize, io::Error>
if buf.remaining_mut() < LW_BUFFER_SIZE { {
buf.reserve(HW_BUFFER_SIZE);
}
unsafe { unsafe {
let n = match io.read(buf.bytes_mut()) { if buf.remaining_mut() < LW_BUFFER_SIZE {
Ok(n) => n, buf.reserve(HW_BUFFER_SIZE);
}
match io.read(buf.bytes_mut()) {
Ok(n) => {
buf.advance_mut(n);
Ok(Async::Ready(n))
},
Err(e) => { Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock { if e.kind() == io::ErrorKind::WouldBlock {
return Ok(Async::NotReady); Ok(Async::NotReady)
} else {
Err(e)
} }
return Err(e)
} }
}; }
buf.advance_mut(n);
Ok(Async::Ready(n))
} }
} }