diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 13055f08..5b790469 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -89,16 +89,16 @@ pin_project! { U::Error: fmt::Display, { #[pin] - inner: DispatcherState, + pub(super) inner: DispatcherState, // used in tests - poll_count: u64, + pub(super) poll_count: u64, } } pin_project! { #[project = DispatcherStateProj] - enum DispatcherState + pub(super) enum DispatcherState where S: Service, S::Error: Into>, @@ -118,7 +118,7 @@ pin_project! { pin_project! { #[project = InnerDispatcherProj] - struct InnerDispatcher + pub(super) struct InnerDispatcher where S: Service, S::Error: Into>, @@ -132,7 +132,7 @@ pin_project! { U::Error: fmt::Display, { flow: Rc>, - flags: Flags, + pub(super) flags: Flags, peer_addr: Option, conn_data: Option>, error: Option, @@ -146,7 +146,7 @@ pin_project! { #[pin] ka_timer: Option, - io: Option, + pub(super) io: Option, read_buf: BytesMut, write_buf: BytesMut, codec: Codec, @@ -1039,397 +1039,3 @@ where } } } - -#[cfg(test)] -mod tests { - use std::str; - - use actix_service::fn_service; - use actix_utils::future::{ready, Ready}; - use bytes::Bytes; - use futures_util::future::lazy; - - use super::*; - use crate::{ - error::Error, - h1::{ExpectHandler, UpgradeHandler}, - test::{TestBuffer, TestSeqBuffer}, - HttpMessage, KeepAlive, Method, - }; - - fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { - haystack[from..] - .windows(needle.len()) - .position(|window| window == needle) - } - - fn stabilize_date_header(payload: &mut [u8]) { - let mut from = 0; - - while let Some(pos) = find_slice(payload, b"date", from) { - payload[(from + pos)..(from + pos + 35)] - .copy_from_slice(b"date: Thu, 01 Jan 1970 12:34:56 UTC"); - from += 35; - } - } - - fn ok_service( - ) -> impl Service, Error = Error> { - fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok()))) - } - - fn echo_path_service( - ) -> impl Service, Error = Error> { - fn_service(|req: Request| { - let path = req.path().as_bytes(); - ready(Ok::<_, Error>( - Response::ok().set_body(Bytes::copy_from_slice(path)), - )) - }) - } - - fn echo_payload_service() -> impl Service, Error = Error> - { - fn_service(|mut req: Request| { - Box::pin(async move { - use futures_util::stream::StreamExt as _; - - let mut pl = req.take_payload(); - let mut body = BytesMut::new(); - while let Some(chunk) = pl.next().await { - body.extend_from_slice(chunk.unwrap().chunk()) - } - - Ok::<_, Error>(Response::ok().set_body(body.freeze())) - }) - }) - } - - #[actix_rt::test] - async fn test_req_parse_err() { - lazy(|cx| { - let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); - - let services = HttpFlow::new(ok_service(), ExpectHandler, None); - - let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( - buf, - services, - ServiceConfig::default(), - None, - OnConnectData::default(), - ); - - actix_rt::pin!(h1); - - match h1.as_mut().poll(cx) { - Poll::Pending => panic!(), - Poll::Ready(res) => assert!(res.is_err()), - } - - if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { - assert!(inner.flags.contains(Flags::READ_DISCONNECT)); - assert_eq!( - &inner.project().io.take().unwrap().write_buf[..26], - b"HTTP/1.1 400 Bad Request\r\n" - ); - } - }) - .await; - } - - #[actix_rt::test] - async fn test_pipelining() { - lazy(|cx| { - let buf = TestBuffer::new( - "\ - GET /abcd HTTP/1.1\r\n\r\n\ - GET /def HTTP/1.1\r\n\r\n\ - ", - ); - - let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); - - let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); - - let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( - buf, - services, - cfg, - None, - OnConnectData::default(), - ); - - actix_rt::pin!(h1); - - assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); - - match h1.as_mut().poll(cx) { - Poll::Pending => panic!("first poll should not be pending"), - Poll::Ready(res) => assert!(res.is_ok()), - } - - // polls: initial => shutdown - assert_eq!(h1.poll_count, 2); - - if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { - let res = &mut inner.project().io.take().unwrap().write_buf[..]; - stabilize_date_header(res); - - let exp = b"\ - HTTP/1.1 200 OK\r\n\ - content-length: 5\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ - /abcd\ - HTTP/1.1 200 OK\r\n\ - content-length: 4\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ - /def\ - "; - - assert_eq!(res.to_vec(), exp.to_vec()); - } - }) - .await; - - lazy(|cx| { - let buf = TestBuffer::new( - "\ - GET /abcd HTTP/1.1\r\n\r\n\ - GET /def HTTP/1\r\n\r\n\ - ", - ); - - let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); - - let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); - - let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( - buf, - services, - cfg, - None, - OnConnectData::default(), - ); - - actix_rt::pin!(h1); - - assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); - - match h1.as_mut().poll(cx) { - Poll::Pending => panic!("first poll should not be pending"), - Poll::Ready(res) => assert!(res.is_err()), - } - - // polls: initial => shutdown - assert_eq!(h1.poll_count, 1); - - if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { - let res = &mut inner.project().io.take().unwrap().write_buf[..]; - stabilize_date_header(res); - - let exp = b"\ - HTTP/1.1 200 OK\r\n\ - content-length: 5\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ - /abcd\ - HTTP/1.1 400 Bad Request\r\n\ - content-length: 0\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ - "; - - assert_eq!(res.to_vec(), exp.to_vec()); - } - }) - .await; - } - - #[actix_rt::test] - async fn test_expect() { - lazy(|cx| { - let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); - - let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None); - - let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( - buf.clone(), - services, - cfg, - None, - OnConnectData::default(), - ); - - buf.extend_read_buf( - "\ - POST /upload HTTP/1.1\r\n\ - Content-Length: 5\r\n\ - Expect: 100-continue\r\n\ - \r\n\ - ", - ); - - actix_rt::pin!(h1); - - assert!(h1.as_mut().poll(cx).is_pending()); - assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); - - // polls: manual - assert_eq!(h1.poll_count, 1); - eprintln!("poll count: {}", h1.poll_count); - - if let DispatcherState::Normal { ref inner } = h1.inner { - let io = inner.io.as_ref().unwrap(); - let res = &io.write_buf()[..]; - assert_eq!( - str::from_utf8(res).unwrap(), - "HTTP/1.1 100 Continue\r\n\r\n" - ); - } - - buf.extend_read_buf("12345"); - assert!(h1.as_mut().poll(cx).is_ready()); - - // polls: manual manual shutdown - assert_eq!(h1.poll_count, 3); - - if let DispatcherState::Normal { ref inner } = h1.inner { - let io = inner.io.as_ref().unwrap(); - let mut res = (&io.write_buf()[..]).to_owned(); - stabilize_date_header(&mut res); - - assert_eq!( - str::from_utf8(&res).unwrap(), - "\ - HTTP/1.1 100 Continue\r\n\ - \r\n\ - HTTP/1.1 200 OK\r\n\ - content-length: 5\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ - \r\n\ - 12345\ - " - ); - } - }) - .await; - } - - #[actix_rt::test] - async fn test_eager_expect() { - lazy(|cx| { - let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); - - let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); - - let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( - buf.clone(), - services, - cfg, - None, - OnConnectData::default(), - ); - - buf.extend_read_buf( - "\ - POST /upload HTTP/1.1\r\n\ - Content-Length: 5\r\n\ - Expect: 100-continue\r\n\ - \r\n\ - ", - ); - - actix_rt::pin!(h1); - - assert!(h1.as_mut().poll(cx).is_ready()); - assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); - - // polls: manual shutdown - assert_eq!(h1.poll_count, 2); - - if let DispatcherState::Normal { ref inner } = h1.inner { - let io = inner.io.as_ref().unwrap(); - let mut res = (&io.write_buf()[..]).to_owned(); - stabilize_date_header(&mut res); - - // Despite the content-length header and even though the request payload has not - // been sent, this test expects a complete service response since the payload - // is not used at all. The service passed to dispatcher is path echo and doesn't - // consume payload bytes. - assert_eq!( - str::from_utf8(&res).unwrap(), - "\ - HTTP/1.1 100 Continue\r\n\ - \r\n\ - HTTP/1.1 200 OK\r\n\ - content-length: 7\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ - \r\n\ - /upload\ - " - ); - } - }) - .await; - } - - #[actix_rt::test] - async fn test_upgrade() { - struct TestUpgrade; - - impl Service<(Request, Framed)> for TestUpgrade { - type Response = (); - type Error = Error; - type Future = Ready>; - - actix_service::always_ready!(); - - fn call(&self, (req, _framed): (Request, Framed)) -> Self::Future { - assert_eq!(req.method(), Method::GET); - assert!(req.upgrade()); - assert_eq!(req.headers().get("upgrade").unwrap(), "websocket"); - ready(Ok(())) - } - } - - lazy(|cx| { - let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); - - let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade)); - - let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new( - buf.clone(), - services, - cfg, - None, - OnConnectData::default(), - ); - - buf.extend_read_buf( - "\ - GET /ws HTTP/1.1\r\n\ - Connection: Upgrade\r\n\ - Upgrade: websocket\r\n\ - \r\n\ - ", - ); - - actix_rt::pin!(h1); - - assert!(h1.as_mut().poll(cx).is_ready()); - assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. })); - - // polls: manual shutdown - assert_eq!(h1.poll_count, 2); - }) - .await; - } -} diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs new file mode 100644 index 00000000..21da2422 --- /dev/null +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -0,0 +1,460 @@ +use std::{future::Future, str, task::Poll}; + +use actix_service::fn_service; +use actix_utils::future::{ready, Ready}; +use bytes::Bytes; +use futures_util::future::lazy; + +use actix_codec::Framed; +use actix_service::Service; +use bytes::{Buf, BytesMut}; + +use super::dispatcher::{Dispatcher, DispatcherState, DispatcherStateProj, Flags}; +use crate::{ + body::MessageBody, + config::ServiceConfig, + h1::{Codec, ExpectHandler, UpgradeHandler}, + service::HttpFlow, + test::{TestBuffer, TestSeqBuffer}, + Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, +}; + +fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { + memchr::memmem::find(&haystack[from..], needle) +} + +fn stabilize_date_header(payload: &mut [u8]) { + let mut from = 0; + while let Some(pos) = find_slice(payload, b"date", from) { + payload[(from + pos)..(from + pos + 35)] + .copy_from_slice(b"date: Thu, 01 Jan 1970 12:34:56 UTC"); + from += 35; + } +} + +fn ok_service() -> impl Service, Error = Error> { + fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok()))) +} + +fn echo_path_service( +) -> impl Service, Error = Error> { + fn_service(|req: Request| { + let path = req.path().as_bytes(); + ready(Ok::<_, Error>( + Response::ok().set_body(Bytes::copy_from_slice(path)), + )) + }) +} + +fn echo_payload_service() -> impl Service, Error = Error> { + fn_service(|mut req: Request| { + Box::pin(async move { + use futures_util::stream::StreamExt as _; + + let mut pl = req.take_payload(); + let mut body = BytesMut::new(); + while let Some(chunk) = pl.next().await { + body.extend_from_slice(chunk.unwrap().chunk()) + } + + Ok::<_, Error>(Response::ok().set_body(body.freeze())) + }) + }) +} + +#[actix_rt::test] +#[ignore] +async fn test_keep_alive() { + lazy(|cx| { + let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); + + let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None); + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + actix_rt::pin!(h1); + + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + assert!( + h1.as_mut().poll(cx).is_pending(), + "keep-alive should prevent poll from resolving" + ); + + // polls: initial + assert_eq!(h1.poll_count, 1); + + let mut res = buf.write_buf_slice_mut(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /abcd\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + }) + .await; +} + +#[actix_rt::test] +async fn test_req_parse_err() { + lazy(|cx| { + let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); + + let services = HttpFlow::new(ok_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + + actix_rt::pin!(h1); + + match h1.as_mut().poll(cx) { + Poll::Pending => panic!(), + Poll::Ready(res) => assert!(res.is_err()), + } + + if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { + assert!(inner.flags.contains(Flags::READ_DISCONNECT)); + assert_eq!( + &buf.write_buf_slice()[..26], + b"HTTP/1.1 400 Bad Request\r\n" + ); + } + }) + .await; +} + +#[actix_rt::test] +async fn pipelining_ok_then_ok() { + lazy(|cx| { + let buf = TestBuffer::new( + "\ + GET /abcd HTTP/1.1\r\n\r\n\ + GET /def HTTP/1.1\r\n\r\n\ + ", + ); + + let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + + actix_rt::pin!(h1); + + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + match h1.as_mut().poll(cx) { + Poll::Pending => panic!("first poll should not be pending"), + Poll::Ready(res) => assert!(res.is_ok()), + } + + // polls: initial => shutdown + assert_eq!(h1.poll_count, 2); + + let mut res = buf.write_buf_slice_mut(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /abcd\ + HTTP/1.1 200 OK\r\n\ + content-length: 4\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /def\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + }) + .await; +} + +#[actix_rt::test] +async fn pipelining_ok_then_bad() { + lazy(|cx| { + let buf = TestBuffer::new( + "\ + GET /abcd HTTP/1.1\r\n\r\n\ + GET /def HTTP/1\r\n\r\n\ + ", + ); + + let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + + actix_rt::pin!(h1); + + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + match h1.as_mut().poll(cx) { + Poll::Pending => panic!("first poll should not be pending"), + Poll::Ready(res) => assert!(res.is_err()), + } + + // polls: initial => shutdown + assert_eq!(h1.poll_count, 1); + + let mut res = buf.write_buf_slice_mut(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /abcd\ + HTTP/1.1 400 Bad Request\r\n\ + content-length: 0\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + }) + .await; +} + +#[actix_rt::test] +async fn test_expect() { + lazy(|cx| { + let mut buf = TestSeqBuffer::empty(); + let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + + let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + + buf.extend_read_buf( + "\ + POST /upload HTTP/1.1\r\n\ + Content-Length: 5\r\n\ + Expect: 100-continue\r\n\ + \r\n\ + ", + ); + + actix_rt::pin!(h1); + + assert!(h1.as_mut().poll(cx).is_pending()); + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + // polls: manual + assert_eq!(h1.poll_count, 1); + eprintln!("poll count: {}", h1.poll_count); + + if let DispatcherState::Normal { ref inner } = h1.inner { + let io = inner.io.as_ref().unwrap(); + let res = &io.write_buf()[..]; + assert_eq!( + str::from_utf8(res).unwrap(), + "HTTP/1.1 100 Continue\r\n\r\n" + ); + } + + buf.extend_read_buf("12345"); + assert!(h1.as_mut().poll(cx).is_ready()); + + // polls: manual manual shutdown + assert_eq!(h1.poll_count, 3); + + if let DispatcherState::Normal { ref inner } = h1.inner { + let io = inner.io.as_ref().unwrap(); + let mut res = (&io.write_buf()[..]).to_owned(); + stabilize_date_header(&mut res); + + assert_eq!( + str::from_utf8(&res).unwrap(), + "\ + HTTP/1.1 100 Continue\r\n\ + \r\n\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ + \r\n\ + 12345\ + " + ); + } + }) + .await; +} + +#[actix_rt::test] +async fn test_eager_expect() { + lazy(|cx| { + let mut buf = TestSeqBuffer::empty(); + let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + + buf.extend_read_buf( + "\ + POST /upload HTTP/1.1\r\n\ + Content-Length: 5\r\n\ + Expect: 100-continue\r\n\ + \r\n\ + ", + ); + + actix_rt::pin!(h1); + + assert!(h1.as_mut().poll(cx).is_ready()); + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + // polls: manual shutdown + assert_eq!(h1.poll_count, 2); + + if let DispatcherState::Normal { ref inner } = h1.inner { + let io = inner.io.as_ref().unwrap(); + let mut res = (&io.write_buf()[..]).to_owned(); + stabilize_date_header(&mut res); + + // Despite the content-length header and even though the request payload has not + // been sent, this test expects a complete service response since the payload + // is not used at all. The service passed to dispatcher is path echo and doesn't + // consume payload bytes. + assert_eq!( + str::from_utf8(&res).unwrap(), + "\ + HTTP/1.1 100 Continue\r\n\ + \r\n\ + HTTP/1.1 200 OK\r\n\ + content-length: 7\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ + \r\n\ + /upload\ + " + ); + } + }) + .await; +} + +#[actix_rt::test] +async fn test_upgrade() { + struct TestUpgrade; + + impl Service<(Request, Framed)> for TestUpgrade { + type Response = (); + type Error = Error; + type Future = Ready>; + + actix_service::always_ready!(); + + fn call(&self, (req, _framed): (Request, Framed)) -> Self::Future { + assert_eq!(req.method(), Method::GET); + assert!(req.upgrade()); + assert_eq!(req.headers().get("upgrade").unwrap(), "websocket"); + ready(Ok(())) + } + } + + lazy(|cx| { + let mut buf = TestSeqBuffer::empty(); + let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + + let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade)); + + let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + + buf.extend_read_buf( + "\ + GET /ws HTTP/1.1\r\n\ + Connection: Upgrade\r\n\ + Upgrade: websocket\r\n\ + \r\n\ + ", + ); + + actix_rt::pin!(h1); + + assert!(h1.as_mut().poll(cx).is_ready()); + assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. })); + + // polls: manual shutdown + assert_eq!(h1.poll_count, 2); + }) + .await; +} diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 64586a2d..8c569165 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -7,6 +7,8 @@ mod client; mod codec; mod decoder; mod dispatcher; +#[cfg(test)] +mod dispatcher_tests; mod encoder; mod expect; mod payload; diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 1f76498e..52919773 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -1,7 +1,7 @@ //! Various testing helpers for use in internal and app tests. use std::{ - cell::{Ref, RefCell}, + cell::{Ref, RefCell, RefMut}, io::{self, Read, Write}, pin::Pin, rc::Rc, @@ -157,10 +157,11 @@ fn parts(parts: &mut Option) -> &mut Inner { } /// Async I/O test buffer. +#[derive(Debug)] pub struct TestBuffer { - pub read_buf: BytesMut, - pub write_buf: BytesMut, - pub err: Option, + pub read_buf: Rc>, + pub write_buf: Rc>, + pub err: Option>, } impl TestBuffer { @@ -170,34 +171,64 @@ impl TestBuffer { T: Into, { Self { - read_buf: data.into(), - write_buf: BytesMut::new(), + read_buf: Rc::new(RefCell::new(data.into())), + write_buf: Rc::new(RefCell::new(BytesMut::new())), err: None, } } + // intentionally not using Clone trait + #[allow(dead_code)] + pub(crate) fn clone(&self) -> Self { + Self { + read_buf: self.read_buf.clone(), + write_buf: self.write_buf.clone(), + err: self.err.clone(), + } + } + /// Create new empty `TestBuffer` instance. pub fn empty() -> Self { Self::new("") } + #[allow(dead_code)] + pub(crate) fn read_buf_slice(&self) -> Ref<'_, [u8]> { + Ref::map(self.read_buf.borrow(), |b| b.as_ref()) + } + + #[allow(dead_code)] + pub(crate) fn read_buf_slice_mut(&self) -> RefMut<'_, [u8]> { + RefMut::map(self.read_buf.borrow_mut(), |b| b.as_mut()) + } + + #[allow(dead_code)] + pub(crate) fn write_buf_slice(&self) -> Ref<'_, [u8]> { + Ref::map(self.write_buf.borrow(), |b| b.as_ref()) + } + + #[allow(dead_code)] + pub(crate) fn write_buf_slice_mut(&self) -> RefMut<'_, [u8]> { + RefMut::map(self.write_buf.borrow_mut(), |b| b.as_mut()) + } + /// Add data to read buffer. pub fn extend_read_buf>(&mut self, data: T) { - self.read_buf.extend_from_slice(data.as_ref()) + self.read_buf.borrow_mut().extend_from_slice(data.as_ref()) } } impl io::Read for TestBuffer { fn read(&mut self, dst: &mut [u8]) -> Result { - if self.read_buf.is_empty() { + if self.read_buf.borrow().is_empty() { if self.err.is_some() { - Err(self.err.take().unwrap()) + Err(Rc::try_unwrap(self.err.take().unwrap()).unwrap()) } else { Err(io::Error::new(io::ErrorKind::WouldBlock, "")) } } else { - let size = std::cmp::min(self.read_buf.len(), dst.len()); - let b = self.read_buf.split_to(size); + let size = std::cmp::min(self.read_buf.borrow().len(), dst.len()); + let b = self.read_buf.borrow_mut().split_to(size); dst[..size].copy_from_slice(&b); Ok(size) } @@ -206,7 +237,7 @@ impl io::Read for TestBuffer { impl io::Write for TestBuffer { fn write(&mut self, buf: &[u8]) -> io::Result { - self.write_buf.extend(buf); + RefCell::borrow_mut(&self.write_buf).extend(buf); Ok(buf.len()) }