1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

simplify h1 codec messages

This commit is contained in:
Nikolay Kim 2018-10-09 10:36:40 -07:00
parent cb78d9d41a
commit 1407bf4f7f
5 changed files with 90 additions and 100 deletions

View File

@ -1,4 +1,4 @@
# Actix http [![Build Status](https://travis-ci.org/fafhrd91/actix-http.svg?branch=master)](https://travis-ci.org/fafhrd91/actix-http) [![Build status](https://ci.appveyor.com/api/projects/status/kkdb4yce7qhm5w85/branch/master?svg=true)](https://ci.appveyor.com/project/fafhrd91/actix-web-hdy9d/branch/master) [![codecov](https://codecov.io/gh/fafhrd91/actix-http/branch/master/graph/badge.svg)](https://codecov.io/gh/fafhrd91/actix-http) [![crates.io](https://meritbadge.herokuapp.com/actix-web)](https://crates.io/crates/actix-web) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) # Actix http [![Build Status](https://travis-ci.org/fafhrd91/actix-http.svg?branch=master)](https://travis-ci.org/fafhrd91/actix-http) [![Build status](https://ci.appveyor.com/api/projects/status/bwq6923pblqg55gk/branch/master?svg=true)](https://ci.appveyor.com/project/fafhrd91/actix-http/branch/master) [![codecov](https://codecov.io/gh/fafhrd91/actix-http/branch/master/graph/badge.svg)](https://codecov.io/gh/fafhrd91/actix-http) [![crates.io](https://meritbadge.herokuapp.com/actix-web)](https://crates.io/crates/actix-web) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Actix http Actix http

View File

@ -32,20 +32,16 @@ pub enum OutMessage {
/// Http response message /// Http response message
Response(Response), Response(Response),
/// Payload chunk /// Payload chunk
Payload(Option<Binary>), Chunk(Option<Binary>),
} }
/// Incoming http/1 request /// Incoming http/1 request
#[derive(Debug)] #[derive(Debug)]
pub enum InMessage { pub enum InMessage {
/// Request /// Request
Message(Request), Message { req: Request, payload: bool },
/// Request with payload
MessageWithPayload(Request),
/// Payload chunk /// Payload chunk
Chunk(Bytes), Chunk(Option<Bytes>),
/// End of payload
Eof,
} }
/// HTTP/1 Codec /// HTTP/1 Codec
@ -246,8 +242,8 @@ impl Decoder for Codec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if self.payload.is_some() { if self.payload.is_some() {
Ok(match self.payload.as_mut().unwrap().decode(src)? { Ok(match self.payload.as_mut().unwrap().decode(src)? {
Some(PayloadItem::Chunk(chunk)) => Some(InMessage::Chunk(chunk)), Some(PayloadItem::Chunk(chunk)) => Some(InMessage::Chunk(Some(chunk))),
Some(PayloadItem::Eof) => Some(InMessage::Eof), Some(PayloadItem::Eof) => Some(InMessage::Chunk(None)),
None => None, None => None,
}) })
} else if let Some((req, payload)) = self.decoder.decode(src)? { } else if let Some((req, payload)) = self.decoder.decode(src)? {
@ -258,11 +254,10 @@ impl Decoder for Codec {
self.flags.set(Flags::KEEPALIVE, req.keep_alive()); self.flags.set(Flags::KEEPALIVE, req.keep_alive());
} }
self.payload = payload; self.payload = payload;
if self.payload.is_some() { Ok(Some(InMessage::Message {
Ok(Some(InMessage::MessageWithPayload(req))) req,
} else { payload: self.payload.is_some(),
Ok(Some(InMessage::Message(req))) }))
}
} else { } else {
Ok(None) Ok(None)
} }
@ -280,10 +275,10 @@ impl Encoder for Codec {
OutMessage::Response(res) => { OutMessage::Response(res) => {
self.encode_response(res, dst)?; self.encode_response(res, dst)?;
} }
OutMessage::Payload(Some(bytes)) => { OutMessage::Chunk(Some(bytes)) => {
self.te.encode(bytes.as_ref(), dst)?; self.te.encode(bytes.as_ref(), dst)?;
} }
OutMessage::Payload(None) => { OutMessage::Chunk(None) => {
self.te.encode_eof(dst)?; self.te.encode_eof(dst)?;
} }
} }

View File

