diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 21da2422f..cc86cbdfd 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -1,5 +1,6 @@ -use std::{future::Future, str, task::Poll}; +use std::{future::Future, str, task::Poll, time::Duration}; +use actix_rt::time::sleep; use actix_service::fn_service; use actix_utils::future::{ready, Ready}; use bytes::Bytes; @@ -63,23 +64,22 @@ fn echo_payload_service() -> impl Service, E } #[actix_rt::test] -#[ignore] -async fn test_keep_alive() { +async fn test_keep_alive_timeout() { + let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); + + let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None); + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + actix_rt::pin!(h1); + lazy(|cx| { - let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); - - let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None); - let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); - - let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( - buf.clone(), - services, - cfg, - None, - OnConnectData::default(), - ); - actix_rt::pin!(h1); - assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); assert!( @@ -90,7 +90,7 @@ async fn test_keep_alive() { // polls: initial assert_eq!(h1.poll_count, 1); - let mut res = buf.write_buf_slice_mut(); + let mut res = buf.take_write_buf().to_vec(); stabilize_date_header(&mut res); let res = &res[..]; @@ -105,8 +105,149 @@ async fn test_keep_alive() { res, exp, "\nexpected response not in write buffer:\n\ - response: {:?}\n\ - expected: {:?}", + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + }) + .await; + + // sleep slightly longer than keep-alive timeout + sleep(Duration::from_millis(1100)).await; + + lazy(|cx| { + assert!( + h1.as_mut().poll(cx).is_ready(), + "keep-alive should have resolved", + ); + + // polls: initial => keep-alive wake-up shutdown + assert_eq!(h1.poll_count, 2); + + if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { + // connection closed + assert!(inner.flags.contains(Flags::SHUTDOWN)); + assert!(inner.flags.contains(Flags::WRITE_DISCONNECT)); + // and nothing added to write buffer + assert!(buf.write_buf_slice().is_empty()); + } + }) + .await; +} + +#[actix_rt::test] +async fn test_keep_alive_follow_up_req() { + let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); + + let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None); + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + actix_rt::pin!(h1); + + lazy(|cx| { + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + assert!( + h1.as_mut().poll(cx).is_pending(), + "keep-alive should prevent poll from resolving" + ); + + // polls: initial + assert_eq!(h1.poll_count, 1); + + let mut res = buf.take_write_buf().to_vec(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /abcd\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + }) + .await; + + // sleep for less than KA timeout + sleep(Duration::from_millis(200)).await; + + lazy(|cx| { + assert!( + h1.as_mut().poll(cx).is_pending(), + "keep-alive should not have resolved dispatcher yet", + ); + + // polls: initial => manual + assert_eq!(h1.poll_count, 2); + + if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() { + // connection not closed + assert!(!inner.flags.contains(Flags::SHUTDOWN)); + assert!(!inner.flags.contains(Flags::WRITE_DISCONNECT)); + // and nothing added to write buffer + assert!(buf.write_buf_slice().is_empty()); + } + }) + .await; + + lazy(|cx| { + buf.extend_read_buf( + "\ + GET /efg HTTP/1.1\r\n\ + Connection: close\r\n\ + \r\n\r\n", + ); + + assert!( + h1.as_mut().poll(cx).is_ready(), + "connection close header should override keep-alive setting", + ); + + // polls: initial => manual => follow-up req => shutdown + assert_eq!(h1.poll_count, 4); + + if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() { + // connection closed + assert!(inner.flags.contains(Flags::SHUTDOWN)); + assert!(!inner.flags.contains(Flags::WRITE_DISCONNECT)); + } + + let mut res = buf.take_write_buf().to_vec(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 4\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /efg\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", String::from_utf8_lossy(res), String::from_utf8_lossy(exp) ); diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 529197736..0d4d342ec 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -212,6 +212,11 @@ impl TestBuffer { RefMut::map(self.write_buf.borrow_mut(), |b| b.as_mut()) } + #[allow(dead_code)] + pub(crate) fn take_write_buf(&self) -> Bytes { + self.write_buf.borrow_mut().split().freeze() + } + /// Add data to read buffer. pub fn extend_read_buf>(&mut self, data: T) { self.read_buf.borrow_mut().extend_from_slice(data.as_ref())