From 3200de3f34b21f65bf84d7b04ba118f03d808f02 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 31 Jan 2022 17:30:34 +0000 Subject: [PATCH] fix request head timeout (#2611) --- CHANGES.md | 5 + actix-http/CHANGES.md | 26 + actix-http/Cargo.toml | 1 + actix-http/examples/bench.rs | 27 + actix-http/examples/echo.rs | 6 +- actix-http/examples/h2spec.rs | 25 + actix-http/examples/hello-world.rs | 6 +- actix-http/src/builder.rs | 93 +-- actix-http/src/config.rs | 317 ++------- actix-http/src/date.rs | 92 +++ actix-http/src/h1/client.rs | 49 +- actix-http/src/h1/codec.rs | 24 +- actix-http/src/h1/dispatcher.rs | 752 ++++++++++++++-------- actix-http/src/h1/dispatcher_tests.rs | 103 ++- actix-http/src/h1/encoder.rs | 7 +- actix-http/src/h1/mod.rs | 6 +- actix-http/src/h1/timer.rs | 80 +++ actix-http/src/h2/dispatcher.rs | 16 +- actix-http/src/h2/mod.rs | 18 +- actix-http/src/header/map.rs | 2 +- actix-http/src/header/shared/http_date.rs | 3 +- actix-http/src/keep_alive.rs | 83 +++ actix-http/src/lib.rs | 7 +- actix-http/src/notify_on_drop.rs | 49 ++ actix-http/src/requests/head.rs | 2 +- actix-http/src/responses/head.rs | 24 +- actix-http/src/service.rs | 28 +- actix-http/src/test.rs | 2 +- actix-http/tests/test_client.rs | 8 +- actix-http/tests/test_h2_timer.rs | 8 +- actix-http/tests/test_server.rs | 95 +-- actix-http/tests/test_ws.rs | 2 +- actix-test/CHANGES.md | 3 + actix-test/src/lib.rs | 30 +- awc/src/client/h1proto.rs | 6 +- src/server.rs | 51 +- tests/test_httpserver.rs | 6 +- tests/test_server.rs | 8 +- 38 files changed, 1303 insertions(+), 767 deletions(-) create mode 100644 actix-http/examples/bench.rs create mode 100644 actix-http/examples/h2spec.rs create mode 100644 actix-http/src/date.rs create mode 100644 actix-http/src/h1/timer.rs create mode 100644 actix-http/src/keep_alive.rs create mode 100644 actix-http/src/notify_on_drop.rs diff --git a/CHANGES.md b/CHANGES.md index 8c3997663..c00bc7198 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,10 +1,15 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed +- Rename `HttpServer::{client_timeout => client_request_timeout}`. [#2611] +- Rename `HttpServer::{client_shutdown => client_disconnect_timeout}`. [#2611] + ### Removed - `impl Future for HttpResponse`. [#2601] [#2601]: https://github.com/actix/actix-web/pull/2601 +[#2611]: https://github.com/actix/actix-web/pull/2611 ## 4.0.0-beta.21 - 2022-01-21 diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 6047a6bc5..a748bc43f 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,32 @@ # Changes ## Unreleased - 2021-xx-xx +### Added +- Implement `Default` for `KeepAlive`. [#2611] +- Implement `From` for `KeepAlive`. [#2611] +- Implement `From>` for `KeepAlive`. [#2611] +- Implement `Default` for `HttpServiceBuilder`. [#2611] + +### Changed +- Rename `ServiceConfig::{client_timer_expire => client_request_deadline}`. [#2611] +- Rename `ServiceConfig::{client_disconnect_timer => client_disconnect_deadline}`. [#2611] +- Deadline methods in `ServiceConfig` now return `std::time::Instant`s instead of Tokio's wrapper type. [#2611] +- Rename `h1::Codec::{keepalive => keep_alive}`. [#2611] +- Rename `h1::Codec::{keepalive_enabled => keep_alive_enabled}`. [#2611] +- Rename `h1::ClientCodec::{keepalive => keep_alive}`. [#2611] +- Rename `h1::ClientPayloadCodec::{keepalive => keep_alive}`. [#2611] +- `ServiceConfig::keep_alive` now returns a `KeepAlive`. [#2611] + +### Fixed +- HTTP/1.1 dispatcher correctly uses client request timeout. [#2611] + +### Removed +- `ServiceConfig::{client_timer, keep_alive_timer}`. [#2611] +- `impl From for KeepAlive`; use `Duration`s instead. [#2611] +- `impl From> for KeepAlive`; use `Duration`s instead. [#2611] +- `HttpServiceBuilder::new`; use `default` instead. [#2611] + +[#2611]: https://github.com/actix/actix-web/pull/2611 ## 3.0.0-beta.19 - 2022-01-21 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index e93d1b7af..11bfa7a1a 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -92,6 +92,7 @@ criterion = { version = "0.3", features = ["html_reports"] } env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } memchr = "2.4" +once_cell = "1.9" rcgen = "0.8" regex = "1.3" rustls-pemfile = "0.2" diff --git a/actix-http/examples/bench.rs b/actix-http/examples/bench.rs new file mode 100644 index 000000000..e41c0bb4f --- /dev/null +++ b/actix-http/examples/bench.rs @@ -0,0 +1,27 @@ +use std::{convert::Infallible, io, time::Duration}; + +use actix_http::{HttpService, Request, Response, StatusCode}; +use actix_server::Server; +use once_cell::sync::Lazy; + +static STR: Lazy = Lazy::new(|| "HELLO WORLD ".repeat(20)); + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + Server::build() + .bind("dispatcher-benchmark", ("127.0.0.1", 8080), || { + HttpService::build() + .client_request_timeout(Duration::from_secs(1)) + .finish(|_: Request| async move { + let mut res = Response::build(StatusCode::OK); + Ok::<_, Infallible>(res.body(&**STR)) + }) + .tcp() + })? + // limiting number of workers so that bench client is not sharing as many resources + .workers(4) + .run() + .await +} diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index f9188ed9f..58de64530 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{io, time::Duration}; use actix_http::{Error, HttpService, Request, Response, StatusCode}; use actix_server::Server; @@ -13,8 +13,8 @@ async fn main() -> io::Result<()> { Server::build() .bind("echo", ("127.0.0.1", 8080), || { HttpService::build() - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) // handles HTTP/1.1 and HTTP/2 .finish(|mut req: Request| async move { let mut body = BytesMut::new(); diff --git a/actix-http/examples/h2spec.rs b/actix-http/examples/h2spec.rs new file mode 100644 index 000000000..4ab426c6c --- /dev/null +++ b/actix-http/examples/h2spec.rs @@ -0,0 +1,25 @@ +use std::{convert::Infallible, io}; + +use actix_http::{HttpService, Request, Response, StatusCode}; +use actix_server::Server; +use once_cell::sync::Lazy; + +static STR: Lazy = Lazy::new(|| "HELLO WORLD ".repeat(100)); + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + Server::build() + .bind("h2spec", ("127.0.0.1", 8080), || { + HttpService::build() + .h2(|_: Request| async move { + let mut res = Response::build(StatusCode::OK); + Ok::<_, Infallible>(res.body(&**STR)) + }) + .tcp() + })? + .workers(4) + .run() + .await +} diff --git a/actix-http/examples/hello-world.rs b/actix-http/examples/hello-world.rs index a29903cc4..1a83d4d9c 100644 --- a/actix-http/examples/hello-world.rs +++ b/actix-http/examples/hello-world.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, io}; +use std::{convert::Infallible, io, time::Duration}; use actix_http::{ header::HeaderValue, HttpMessage, HttpService, Request, Response, StatusCode, @@ -12,8 +12,8 @@ async fn main() -> io::Result<()> { Server::build() .bind("hello-world", ("127.0.0.1", 8080), || { HttpService::build() - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) .on_connect_ext(|_, ext| { ext.insert(42u32); }) diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 408ee7924..9dd145ce1 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -1,25 +1,23 @@ -use std::{fmt, marker::PhantomData, net, rc::Rc}; +use std::{fmt, marker::PhantomData, net, rc::Rc, time::Duration}; use actix_codec::Framed; use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use crate::{ body::{BoxBody, MessageBody}, - config::{KeepAlive, ServiceConfig}, h1::{self, ExpectHandler, H1Service, UpgradeHandler}, h2::H2Service, service::HttpService, - ConnectCallback, Extensions, Request, Response, + ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfig, }; -/// A HTTP service builder +/// An HTTP service builder. /// -/// This type can be used to construct an instance of [`HttpService`] through a -/// builder-like pattern. +/// This type can construct an instance of [`HttpService`] through a builder-like pattern. pub struct HttpServiceBuilder { keep_alive: KeepAlive, - client_timeout: u64, - client_disconnect: u64, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, secure: bool, local_addr: Option, expect: X, @@ -28,22 +26,23 @@ pub struct HttpServiceBuilder { _phantom: PhantomData, } -impl HttpServiceBuilder +impl Default for HttpServiceBuilder where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, >::Future: 'static, { - /// Create instance of `ServiceConfigBuilder` - #[allow(clippy::new_without_default)] - pub fn new() -> Self { + fn default() -> Self { HttpServiceBuilder { - keep_alive: KeepAlive::Timeout(5), - client_timeout: 5000, - client_disconnect: 0, + // ServiceConfig parts (make sure defaults match) + keep_alive: KeepAlive::default(), + client_request_timeout: Duration::from_secs(5), + client_disconnect_timeout: Duration::ZERO, secure: false, local_addr: None, + + // dispatcher parts expect: ExpectHandler, upgrade: None, on_connect_ext: None, @@ -65,9 +64,11 @@ where U::Error: fmt::Display, U::InitError: fmt::Debug, { - /// Set server keep-alive setting. + /// Set connection keep-alive setting. /// - /// By default keep alive is set to a 5 seconds. + /// Applies to HTTP/1.1 keep-alive and HTTP/2 ping-pong. + /// + /// By default keep-alive is 5 seconds. pub fn keep_alive>(mut self, val: W) -> Self { self.keep_alive = val.into(); self @@ -85,33 +86,45 @@ where self } - /// Set server client timeout in milliseconds for first request. + /// Set client request timeout (for first request). /// - /// Defines a timeout for reading client request header. If a client does not transmit - /// the entire set headers within this time, the request is terminated with - /// the 408 (Request Time-out) error. + /// Defines a timeout for reading client request header. If the client does not transmit the + /// request head within this duration, the connection is terminated with a `408 Request Timeout` + /// response error. /// - /// To disable timeout set value to 0. + /// A duration of zero disables the timeout. /// - /// By default client timeout is set to 5000 milliseconds. - pub fn client_timeout(mut self, val: u64) -> Self { - self.client_timeout = val; + /// By default, the client timeout is 5 seconds. + pub fn client_request_timeout(mut self, dur: Duration) -> Self { + self.client_request_timeout = dur; self } - /// Set server connection disconnect timeout in milliseconds. + #[doc(hidden)] + #[deprecated(since = "3.0.0", note = "Renamed to `client_request_timeout`.")] + pub fn client_timeout(self, dur: Duration) -> Self { + self.client_request_timeout(dur) + } + + /// Set client connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the request get dropped. This timeout affects secure connections. /// - /// To disable timeout set value to 0. + /// A duration of zero disables the timeout. /// - /// By default disconnect timeout is set to 0. - pub fn client_disconnect(mut self, val: u64) -> Self { - self.client_disconnect = val; + /// By default, the disconnect timeout is disabled. + pub fn client_disconnect_timeout(mut self, dur: Duration) -> Self { + self.client_disconnect_timeout = dur; self } + #[doc(hidden)] + #[deprecated(since = "3.0.0", note = "Renamed to `client_disconnect_timeout`.")] + pub fn client_disconnect(self, dur: Duration) -> Self { + self.client_disconnect_timeout(dur) + } + /// Provide service for `EXPECT: 100-Continue` support. /// /// Service get called with request that contains `EXPECT` header. @@ -126,8 +139,8 @@ where { HttpServiceBuilder { keep_alive: self.keep_alive, - client_timeout: self.client_timeout, - client_disconnect: self.client_disconnect, + client_request_timeout: self.client_request_timeout, + client_disconnect_timeout: self.client_disconnect_timeout, secure: self.secure, local_addr: self.local_addr, expect: expect.into_factory(), @@ -150,8 +163,8 @@ where { HttpServiceBuilder { keep_alive: self.keep_alive, - client_timeout: self.client_timeout, - client_disconnect: self.client_disconnect, + client_request_timeout: self.client_request_timeout, + client_disconnect_timeout: self.client_disconnect_timeout, secure: self.secure, local_addr: self.local_addr, expect: self.expect, @@ -185,8 +198,8 @@ where { let cfg = ServiceConfig::new( self.keep_alive, - self.client_timeout, - self.client_disconnect, + self.client_request_timeout, + self.client_disconnect_timeout, self.secure, self.local_addr, ); @@ -209,8 +222,8 @@ where { let cfg = ServiceConfig::new( self.keep_alive, - self.client_timeout, - self.client_disconnect, + self.client_request_timeout, + self.client_disconnect_timeout, self.secure, self.local_addr, ); @@ -230,8 +243,8 @@ where { let cfg = ServiceConfig::new( self.keep_alive, - self.client_timeout, - self.client_disconnect, + self.client_request_timeout, + self.client_disconnect_timeout, self.secure, self.local_addr, ); diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index b6d5a7d51..aa05d6aba 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -1,71 +1,36 @@ use std::{ - cell::Cell, - fmt::{self, Write}, net, rc::Rc, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; -use actix_rt::{ - task::JoinHandle, - time::{interval, sleep_until, Instant, Sleep}, -}; use bytes::BytesMut; -/// "Sun, 06 Nov 1994 08:49:37 GMT".len() -pub(crate) const DATE_VALUE_LENGTH: usize = 29; +use crate::{date::DateService, KeepAlive}; -#[derive(Debug, PartialEq, Clone, Copy)] -/// Server keep-alive setting -pub enum KeepAlive { - /// Keep alive in seconds - Timeout(usize), - - /// Rely on OS to shutdown tcp connection - Os, - - /// Disabled - Disabled, -} - -impl From for KeepAlive { - fn from(keepalive: usize) -> Self { - KeepAlive::Timeout(keepalive) - } -} - -impl From> for KeepAlive { - fn from(keepalive: Option) -> Self { - if let Some(keepalive) = keepalive { - KeepAlive::Timeout(keepalive) - } else { - KeepAlive::Disabled - } - } -} - -/// Http service configuration +/// HTTP service configuration. +#[derive(Debug, Clone)] pub struct ServiceConfig(Rc); +#[derive(Debug)] struct Inner { - keep_alive: Option, - client_timeout: u64, - client_disconnect: u64, - ka_enabled: bool, + keep_alive: KeepAlive, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, secure: bool, local_addr: Option, date_service: DateService, } -impl Clone for ServiceConfig { - fn clone(&self) -> Self { - ServiceConfig(self.0.clone()) - } -} - impl Default for ServiceConfig { fn default() -> Self { - Self::new(KeepAlive::Timeout(5), 0, 0, false, None) + Self::new( + KeepAlive::default(), + Duration::from_secs(5), + Duration::ZERO, + false, + None, + ) } } @@ -73,34 +38,22 @@ impl ServiceConfig { /// Create instance of `ServiceConfig` pub fn new( keep_alive: KeepAlive, - client_timeout: u64, - client_disconnect: u64, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, secure: bool, local_addr: Option, ) -> ServiceConfig { - let (keep_alive, ka_enabled) = match keep_alive { - KeepAlive::Timeout(val) => (val as u64, true), - KeepAlive::Os => (0, true), - KeepAlive::Disabled => (0, false), - }; - let keep_alive = if ka_enabled && keep_alive > 0 { - Some(Duration::from_secs(keep_alive)) - } else { - None - }; - ServiceConfig(Rc::new(Inner { - keep_alive, - ka_enabled, - client_timeout, - client_disconnect, + keep_alive: keep_alive.normalize(), + client_request_timeout, + client_disconnect_timeout, secure, local_addr, date_service: DateService::new(), })) } - /// Returns true if connection is secure (HTTPS) + /// Returns `true` if connection is secure (i.e., using TLS / HTTPS). #[inline] pub fn secure(&self) -> bool { self.0.secure @@ -114,239 +67,91 @@ impl ServiceConfig { self.0.local_addr } - /// Keep alive duration if configured. + /// Connection keep-alive setting. #[inline] - pub fn keep_alive(&self) -> Option { + pub fn keep_alive(&self) -> KeepAlive { self.0.keep_alive } - /// Return state of connection keep-alive functionality - #[inline] - pub fn keep_alive_enabled(&self) -> bool { - self.0.ka_enabled - } - - /// Client timeout for first request. - #[inline] - pub fn client_timer(&self) -> Option { - let delay_time = self.0.client_timeout; - if delay_time != 0 { - Some(sleep_until(self.now() + Duration::from_millis(delay_time))) - } else { - None + /// Creates a time object representing the deadline for this connection's keep-alive period, if + /// enabled. + /// + /// When [`KeepAlive::Os`] or [`KeepAlive::Disabled`] is set, this will return `None`. + pub fn keep_alive_deadline(&self) -> Option { + match self.keep_alive() { + KeepAlive::Timeout(dur) => Some(self.now() + dur), + KeepAlive::Os => None, + KeepAlive::Disabled => None, } } - /// Client timeout for first request. - pub fn client_timer_expire(&self) -> Option { - let delay = self.0.client_timeout; - if delay != 0 { - Some(self.now() + Duration::from_millis(delay)) - } else { - None - } + /// Creates a time object representing the deadline for the client to finish sending the head of + /// its first request. + /// + /// Returns `None` if this `ServiceConfig was` constructed with `client_request_timeout: 0`. + pub fn client_request_deadline(&self) -> Option { + let timeout = self.0.client_request_timeout; + (timeout != Duration::ZERO).then(|| self.now() + timeout) } - /// Client disconnect timer - pub fn client_disconnect_timer(&self) -> Option { - let delay = self.0.client_disconnect; - if delay != 0 { - Some(self.now() + Duration::from_millis(delay)) - } else { - None - } + /// Creates a time object representing the deadline for the client to disconnect. + pub fn client_disconnect_deadline(&self) -> Option { + let timeout = self.0.client_disconnect_timeout; + (timeout != Duration::ZERO).then(|| self.now() + timeout) } - /// Return keep-alive timer delay is configured. - #[inline] - pub fn keep_alive_timer(&self) -> Option { - self.keep_alive().map(|ka| sleep_until(self.now() + ka)) - } - - /// Keep-alive expire time - pub fn keep_alive_expire(&self) -> Option { - self.keep_alive().map(|ka| self.now() + ka) - } - - #[inline] pub(crate) fn now(&self) -> Instant { self.0.date_service.now() } - #[doc(hidden)] - pub fn set_date(&self, dst: &mut BytesMut, camel_case: bool) { + pub(crate) fn write_date_header(&self, dst: &mut BytesMut, camel_case: bool) { let mut buf: [u8; 39] = [0; 39]; buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " }); self.0 .date_service - .set_date(|date| buf[6..35].copy_from_slice(&date.bytes)); + .with_date(|date| buf[6..35].copy_from_slice(&date.bytes)); buf[35..].copy_from_slice(b"\r\n\r\n"); dst.extend_from_slice(&buf); } - pub(crate) fn set_date_header(&self, dst: &mut BytesMut) { + pub(crate) fn write_date_header_value(&self, dst: &mut BytesMut) { self.0 .date_service - .set_date(|date| dst.extend_from_slice(&date.bytes)); - } -} - -#[derive(Copy, Clone)] -struct Date { - bytes: [u8; DATE_VALUE_LENGTH], - pos: usize, -} - -impl Date { - fn new() -> Date { - let mut date = Date { - bytes: [0; DATE_VALUE_LENGTH], - pos: 0, - }; - date.update(); - date - } - - fn update(&mut self) { - self.pos = 0; - write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap(); - } -} - -impl fmt::Write for Date { - fn write_str(&mut self, s: &str) -> fmt::Result { - let len = s.len(); - self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); - self.pos += len; - Ok(()) - } -} - -/// Service for update Date and Instant periodically at 500 millis interval. -struct DateService { - current: Rc>, - handle: JoinHandle<()>, -} - -impl Drop for DateService { - fn drop(&mut self) { - // stop the timer update async task on drop. - self.handle.abort(); - } -} - -impl DateService { - fn new() -> Self { - // shared date and timer for DateService and update async task. - let current = Rc::new(Cell::new((Date::new(), Instant::now()))); - let current_clone = Rc::clone(¤t); - // spawn an async task sleep for 500 milli and update current date/timer in a loop. - // handle is used to stop the task on DateService drop. - let handle = actix_rt::spawn(async move { - #[cfg(test)] - let _notify = notify_on_drop::NotifyOnDrop::new(); - - let mut interval = interval(Duration::from_millis(500)); - loop { - let now = interval.tick().await; - let date = Date::new(); - current_clone.set((date, now)); - } - }); - - DateService { current, handle } - } - - fn now(&self) -> Instant { - self.current.get().1 - } - - fn set_date(&self, mut f: F) { - f(&self.current.get().0); - } -} - -// TODO: move to a util module for testing all spawn handle drop style tasks. -/// Test Module for checking the drop state of certain async tasks that are spawned -/// with `actix_rt::spawn` -/// -/// The target task must explicitly generate `NotifyOnDrop` when spawn the task -#[cfg(test)] -mod notify_on_drop { - use std::cell::RefCell; - - thread_local! { - static NOTIFY_DROPPED: RefCell> = RefCell::new(None); - } - - /// Check if the spawned task is dropped. - /// - /// # Panics - /// Panics when there was no `NotifyOnDrop` instance on current thread. - pub(crate) fn is_dropped() -> bool { - NOTIFY_DROPPED.with(|bool| { - bool.borrow() - .expect("No NotifyOnDrop existed on current thread") - }) - } - - pub(crate) struct NotifyOnDrop; - - impl NotifyOnDrop { - /// # Panic: - /// - /// When construct multiple instances on any given thread. - pub(crate) fn new() -> Self { - NOTIFY_DROPPED.with(|bool| { - let mut bool = bool.borrow_mut(); - if bool.is_some() { - panic!("NotifyOnDrop existed on current thread"); - } else { - *bool = Some(false); - } - }); - - NotifyOnDrop - } - } - - impl Drop for NotifyOnDrop { - fn drop(&mut self) { - NOTIFY_DROPPED.with(|bool| { - if let Some(b) = bool.borrow_mut().as_mut() { - *b = true; - } - }); - } + .with_date(|date| dst.extend_from_slice(&date.bytes)); } } #[cfg(test)] mod tests { use super::*; + use crate::{date::DATE_VALUE_LENGTH, notify_on_drop}; - use actix_rt::{task::yield_now, time::sleep}; + use actix_rt::{ + task::yield_now, + time::{sleep, sleep_until}, + }; use memchr::memmem; #[actix_rt::test] async fn test_date_service_update() { - let settings = ServiceConfig::new(KeepAlive::Os, 0, 0, false, None); + let settings = + ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None); yield_now().await; let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf1, false); + settings.write_date_header(&mut buf1, false); let now1 = settings.now(); - sleep_until(Instant::now() + Duration::from_secs(2)).await; + sleep_until((Instant::now() + Duration::from_secs(2)).into()).await; yield_now().await; let now2 = settings.now(); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf2, false); + settings.write_date_header(&mut buf2, false); assert_ne!(now1, now2); @@ -402,10 +207,10 @@ mod tests { let settings = ServiceConfig::default(); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf1, false); + settings.write_date_header(&mut buf1, false); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf2, false); + settings.write_date_header(&mut buf2, false); assert_eq!(buf1, buf2); } @@ -415,11 +220,11 @@ mod tests { let settings = ServiceConfig::default(); let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf, false); + settings.write_date_header(&mut buf, false); assert!(memmem::find(&buf, b"date:").is_some()); let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf, true); + settings.write_date_header(&mut buf, true); assert!(memmem::find(&buf, b"Date:").is_some()); } } diff --git a/actix-http/src/date.rs b/actix-http/src/date.rs new file mode 100644 index 000000000..1358bbd8c --- /dev/null +++ b/actix-http/src/date.rs @@ -0,0 +1,92 @@ +use std::{ + cell::Cell, + fmt::{self, Write}, + rc::Rc, + time::{Duration, Instant, SystemTime}, +}; + +use actix_rt::{task::JoinHandle, time::interval}; + +/// "Thu, 01 Jan 1970 00:00:00 GMT".len() +pub(crate) const DATE_VALUE_LENGTH: usize = 29; + +#[derive(Clone, Copy)] +pub(crate) struct Date { + pub(crate) bytes: [u8; DATE_VALUE_LENGTH], + pos: usize, +} + +impl Date { + fn new() -> Date { + let mut date = Date { + bytes: [0; DATE_VALUE_LENGTH], + pos: 0, + }; + date.update(); + date + } + + fn update(&mut self) { + self.pos = 0; + write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap(); + } +} + +impl fmt::Write for Date { + fn write_str(&mut self, s: &str) -> fmt::Result { + let len = s.len(); + self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); + self.pos += len; + Ok(()) + } +} + +/// Service for update Date and Instant periodically at 500 millis interval. +pub(crate) struct DateService { + current: Rc>, + handle: JoinHandle<()>, +} + +impl DateService { + pub(crate) fn new() -> Self { + // shared date and timer for DateService and update async task. + let current = Rc::new(Cell::new((Date::new(), Instant::now()))); + let current_clone = Rc::clone(¤t); + // spawn an async task sleep for 500 millis and update current date/timer in a loop. + // handle is used to stop the task on DateService drop. + let handle = actix_rt::spawn(async move { + #[cfg(test)] + let _notify = crate::notify_on_drop::NotifyOnDrop::new(); + + let mut interval = interval(Duration::from_millis(500)); + loop { + let now = interval.tick().await; + let date = Date::new(); + current_clone.set((date, now.into_std())); + } + }); + + DateService { current, handle } + } + + pub(crate) fn now(&self) -> Instant { + self.current.get().1 + } + + pub(crate) fn with_date(&self, mut f: F) { + f(&self.current.get().0); + } +} + +impl fmt::Debug for DateService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DateService").finish_non_exhaustive() + } +} + +impl Drop for DateService { + fn drop(&mut self) { + // stop the timer update async task on drop. + self.handle.abort(); + } +} diff --git a/actix-http/src/h1/client.rs b/actix-http/src/h1/client.rs index 9bd896ae0..4e0ae8f48 100644 --- a/actix-http/src/h1/client.rs +++ b/actix-http/src/h1/client.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{fmt, io}; use actix_codec::{Decoder, Encoder}; use bitflags::bitflags; @@ -17,9 +17,9 @@ use crate::{ bitflags! { struct Flags: u8 { - const HEAD = 0b0000_0001; - const KEEPALIVE_ENABLED = 0b0000_1000; - const STREAM = 0b0001_0000; + const HEAD = 0b0000_0001; + const KEEP_ALIVE_ENABLED = 0b0000_1000; + const STREAM = 0b0001_0000; } } @@ -38,7 +38,7 @@ struct ClientCodecInner { decoder: decoder::MessageDecoder, payload: Option, version: Version, - ctype: ConnectionType, + conn_type: ConnectionType, // encoder part flags: Flags, @@ -51,23 +51,32 @@ impl Default for ClientCodec { } } +impl fmt::Debug for ClientCodec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("h1::ClientCodec") + .field("flags", &self.inner.flags) + .finish_non_exhaustive() + } +} + impl ClientCodec { /// Create HTTP/1 codec. /// /// `keepalive_enabled` how response `connection` header get generated. pub fn new(config: ServiceConfig) -> Self { - let flags = if config.keep_alive_enabled() { - Flags::KEEPALIVE_ENABLED + let flags = if config.keep_alive().enabled() { + Flags::KEEP_ALIVE_ENABLED } else { Flags::empty() }; + ClientCodec { inner: ClientCodecInner { config, decoder: decoder::MessageDecoder::default(), payload: None, version: Version::HTTP_11, - ctype: ConnectionType::Close, + conn_type: ConnectionType::Close, flags, encoder: encoder::MessageEncoder::default(), @@ -77,12 +86,12 @@ impl ClientCodec { /// Check if request is upgrade pub fn upgrade(&self) -> bool { - self.inner.ctype == ConnectionType::Upgrade + self.inner.conn_type == ConnectionType::Upgrade } /// Check if last response is keep-alive - pub fn keepalive(&self) -> bool { - self.inner.ctype == ConnectionType::KeepAlive + pub fn keep_alive(&self) -> bool { + self.inner.conn_type == ConnectionType::KeepAlive } /// Check last request's message type @@ -104,8 +113,8 @@ impl ClientCodec { impl ClientPayloadCodec { /// Check if last response is keep-alive - pub fn keepalive(&self) -> bool { - self.inner.ctype == ConnectionType::KeepAlive + pub fn keep_alive(&self) -> bool { + self.inner.conn_type == ConnectionType::KeepAlive } /// Transform payload codec to a message codec @@ -122,12 +131,12 @@ impl Decoder for ClientCodec { debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set"); if let Some((req, payload)) = self.inner.decoder.decode(src)? { - if let Some(ctype) = req.conn_type() { + if let Some(conn_type) = req.conn_type() { // do not use peer's keep-alive - self.inner.ctype = if ctype == ConnectionType::KeepAlive { - self.inner.ctype + self.inner.conn_type = if conn_type == ConnectionType::KeepAlive { + self.inner.conn_type } else { - ctype + conn_type }; } @@ -192,9 +201,9 @@ impl Encoder> for ClientCodec { .set(Flags::HEAD, head.as_ref().method == Method::HEAD); // connection status - inner.ctype = match head.as_ref().connection_type() { + inner.conn_type = match head.as_ref().connection_type() { ConnectionType::KeepAlive => { - if inner.flags.contains(Flags::KEEPALIVE_ENABLED) { + if inner.flags.contains(Flags::KEEP_ALIVE_ENABLED) { ConnectionType::KeepAlive } else { ConnectionType::Close @@ -211,7 +220,7 @@ impl Encoder> for ClientCodec { false, inner.version, length, - inner.ctype, + inner.conn_type, &inner.config, )?; } diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 9a8907579..df74bcc42 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -15,9 +15,9 @@ use crate::{ bitflags! { struct Flags: u8 { - const HEAD = 0b0000_0001; - const KEEPALIVE_ENABLED = 0b0000_0010; - const STREAM = 0b0000_0100; + const HEAD = 0b0000_0001; + const KEEP_ALIVE_ENABLED = 0b0000_0010; + const STREAM = 0b0000_0100; } } @@ -42,7 +42,9 @@ impl Default for Codec { impl fmt::Debug for Codec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "h1::Codec({:?})", self.flags) + f.debug_struct("h1::Codec") + .field("flags", &self.flags) + .finish_non_exhaustive() } } @@ -51,8 +53,8 @@ impl Codec { /// /// `keepalive_enabled` how response `connection` header get generated. pub fn new(config: ServiceConfig) -> Self { - let flags = if config.keep_alive_enabled() { - Flags::KEEPALIVE_ENABLED + let flags = if config.keep_alive().enabled() { + Flags::KEEP_ALIVE_ENABLED } else { Flags::empty() }; @@ -76,14 +78,14 @@ impl Codec { /// Check if last response is keep-alive. #[inline] - pub fn keepalive(&self) -> bool { + pub fn keep_alive(&self) -> bool { self.conn_type == ConnectionType::KeepAlive } /// Check if keep-alive enabled on server level. #[inline] - pub fn keepalive_enabled(&self) -> bool { - self.flags.contains(Flags::KEEPALIVE_ENABLED) + pub fn keep_alive_enabled(&self) -> bool { + self.flags.contains(Flags::KEEP_ALIVE_ENABLED) } /// Check last request's message type. @@ -124,7 +126,7 @@ impl Decoder for Codec { self.version = head.version; self.conn_type = head.connection_type(); if self.conn_type == ConnectionType::KeepAlive - && !self.flags.contains(Flags::KEEPALIVE_ENABLED) + && !self.flags.contains(Flags::KEEP_ALIVE_ENABLED) { self.conn_type = ConnectionType::Close } @@ -179,9 +181,11 @@ impl Encoder, BodySize)>> for Codec { &self.config, )?; } + Message::Chunk(Some(bytes)) => { self.encoder.encode_chunk(bytes.as_ref(), dst)?; } + Message::Chunk(None) => { self.encoder.encode_eof(dst)?; } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 5b790469f..3f327171d 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -8,13 +8,12 @@ use std::{ task::{Context, Poll}, }; -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; -use actix_rt::time::{sleep_until, Instant, Sleep}; +use actix_codec::{AsyncRead, AsyncWrite, Decoder as _, Encoder as _, Framed, FramedParts}; +use actix_rt::time::sleep_until; use actix_service::Service; use bitflags::bitflags; use bytes::{Buf, BytesMut}; use futures_core::ready; -use log::{error, trace}; use pin_project_lite::pin_project; use crate::{ @@ -29,6 +28,7 @@ use super::{ codec::Codec, decoder::MAX_BUFFER_SIZE, payload::{Payload, PayloadSender, PayloadStatus}, + timer::TimerState, Message, MessageType, }; @@ -38,11 +38,23 @@ const MAX_PIPELINED_MESSAGES: usize = 16; bitflags! { pub struct Flags: u8 { - const STARTED = 0b0000_0001; - const KEEPALIVE = 0b0000_0010; - const SHUTDOWN = 0b0000_0100; - const READ_DISCONNECT = 0b0000_1000; - const WRITE_DISCONNECT = 0b0001_0000; + /// Set when stream is read for first time. + const STARTED = 0b0000_0001; + + /// Set when full request-response cycle has occurred. + const FINISHED = 0b0000_0010; + + /// Set if connection is in keep-alive (inactive) state. + const KEEP_ALIVE = 0b0000_0100; + + /// Set if in shutdown procedure. + const SHUTDOWN = 0b0000_1000; + + /// Set if read-half is disconnected. + const READ_DISCONNECT = 0b0001_0000; + + /// Set if write-half is disconnected. + const WRITE_DISCONNECT = 0b0010_0000; } } @@ -135,6 +147,7 @@ pin_project! { pub(super) flags: Flags, peer_addr: Option, conn_data: Option>, + config: ServiceConfig, error: Option, #[pin] @@ -142,9 +155,9 @@ pin_project! { payload: Option, messages: VecDeque, - ka_expire: Instant, - #[pin] - ka_timer: Option, + head_timer: TimerState, + ka_timer: TimerState, + shutdown_timer: TimerState, pub(super) io: Option, read_buf: BytesMut, @@ -165,7 +178,6 @@ pin_project! { where S: Service, X: Service, - B: MessageBody, { None, @@ -179,16 +191,40 @@ pin_project! { impl State where S: Service, - X: Service, - B: MessageBody, { - fn is_empty(&self) -> bool { + fn is_none(&self) -> bool { matches!(self, State::None) } } +impl fmt::Debug for State +where + S: Service, + X: Service, + B: MessageBody, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::None => write!(f, "State::None"), + Self::ExpectCall { .. } => { + f.debug_struct("State::ExpectCall").finish_non_exhaustive() + } + Self::ServiceCall { .. } => { + f.debug_struct("State::ServiceCall").finish_non_exhaustive() + } + Self::SendPayload { .. } => { + f.debug_struct("State::SendPayload").finish_non_exhaustive() + } + Self::SendErrorPayload { .. } => f + .debug_struct("State::SendErrorPayload") + .finish_non_exhaustive(), + } + } +} + +#[derive(Debug)] enum PollResponse { Upgrade(Request), DoNothing, @@ -219,33 +255,25 @@ where peer_addr: Option, conn_data: OnConnectData, ) -> Self { - let flags = if config.keep_alive_enabled() { - Flags::KEEPALIVE - } else { - Flags::empty() - }; - - // keep-alive timer - let (ka_expire, ka_timer) = match config.keep_alive_timer() { - Some(delay) => (delay.deadline(), Some(delay)), - None => (config.now(), None), - }; - Dispatcher { inner: DispatcherState::Normal { inner: InnerDispatcher { flow, - flags, + flags: Flags::empty(), peer_addr, conn_data: conn_data.0.map(Rc::new), + config: config.clone(), error: None, state: State::None, payload: None, messages: VecDeque::new(), - ka_expire, - ka_timer, + head_timer: TimerState::new(config.client_request_deadline().is_some()), + ka_timer: TimerState::new(config.keep_alive().enabled()), + shutdown_timer: TimerState::new( + config.client_disconnect_deadline().is_some(), + ), io: Some(io), read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), @@ -286,11 +314,12 @@ where } } - // if checked is set to true, delay disconnect until all tasks have finished. fn client_disconnected(self: Pin<&mut Self>) { let this = self.project(); + this.flags .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } @@ -306,9 +335,12 @@ where while written < len { match io.as_mut().poll_write(cx, &write_buf[written..])? { Poll::Ready(0) => { - return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))) + log::error!("write zero; closing"); + return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))); } + Poll::Ready(n) => written += n, + Poll::Pending => { write_buf.advance(written); return Poll::Pending; @@ -316,59 +348,70 @@ where } } - // everything has written to io. clear buffer. + // everything has written to I/O; clear buffer write_buf.clear(); - // flush the io and check if get blocked. + // flush the I/O and check if get blocked io.poll_flush(cx) } fn send_response_inner( self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: &impl MessageBody, ) -> Result { - let size = body.size(); let this = self.project(); + + let size = body.size(); + this.codec - .encode(Message::Item((message, size)), this.write_buf) + .encode(Message::Item((res, size)), this.write_buf) .map_err(|err| { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } + DispatchError::Io(err) })?; - this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); + this.flags.set(Flags::KEEP_ALIVE, this.codec.keep_alive()); Ok(size) } fn send_response( mut self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: B, ) -> Result<(), DispatchError> { - let size = self.as_mut().send_response_inner(message, &body)?; - let state = match size { - BodySize::None | BodySize::Sized(0) => State::None, + let size = self.as_mut().send_response_inner(res, &body)?; + let mut this = self.project(); + this.state.set(match size { + BodySize::None | BodySize::Sized(0) => { + this.flags.insert(Flags::FINISHED); + State::None + } _ => State::SendPayload { body }, - }; - self.project().state.set(state); + }); + Ok(()) } fn send_error_response( mut self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: BoxBody, ) -> Result<(), DispatchError> { - let size = self.as_mut().send_response_inner(message, &body)?; - let state = match size { - BodySize::None | BodySize::Sized(0) => State::None, + let size = self.as_mut().send_response_inner(res, &body)?; + let mut this = self.project(); + this.state.set(match size { + BodySize::None | BodySize::Sized(0) => { + this.flags.insert(Flags::FINISHED); + State::None + } _ => State::SendErrorPayload { body }, - }; - self.project().state.set(state); + }); + Ok(()) } @@ -385,63 +428,66 @@ where 'res: loop { let mut this = self.as_mut().project(); match this.state.as_mut().project() { - // no future is in InnerDispatcher state. pop next message. + // no future is in InnerDispatcher state; pop next message StateProj::None => match this.messages.pop_front() { - // handle request message. + // handle request message Some(DispatcherMessage::Item(req)) => { // Handle `EXPECT: 100-Continue` header if req.head().expect() { - // set InnerDispatcher state and continue loop to poll it. + // set InnerDispatcher state and continue loop to poll it let fut = this.flow.expect.call(req); this.state.set(State::ExpectCall { fut }); } else { - // the same as expect call. + // set InnerDispatcher state and continue loop to poll it let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); }; } - // handle error message. + // handle error message Some(DispatcherMessage::Error(res)) => { - // send_response would update InnerDispatcher state to SendPayload or - // None(If response body is empty). - // continue loop to poll it. + // send_response would update InnerDispatcher state to SendPayload or None + // (If response body is empty) + // continue loop to poll it self.as_mut().send_error_response(res, BoxBody::new(()))?; } - // return with upgrade request and poll it exclusively. + // return with upgrade request and poll it exclusively Some(DispatcherMessage::Upgrade(req)) => { - return Ok(PollResponse::Upgrade(req)); + return Ok(PollResponse::Upgrade(req)) } - // all messages are dealt with. + // all messages are dealt with None => return Ok(PollResponse::DoNothing), }, - StateProj::ServiceCall { fut } => match fut.poll(cx) { - // service call resolved. send response. - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - self.as_mut().send_response(res, body)?; - } - // send service call error as response - Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; - } - - // service call pending and could be waiting for more chunk messages. - // (pipeline message limit and/or payload can_read limit) - Poll::Pending => { - // no new message is decoded and no new payload is feed. - // nothing to do except waiting for new incoming data from client. - if !self.as_mut().poll_request(cx)? { - return Ok(PollResponse::DoNothing); + StateProj::ServiceCall { fut } => { + match fut.poll(cx) { + // service call resolved. send response. + Poll::Ready(Ok(res)) => { + let (res, body) = res.into().replace_body(()); + self.as_mut().send_response(res, body)?; + } + + // send service call error as response + Poll::Ready(Err(err)) => { + let res: Response = err.into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_error_response(res, body)?; + } + + // service call pending and could be waiting for more chunk messages + // (pipeline message limit and/or payload can_read limit) + Poll::Pending => { + // no new message is decoded and no new payload is fed + // nothing to do except waiting for new incoming data from client + if !self.as_mut().poll_request(cx)? { + return Ok(PollResponse::DoNothing); + } + // else loop } - // otherwise keep loop. } - }, + } StateProj::SendPayload { mut body } => { // keep populate writer buffer until buffer size limit hit, @@ -455,21 +501,26 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; + // payload stream finished. // set state to None and handle next message this.state.set(State::None); + this.flags.insert(Flags::FINISHED); + continue 'res; } Poll::Ready(Some(Err(err))) => { - return Err(DispatchError::Body(err.into())) + this.flags.insert(Flags::FINISHED); + return Err(DispatchError::Body(err.into())); } Poll::Pending => return Ok(PollResponse::DoNothing), } } - // buffer is beyond max size. - // return and try to write the whole buffer to io stream. + + // buffer is beyond max size + // return and try to write the whole buffer to I/O stream. return Ok(PollResponse::DrainWriteBuf); } @@ -487,46 +538,55 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; - // payload stream finished. + + // payload stream finished // set state to None and handle next message this.state.set(State::None); + this.flags.insert(Flags::FINISHED); + continue 'res; } Poll::Ready(Some(Err(err))) => { + this.flags.insert(Flags::FINISHED); return Err(DispatchError::Body( Error::new_body().with_cause(err).into(), - )) + )); } Poll::Pending => return Ok(PollResponse::DoNothing), } } - // buffer is beyond max size. - // return and try to write the whole buffer to io stream. + + // buffer is beyond max size + // return and try to write the whole buffer to stream return Ok(PollResponse::DrainWriteBuf); } - StateProj::ExpectCall { fut } => match fut.poll(cx) { - // expect resolved. write continue to buffer and set InnerDispatcher state - // to service call. - Poll::Ready(Ok(req)) => { - this.write_buf - .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall { fut }); - } + StateProj::ExpectCall { fut } => { + log::trace!(" calling expect service"); - // send expect error as response - Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; - } + match fut.poll(cx) { + // expect resolved. write continue to buffer and set InnerDispatcher state + // to service call. + Poll::Ready(Ok(req)) => { + this.write_buf + .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); + } - // expect must be solved before progress can be made. - Poll::Pending => return Ok(PollResponse::DoNothing), - }, + // send expect error as response + Poll::Ready(Err(err)) => { + let res: Response = err.into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_error_response(res, body)?; + } + + // expect must be solved before progress can be made. + Poll::Pending => return Ok(PollResponse::DoNothing), + } + } } } } @@ -536,64 +596,76 @@ where req: Request, cx: &mut Context<'_>, ) -> Result<(), DispatchError> { - // Handle `EXPECT: 100-Continue` header - let mut this = self.as_mut().project(); - if req.head().expect() { - // set dispatcher state so the future is pinned. - let fut = this.flow.expect.call(req); - this.state.set(State::ExpectCall { fut }); - } else { - // the same as above. - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall { fut }); + // initialize dispatcher state + { + let mut this = self.as_mut().project(); + + // Handle `EXPECT: 100-Continue` header + if req.head().expect() { + // set dispatcher state to call expect handler + let fut = this.flow.expect.call(req); + this.state.set(State::ExpectCall { fut }); + } else { + // set dispatcher state to call service handler + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); + }; }; - // eagerly poll the future for once(or twice if expect is resolved immediately). + // eagerly poll the future once (or twice if expect is resolved immediately). loop { match self.as_mut().project().state.project() { StateProj::ExpectCall { fut } => { match fut.poll(cx) { - // expect is resolved. continue loop and poll the service call branch. + // expect is resolved; continue loop and poll the service call branch. Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); + let mut this = self.as_mut().project(); let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); + continue; } - // future is pending. return Ok(()) to notify that a new state is - // set and the outer loop should be continue. - Poll::Pending => return Ok(()), - // future is error. send response and return a result. On success - // to notify the dispatcher a new state is set and the outer loop - // should be continue. + + // future is error; send response and return a result + // on success to notify the dispatcher a new state is set and the outer loop + // should be continued Poll::Ready(Err(err)) => { let res: Response = err.into(); let (res, body) = res.replace_body(()); return self.send_error_response(res, body); } + + // future is pending; return Ok(()) to notify that a new state is + // set and the outer loop should be continue. + Poll::Pending => return Ok(()), } } + StateProj::ServiceCall { fut } => { // return no matter the service call future's result. return match fut.poll(cx) { - // future is resolved. send response and return a result. On success + // Future is resolved. Send response and return a result. On success // to notify the dispatcher a new state is set and the outer loop // should be continue. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); - self.send_response(res, body) + self.as_mut().send_response(res, body) } - // see the comment on ExpectCall state branch's Pending. + + // see the comment on ExpectCall state branch's Pending Poll::Pending => Ok(()), - // see the comment on ExpectCall state branch's Ready(Err(err)). + + // see the comment on ExpectCall state branch's Ready(Err(_)) Poll::Ready(Err(err)) => { let res: Response = err.into(); let (res, body) = res.replace_body(()); - self.send_error_response(res, body) + self.as_mut().send_error_response(res, body) } }; } + _ => { unreachable!( "State must be set to ServiceCall or ExceptCall in handle_request" @@ -604,72 +676,77 @@ where } /// Process one incoming request. + /// + /// Returns true if any meaningful work was done. fn poll_request( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { + let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES; + let can_not_read = !self.can_read(cx); + // limit amount of non-processed requests - if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { + if pipeline_queue_full || can_not_read { return Ok(false); } - let mut updated = false; let mut this = self.as_mut().project(); + + let mut updated = false; + loop { match this.codec.decode(this.read_buf) { Ok(Some(msg)) => { updated = true; - this.flags.insert(Flags::STARTED); match msg { Message::Item(mut req) => { + // head timer only applies to first request on connection + this.head_timer.clear(line!()); + req.head_mut().peer_addr = *this.peer_addr; req.conn_data = this.conn_data.as_ref().map(Rc::clone); match this.codec.message_type() { - // Request is upgradable. add upgrade message and break. - // everything remain in read buffer would be handed to + // request has no payload + MessageType::None => {} + + // Request is upgradable. Add upgrade message and break. + // Everything remaining in read buffer will be handed to // upgraded Request. MessageType::Stream if this.flow.upgrade.is_some() => { this.messages.push_back(DispatcherMessage::Upgrade(req)); break; } - // Request is not upgradable. + // request is not upgradable MessageType::Payload | MessageType::Stream => { - /* - PayloadSender and Payload are smart pointers share the - same state. - PayloadSender is attached to dispatcher and used to sink - new chunked request data to state. - Payload is attached to Request and passed to Service::call - where the state can be collected and consumed. - */ + // PayloadSender and Payload are smart pointers share the + // same state. PayloadSender is attached to dispatcher and used + // to sink new chunked request data to state. Payload is + // attached to Request and passed to Service::call where the + // state can be collected and consumed. let (sender, payload) = Payload::create(false); - let (req1, _) = - req.replace_payload(crate::Payload::H1 { payload }); - req = req1; + *req.payload() = crate::Payload::H1 { payload }; *this.payload = Some(sender); } - - // Request has no payload. - MessageType::None => {} } // handle request early when no future in InnerDispatcher state. - if this.state.is_empty() { + if this.state.is_none() { self.as_mut().handle_request(req, cx)?; this = self.as_mut().project(); } else { this.messages.push_back(DispatcherMessage::Item(req)); } } + Message::Chunk(Some(chunk)) => { if let Some(ref mut payload) = this.payload { payload.feed_data(chunk); } else { - error!("Internal server error: unexpected payload chunk"); + log::error!("Internal server error: unexpected payload chunk"); this.flags.insert(Flags::READ_DISCONNECT); this.messages.push_back(DispatcherMessage::Error( Response::internal_server_error().drop_body(), @@ -678,11 +755,12 @@ where break; } } + Message::Chunk(None) => { if let Some(mut payload) = this.payload.take() { payload.feed_eof(); } else { - error!("Internal server error: unexpected eof"); + log::error!("Internal server error: unexpected eof"); this.flags.insert(Flags::READ_DISCONNECT); this.messages.push_back(DispatcherMessage::Error( Response::internal_server_error().drop_body(), @@ -693,38 +771,51 @@ where } } } - // decode is partial and buffer is not full yet. - // break and wait for more read. + + // decode is partial and buffer is not full yet + // break and wait for more read Ok(None) => break, + Err(ParseError::Io(err)) => { + log::trace!("I/O error: {}", &err); self.as_mut().client_disconnected(); this = self.as_mut().project(); *this.error = Some(DispatchError::Io(err)); break; } + Err(ParseError::TooLarge) => { + log::trace!("request head was too big; returning 431 response"); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Overflow); } - // Requests overflow buffer size should be responded with 431 + + // request heads that overflow buffer size return a 431 error this.messages .push_back(DispatcherMessage::Error(Response::with_body( StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, (), ))); + this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(ParseError::TooLarge.into()); + break; } + Err(err) => { + log::trace!("parse error {}", &err); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); } - // Malformed requests should be responded with 400 + // malformed requests should be responded with 400 this.messages.push_back(DispatcherMessage::Error( Response::bad_request().drop_body(), )); + this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(err.into()); break; @@ -732,92 +823,115 @@ where } } - if updated && this.ka_timer.is_some() { - if let Some(expire) = this.codec.config().keep_alive_expire() { - *this.ka_expire = expire; - } - } Ok(updated) } - /// keep-alive timer - fn poll_keepalive( + fn poll_head_timer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<(), DispatchError> { - let mut this = self.as_mut().project(); + let this = self.as_mut().project(); - // when a branch is not explicit return early it's meant to fall through - // and return as Ok(()) - match this.ka_timer.as_mut().as_pin_mut() { - None => { - // conditionally go into shutdown timeout - if this.flags.contains(Flags::SHUTDOWN) { - if let Some(deadline) = this.codec.config().client_disconnect_timer() { - // write client disconnect time out and poll again to - // go into Some> branch - this.ka_timer.set(Some(sleep_until(deadline))); - return self.poll_keepalive(cx); - } - } - } - Some(mut timer) => { - // only operate when keep-alive timer is resolved. - if timer.as_mut().poll(cx).is_ready() { - // got timeout during shutdown, drop connection - if this.flags.contains(Flags::SHUTDOWN) { - return Err(DispatchError::DisconnectTimeout); - // exceed deadline. check for any outstanding tasks - } else if timer.deadline() >= *this.ka_expire { - // have no task at hand. - if this.state.is_empty() && this.write_buf.is_empty() { - if this.flags.contains(Flags::STARTED) { - trace!("Keep-alive timeout, close connection"); - this.flags.insert(Flags::SHUTDOWN); + if let TimerState::Active { timer } = this.head_timer { + if timer.as_mut().poll(cx).is_ready() { + // timeout on first request (slow request) return 408 - // start shutdown timeout - if let Some(deadline) = - this.codec.config().client_disconnect_timer() - { - timer.as_mut().reset(deadline); - let _ = timer.poll(cx); - } else { - // no shutdown timeout, drop socket - this.flags.insert(Flags::WRITE_DISCONNECT); - } - } else { - // timeout on first request (slow request) return 408 - trace!("Slow request timeout"); - let _ = self.as_mut().send_error_response( - Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), - BoxBody::new(()), - ); - this = self.project(); - this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); - } - // still have unfinished task. try to reset and register keep-alive. - } else if let Some(deadline) = this.codec.config().keep_alive_expire() { - timer.as_mut().reset(deadline); - let _ = timer.poll(cx); - } - // timer resolved but still have not met the keep-alive expire deadline. - // reset and register for later wakeup. - } else { - timer.as_mut().reset(*this.ka_expire); - let _ = timer.poll(cx); - } - } + log::trace!( + "timed out on slow request; \ + replying with 408 and closing connection" + ); + + let _ = self.as_mut().send_error_response( + Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), + BoxBody::new(()), + ); + + self.project().flags.insert(Flags::SHUTDOWN); } - } + }; + Ok(()) } - /// Returns true when io stream can be disconnected after write to it. + fn poll_ka_timer( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { + let this = self.as_mut().project(); + if let TimerState::Active { timer } = this.ka_timer { + debug_assert!( + this.flags.contains(Flags::KEEP_ALIVE), + "keep-alive flag should be set when timer is active", + ); + debug_assert!( + this.state.is_none(), + "dispatcher should not be in keep-alive phase if state is not none: {:?}", + this.state, + ); + debug_assert!( + this.write_buf.is_empty(), + "dispatcher should not be in keep-alive phase if write_buf is not empty", + ); + + // keep-alive timer has timed out + if timer.as_mut().poll(cx).is_ready() { + // no tasks at hand + log::trace!("timer timed out; closing connection"); + this.flags.insert(Flags::SHUTDOWN); + + if let Some(deadline) = this.config.client_disconnect_deadline() { + // start shutdown timeout if enabled + this.shutdown_timer + .set_and_init(cx, sleep_until(deadline.into()), line!()); + } else { + // no shutdown timeout, drop socket + this.flags.insert(Flags::WRITE_DISCONNECT); + } + } + } + + Ok(()) + } + + fn poll_shutdown_timer( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { + let this = self.as_mut().project(); + if let TimerState::Active { timer } = this.shutdown_timer { + debug_assert!( + this.flags.contains(Flags::SHUTDOWN), + "shutdown flag should be set when timer is active", + ); + + // timed-out during shutdown; drop connection + if timer.as_mut().poll(cx).is_ready() { + log::trace!("timed-out during shutdown"); + return Err(DispatchError::DisconnectTimeout); + } + } + + Ok(()) + } + + /// Poll head, keep-alive, and disconnect timer. + fn poll_timers( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { + self.as_mut().poll_head_timer(cx)?; + self.as_mut().poll_ka_timer(cx)?; + self.as_mut().poll_shutdown_timer(cx)?; + + Ok(()) + } + + /// Returns true when I/O stream can be disconnected after write to it. /// /// It covers these conditions: - /// - `std::io::ErrorKind::ConnectionReset` after partial read. + /// - `std::io::ErrorKind::ConnectionReset` after partial read; /// - all data read done. - #[inline(always)] + #[inline(always)] // TODO: bench this inline fn read_available( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -846,13 +960,12 @@ where // When read_buf is beyond max buffer size the early return could be successfully // be parsed as a new Request. This case would not generate ParseError::TooLarge and // at this point IO stream is not fully read to Pending and would result in - // dispatcher stuck until timeout (KA) + // dispatcher stuck until timeout (keep-alive). // // Note: // This is a perf choice to reduce branch on ::decode. // - // A Request head too large to parse is only checked on - // `httparse::Status::Partial` condition. + // A Request head too large to parse is only checked on `httparse::Status::Partial`. if this.payload.is_none() { // When dispatcher has a payload the responsibility of wake up it would be shift @@ -881,18 +994,29 @@ where match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { Poll::Ready(Ok(n)) => { + this.flags.remove(Flags::FINISHED); + if n == 0 { return Ok(true); } + read_some = true; } - Poll::Pending => return Ok(false), + + Poll::Pending => { + return Ok(false); + } + Poll::Ready(Err(err)) => { return match err.kind() { + // convert WouldBlock error to the same as Pending return io::ErrorKind::WouldBlock => Ok(false), + + // connection reset after partial read io::ErrorKind::ConnectionReset if read_some => Ok(true), + _ => Err(DispatchError::Io(err)), - } + }; } } } @@ -940,27 +1064,60 @@ where } match this.inner.project() { - DispatcherStateProj::Normal { mut inner } => { - inner.as_mut().poll_keepalive(cx)?; + DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| { + log::error!("Upgrade handler error: {}", err); + DispatchError::Upgrade + }), - if inner.flags.contains(Flags::SHUTDOWN) { + DispatcherStateProj::Normal { mut inner } => { + log::trace!("start flags: {:?}", &inner.flags); + + trace_timer_states( + "start", + &inner.head_timer, + &inner.ka_timer, + &inner.shutdown_timer, + ); + + inner.as_mut().poll_timers(cx)?; + + let poll = if inner.flags.contains(Flags::SHUTDOWN) { if inner.flags.contains(Flags::WRITE_DISCONNECT) { Poll::Ready(Ok(())) } else { - // flush buffer and wait on blocked. + // flush buffer and wait on blocked ready!(inner.as_mut().poll_flush(cx))?; - Pin::new(inner.project().io.as_mut().unwrap()) + Pin::new(inner.as_mut().project().io.as_mut().unwrap()) .poll_shutdown(cx) .map_err(DispatchError::from) } } else { - // read from io stream and fill read buffer. + // read from I/O stream and fill read buffer let should_disconnect = inner.as_mut().read_available(cx)?; + // after reading something from stream, clear keep-alive timer + if !inner.read_buf.is_empty() && inner.flags.contains(Flags::KEEP_ALIVE) { + let inner = inner.as_mut().project(); + inner.flags.remove(Flags::KEEP_ALIVE); + inner.ka_timer.clear(line!()); + } + + if !inner.flags.contains(Flags::STARTED) { + inner.as_mut().project().flags.insert(Flags::STARTED); + + if let Some(deadline) = inner.config.client_request_deadline() { + inner.as_mut().project().head_timer.set_and_init( + cx, + sleep_until(deadline.into()), + line!(), + ); + } + } + inner.as_mut().poll_request(cx)?; - // io stream should to be closed. if should_disconnect { + // I/O stream should to be closed let inner = inner.as_mut().project(); inner.flags.insert(Flags::READ_DISCONNECT); if let Some(mut payload) = inner.payload.take() { @@ -969,11 +1126,27 @@ where }; loop { - // poll_response and populate write buffer. - // drain indicate if write buffer should be emptied before next run. + // poll response to populate write buffer + // drain indicates whether write buffer should be emptied before next run let drain = match inner.as_mut().poll_response(cx)? { PollResponse::DrainWriteBuf => true, - PollResponse::DoNothing => false, + + PollResponse::DoNothing => { + // KEEP_ALIVE is set in send_response_inner if client allows it + // FINISHED is set after writing last chunk of response + if inner.flags.contains(Flags::KEEP_ALIVE | Flags::FINISHED) { + if let Some(timer) = inner.config.keep_alive_deadline() { + inner.as_mut().project().ka_timer.set_and_init( + cx, + sleep_until(timer.into()), + line!(), + ); + } + } + + false + } + // upgrade request and goes Upgrade variant of DispatcherState. PollResponse::Upgrade(req) => { let upgrade = inner.upgrade(req); @@ -985,57 +1158,96 @@ where } }; - // we didn't get WouldBlock from write operation, - // so data get written to kernel completely (macOS) - // and we have to write again otherwise response can get stuck + // we didn't get WouldBlock from write operation, so data get written to + // kernel completely (macOS) and we have to write again otherwise response + // can get stuck // - // TODO: what? is WouldBlock good or bad? - // want to find a reference for this macOS behavior - if inner.as_mut().poll_flush(cx)?.is_pending() || !drain { + // TODO: want to find a reference for this behavior + // see introduced commit: 3872d3ba + let flush_was_ready = inner.as_mut().poll_flush(cx)?.is_ready(); + + // this assert seems to always be true but not willing to commit to it until + // we understand what Nikolay meant when writing the above comment + // debug_assert!(flush_was_ready); + + if !flush_was_ready || !drain { break; } } // client is gone if inner.flags.contains(Flags::WRITE_DISCONNECT) { + log::trace!("client is gone; disconnecting"); return Poll::Ready(Ok(())); } - let is_empty = inner.state.is_empty(); - let inner_p = inner.as_mut().project(); - // read half is closed and we do not processing any responses - if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { + let state_is_none = inner_p.state.is_none(); + + // read half is closed; we do not process any responses + if inner_p.flags.contains(Flags::READ_DISCONNECT) && state_is_none { + log::trace!("read half closed; start shutdown"); inner_p.flags.insert(Flags::SHUTDOWN); } // keep-alive and stream errors - if is_empty && inner_p.write_buf.is_empty() { + if state_is_none && inner_p.write_buf.is_empty() { if let Some(err) = inner_p.error.take() { - Poll::Ready(Err(err)) + log::error!("stream error: {}", &err); + return Poll::Ready(Err(err)); } + // disconnect if keep-alive is not enabled - else if inner_p.flags.contains(Flags::STARTED) - && !inner_p.flags.intersects(Flags::KEEPALIVE) + if inner_p.flags.contains(Flags::FINISHED) + && !inner_p.flags.contains(Flags::KEEP_ALIVE) { + inner_p.flags.remove(Flags::FINISHED); inner_p.flags.insert(Flags::SHUTDOWN); - self.poll(cx) + return self.poll(cx); } + // disconnect if shutdown - else if inner_p.flags.contains(Flags::SHUTDOWN) { - self.poll(cx) - } else { - Poll::Pending + if inner_p.flags.contains(Flags::SHUTDOWN) { + return self.poll(cx); } - } else { - Poll::Pending } - } + + trace_timer_states( + "end", + inner_p.head_timer, + inner_p.ka_timer, + inner_p.shutdown_timer, + ); + + Poll::Pending + }; + + log::trace!("end flags: {:?}", &inner.flags); + + poll } - DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| { - error!("Upgrade handler error: {}", err); - DispatchError::Upgrade - }), } } } + +#[allow(dead_code)] +fn trace_timer_states( + label: &str, + head_timer: &TimerState, + ka_timer: &TimerState, + shutdown_timer: &TimerState, +) { + log::trace!("{} timers:", label); + + if head_timer.is_enabled() { + log::trace!(" head {}", &head_timer); + } + + if ka_timer.is_enabled() { + log::trace!(" keep-alive {}", &ka_timer); + } + + if shutdown_timer.is_enabled() { + log::trace!(" shutdown {}", &shutdown_timer); + } +} diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 379019c6f..891cce69c 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -17,7 +17,7 @@ use crate::{ h1::{Codec, ExpectHandler, UpgradeHandler}, service::HttpFlow, test::{TestBuffer, TestSeqBuffer}, - Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, + Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode, }; fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { @@ -34,7 +34,13 @@ fn stabilize_date_header(payload: &mut [u8]) { } fn ok_service() -> impl Service, Error = Error> { - fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok()))) + status_service(StatusCode::OK) +} + +fn status_service( + status: StatusCode, +) -> impl Service, Error = Error> { + fn_service(move |_req: Request| ready(Ok::<_, Error>(Response::new(status)))) } fn echo_path_service( @@ -65,11 +71,15 @@ fn echo_payload_service() -> impl Service, E #[actix_rt::test] async fn late_request() { - let _ = env_logger::try_init(); - let mut buf = TestBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(ok_service(), ExpectHandler, None); let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( @@ -127,10 +137,16 @@ async fn late_request() { } #[actix_rt::test] -async fn test_basic() { +async fn oneshot_connection() { let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( @@ -179,10 +195,16 @@ async fn test_basic() { } #[actix_rt::test] -async fn test_keep_alive_timeout() { +async fn keep_alive_timeout() { let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); - let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Timeout(Duration::from_millis(200)), + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( @@ -229,7 +251,7 @@ async fn test_keep_alive_timeout() { .await; // sleep slightly longer than keep-alive timeout - sleep(Duration::from_millis(1100)).await; + sleep(Duration::from_millis(250)).await; lazy(|cx| { assert!( @@ -252,10 +274,16 @@ async fn test_keep_alive_timeout() { } #[actix_rt::test] -async fn test_keep_alive_follow_up_req() { +async fn keep_alive_follow_up_req() { let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); - let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Timeout(Duration::from_millis(500)), + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( @@ -302,7 +330,7 @@ async fn test_keep_alive_follow_up_req() { .await; // sleep for less than KA timeout - sleep(Duration::from_millis(200)).await; + sleep(Duration::from_millis(100)).await; lazy(|cx| { assert!( @@ -371,7 +399,7 @@ async fn test_keep_alive_follow_up_req() { } #[actix_rt::test] -async fn test_req_parse_err() { +async fn req_parse_err() { lazy(|cx| { let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); @@ -413,7 +441,13 @@ async fn pipelining_ok_then_ok() { ", ); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(1), + Duration::from_millis(1), + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); @@ -477,7 +511,13 @@ async fn pipelining_ok_then_bad() { ", ); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(1), + Duration::from_millis(1), + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); @@ -531,10 +571,16 @@ async fn pipelining_ok_then_bad() { } #[actix_rt::test] -async fn test_expect() { +async fn expect_handling() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::ZERO, + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None); @@ -562,7 +608,6 @@ async fn test_expect() { // polls: manual assert_eq!(h1.poll_count, 1); - eprintln!("poll count: {}", h1.poll_count); if let DispatcherState::Normal { ref inner } = h1.inner { let io = inner.io.as_ref().unwrap(); @@ -603,10 +648,16 @@ async fn test_expect() { } #[actix_rt::test] -async fn test_eager_expect() { +async fn expect_eager() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::ZERO, + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); @@ -663,7 +714,7 @@ async fn test_eager_expect() { } #[actix_rt::test] -async fn test_upgrade() { +async fn upgrade_handling() { struct TestUpgrade; impl Service<(Request, Framed)> for TestUpgrade { @@ -683,7 +734,13 @@ async fn test_upgrade() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::ZERO, + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade)); diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 5fcb2f688..a24ba5911 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -212,7 +212,7 @@ pub(crate) trait MessageType: Sized { // optimized date header, set_date writes \r\n if !has_date { - config.set_date(dst, camel_case); + config.write_date_header(dst, camel_case); } else { // msg eof dst.extend_from_slice(b"\r\n"); @@ -318,16 +318,17 @@ impl MessageType for RequestHeadType { } impl MessageEncoder { - /// Encode message + /// Encode chunk. pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result { self.te.encode(msg, buf) } - /// Encode eof + /// Encode EOF. pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> { self.te.encode_eof(buf) } + /// Encode message. pub fn encode( &mut self, dst: &mut BytesMut, diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 8c569165d..858cf542a 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -13,6 +13,7 @@ mod encoder; mod expect; mod payload; mod service; +mod timer; mod upgrade; mod utils; @@ -28,9 +29,10 @@ pub use self::utils::SendResponse; #[derive(Debug)] /// Codec message pub enum Message { - /// Http message + /// HTTP message. Item(T), - /// Payload chunk + + /// Payload chunk. Chunk(Option), } diff --git a/actix-http/src/h1/timer.rs b/actix-http/src/h1/timer.rs new file mode 100644 index 000000000..bb69fdb80 --- /dev/null +++ b/actix-http/src/h1/timer.rs @@ -0,0 +1,80 @@ +use std::{fmt, future::Future, pin::Pin, task::Context}; + +use actix_rt::time::{Instant, Sleep}; + +#[derive(Debug)] +pub(super) enum TimerState { + Disabled, + Inactive, + Active { timer: Pin> }, +} + +impl TimerState { + pub(super) fn new(enabled: bool) -> Self { + if enabled { + Self::Inactive + } else { + Self::Disabled + } + } + + pub(super) fn is_enabled(&self) -> bool { + matches!(self, Self::Active { .. } | Self::Inactive) + } + + pub(super) fn set(&mut self, timer: Sleep, line: u32) { + if matches!(self, Self::Disabled) { + log::trace!("setting disabled timer from line {}", line); + } + + *self = Self::Active { + timer: Box::pin(timer), + }; + } + + pub(super) fn set_and_init(&mut self, cx: &mut Context<'_>, timer: Sleep, line: u32) { + self.set(timer, line); + self.init(cx); + } + + pub(super) fn clear(&mut self, line: u32) { + if matches!(self, Self::Disabled) { + log::trace!("trying to clear a disabled timer from line {}", line); + } + + if matches!(self, Self::Inactive) { + log::trace!("trying to clear an inactive timer from line {}", line); + } + + *self = Self::Inactive; + } + + pub(super) fn init(&mut self, cx: &mut Context<'_>) { + if let TimerState::Active { timer } = self { + let _ = timer.as_mut().poll(cx); + } + } +} + +impl fmt::Display for TimerState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TimerState::Disabled => f.write_str("timer is disabled"), + TimerState::Inactive => f.write_str("timer is inactive"), + TimerState::Active { timer } => { + let deadline = timer.deadline(); + let now = Instant::now(); + + if deadline < now { + f.write_str("timer is active and has reached deadline") + } else { + write!( + f, + "timer is active and due to expire in {} milliseconds", + ((deadline - now).as_secs_f32() * 1000.0) + ) + } + } + } + } +} diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index a90eb3466..7a11f9b33 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -57,11 +57,11 @@ where conn_data: OnConnectData, timer: Option>>, ) -> Self { - let ping_pong = config.keep_alive().map(|dur| H2PingPong { + let ping_pong = config.keep_alive().duration().map(|dur| H2PingPong { timer: timer .map(|mut timer| { - // reset timer if it's received from new function. - timer.as_mut().reset(config.now() + dur); + // reuse timer slot if it was initialized for handshake + timer.as_mut().reset((config.now() + dur).into()); timer }) .unwrap_or_else(|| Box::pin(sleep(dur))), @@ -160,8 +160,8 @@ where Poll::Ready(_) => { ping_pong.on_flight = false; - let dead_line = this.config.keep_alive_expire().unwrap(); - ping_pong.timer.as_mut().reset(dead_line); + let dead_line = this.config.keep_alive_deadline().unwrap(); + ping_pong.timer.as_mut().reset(dead_line.into()); } Poll::Pending => { return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(())) @@ -174,8 +174,8 @@ where ping_pong.ping_pong.send_ping(Ping::opaque())?; - let dead_line = this.config.keep_alive_expire().unwrap(); - ping_pong.timer.as_mut().reset(dead_line); + let dead_line = this.config.keep_alive_deadline().unwrap(); + ping_pong.timer.as_mut().reset(dead_line.into()); ping_pong.on_flight = true; } @@ -322,7 +322,7 @@ fn prepare_response( // set date header if !has_date { let mut bytes = BytesMut::with_capacity(29); - config.set_date_header(&mut bytes); + config.write_date_header_value(&mut bytes); res.headers_mut().insert( DATE, // SAFETY: serialized date-times are known ASCII strings diff --git a/actix-http/src/h2/mod.rs b/actix-http/src/h2/mod.rs index 47d51b420..c8aaaaa5f 100644 --- a/actix-http/src/h2/mod.rs +++ b/actix-http/src/h2/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use actix_codec::{AsyncRead, AsyncWrite}; -use actix_rt::time::Sleep; +use actix_rt::time::{sleep_until, Sleep}; use bytes::Bytes; use futures_core::{ready, Stream}; use h2::{ @@ -15,17 +15,17 @@ use h2::{ RecvStream, }; +use crate::{ + config::ServiceConfig, + error::{DispatchError, PayloadError}, +}; + mod dispatcher; mod service; pub use self::dispatcher::Dispatcher; pub use self::service::H2Service; -use crate::{ - config::ServiceConfig, - error::{DispatchError, PayloadError}, -}; - /// HTTP/2 peer stream. pub struct Payload { stream: RecvStream, @@ -67,7 +67,9 @@ where { HandshakeWithTimeout { handshake: handshake(io), - timer: config.client_timer().map(Box::pin), + timer: config + .client_request_deadline() + .map(|deadline| Box::pin(sleep_until(deadline.into()))), } } @@ -86,7 +88,7 @@ where let this = self.get_mut(); match Pin::new(&mut this.handshake).poll(cx)? { - // return the timer on success handshake. It can be re-used for h2 ping-pong. + // return the timer on success handshake; its slot can be re-used for h2 ping-pong Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))), Poll::Pending => match this.timer.as_mut() { Some(timer) => { diff --git a/actix-http/src/header/map.rs b/actix-http/src/header/map.rs index 33fb262c4..8f6d1cead 100644 --- a/actix-http/src/header/map.rs +++ b/actix-http/src/header/map.rs @@ -630,7 +630,7 @@ impl Removed { /// Returns true if iterator contains no elements, without consuming it. /// /// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate - /// wether any items were actually replaced or removed, respectively. + /// whether any items were actually replaced or removed, respectively. pub fn is_empty(&self) -> bool { match self.inner { // size hint lower bound of smallvec is the correct length diff --git a/actix-http/src/header/shared/http_date.rs b/actix-http/src/header/shared/http_date.rs index 473d6cad0..21ed49f0c 100644 --- a/actix-http/src/header/shared/http_date.rs +++ b/actix-http/src/header/shared/http_date.rs @@ -4,8 +4,7 @@ use bytes::BytesMut; use http::header::{HeaderValue, InvalidHeaderValue}; use crate::{ - config::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue, - helpers::MutWriter, + date::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue, helpers::MutWriter, }; /// A timestamp with HTTP-style formatting and parsing. diff --git a/actix-http/src/keep_alive.rs b/actix-http/src/keep_alive.rs new file mode 100644 index 000000000..27161614d --- /dev/null +++ b/actix-http/src/keep_alive.rs @@ -0,0 +1,83 @@ +use std::time::Duration; + +/// Connection keep-alive config. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum KeepAlive { + /// Keep-alive duration. + /// + /// `KeepAlive::Timeout(Duration::ZERO)` is mapped to `KeepAlive::Disabled`. + Timeout(Duration), + + /// Rely on OS to shutdown TCP connection. + /// + /// Some defaults can be very long, check your OS documentation. + Os, + + /// Keep-alive is disabled. + /// + /// Connections will be closed immediately. + Disabled, +} + +impl KeepAlive { + pub(crate) fn enabled(&self) -> bool { + !matches!(self, Self::Disabled) + } + + pub(crate) fn duration(&self) -> Option { + match self { + KeepAlive::Timeout(dur) => Some(*dur), + _ => None, + } + } + + /// Map zero duration to disabled. + pub(crate) fn normalize(self) -> KeepAlive { + match self { + KeepAlive::Timeout(Duration::ZERO) => KeepAlive::Disabled, + ka => ka, + } + } +} + +impl Default for KeepAlive { + fn default() -> Self { + Self::Timeout(Duration::from_secs(5)) + } +} + +impl From for KeepAlive { + fn from(dur: Duration) -> Self { + KeepAlive::Timeout(dur).normalize() + } +} + +impl From> for KeepAlive { + fn from(ka_dur: Option) -> Self { + match ka_dur { + Some(dur) => KeepAlive::from(dur), + None => KeepAlive::Disabled, + } + .normalize() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_impls() { + let test: KeepAlive = Duration::from_secs(1).into(); + assert_eq!(test, KeepAlive::Timeout(Duration::from_secs(1))); + + let test: KeepAlive = Duration::from_secs(0).into(); + assert_eq!(test, KeepAlive::Disabled); + + let test: KeepAlive = Some(Duration::from_secs(0)).into(); + assert_eq!(test, KeepAlive::Disabled); + + let test: KeepAlive = None.into(); + assert_eq!(test, KeepAlive::Disabled); + } +} diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index f2b415790..c8c7d55c9 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -33,6 +33,7 @@ pub use ::http::{Method, StatusCode, Version}; pub mod body; mod builder; mod config; +mod date; #[cfg(feature = "__compress")] pub mod encoding; pub mod error; @@ -42,7 +43,10 @@ pub mod h2; pub mod header; mod helpers; mod http_message; +mod keep_alive; mod message; +#[cfg(test)] +mod notify_on_drop; mod payload; mod requests; mod responses; @@ -51,11 +55,12 @@ pub mod test; pub mod ws; pub use self::builder::HttpServiceBuilder; -pub use self::config::{KeepAlive, ServiceConfig}; +pub use self::config::ServiceConfig; pub use self::error::Error; pub use self::extensions::Extensions; pub use self::header::ContentEncoding; pub use self::http_message::HttpMessage; +pub use self::keep_alive::KeepAlive; pub use self::message::ConnectionType; pub use self::message::Message; #[allow(deprecated)] diff --git a/actix-http/src/notify_on_drop.rs b/actix-http/src/notify_on_drop.rs new file mode 100644 index 000000000..98544bb5d --- /dev/null +++ b/actix-http/src/notify_on_drop.rs @@ -0,0 +1,49 @@ +/// Test Module for checking the drop state of certain async tasks that are spawned +/// with `actix_rt::spawn` +/// +/// The target task must explicitly generate `NotifyOnDrop` when spawn the task +use std::cell::RefCell; + +thread_local! { + static NOTIFY_DROPPED: RefCell> = RefCell::new(None); +} + +/// Check if the spawned task is dropped. +/// +/// # Panics +/// Panics when there was no `NotifyOnDrop` instance on current thread. +pub(crate) fn is_dropped() -> bool { + NOTIFY_DROPPED.with(|bool| { + bool.borrow() + .expect("No NotifyOnDrop existed on current thread") + }) +} + +pub(crate) struct NotifyOnDrop; + +impl NotifyOnDrop { + /// # Panics + /// Panics hen construct multiple instances on any given thread. + pub(crate) fn new() -> Self { + NOTIFY_DROPPED.with(|bool| { + let mut bool = bool.borrow_mut(); + if bool.is_some() { + panic!("NotifyOnDrop existed on current thread"); + } else { + *bool = Some(false); + } + }); + + NotifyOnDrop + } +} + +impl Drop for NotifyOnDrop { + fn drop(&mut self) { + NOTIFY_DROPPED.with(|bool| { + if let Some(b) = bool.borrow_mut().as_mut() { + *b = true; + } + }); + } +} diff --git a/actix-http/src/requests/head.rs b/actix-http/src/requests/head.rs index 06fd0429e..4558801f3 100644 --- a/actix-http/src/requests/head.rs +++ b/actix-http/src/requests/head.rs @@ -130,8 +130,8 @@ impl RequestHead { } } + /// Request contains `EXPECT` header. #[inline] - /// Request contains `EXPECT` header pub fn expect(&self) -> bool { self.flags.contains(Flags::EXPECT) } diff --git a/actix-http/src/responses/head.rs b/actix-http/src/responses/head.rs index 870073ab3..cb47c4b7a 100644 --- a/actix-http/src/responses/head.rs +++ b/actix-http/src/responses/head.rs @@ -42,7 +42,7 @@ impl ResponseHead { &mut self.headers } - /// Sets the flag that controls wether to send headers formatted as Camel-Case. + /// Sets the flag that controls whether to send headers formatted as Camel-Case. /// /// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase. #[inline] @@ -210,14 +210,15 @@ mod tests { use memchr::memmem; use crate::{ + h1::H1Service, header::{HeaderName, HeaderValue}, - Error, HttpService, Request, Response, + Error, Request, Response, ServiceConfig, }; #[actix_rt::test] async fn camel_case_headers() { let mut srv = actix_http_test::test_server(|| { - HttpService::new(|req: Request| async move { + H1Service::with_config(ServiceConfig::default(), |req: Request| async move { let mut res = Response::ok(); if req.path().contains("camel") { @@ -228,6 +229,7 @@ mod tests { HeaderName::from_static("foo-bar"), HeaderValue::from_static("baz"), ); + Ok::<_, Error>(res) }) .tcp() @@ -235,9 +237,11 @@ mod tests { .await; let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n"); - let mut data = vec![0; 1024]; - let _ = stream.read(&mut data); + let _ = stream + .write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n") + .unwrap(); + let mut data = vec![]; + let _ = stream.read_to_end(&mut data).unwrap(); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert!(memmem::find(&data, b"Foo-Bar").is_some()); assert!(memmem::find(&data, b"foo-bar").is_none()); @@ -247,9 +251,11 @@ mod tests { assert!(memmem::find(&data, b"content-length").is_none()); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n"); - let mut data = vec![0; 1024]; - let _ = stream.read(&mut data); + let _ = stream + .write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n") + .unwrap(); + let mut data = vec![]; + let _ = stream.read_to_end(&mut data).unwrap(); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert!(memmem::find(&data, b"Foo-Bar").is_none()); assert!(memmem::find(&data, b"foo-bar").is_some()); diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index cd2efe678..4fe573aa5 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -19,9 +19,8 @@ use pin_project_lite::pin_project; use crate::{ body::{BoxBody, MessageBody}, builder::HttpServiceBuilder, - config::{KeepAlive, ServiceConfig}, error::DispatchError, - h1, h2, ConnectCallback, OnConnectData, Protocol, Request, Response, + h1, h2, ConnectCallback, OnConnectData, Protocol, Request, Response, ServiceConfig, }; /// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol. @@ -43,9 +42,9 @@ where >::Future: 'static, B: MessageBody + 'static, { - /// Create builder for `HttpService` instance. + /// Constructs builder for `HttpService` instance. pub fn build() -> HttpServiceBuilder { - HttpServiceBuilder::new() + HttpServiceBuilder::default() } } @@ -58,12 +57,10 @@ where >::Future: 'static, B: MessageBody + 'static, { - /// Create new `HttpService` instance. + /// Constructs new `HttpService` instance from service with default config. pub fn new>(service: F) -> Self { - let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None); - HttpService { - cfg, + cfg: ServiceConfig::default(), srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, @@ -72,7 +69,7 @@ where } } - /// Create new `HttpService` instance with config. + /// Constructs new `HttpService` instance from config and service. pub(crate) fn with_config>( cfg: ServiceConfig, service: F, @@ -97,11 +94,10 @@ where >::Future: 'static, B: MessageBody, { - /// Provide service for `EXPECT: 100-Continue` support. + /// Sets service for `Expect: 100-Continue` handling. /// - /// Service get called with request that contains `EXPECT` header. - /// Service must return request in case of success, in that case - /// request will be forwarded to main service. + /// An expect service is called with requests that contain an `Expect` header. A successful + /// response type is also a request which will be forwarded to the main service. pub fn expect(self, expect: X1) -> HttpService where X1: ServiceFactory, @@ -118,10 +114,10 @@ where } } - /// Provide service for custom `Connection: UPGRADE` support. + /// Sets service for custom `Connection: Upgrade` handling. /// - /// If service is provided then normal requests handling get halted - /// and this service get called with original request and framed object. + /// If service is provided then normal requests handling get halted and this service get called + /// with original request and framed object. pub fn upgrade(self, upgrade: Option) -> HttpService where U1: ServiceFactory<(Request, Framed), Config = (), Response = ()>, diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 0d4d342ec..6212c19d1 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -242,7 +242,7 @@ impl io::Read for TestBuffer { impl io::Write for TestBuffer { fn write(&mut self, buf: &[u8]) -> io::Result { - RefCell::borrow_mut(&self.write_buf).extend(buf); + self.write_buf.borrow_mut().extend(buf); Ok(buf.len()) } diff --git a/actix-http/tests/test_client.rs b/actix-http/tests/test_client.rs index a3adcdfd6..5888527f1 100644 --- a/actix-http/tests/test_client.rs +++ b/actix-http/tests/test_client.rs @@ -31,7 +31,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; #[actix_rt::test] -async fn test_h1_v2() { +async fn h1_v2() { let srv = test_server(move || { HttpService::build() .finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -59,7 +59,7 @@ async fn test_h1_v2() { } #[actix_rt::test] -async fn test_connection_close() { +async fn connection_close() { let srv = test_server(move || { HttpService::build() .finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -73,7 +73,7 @@ async fn test_connection_close() { } #[actix_rt::test] -async fn test_with_query_parameter() { +async fn with_query_parameter() { let srv = test_server(move || { HttpService::build() .finish(|req: Request| async move { @@ -104,7 +104,7 @@ impl From for Response { } #[actix_rt::test] -async fn test_h1_expect() { +async fn h1_expect() { let srv = test_server(move || { HttpService::build() .expect(|req: Request| async { diff --git a/actix-http/tests/test_h2_timer.rs b/actix-http/tests/test_h2_timer.rs index 2b9c26e4a..2e1480297 100644 --- a/actix-http/tests/test_h2_timer.rs +++ b/actix-http/tests/test_h2_timer.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{io, time::Duration}; use actix_http::{error::Error, HttpService, Response}; use actix_server::Server; @@ -19,7 +19,7 @@ async fn h2_ping_pong() -> io::Result<()> { .workers(1) .listen("h2_ping_pong", lst, || { HttpService::build() - .keep_alive(3) + .keep_alive(Duration::from_secs(3)) .h2(|_| async { Ok::<_, Error>(Response::ok()) }) .tcp() })? @@ -92,10 +92,10 @@ async fn h2_handshake_timeout() -> io::Result<()> { .workers(1) .listen("h2_ping_pong", lst, || { HttpService::build() - .keep_alive(30) + .keep_alive(Duration::from_secs(30)) // set first request timeout to 5 seconds. // this is the timeout used for http2 handshake. - .client_timeout(5000) + .client_request_timeout(Duration::from_secs(5)) .h2(|_| async { Ok::<_, Error>(Response::ok()) }) .tcp() })? diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index 1bb574fd6..1b5de3425 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -2,7 +2,7 @@ use std::{ convert::Infallible, io::{Read, Write}, net, thread, - time::Duration, + time::{Duration, Instant}, }; use actix_http::{ @@ -22,12 +22,12 @@ use futures_util::{ use regex::Regex; #[actix_rt::test] -async fn test_h1() { +async fn h1_basic() { let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) .h1(|req: Request| { assert!(req.peer_addr().is_some()); ok::<_, Infallible>(Response::ok()) @@ -43,12 +43,12 @@ async fn test_h1() { } #[actix_rt::test] -async fn test_h1_2() { +async fn h1_2() { let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) .finish(|req: Request| { assert!(req.peer_addr().is_some()); assert_eq!(req.version(), http::Version::HTTP_11); @@ -75,7 +75,7 @@ impl From for Response { } #[actix_rt::test] -async fn test_expect_continue() { +async fn expect_continue() { let mut srv = test_server(|| { HttpService::build() .expect(fn_service(|req: Request| { @@ -106,7 +106,7 @@ async fn test_expect_continue() { } #[actix_rt::test] -async fn test_expect_continue_h1() { +async fn expect_continue_h1() { let mut srv = test_server(|| { HttpService::build() .expect(fn_service(|req: Request| { @@ -139,7 +139,7 @@ async fn test_expect_continue_h1() { } #[actix_rt::test] -async fn test_chunked_payload() { +async fn chunked_payload() { let chunk_sizes = vec![32768, 32, 32768]; let total_size: usize = chunk_sizes.iter().sum(); @@ -197,26 +197,43 @@ async fn test_chunked_payload() { } #[actix_rt::test] -async fn test_slow_request() { +async fn slow_request_408() { let mut srv = test_server(|| { HttpService::build() - .client_timeout(100) + .client_request_timeout(Duration::from_millis(200)) + .keep_alive(Duration::from_secs(2)) .finish(|_| ok::<_, Infallible>(Response::ok())) .tcp() }) .await; + let start = Instant::now(); + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); + let _ = stream.write_all(b"GET /test HTTP/1.1\r\n"); let mut data = String::new(); let _ = stream.read_to_string(&mut data); - assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); + assert!( + data.starts_with("HTTP/1.1 408 Request Timeout"), + "response was not 408: {}", + data + ); + + let diff = start.elapsed(); + + if diff < Duration::from_secs(1) { + // test success + } else if diff < Duration::from_secs(3) { + panic!("request seems to have wrongly timed-out according to keep-alive"); + } else { + panic!("request took way too long to time out"); + } srv.stop().await; } #[actix_rt::test] -async fn test_http1_malformed_request() { +async fn http1_malformed_request() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -234,7 +251,7 @@ async fn test_http1_malformed_request() { } #[actix_rt::test] -async fn test_http1_keepalive() { +async fn http1_keepalive() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -257,23 +274,25 @@ async fn test_http1_keepalive() { } #[actix_rt::test] -async fn test_http1_keepalive_timeout() { +async fn http1_keepalive_timeout() { let mut srv = test_server(|| { HttpService::build() - .keep_alive(1) + .keep_alive(Duration::from_secs(1)) .h1(|_| ok::<_, Infallible>(Response::ok())) .tcp() }) .await; let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n"); - let mut data = vec![0; 1024]; + + let _ = stream.write_all(b"GET /test HTTP/1.1\r\n\r\n"); + let mut data = vec![0; 256]; let _ = stream.read(&mut data); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + thread::sleep(Duration::from_millis(1100)); - let mut data = vec![0; 1024]; + let mut data = vec![0; 256]; let res = stream.read(&mut data).unwrap(); assert_eq!(res, 0); @@ -281,7 +300,7 @@ async fn test_http1_keepalive_timeout() { } #[actix_rt::test] -async fn test_http1_keepalive_close() { +async fn http1_keepalive_close() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -303,7 +322,7 @@ async fn test_http1_keepalive_close() { } #[actix_rt::test] -async fn test_http10_keepalive_default_close() { +async fn http10_keepalive_default_close() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -325,7 +344,7 @@ async fn test_http10_keepalive_default_close() { } #[actix_rt::test] -async fn test_http10_keepalive() { +async fn http10_keepalive() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -354,7 +373,7 @@ async fn test_http10_keepalive() { } #[actix_rt::test] -async fn test_http1_keepalive_disabled() { +async fn http1_keepalive_disabled() { let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) @@ -377,7 +396,7 @@ async fn test_http1_keepalive_disabled() { } #[actix_rt::test] -async fn test_content_length() { +async fn content_length() { use actix_http::{ header::{HeaderName, HeaderValue}, StatusCode, @@ -426,7 +445,7 @@ async fn test_content_length() { } #[actix_rt::test] -async fn test_h1_headers() { +async fn h1_headers() { let data = STR.repeat(10); let data2 = data.clone(); @@ -492,7 +511,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; #[actix_rt::test] -async fn test_h1_body() { +async fn h1_body() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -511,7 +530,7 @@ async fn test_h1_body() { } #[actix_rt::test] -async fn test_h1_head_empty() { +async fn h1_head_empty() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -538,7 +557,7 @@ async fn test_h1_head_empty() { } #[actix_rt::test] -async fn test_h1_head_binary() { +async fn h1_head_binary() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -565,7 +584,7 @@ async fn test_h1_head_binary() { } #[actix_rt::test] -async fn test_h1_head_binary2() { +async fn h1_head_binary2() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -588,7 +607,7 @@ async fn test_h1_head_binary2() { } #[actix_rt::test] -async fn test_h1_body_length() { +async fn h1_body_length() { let mut srv = test_server(|| { HttpService::build() .h1(|_| { @@ -612,7 +631,7 @@ async fn test_h1_body_length() { } #[actix_rt::test] -async fn test_h1_body_chunked_explicit() { +async fn h1_body_chunked_explicit() { let mut srv = test_server(|| { HttpService::build() .h1(|_| { @@ -649,7 +668,7 @@ async fn test_h1_body_chunked_explicit() { } #[actix_rt::test] -async fn test_h1_body_chunked_implicit() { +async fn h1_body_chunked_implicit() { let mut srv = test_server(|| { HttpService::build() .h1(|_| { @@ -680,7 +699,7 @@ async fn test_h1_body_chunked_implicit() { } #[actix_rt::test] -async fn test_h1_response_http_error_handling() { +async fn h1_response_http_error_handling() { let mut srv = test_server(|| { HttpService::build() .h1(fn_service(|_| { @@ -719,7 +738,7 @@ impl From for Response { } #[actix_rt::test] -async fn test_h1_service_error() { +async fn h1_service_error() { let mut srv = test_server(|| { HttpService::build() .h1(|_| err::, _>(BadRequest)) @@ -738,7 +757,7 @@ async fn test_h1_service_error() { } #[actix_rt::test] -async fn test_h1_on_connect() { +async fn h1_on_connect() { let mut srv = test_server(|| { HttpService::build() .on_connect_ext(|_, data| { @@ -761,7 +780,7 @@ async fn test_h1_on_connect() { /// Tests compliance with 304 Not Modified spec in RFC 7232 ยง4.1. /// https://datatracker.ietf.org/doc/html/rfc7232#section-4.1 #[actix_rt::test] -async fn test_not_modified_spec_h1() { +async fn not_modified_spec_h1() { // TODO: this test needing a few seconds to complete reveals some weirdness with either the // dispatcher or the client, though similar hangs occur on other tests in this file, only // succeeding, it seems, because of the keepalive timer diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index ed8c61fd6..8b3ab8e1b 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -109,7 +109,7 @@ async fn service(msg: Frame) -> Result { } #[actix_rt::test] -async fn test_simple() { +async fn simple() { let mut srv = test_server(|| { HttpService::build() .upgrade(fn_factory(|| async { diff --git a/actix-test/CHANGES.md b/actix-test/CHANGES.md index 32ab2344f..3877f4fbf 100644 --- a/actix-test/CHANGES.md +++ b/actix-test/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +- Rename `TestServerConfig::{client_timeout => client_request_timeout}`. [#2611] + +[#2611]: https://github.com/actix/actix-web/pull/2611 ## 0.1.0-beta.11 - 2022-01-04 diff --git a/actix-test/src/lib.rs b/actix-test/src/lib.rs index f86120f2f..d44bc7a45 100644 --- a/actix-test/src/lib.rs +++ b/actix-test/src/lib.rs @@ -149,7 +149,7 @@ where let local_addr = tcp.local_addr().unwrap(); let factory = factory.clone(); let srv_cfg = cfg.clone(); - let timeout = cfg.client_timeout; + let timeout = cfg.client_request_timeout; let builder = Server::build().workers(1).disable_signals().system_exit(); @@ -167,7 +167,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h1(map_config(fac, move |_| app_cfg.clone())) .tcp() }), @@ -183,7 +183,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h2(map_config(fac, move |_| app_cfg.clone())) .tcp() }), @@ -199,7 +199,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .finish(map_config(fac, move |_| app_cfg.clone())) .tcp() }), @@ -218,7 +218,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h1(map_config(fac, move |_| app_cfg.clone())) .openssl(acceptor.clone()) }), @@ -234,7 +234,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h2(map_config(fac, move |_| app_cfg.clone())) .openssl(acceptor.clone()) }), @@ -250,7 +250,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .finish(map_config(fac, move |_| app_cfg.clone())) .openssl(acceptor.clone()) }), @@ -269,7 +269,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h1(map_config(fac, move |_| app_cfg.clone())) .rustls(config.clone()) }), @@ -285,7 +285,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h2(map_config(fac, move |_| app_cfg.clone())) .rustls(config.clone()) }), @@ -301,7 +301,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .finish(map_config(fac, move |_| app_cfg.clone())) .rustls(config.clone()) }), @@ -388,7 +388,7 @@ pub fn config() -> TestServerConfig { pub struct TestServerConfig { tp: HttpVer, stream: StreamType, - client_timeout: u64, + client_request_timeout: Duration, } impl Default for TestServerConfig { @@ -403,7 +403,7 @@ impl TestServerConfig { TestServerConfig { tp: HttpVer::Both, stream: StreamType::Tcp, - client_timeout: 5000, + client_request_timeout: Duration::from_secs(5), } } @@ -433,9 +433,9 @@ impl TestServerConfig { self } - /// Set client timeout in milliseconds for first request. - pub fn client_timeout(mut self, val: u64) -> Self { - self.client_timeout = val; + /// Set client timeout for first request. + pub fn client_request_timeout(mut self, dur: Duration) -> Self { + self.client_request_timeout = dur; self } } diff --git a/awc/src/client/h1proto.rs b/awc/src/client/h1proto.rs index cf716db72..4f6a87ac5 100644 --- a/awc/src/client/h1proto.rs +++ b/awc/src/client/h1proto.rs @@ -70,7 +70,7 @@ where let is_expect = if head.as_ref().headers.contains_key(EXPECT) { match body.size() { BodySize::None | BodySize::Sized(0) => { - let keep_alive = framed.codec_ref().keepalive(); + let keep_alive = framed.codec_ref().keep_alive(); framed.io_mut().on_release(keep_alive); // TODO: use a new variant or a new type better describing error violate @@ -119,7 +119,7 @@ where match pin_framed.codec_ref().message_type() { h1::MessageType::None => { - let keep_alive = pin_framed.codec_ref().keepalive(); + let keep_alive = pin_framed.codec_ref().keep_alive(); pin_framed.io_mut().on_release(keep_alive); Ok((head, Payload::None)) @@ -223,7 +223,7 @@ impl Stream for PlStream { match ready!(this.framed.as_mut().next_item(cx)?) { Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))), Some(None) => { - let keep_alive = this.framed.codec_ref().keepalive(); + let keep_alive = this.framed.codec_ref().keep_alive(); this.framed.io_mut().on_release(keep_alive); Poll::Ready(None) } diff --git a/src/server.rs b/src/server.rs index ed0c965b3..83e025fb0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,6 +4,7 @@ use std::{ marker::PhantomData, net, sync::{Arc, Mutex}, + time::Duration, }; use actix_http::{body::MessageBody, Extensions, HttpService, KeepAlive, Request, Response}; @@ -27,8 +28,8 @@ struct Socket { struct Config { host: Option, keep_alive: KeepAlive, - client_timeout: u64, - client_shutdown: u64, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, } /// An HTTP Server. @@ -88,9 +89,9 @@ where factory, config: Arc::new(Mutex::new(Config { host: None, - keep_alive: KeepAlive::Timeout(5), - client_timeout: 5000, - client_shutdown: 5000, + keep_alive: KeepAlive::default(), + client_request_timeout: Duration::from_secs(5), + client_disconnect_timeout: Duration::from_secs(1), })), backlog: 1024, sockets: Vec::new(), @@ -200,11 +201,17 @@ where /// To disable timeout set value to 0. /// /// By default client timeout is set to 5000 milliseconds. - pub fn client_timeout(self, val: u64) -> Self { - self.config.lock().unwrap().client_timeout = val; + pub fn client_request_timeout(self, dur: Duration) -> Self { + self.config.lock().unwrap().client_request_timeout = dur; self } + #[doc(hidden)] + #[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")] + pub fn client_timeout(self, dur: Duration) -> Self { + self.client_request_timeout(dur) + } + /// Set server connection shutdown timeout in milliseconds. /// /// Defines a timeout for shutdown connection. If a shutdown procedure does not complete @@ -213,11 +220,17 @@ where /// To disable timeout set value to 0. /// /// By default client timeout is set to 5000 milliseconds. - pub fn client_shutdown(self, val: u64) -> Self { - self.config.lock().unwrap().client_shutdown = val; + pub fn client_disconnect_timeout(self, dur: Duration) -> Self { + self.config.lock().unwrap().client_disconnect_timeout = dur; self } + #[doc(hidden)] + #[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")] + pub fn client_shutdown(self, dur: u64) -> Self { + self.client_disconnect_timeout(Duration::from_millis(dur)) + } + /// Set server host name. /// /// Host name is used by application router as a hostname for url generation. @@ -291,8 +304,8 @@ where let mut svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown) + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout) .local_addr(addr); if let Some(handler) = on_connect_fn.clone() { @@ -349,8 +362,8 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown) + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout) .local_addr(addr); let svc = if let Some(handler) = on_connect_fn.clone() { @@ -410,8 +423,8 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown); + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout); let svc = if let Some(handler) = on_connect_fn.clone() { svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext)) @@ -537,8 +550,8 @@ where fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({ let mut svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown); + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout); if let Some(handler) = on_connect_fn.clone() { svc = svc @@ -593,8 +606,8 @@ where fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then( HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown) + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout) .finish(map_config(fac, move |_| config.clone())), ) }, diff --git a/tests/test_httpserver.rs b/tests/test_httpserver.rs index 6ea8e520c..86e0575f3 100644 --- a/tests/test_httpserver.rs +++ b/tests/test_httpserver.rs @@ -26,9 +26,9 @@ async fn test_start() { .backlog(1) .max_connections(10) .max_connection_rate(10) - .keep_alive(10) - .client_timeout(5000) - .client_shutdown(0) + .keep_alive(Duration::from_secs(10)) + .client_request_timeout(Duration::from_secs(5)) + .client_disconnect_timeout(Duration::ZERO) .server_hostname("localhost") .system_exit() .disable_signals() diff --git a/tests/test_server.rs b/tests/test_server.rs index b8193a004..bd8934061 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -8,6 +8,7 @@ use std::{ io::{Read, Write}, pin::Pin, task::{Context, Poll}, + time::Duration, }; use actix_web::{ @@ -835,9 +836,10 @@ async fn test_server_cookies() { async fn test_slow_request() { use std::net; - let srv = actix_test::start_with(actix_test::config().client_timeout(200), || { - App::new().service(web::resource("/").route(web::to(HttpResponse::Ok))) - }); + let srv = actix_test::start_with( + actix_test::config().client_request_timeout(Duration::from_millis(200)), + || App::new().service(web::resource("/").route(web::to(HttpResponse::Ok))), + ); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut data = String::new();