From 1407bf4f7f2a0e0ec6302b792eef45acbc4656bf Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 9 Oct 2018 10:36:40 -0700 Subject: [PATCH] simplify h1 codec messages --- README.md | 2 +- src/h1/codec.rs | 27 ++++---- src/h1/decoder.rs | 5 +- src/h1/dispatcher.rs | 154 +++++++++++++++++++++---------------------- tests/test_ws.rs | 2 +- 5 files changed, 90 insertions(+), 100 deletions(-) diff --git a/README.md b/README.md index b273ea8c..59e6fc37 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Actix http [![Build Status](https://travis-ci.org/fafhrd91/actix-http.svg?branch=master)](https://travis-ci.org/fafhrd91/actix-http) [![Build status](https://ci.appveyor.com/api/projects/status/kkdb4yce7qhm5w85/branch/master?svg=true)](https://ci.appveyor.com/project/fafhrd91/actix-web-hdy9d/branch/master) [![codecov](https://codecov.io/gh/fafhrd91/actix-http/branch/master/graph/badge.svg)](https://codecov.io/gh/fafhrd91/actix-http) [![crates.io](https://meritbadge.herokuapp.com/actix-web)](https://crates.io/crates/actix-web) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +# Actix http [![Build Status](https://travis-ci.org/fafhrd91/actix-http.svg?branch=master)](https://travis-ci.org/fafhrd91/actix-http) [![Build status](https://ci.appveyor.com/api/projects/status/bwq6923pblqg55gk/branch/master?svg=true)](https://ci.appveyor.com/project/fafhrd91/actix-http/branch/master) [![codecov](https://codecov.io/gh/fafhrd91/actix-http/branch/master/graph/badge.svg)](https://codecov.io/gh/fafhrd91/actix-http) [![crates.io](https://meritbadge.herokuapp.com/actix-web)](https://crates.io/crates/actix-web) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) Actix http diff --git a/src/h1/codec.rs b/src/h1/codec.rs index 8f97d677..04cf395b 100644 --- a/src/h1/codec.rs +++ b/src/h1/codec.rs @@ -32,20 +32,16 @@ pub enum OutMessage { /// Http response message Response(Response), /// Payload chunk - Payload(Option), + Chunk(Option), } /// Incoming http/1 request #[derive(Debug)] pub enum InMessage { /// Request - Message(Request), - /// Request with payload - MessageWithPayload(Request), + Message { req: Request, payload: bool }, /// Payload chunk - Chunk(Bytes), - /// End of payload - Eof, + Chunk(Option), } /// HTTP/1 Codec @@ -246,8 +242,8 @@ impl Decoder for Codec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if self.payload.is_some() { Ok(match self.payload.as_mut().unwrap().decode(src)? { - Some(PayloadItem::Chunk(chunk)) => Some(InMessage::Chunk(chunk)), - Some(PayloadItem::Eof) => Some(InMessage::Eof), + Some(PayloadItem::Chunk(chunk)) => Some(InMessage::Chunk(Some(chunk))), + Some(PayloadItem::Eof) => Some(InMessage::Chunk(None)), None => None, }) } else if let Some((req, payload)) = self.decoder.decode(src)? { @@ -258,11 +254,10 @@ impl Decoder for Codec { self.flags.set(Flags::KEEPALIVE, req.keep_alive()); } self.payload = payload; - if self.payload.is_some() { - Ok(Some(InMessage::MessageWithPayload(req))) - } else { - Ok(Some(InMessage::Message(req))) - } + Ok(Some(InMessage::Message { + req, + payload: self.payload.is_some(), + })) } else { Ok(None) } @@ -280,10 +275,10 @@ impl Encoder for Codec { OutMessage::Response(res) => { self.encode_response(res, dst)?; } - OutMessage::Payload(Some(bytes)) => { + OutMessage::Chunk(Some(bytes)) => { self.te.encode(bytes.as_ref(), dst)?; } - OutMessage::Payload(None) => { + OutMessage::Chunk(None) => { self.te.encode_eof(dst)?; } } diff --git a/src/h1/decoder.rs b/src/h1/decoder.rs index d0c3fa04..5fe8b19c 100644 --- a/src/h1/decoder.rs +++ b/src/h1/decoder.rs @@ -488,14 +488,13 @@ mod tests { impl InMessage { fn message(self) -> Request { match self { - InMessage::Message(msg) => msg, - InMessage::MessageWithPayload(msg) => msg, + InMessage::Message { req, payload: _ } => req, _ => panic!("error"), } } fn is_payload(&self) -> bool { match *self { - InMessage::MessageWithPayload(_) => true, + InMessage::Message { req: _, payload } => payload, _ => panic!("error"), } } diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index c2ce1203..8b7c2933 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -211,7 +211,7 @@ where Body::Empty => Some(State::None), Body::Binary(bin) => Some(State::SendPayload( None, - Some(OutMessage::Payload(bin.into())), + Some(OutMessage::Chunk(bin.into())), )), Body::Streaming(stream) => { Some(State::SendPayload(Some(stream), None)) @@ -248,7 +248,7 @@ where match stream.poll() { Ok(Async::Ready(Some(item))) => match self .framed - .start_send(OutMessage::Payload(Some(item.into()))) + .start_send(OutMessage::Chunk(Some(item.into()))) { Ok(AsyncSink::Ready) => { self.flags.remove(Flags::FLUSHED); @@ -262,7 +262,7 @@ where }, Ok(Async::Ready(None)) => Some(State::SendPayload( None, - Some(OutMessage::Payload(None)), + Some(OutMessage::Chunk(None)), )), Ok(Async::NotReady) => return Ok(()), // Err(err) => return Err(DispatchError::Io(err)), @@ -305,89 +305,85 @@ where } } - /// Process one incoming message - fn one_message(&mut self, msg: InMessage) -> Result<(), DispatchError> { - self.flags.insert(Flags::STARTED); - - match msg { - InMessage::Message(msg) => { - // handle request early - if self.state.is_empty() { - self.state = self.handle_request(msg)?; - } else { - self.messages.push_back(Message::Item(msg)); - } - } - InMessage::MessageWithPayload(msg) => { - // payload - let (ps, pl) = Payload::new(false); - *msg.inner.payload.borrow_mut() = Some(pl); - self.payload = Some(ps); - - self.messages.push_back(Message::Item(msg)); - } - InMessage::Chunk(chunk) => { - if let Some(ref mut payload) = self.payload { - payload.feed_data(chunk); - } else { - error!("Internal server error: unexpected payload chunk"); - self.flags.insert(Flags::DISCONNECTED); - self.messages.push_back(Message::Error( - Response::InternalServerError().finish(), - )); - self.error = Some(DispatchError::InternalError); - } - } - InMessage::Eof => { - if let Some(mut payload) = self.payload.take() { - payload.feed_eof(); - } else { - error!("Internal server error: unexpected eof"); - self.flags.insert(Flags::DISCONNECTED); - self.messages.push_back(Message::Error( - Response::InternalServerError().finish(), - )); - self.error = Some(DispatchError::InternalError); - } - } + /// Process one incoming requests + pub(self) fn poll_request(&mut self) -> Result> { + // limit a mount of non processed requests + if self.messages.len() >= MAX_PIPELINED_MESSAGES { + return Ok(false); } - Ok(()) - } - - pub(self) fn poll_request(&mut self) -> Result> { let mut updated = false; + 'outer: loop { + match self.framed.poll() { + Ok(Async::Ready(Some(msg))) => { + updated = true; + self.flags.insert(Flags::STARTED); - if self.messages.len() < MAX_PIPELINED_MESSAGES { - 'outer: loop { - match self.framed.poll() { - Ok(Async::Ready(Some(msg))) => { - updated = true; - self.one_message(msg)?; - } - Ok(Async::Ready(None)) => { - self.client_disconnected(); - break; - } - Ok(Async::NotReady) => break, - Err(ParseError::Io(e)) => { - self.client_disconnected(); - self.error = Some(DispatchError::Io(e)); - break; - } - Err(e) => { - if let Some(mut payload) = self.payload.take() { - payload.set_error(PayloadError::EncodingCorrupted); + match msg { + InMessage::Message { req, payload } => { + if payload { + let (ps, pl) = Payload::new(false); + *req.inner.payload.borrow_mut() = Some(pl); + self.payload = Some(ps); + } + + // handle request early + if self.state.is_empty() { + self.state = self.handle_request(req)?; + } else { + self.messages.push_back(Message::Item(req)); + } + } + InMessage::Chunk(Some(chunk)) => { + if let Some(ref mut payload) = self.payload { + payload.feed_data(chunk); + } else { + error!( + "Internal server error: unexpected payload chunk" + ); + self.flags.insert(Flags::DISCONNECTED); + self.messages.push_back(Message::Error( + Response::InternalServerError().finish(), + )); + self.error = Some(DispatchError::InternalError); + } + } + InMessage::Chunk(None) => { + if let Some(mut payload) = self.payload.take() { + payload.feed_eof(); + } else { + error!("Internal server error: unexpected eof"); + self.flags.insert(Flags::DISCONNECTED); + self.messages.push_back(Message::Error( + Response::InternalServerError().finish(), + )); + self.error = Some(DispatchError::InternalError); + } } - - // Malformed requests should be responded with 400 - self.messages - .push_back(Message::Error(Response::BadRequest().finish())); - self.flags.insert(Flags::DISCONNECTED); - self.error = Some(e.into()); - break; } } + Ok(Async::Ready(None)) => { + self.client_disconnected(); + break; + } + Ok(Async::NotReady) => break, + Err(ParseError::Io(e)) => { + self.client_disconnected(); + self.error = Some(DispatchError::Io(e)); + break; + } + Err(e) => { + if let Some(mut payload) = self.payload.take() { + payload.set_error(PayloadError::EncodingCorrupted); + } + + // Malformed requests should be responded with 400 + self.messages + .push_back(Message::Error(Response::BadRequest().finish())); + self.flags.insert(Flags::DISCONNECTED); + self.error = Some(e.into()); + break; + } } } diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 00ccd155..73590990 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -40,7 +40,7 @@ fn test_simple() { .and_then(TakeItem::new().map_err(|_| ())) .and_then(|(req, framed): (_, Framed<_, _>)| { // validate request - if let Some(h1::InMessage::MessageWithPayload(req)) = req { + if let Some(h1::InMessage::Message { req, payload: _ }) = req { match ws::handshake(&req) { Err(e) => { // validation failed