@ -488,14 +488,13 @@ mod tests {
impl InMessage { impl InMessage {
fn message(self) -> Request { fn message(self) -> Request {
match self { match self {
InMessage::Message(msg) => msg, InMessage::Message { req, payload: _ } => req,
InMessage::MessageWithPayload(msg) => msg,
_ => panic!("error"), _ => panic!("error"),
} }
} }
fn is_payload(&self) -> bool { fn is_payload(&self) -> bool {
match *self { match *self {
InMessage::MessageWithPayload(_) => true, InMessage::Message { req: _, payload } => payload,
_ => panic!("error"), _ => panic!("error"),
} }
} }

View File

@ -211,7 +211,7 @@ where
Body::Empty => Some(State::None), Body::Empty => Some(State::None),
Body::Binary(bin) => Some(State::SendPayload( Body::Binary(bin) => Some(State::SendPayload(
None, None,
Some(OutMessage::Payload(bin.into())), Some(OutMessage::Chunk(bin.into())),
)), )),
Body::Streaming(stream) => { Body::Streaming(stream) => {
Some(State::SendPayload(Some(stream), None)) Some(State::SendPayload(Some(stream), None))
@ -248,7 +248,7 @@ where
match stream.poll() { match stream.poll() {
Ok(Async::Ready(Some(item))) => match self Ok(Async::Ready(Some(item))) => match self
.framed .framed
.start_send(OutMessage::Payload(Some(item.into()))) .start_send(OutMessage::Chunk(Some(item.into())))
{ {
Ok(AsyncSink::Ready) => { Ok(AsyncSink::Ready) => {
self.flags.remove(Flags::FLUSHED); self.flags.remove(Flags::FLUSHED);
@ -262,7 +262,7 @@ where
}, },
Ok(Async::Ready(None)) => Some(State::SendPayload( Ok(Async::Ready(None)) => Some(State::SendPayload(
None, None,
Some(OutMessage::Payload(None)), Some(OutMessage::Chunk(None)),
)), )),
Ok(Async::NotReady) => return Ok(()), Ok(Async::NotReady) => return Ok(()),
// Err(err) => return Err(DispatchError::Io(err)), // Err(err) => return Err(DispatchError::Io(err)),
@ -305,32 +305,42 @@ where
} }
} }
/// Process one incoming message /// Process one incoming requests
fn one_message(&mut self, msg: InMessage) -> Result<(), DispatchError<S::Error>> { pub(self) fn poll_request(&mut self) -> Result<bool, DispatchError<S::Error>> {
// limit a mount of non processed requests
if self.messages.len() >= MAX_PIPELINED_MESSAGES {
return Ok(false);
}
let mut updated = false;
'outer: loop {
match self.framed.poll() {
Ok(Async::Ready(Some(msg))) => {
updated = true;
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
match msg { match msg {
InMessage::Message(msg) => { InMessage::Message { req, payload } => {
if payload {
let (ps, pl) = Payload::new(false);
*req.inner.payload.borrow_mut() = Some(pl);
self.payload = Some(ps);
}
// handle request early // handle request early
if self.state.is_empty() { if self.state.is_empty() {
self.state = self.handle_request(msg)?; self.state = self.handle_request(req)?;
} else { } else {
self.messages.push_back(Message::Item(msg)); self.messages.push_back(Message::Item(req));
} }
} }
InMessage::MessageWithPayload(msg) => { InMessage::Chunk(Some(chunk)) => {
// payload
let (ps, pl) = Payload::new(false);
*msg.inner.payload.borrow_mut() = Some(pl);
self.payload = Some(ps);
self.messages.push_back(Message::Item(msg));
}
InMessage::Chunk(chunk) => {
if let Some(ref mut payload) = self.payload { if let Some(ref mut payload) = self.payload {
payload.feed_data(chunk); payload.feed_data(chunk);
} else { } else {
error!("Internal server error: unexpected payload chunk"); error!(
"Internal server error: unexpected payload chunk"
);
self.flags.insert(Flags::DISCONNECTED); self.flags.insert(Flags::DISCONNECTED);
self.messages.push_back(Message::Error( self.messages.push_back(Message::Error(
Response::InternalServerError().finish(), Response::InternalServerError().finish(),
@ -338,7 +348,7 @@ where
self.error = Some(DispatchError::InternalError); self.error = Some(DispatchError::InternalError);
} }
} }
InMessage::Eof => { InMessage::Chunk(None) => {
if let Some(mut payload) = self.payload.take() { if let Some(mut payload) = self.payload.take() {
payload.feed_eof(); payload.feed_eof();
} else { } else {
@ -351,19 +361,6 @@ where
} }
} }
} }
Ok(())
}
pub(self) fn poll_request(&mut self) -> Result<bool, DispatchError<S::Error>> {
let mut updated = false;
if self.messages.len() < MAX_PIPELINED_MESSAGES {
'outer: loop {
match self.framed.poll() {
Ok(Async::Ready(Some(msg))) => {
updated = true;
self.one_message(msg)?;
} }
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
self.client_disconnected(); self.client_disconnected();
@ -389,7 +386,6 @@ where
} }
} }
} }
}
if self.ka_timer.is_some() && updated { if self.ka_timer.is_some() && updated {
if let Some(expire) = self.config.keep_alive_expire() { if let Some(expire) = self.config.keep_alive_expire() {

View File

@ -40,7 +40,7 @@ fn test_simple() {
.and_then(TakeItem::new().map_err(|_| ())) .and_then(TakeItem::new().map_err(|_| ()))
.and_then(|(req, framed): (_, Framed<_, _>)| { .and_then(|(req, framed): (_, Framed<_, _>)| {
// validate request // validate request
if let Some(h1::InMessage::MessageWithPayload(req)) = req { if let Some(h1::InMessage::Message { req, payload: _ }) = req {
match ws::handshake(&req) { match ws::handshake(&req) {
Err(e) => { Err(e) => {
// validation failed // validation failed