diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 7f7af23a..afc988d4 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,14 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed +- `error::DispatcherError` enum is now marked `#[non_exhaustive]`. [#2624] + + +### Fixed +- Issue where handlers that took payload but then dropped without reading it to EOF it would cause keep-alive connections to become stuck. [#2624] + +[#2624]: https://github.com/actix/actix-web/pull/2624 ## 3.0.0-rc.1 - 2022-01-31 diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 84132286..52b95342 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -340,6 +340,7 @@ impl From for Error { /// A set of errors that can occur during dispatching HTTP requests. #[derive(Debug, Display, From)] +#[non_exhaustive] pub enum DispatchError { /// Service error. #[display(fmt = "Service Error")] @@ -373,6 +374,10 @@ pub enum DispatchError { #[display(fmt = "Connection shutdown timeout")] DisconnectTimeout, + /// Handler dropped payload before reading EOF. + #[display(fmt = "Handler dropped payload before reading EOF")] + HandlerDroppedPayload, + /// Internal error. #[display(fmt = "Internal error")] InternalError, diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index df74bcc4..80afd745 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -125,11 +125,13 @@ impl Decoder for Codec { self.flags.set(Flags::HEAD, head.method == Method::HEAD); self.version = head.version; self.conn_type = head.connection_type(); + if self.conn_type == ConnectionType::KeepAlive && !self.flags.contains(Flags::KEEP_ALIVE_ENABLED) { self.conn_type = ConnectionType::Close } + match payload { PayloadType::None => self.payload = None, PayloadType::Payload(pl) => self.payload = Some(pl), diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index fa924f92..17b9b695 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -209,15 +209,16 @@ impl MessageType for Request { let (len, method, uri, ver, h_len) = { // SAFETY: - // Create an uninitialized array of `MaybeUninit`. The `assume_init` is - // safe because the type we are claiming to have initialized here is a - // bunch of `MaybeUninit`s, which do not require initialization. + // Create an uninitialized array of `MaybeUninit`. The `assume_init` is safe because the + // type we are claiming to have initialized here is a bunch of `MaybeUninit`s, which + // do not require initialization. let mut parsed = unsafe { MaybeUninit::<[MaybeUninit>; MAX_HEADERS]>::uninit() .assume_init() }; let mut req = httparse::Request::new(&mut []); + match req.parse_with_uninit_headers(src, &mut parsed)? { httparse::Status::Complete(len) => { let method = Method::from_bytes(req.method.unwrap().as_bytes()) @@ -232,6 +233,7 @@ impl MessageType for Request { (len, method, uri, version, req.headers.len()) } + httparse::Status::Partial => { return if src.len() >= MAX_BUFFER_SIZE { trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 3f327171..fbc7e5b9 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -21,7 +21,7 @@ use crate::{ config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, - Error, Extensions, OnConnectData, Request, Response, StatusCode, + ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode, }; use super::{ @@ -151,7 +151,8 @@ pin_project! { error: Option, #[pin] - state: State, + pub(super) state: State, + // when Some(_) dispatcher is in state of receiving request payload payload: Option, messages: VecDeque, @@ -174,7 +175,7 @@ enum DispatcherMessage { pin_project! { #[project = StateProj] - enum State + pub(super) enum State where S: Service, X: Service, @@ -194,7 +195,7 @@ where X: Service, B: MessageBody, { - fn is_none(&self) -> bool { + pub(super) fn is_none(&self) -> bool { matches!(self, State::None) } } @@ -686,12 +687,74 @@ where let can_not_read = !self.can_read(cx); // limit amount of non-processed requests - if pipeline_queue_full || can_not_read { + if pipeline_queue_full { return Ok(false); } let mut this = self.as_mut().project(); + if can_not_read { + log::debug!("cannot read request payload"); + + if let Some(sender) = &this.payload { + // ...maybe handler does not want to read any more payload... + if let PayloadStatus::Dropped = sender.need_read(cx) { + log::debug!("handler dropped payload early; attempt to clean connection"); + // ...in which case poll request payload a few times + loop { + match this.codec.decode(this.read_buf)? { + Some(msg) => { + match msg { + // payload decoded did not yield EOF yet + Message::Chunk(Some(_)) => { + // if non-clean connection, next loop iter will detect empty + // read buffer and close connection + } + + // connection is in clean state for next request + Message::Chunk(None) => { + log::debug!("connection successfully cleaned"); + + // reset dispatcher state + let _ = this.payload.take(); + this.state.set(State::None); + + // break out of payload decode loop + break; + } + + // Either whole payload is read and loop is broken or more data + // was expected in which case connection is closed. In both + // situations dispatcher cannot get here. + Message::Item(_) => { + unreachable!("dispatcher is in payload receive state") + } + } + } + + // not enough info to decide if connection is going to be clean or not + None => { + log::error!( + "handler did not read whole payload and dispatcher could not \ + drain read buf; return 500 and close connection" + ); + + this.flags.insert(Flags::SHUTDOWN); + let mut res = Response::internal_server_error().drop_body(); + res.head_mut().set_connection_type(ConnectionType::Close); + this.messages.push_back(DispatcherMessage::Error(res)); + *this.error = Some(DispatchError::HandlerDroppedPayload); + return Ok(true); + } + } + } + } + } else { + // can_not_read and no request payload + return Ok(false); + } + } + let mut updated = false; loop { diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 891cce69..40454d45 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -1,6 +1,6 @@ use std::{future::Future, str, task::Poll, time::Duration}; -use actix_rt::time::sleep; +use actix_rt::{pin, time::sleep}; use actix_service::fn_service; use actix_utils::future::{ready, Ready}; use bytes::Bytes; @@ -53,6 +53,14 @@ fn echo_path_service( }) } +fn drop_payload_service( +) -> impl Service, Error = Error> { + fn_service(|mut req: Request| async move { + let _ = req.take_payload(); + Ok::<_, Error>(Response::with_body(StatusCode::OK, "payload dropped")) + }) +} + fn echo_payload_service() -> impl Service, Error = Error> { fn_service(|mut req: Request| { Box::pin(async move { @@ -89,7 +97,7 @@ async fn late_request() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -156,7 +164,7 @@ async fn oneshot_connection() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -173,13 +181,16 @@ async fn oneshot_connection() { stabilize_date_header(&mut res); let res = &res[..]; - let exp = b"\ - HTTP/1.1 200 OK\r\n\ - content-length: 5\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ - /abcd\ - "; + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 5 + connection: close + date: Thu, 01 Jan 1970 12:34:56 UTC + + /abcd + ", + ); assert_eq!( res, @@ -188,7 +199,7 @@ async fn oneshot_connection() { response: {:?}\n\ expected: {:?}", String::from_utf8_lossy(res), - String::from_utf8_lossy(exp) + String::from_utf8_lossy(&exp) ); }) .await; @@ -214,7 +225,7 @@ async fn keep_alive_timeout() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -293,7 +304,7 @@ async fn keep_alive_follow_up_req() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -413,7 +424,7 @@ async fn req_parse_err() { OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); match h1.as_mut().poll(cx) { Poll::Pending => panic!(), @@ -459,7 +470,7 @@ async fn pipelining_ok_then_ok() { OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -529,7 +540,7 @@ async fn pipelining_ok_then_bad() { OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -601,7 +612,7 @@ async fn expect_handling() { ", ); - actix_rt::pin!(h1); + pin!(h1); assert!(h1.as_mut().poll(cx).is_pending()); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -678,7 +689,7 @@ async fn expect_eager() { ", ); - actix_rt::pin!(h1); + pin!(h1); assert!(h1.as_mut().poll(cx).is_ready()); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -761,7 +772,7 @@ async fn upgrade_handling() { ", ); - actix_rt::pin!(h1); + pin!(h1); assert!(h1.as_mut().poll(cx).is_ready()); assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. })); @@ -771,3 +782,192 @@ async fn upgrade_handling() { }) .await; } + +#[actix_rt::test] +async fn handler_drop_payload() { + let _ = env_logger::try_init(); + + let mut buf = TestBuffer::new(http_msg( + r" + POST /drop-payload HTTP/1.1 + Content-Length: 3 + + abc + ", + )); + + let services = HttpFlow::new( + drop_payload_service(), + ExpectHandler, + None::, + ); + + let h1 = Dispatcher::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(h1); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + + // polls: manual + assert_eq!(h1.poll_count, 1); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + + if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() { + assert!(inner.state.is_none()); + } + }) + .await; + + lazy(|cx| { + // add message that claims to have payload longer than provided + buf.extend_read_buf(http_msg( + r" + POST /drop-payload HTTP/1.1 + Content-Length: 200 + + abc + ", + )); + + assert!(h1.as_mut().poll(cx).is_pending()); + + // polls: manual => manual + assert_eq!(h1.poll_count, 2); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + // expect response immediately even though request side has not finished reading payload + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + }) + .await; + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_ready()); + + // polls: manual => manual => manual + assert_eq!(h1.poll_count, 3); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + // expect that unrequested error response is sent back since connection could not be cleaned + let exp = http_msg( + r" + HTTP/1.1 500 Internal Server Error + content-length: 0 + connection: close + date: Thu, 01 Jan 1970 12:34:56 UTC + + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + }) + .await; +} + +fn http_msg(msg: impl AsRef) -> BytesMut { + let mut msg = msg + .as_ref() + .trim() + .split('\n') + .into_iter() + .map(|line| [line.trim_start(), "\r"].concat()) + .collect::>() + .join("\n"); + + // remove trailing \r + msg.pop(); + + if !msg.is_empty() && !msg.contains("\r\n\r\n") { + msg.push_str("\r\n\r\n"); + } + + BytesMut::from(msg.as_bytes()) +} + +#[test] +fn http_msg_creates_msg() { + assert_eq!(http_msg(r""), ""); + + assert_eq!( + http_msg( + r" + POST / HTTP/1.1 + Content-Length: 3 + + abc + " + ), + "POST / HTTP/1.1\r\nContent-Length: 3\r\n\r\nabc" + ); + + assert_eq!( + http_msg( + r" + GET / HTTP/1.1 + Content-Length: 3 + + " + ), + "GET / HTTP/1.1\r\nContent-Length: 3\r\n\r\n" + ); +}