From dd8692e00026ad7648dc71db02098feb33e353f5 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sun, 24 Jul 2022 15:27:54 +0100 Subject: [PATCH] add send error response state add changelog add send error response state remove comment --- actix-http/CHANGES.md | 4 + actix-http/src/h1/dispatcher.rs | 181 +++++++++++++++++++++----- actix-http/src/h1/dispatcher_tests.rs | 44 +------ 3 files changed, 157 insertions(+), 72 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 7e6604046..4cd925533 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,10 @@ # Changes ## Unreleased - 2022-xx-xx +### Fixed +- Dropping the payload early and causing unclean connections no longer causes erroneous 500 responses. [#2745] + +[#2745]: https://github.com/actix/actix-web/issues/2745 ## 3.2.1 - 2022-07-02 diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index c2ddc06ba..227ee8350 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -22,7 +22,7 @@ use crate::{ config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, - Error, Extensions, OnConnectData, Request, Response, StatusCode, + ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode, }; use super::{ @@ -185,7 +185,9 @@ pin_project! { None, ExpectCall { #[pin] fut: X::Future }, ServiceCall { #[pin] fut: S::Future }, + SendResponse { res: Option> }, SendPayload { #[pin] body: B }, + SendErrorResponse { res: Option> }, SendErrorPayload { #[pin] body: BoxBody }, } } @@ -216,9 +218,15 @@ where Self::ServiceCall { .. } => { f.debug_struct("State::ServiceCall").finish_non_exhaustive() } + Self::SendResponse { .. } => f + .debug_struct("State::SendResponse") + .finish_non_exhaustive(), Self::SendPayload { .. } => { f.debug_struct("State::SendPayload").finish_non_exhaustive() } + Self::SendErrorResponse { .. } => f + .debug_struct("State::SendErrorResponse") + .finish_non_exhaustive(), Self::SendErrorPayload { .. } => f .debug_struct("State::SendErrorPayload") .finish_non_exhaustive(), @@ -379,11 +387,8 @@ where Ok(size) } - fn send_response( - mut self: Pin<&mut Self>, - res: Response<()>, - body: B, - ) -> Result<(), DispatchError> { + fn send_response(mut self: Pin<&mut Self>, res: Response) -> Result<(), DispatchError> { + let (res, body) = res.replace_body(()); let size = self.as_mut().send_response_inner(res, &body)?; let mut this = self.project(); this.state.set(match size { @@ -397,11 +402,17 @@ where Ok(()) } + fn queue_response(self: Pin<&mut Self>, res: Response) { + self.project() + .state + .set(State::SendResponse { res: Some(res) }); + } + fn send_error_response( mut self: Pin<&mut Self>, - res: Response<()>, - body: BoxBody, + res: Response, ) -> Result<(), DispatchError> { + let (res, body) = res.replace_body(()); let size = self.as_mut().send_response_inner(res, &body)?; let mut this = self.project(); this.state.set(match size { @@ -415,6 +426,12 @@ where Ok(()) } + fn queue_error_response(self: Pin<&mut Self>, res: Response) { + self.project() + .state + .set(State::SendErrorResponse { res: Some(res) }); + } + fn send_continue(self: Pin<&mut Self>) { self.project() .write_buf @@ -449,7 +466,8 @@ where // send_response would update InnerDispatcher state to SendPayload or None // (If response body is empty) // continue loop to poll it - self.as_mut().send_error_response(res, BoxBody::new(()))?; + self.as_mut() + .queue_error_response(res.set_body(BoxBody::new(()))); } // return with upgrade request and poll it exclusively @@ -470,15 +488,12 @@ where match fut.poll(cx) { // service call resolved. send response. Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - self.as_mut().send_response(res, body)?; + self.as_mut().queue_response(res.into()); } // send service call error as response Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; + self.as_mut().queue_error_response(err.into()); } // service call pending and could be waiting for more chunk messages @@ -486,14 +501,32 @@ where Poll::Pending => { // no new message is decoded and no new payload is fed // nothing to do except waiting for new incoming data from client - if !self.as_mut().poll_request(cx)? { - return Ok(PollResponse::DoNothing); - } + + // optimisation disabled so that poll_request is called from only one place + // if !self.as_mut().poll_request(cx)? { + return Ok(PollResponse::DoNothing); + // } + // else loop } } } + StateProj::SendResponse { res } => { + let mut res = res.take().expect("response should be take-able"); + + if this.flags.contains(Flags::SHUTDOWN) { + trace!("shutdown flag set; assuming dirty read I/O"); + // shutdown flags occur when read I/O is not clean so connections should be + // closed to avoid stuck or erroneous errors on next request + res.head_mut().set_connection_type(ConnectionType::Close); + } + + self.send_response(res)?; + + return Ok(PollResponse::DrainWriteBuf); + } + StateProj::SendPayload { mut body } => { // keep populate writer buffer until buffer size limit hit, // get blocked or finished. @@ -529,6 +562,23 @@ where return Ok(PollResponse::DrainWriteBuf); } + StateProj::SendErrorResponse { res } => { + // TODO: de-dupe impl with SendResponse + + let mut res = res.take().expect("response should be take-able"); + + if this.flags.contains(Flags::SHUTDOWN) { + trace!("shutdown flag set; assuming dirty read I/O"); + // shutdown flags occur when read I/O is not clean so connections should be + // closed to avoid stuck or erroneous errors on next request + res.head_mut().set_connection_type(ConnectionType::Close); + } + + self.send_error_response(res)?; + + return Ok(PollResponse::DrainWriteBuf); + } + StateProj::SendErrorPayload { mut body } => { // TODO: de-dupe impl with SendPayload @@ -583,9 +633,7 @@ where // send expect error as response Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; + self.as_mut().queue_error_response(err.into()); } // expect must be solved before progress can be made. @@ -637,9 +685,8 @@ where // on success to notify the dispatcher a new state is set and the outer loop // should be continued Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - return self.send_error_response(res, body); + self.queue_error_response(err.into()); + return Ok(()); } // future is pending; return Ok(()) to notify that a new state is @@ -655,8 +702,8 @@ where // to notify the dispatcher a new state is set and the outer loop // should be continue. Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - self.as_mut().send_response(res, body) + self.as_mut().queue_response(res.into()); + Ok(()) } // see the comment on ExpectCall state branch's Pending @@ -664,9 +711,8 @@ where // see the comment on ExpectCall state branch's Ready(Err(_)) Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body) + self.as_mut().queue_error_response(err.into()); + Ok(()) } }; } @@ -688,15 +734,13 @@ where cx: &mut Context<'_>, ) -> Result { let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES; - let can_not_read = !self.can_read(cx); // limit amount of non-processed requests - if pipeline_queue_full || can_not_read { + if pipeline_queue_full { return Ok(false); } let mut this = self.as_mut().project(); - let mut updated = false; // decode from read buf as many full requests as possible @@ -829,6 +873,72 @@ where } } + let can_read = self.can_read(cx); + let mut this = self.as_mut().project(); + + if !can_read { + // request payload is not readable... + tracing::debug!("cannot read request payload"); + + if let Some(sender) = &this.payload { + // ...maybe handler does not want to read any more payload... + if let PayloadStatus::Dropped = sender.need_read(cx) { + tracing::debug!( + "handler dropped payload early; attempt to clean connection" + ); + + // ...in which case poll request payload a few times + loop { + match this.codec.decode(this.read_buf)? { + Some(msg) => { + match msg { + // payload decoded did not yield EOF yet + Message::Chunk(Some(_)) => { + // if non-clean connection, next loop iter will detect empty + // read buffer and close connection + } + + // connection is in clean state for next request + Message::Chunk(None) => { + tracing::debug!("connection successfully cleaned"); + + // reset dispatcher state + let _ = this.payload.take(); + this.state.set(State::None); + + // break out of payload decode loop + break; + } + + // Either whole payload is read and loop is broken or more data + // was expected in which case connection is closed. In both + // situations dispatcher cannot get here. + Message::Item(_) => { + unreachable!("dispatcher is in payload receive state") + } + } + } + + // not enough info to decide if connection is going to be clean or not + None => { + tracing::debug!( + "handler did not read whole payload and dispatcher could not \ + drain read buf; close connection" + ); + + this.flags.insert(Flags::SHUTDOWN); + + return Ok(updated); + } + } + } + } + } else { + // can_not_read and no request payload + return Ok(false); + } + } + Ok(updated) } @@ -844,10 +954,10 @@ where trace!("timed out on slow request; replying with 408 and closing connection"); - let _ = self.as_mut().send_error_response( - Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), - BoxBody::new(()), - ); + let mut res = + Response::with_body(StatusCode::REQUEST_TIMEOUT, BoxBody::new(())); + res.head_mut().set_connection_type(ConnectionType::Close); + self.as_mut().send_error_response(res)?; self.project().flags.insert(Flags::SHUTDOWN); } @@ -1123,6 +1233,7 @@ where } } + // process request(s) and queue response inner.as_mut().poll_request(cx)?; if should_disconnect { diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index b3ee3d2bb..aae50a371 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -861,56 +861,26 @@ async fn handler_drop_payload() { ", )); - assert!(h1.as_mut().poll(cx).is_pending()); - - // polls: manual => manual - assert_eq!(h1.poll_count, 2); - - let mut res = BytesMut::from(buf.take_write_buf().as_ref()); - stabilize_date_header(&mut res); - let res = &res[..]; - - // expect response immediately even though request side has not finished reading payload - let exp = http_msg( - r" - HTTP/1.1 200 OK - content-length: 15 - date: Thu, 01 Jan 1970 12:34:56 UTC - - payload dropped - ", - ); - - assert_eq!( - res, - exp, - "\nexpected response not in write buffer:\n\ - response: {:?}\n\ - expected: {:?}", - String::from_utf8_lossy(res), - String::from_utf8_lossy(&exp) - ); - }) - .await; - - lazy(|cx| { assert!(h1.as_mut().poll(cx).is_ready()); - // polls: manual => manual => manual + // polls: manual => manual => shutdown assert_eq!(h1.poll_count, 3); let mut res = BytesMut::from(buf.take_write_buf().as_ref()); stabilize_date_header(&mut res); let res = &res[..]; - // expect that unrequested error response is sent back since connection could not be cleaned + // expect response immediately even though request side has not finished reading payload + // since write buffer was "too short" we should expect a closed connection hint let exp = http_msg( + // connection: close r" - HTTP/1.1 500 Internal Server Error - content-length: 0 + HTTP/1.1 200 OK + content-length: 15 connection: close date: Thu, 01 Jan 1970 12:34:56 UTC + payload dropped ", );