mirror of
https://github.com/fafhrd91/actix-web
synced 2024-11-24 00:21:08 +01:00
fix request head timeout (#2611)
This commit is contained in:
parent
b3e84b5c4b
commit
3200de3f34
@ -1,10 +1,15 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
### Changed
|
||||||
|
- Rename `HttpServer::{client_timeout => client_request_timeout}`. [#2611]
|
||||||
|
- Rename `HttpServer::{client_shutdown => client_disconnect_timeout}`. [#2611]
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
- `impl Future for HttpResponse`. [#2601]
|
- `impl Future for HttpResponse`. [#2601]
|
||||||
|
|
||||||
[#2601]: https://github.com/actix/actix-web/pull/2601
|
[#2601]: https://github.com/actix/actix-web/pull/2601
|
||||||
|
[#2611]: https://github.com/actix/actix-web/pull/2611
|
||||||
|
|
||||||
|
|
||||||
## 4.0.0-beta.21 - 2022-01-21
|
## 4.0.0-beta.21 - 2022-01-21
|
||||||
|
@ -1,6 +1,32 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
### Added
|
||||||
|
- Implement `Default` for `KeepAlive`. [#2611]
|
||||||
|
- Implement `From<Duration>` for `KeepAlive`. [#2611]
|
||||||
|
- Implement `From<Option<Duration>>` for `KeepAlive`. [#2611]
|
||||||
|
- Implement `Default` for `HttpServiceBuilder`. [#2611]
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- Rename `ServiceConfig::{client_timer_expire => client_request_deadline}`. [#2611]
|
||||||
|
- Rename `ServiceConfig::{client_disconnect_timer => client_disconnect_deadline}`. [#2611]
|
||||||
|
- Deadline methods in `ServiceConfig` now return `std::time::Instant`s instead of Tokio's wrapper type. [#2611]
|
||||||
|
- Rename `h1::Codec::{keepalive => keep_alive}`. [#2611]
|
||||||
|
- Rename `h1::Codec::{keepalive_enabled => keep_alive_enabled}`. [#2611]
|
||||||
|
- Rename `h1::ClientCodec::{keepalive => keep_alive}`. [#2611]
|
||||||
|
- Rename `h1::ClientPayloadCodec::{keepalive => keep_alive}`. [#2611]
|
||||||
|
- `ServiceConfig::keep_alive` now returns a `KeepAlive`. [#2611]
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- HTTP/1.1 dispatcher correctly uses client request timeout. [#2611]
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
- `ServiceConfig::{client_timer, keep_alive_timer}`. [#2611]
|
||||||
|
- `impl From<usize> for KeepAlive`; use `Duration`s instead. [#2611]
|
||||||
|
- `impl From<Option<usize>> for KeepAlive`; use `Duration`s instead. [#2611]
|
||||||
|
- `HttpServiceBuilder::new`; use `default` instead. [#2611]
|
||||||
|
|
||||||
|
[#2611]: https://github.com/actix/actix-web/pull/2611
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-beta.19 - 2022-01-21
|
## 3.0.0-beta.19 - 2022-01-21
|
||||||
|
@ -92,6 +92,7 @@ criterion = { version = "0.3", features = ["html_reports"] }
|
|||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||||
memchr = "2.4"
|
memchr = "2.4"
|
||||||
|
once_cell = "1.9"
|
||||||
rcgen = "0.8"
|
rcgen = "0.8"
|
||||||
regex = "1.3"
|
regex = "1.3"
|
||||||
rustls-pemfile = "0.2"
|
rustls-pemfile = "0.2"
|
||||||
|
27
actix-http/examples/bench.rs
Normal file
27
actix-http/examples/bench.rs
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
use std::{convert::Infallible, io, time::Duration};
|
||||||
|
|
||||||
|
use actix_http::{HttpService, Request, Response, StatusCode};
|
||||||
|
use actix_server::Server;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
|
||||||
|
static STR: Lazy<String> = Lazy::new(|| "HELLO WORLD ".repeat(20));
|
||||||
|
|
||||||
|
#[actix_rt::main]
|
||||||
|
async fn main() -> io::Result<()> {
|
||||||
|
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||||
|
|
||||||
|
Server::build()
|
||||||
|
.bind("dispatcher-benchmark", ("127.0.0.1", 8080), || {
|
||||||
|
HttpService::build()
|
||||||
|
.client_request_timeout(Duration::from_secs(1))
|
||||||
|
.finish(|_: Request| async move {
|
||||||
|
let mut res = Response::build(StatusCode::OK);
|
||||||
|
Ok::<_, Infallible>(res.body(&**STR))
|
||||||
|
})
|
||||||
|
.tcp()
|
||||||
|
})?
|
||||||
|
// limiting number of workers so that bench client is not sharing as many resources
|
||||||
|
.workers(4)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
use std::io;
|
use std::{io, time::Duration};
|
||||||
|
|
||||||
use actix_http::{Error, HttpService, Request, Response, StatusCode};
|
use actix_http::{Error, HttpService, Request, Response, StatusCode};
|
||||||
use actix_server::Server;
|
use actix_server::Server;
|
||||||
@ -13,8 +13,8 @@ async fn main() -> io::Result<()> {
|
|||||||
Server::build()
|
Server::build()
|
||||||
.bind("echo", ("127.0.0.1", 8080), || {
|
.bind("echo", ("127.0.0.1", 8080), || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(1000)
|
.client_request_timeout(Duration::from_secs(1))
|
||||||
.client_disconnect(1000)
|
.client_disconnect_timeout(Duration::from_secs(1))
|
||||||
// handles HTTP/1.1 and HTTP/2
|
// handles HTTP/1.1 and HTTP/2
|
||||||
.finish(|mut req: Request| async move {
|
.finish(|mut req: Request| async move {
|
||||||
let mut body = BytesMut::new();
|
let mut body = BytesMut::new();
|
||||||
|
25
actix-http/examples/h2spec.rs
Normal file
25
actix-http/examples/h2spec.rs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
use std::{convert::Infallible, io};
|
||||||
|
|
||||||
|
use actix_http::{HttpService, Request, Response, StatusCode};
|
||||||
|
use actix_server::Server;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
|
||||||
|
static STR: Lazy<String> = Lazy::new(|| "HELLO WORLD ".repeat(100));
|
||||||
|
|
||||||
|
#[actix_rt::main]
|
||||||
|
async fn main() -> io::Result<()> {
|
||||||
|
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||||
|
|
||||||
|
Server::build()
|
||||||
|
.bind("h2spec", ("127.0.0.1", 8080), || {
|
||||||
|
HttpService::build()
|
||||||
|
.h2(|_: Request| async move {
|
||||||
|
let mut res = Response::build(StatusCode::OK);
|
||||||
|
Ok::<_, Infallible>(res.body(&**STR))
|
||||||
|
})
|
||||||
|
.tcp()
|
||||||
|
})?
|
||||||
|
.workers(4)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
use std::{convert::Infallible, io};
|
use std::{convert::Infallible, io, time::Duration};
|
||||||
|
|
||||||
use actix_http::{
|
use actix_http::{
|
||||||
header::HeaderValue, HttpMessage, HttpService, Request, Response, StatusCode,
|
header::HeaderValue, HttpMessage, HttpService, Request, Response, StatusCode,
|
||||||
@ -12,8 +12,8 @@ async fn main() -> io::Result<()> {
|
|||||||
Server::build()
|
Server::build()
|
||||||
.bind("hello-world", ("127.0.0.1", 8080), || {
|
.bind("hello-world", ("127.0.0.1", 8080), || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(1000)
|
.client_request_timeout(Duration::from_secs(1))
|
||||||
.client_disconnect(1000)
|
.client_disconnect_timeout(Duration::from_secs(1))
|
||||||
.on_connect_ext(|_, ext| {
|
.on_connect_ext(|_, ext| {
|
||||||
ext.insert(42u32);
|
ext.insert(42u32);
|
||||||
})
|
})
|
||||||
|
@ -1,25 +1,23 @@
|
|||||||
use std::{fmt, marker::PhantomData, net, rc::Rc};
|
use std::{fmt, marker::PhantomData, net, rc::Rc, time::Duration};
|
||||||
|
|
||||||
use actix_codec::Framed;
|
use actix_codec::Framed;
|
||||||
use actix_service::{IntoServiceFactory, Service, ServiceFactory};
|
use actix_service::{IntoServiceFactory, Service, ServiceFactory};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
body::{BoxBody, MessageBody},
|
body::{BoxBody, MessageBody},
|
||||||
config::{KeepAlive, ServiceConfig},
|
|
||||||
h1::{self, ExpectHandler, H1Service, UpgradeHandler},
|
h1::{self, ExpectHandler, H1Service, UpgradeHandler},
|
||||||
h2::H2Service,
|
h2::H2Service,
|
||||||
service::HttpService,
|
service::HttpService,
|
||||||
ConnectCallback, Extensions, Request, Response,
|
ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A HTTP service builder
|
/// An HTTP service builder.
|
||||||
///
|
///
|
||||||
/// This type can be used to construct an instance of [`HttpService`] through a
|
/// This type can construct an instance of [`HttpService`] through a builder-like pattern.
|
||||||
/// builder-like pattern.
|
|
||||||
pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
|
pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
|
||||||
keep_alive: KeepAlive,
|
keep_alive: KeepAlive,
|
||||||
client_timeout: u64,
|
client_request_timeout: Duration,
|
||||||
client_disconnect: u64,
|
client_disconnect_timeout: Duration,
|
||||||
secure: bool,
|
secure: bool,
|
||||||
local_addr: Option<net::SocketAddr>,
|
local_addr: Option<net::SocketAddr>,
|
||||||
expect: X,
|
expect: X,
|
||||||
@ -28,22 +26,23 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
|
|||||||
_phantom: PhantomData<S>,
|
_phantom: PhantomData<S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, S> HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler>
|
impl<T, S> Default for HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler>
|
||||||
where
|
where
|
||||||
S: ServiceFactory<Request, Config = ()>,
|
S: ServiceFactory<Request, Config = ()>,
|
||||||
S::Error: Into<Response<BoxBody>> + 'static,
|
S::Error: Into<Response<BoxBody>> + 'static,
|
||||||
S::InitError: fmt::Debug,
|
S::InitError: fmt::Debug,
|
||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
{
|
{
|
||||||
/// Create instance of `ServiceConfigBuilder`
|
fn default() -> Self {
|
||||||
#[allow(clippy::new_without_default)]
|
|
||||||
pub fn new() -> Self {
|
|
||||||
HttpServiceBuilder {
|
HttpServiceBuilder {
|
||||||
keep_alive: KeepAlive::Timeout(5),
|
// ServiceConfig parts (make sure defaults match)
|
||||||
client_timeout: 5000,
|
keep_alive: KeepAlive::default(),
|
||||||
client_disconnect: 0,
|
client_request_timeout: Duration::from_secs(5),
|
||||||
|
client_disconnect_timeout: Duration::ZERO,
|
||||||
secure: false,
|
secure: false,
|
||||||
local_addr: None,
|
local_addr: None,
|
||||||
|
|
||||||
|
// dispatcher parts
|
||||||
expect: ExpectHandler,
|
expect: ExpectHandler,
|
||||||
upgrade: None,
|
upgrade: None,
|
||||||
on_connect_ext: None,
|
on_connect_ext: None,
|
||||||
@ -65,9 +64,11 @@ where
|
|||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
{
|
{
|
||||||
/// Set server keep-alive setting.
|
/// Set connection keep-alive setting.
|
||||||
///
|
///
|
||||||
/// By default keep alive is set to a 5 seconds.
|
/// Applies to HTTP/1.1 keep-alive and HTTP/2 ping-pong.
|
||||||
|
///
|
||||||
|
/// By default keep-alive is 5 seconds.
|
||||||
pub fn keep_alive<W: Into<KeepAlive>>(mut self, val: W) -> Self {
|
pub fn keep_alive<W: Into<KeepAlive>>(mut self, val: W) -> Self {
|
||||||
self.keep_alive = val.into();
|
self.keep_alive = val.into();
|
||||||
self
|
self
|
||||||
@ -85,33 +86,45 @@ where
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set server client timeout in milliseconds for first request.
|
/// Set client request timeout (for first request).
|
||||||
///
|
///
|
||||||
/// Defines a timeout for reading client request header. If a client does not transmit
|
/// Defines a timeout for reading client request header. If the client does not transmit the
|
||||||
/// the entire set headers within this time, the request is terminated with
|
/// request head within this duration, the connection is terminated with a `408 Request Timeout`
|
||||||
/// the 408 (Request Time-out) error.
|
/// response error.
|
||||||
///
|
///
|
||||||
/// To disable timeout set value to 0.
|
/// A duration of zero disables the timeout.
|
||||||
///
|
///
|
||||||
/// By default client timeout is set to 5000 milliseconds.
|
/// By default, the client timeout is 5 seconds.
|
||||||
pub fn client_timeout(mut self, val: u64) -> Self {
|
pub fn client_request_timeout(mut self, dur: Duration) -> Self {
|
||||||
self.client_timeout = val;
|
self.client_request_timeout = dur;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set server connection disconnect timeout in milliseconds.
|
#[doc(hidden)]
|
||||||
|
#[deprecated(since = "3.0.0", note = "Renamed to `client_request_timeout`.")]
|
||||||
|
pub fn client_timeout(self, dur: Duration) -> Self {
|
||||||
|
self.client_request_timeout(dur)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set client connection disconnect timeout.
|
||||||
///
|
///
|
||||||
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
|
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
|
||||||
/// within this time, the request get dropped. This timeout affects secure connections.
|
/// within this time, the request get dropped. This timeout affects secure connections.
|
||||||
///
|
///
|
||||||
/// To disable timeout set value to 0.
|
/// A duration of zero disables the timeout.
|
||||||
///
|
///
|
||||||
/// By default disconnect timeout is set to 0.
|
/// By default, the disconnect timeout is disabled.
|
||||||
pub fn client_disconnect(mut self, val: u64) -> Self {
|
pub fn client_disconnect_timeout(mut self, dur: Duration) -> Self {
|
||||||
self.client_disconnect = val;
|
self.client_disconnect_timeout = dur;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[deprecated(since = "3.0.0", note = "Renamed to `client_disconnect_timeout`.")]
|
||||||
|
pub fn client_disconnect(self, dur: Duration) -> Self {
|
||||||
|
self.client_disconnect_timeout(dur)
|
||||||
|
}
|
||||||
|
|
||||||
/// Provide service for `EXPECT: 100-Continue` support.
|
/// Provide service for `EXPECT: 100-Continue` support.
|
||||||
///
|
///
|
||||||
/// Service get called with request that contains `EXPECT` header.
|
/// Service get called with request that contains `EXPECT` header.
|
||||||
@ -126,8 +139,8 @@ where
|
|||||||
{
|
{
|
||||||
HttpServiceBuilder {
|
HttpServiceBuilder {
|
||||||
keep_alive: self.keep_alive,
|
keep_alive: self.keep_alive,
|
||||||
client_timeout: self.client_timeout,
|
client_request_timeout: self.client_request_timeout,
|
||||||
client_disconnect: self.client_disconnect,
|
client_disconnect_timeout: self.client_disconnect_timeout,
|
||||||
secure: self.secure,
|
secure: self.secure,
|
||||||
local_addr: self.local_addr,
|
local_addr: self.local_addr,
|
||||||
expect: expect.into_factory(),
|
expect: expect.into_factory(),
|
||||||
@ -150,8 +163,8 @@ where
|
|||||||
{
|
{
|
||||||
HttpServiceBuilder {
|
HttpServiceBuilder {
|
||||||
keep_alive: self.keep_alive,
|
keep_alive: self.keep_alive,
|
||||||
client_timeout: self.client_timeout,
|
client_request_timeout: self.client_request_timeout,
|
||||||
client_disconnect: self.client_disconnect,
|
client_disconnect_timeout: self.client_disconnect_timeout,
|
||||||
secure: self.secure,
|
secure: self.secure,
|
||||||
local_addr: self.local_addr,
|
local_addr: self.local_addr,
|
||||||
expect: self.expect,
|
expect: self.expect,
|
||||||
@ -185,8 +198,8 @@ where
|
|||||||
{
|
{
|
||||||
let cfg = ServiceConfig::new(
|
let cfg = ServiceConfig::new(
|
||||||
self.keep_alive,
|
self.keep_alive,
|
||||||
self.client_timeout,
|
self.client_request_timeout,
|
||||||
self.client_disconnect,
|
self.client_disconnect_timeout,
|
||||||
self.secure,
|
self.secure,
|
||||||
self.local_addr,
|
self.local_addr,
|
||||||
);
|
);
|
||||||
@ -209,8 +222,8 @@ where
|
|||||||
{
|
{
|
||||||
let cfg = ServiceConfig::new(
|
let cfg = ServiceConfig::new(
|
||||||
self.keep_alive,
|
self.keep_alive,
|
||||||
self.client_timeout,
|
self.client_request_timeout,
|
||||||
self.client_disconnect,
|
self.client_disconnect_timeout,
|
||||||
self.secure,
|
self.secure,
|
||||||
self.local_addr,
|
self.local_addr,
|
||||||
);
|
);
|
||||||
@ -230,8 +243,8 @@ where
|
|||||||
{
|
{
|
||||||
let cfg = ServiceConfig::new(
|
let cfg = ServiceConfig::new(
|
||||||
self.keep_alive,
|
self.keep_alive,
|
||||||
self.client_timeout,
|
self.client_request_timeout,
|
||||||
self.client_disconnect,
|
self.client_disconnect_timeout,
|
||||||
self.secure,
|
self.secure,
|
||||||
self.local_addr,
|
self.local_addr,
|
||||||
);
|
);
|
||||||
|
@ -1,71 +1,36 @@
|
|||||||
use std::{
|
use std::{
|
||||||
cell::Cell,
|
|
||||||
fmt::{self, Write},
|
|
||||||
net,
|
net,
|
||||||
rc::Rc,
|
rc::Rc,
|
||||||
time::{Duration, SystemTime},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_rt::{
|
|
||||||
task::JoinHandle,
|
|
||||||
time::{interval, sleep_until, Instant, Sleep},
|
|
||||||
};
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
|
||||||
/// "Sun, 06 Nov 1994 08:49:37 GMT".len()
|
use crate::{date::DateService, KeepAlive};
|
||||||
pub(crate) const DATE_VALUE_LENGTH: usize = 29;
|
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
/// HTTP service configuration.
|
||||||
/// Server keep-alive setting
|
#[derive(Debug, Clone)]
|
||||||
pub enum KeepAlive {
|
|
||||||
/// Keep alive in seconds
|
|
||||||
Timeout(usize),
|
|
||||||
|
|
||||||
/// Rely on OS to shutdown tcp connection
|
|
||||||
Os,
|
|
||||||
|
|
||||||
/// Disabled
|
|
||||||
Disabled,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<usize> for KeepAlive {
|
|
||||||
fn from(keepalive: usize) -> Self {
|
|
||||||
KeepAlive::Timeout(keepalive)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Option<usize>> for KeepAlive {
|
|
||||||
fn from(keepalive: Option<usize>) -> Self {
|
|
||||||
if let Some(keepalive) = keepalive {
|
|
||||||
KeepAlive::Timeout(keepalive)
|
|
||||||
} else {
|
|
||||||
KeepAlive::Disabled
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Http service configuration
|
|
||||||
pub struct ServiceConfig(Rc<Inner>);
|
pub struct ServiceConfig(Rc<Inner>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct Inner {
|
struct Inner {
|
||||||
keep_alive: Option<Duration>,
|
keep_alive: KeepAlive,
|
||||||
client_timeout: u64,
|
client_request_timeout: Duration,
|
||||||
client_disconnect: u64,
|
client_disconnect_timeout: Duration,
|
||||||
ka_enabled: bool,
|
|
||||||
secure: bool,
|
secure: bool,
|
||||||
local_addr: Option<std::net::SocketAddr>,
|
local_addr: Option<std::net::SocketAddr>,
|
||||||
date_service: DateService,
|
date_service: DateService,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for ServiceConfig {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
ServiceConfig(self.0.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for ServiceConfig {
|
impl Default for ServiceConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self::new(KeepAlive::Timeout(5), 0, 0, false, None)
|
Self::new(
|
||||||
|
KeepAlive::default(),
|
||||||
|
Duration::from_secs(5),
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,34 +38,22 @@ impl ServiceConfig {
|
|||||||
/// Create instance of `ServiceConfig`
|
/// Create instance of `ServiceConfig`
|
||||||
pub fn new(
|
pub fn new(
|
||||||
keep_alive: KeepAlive,
|
keep_alive: KeepAlive,
|
||||||
client_timeout: u64,
|
client_request_timeout: Duration,
|
||||||
client_disconnect: u64,
|
client_disconnect_timeout: Duration,
|
||||||
secure: bool,
|
secure: bool,
|
||||||
local_addr: Option<net::SocketAddr>,
|
local_addr: Option<net::SocketAddr>,
|
||||||
) -> ServiceConfig {
|
) -> ServiceConfig {
|
||||||
let (keep_alive, ka_enabled) = match keep_alive {
|
|
||||||
KeepAlive::Timeout(val) => (val as u64, true),
|
|
||||||
KeepAlive::Os => (0, true),
|
|
||||||
KeepAlive::Disabled => (0, false),
|
|
||||||
};
|
|
||||||
let keep_alive = if ka_enabled && keep_alive > 0 {
|
|
||||||
Some(Duration::from_secs(keep_alive))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
ServiceConfig(Rc::new(Inner {
|
ServiceConfig(Rc::new(Inner {
|
||||||
keep_alive,
|
keep_alive: keep_alive.normalize(),
|
||||||
ka_enabled,
|
client_request_timeout,
|
||||||
client_timeout,
|
client_disconnect_timeout,
|
||||||
client_disconnect,
|
|
||||||
secure,
|
secure,
|
||||||
local_addr,
|
local_addr,
|
||||||
date_service: DateService::new(),
|
date_service: DateService::new(),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns true if connection is secure (HTTPS)
|
/// Returns `true` if connection is secure (i.e., using TLS / HTTPS).
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn secure(&self) -> bool {
|
pub fn secure(&self) -> bool {
|
||||||
self.0.secure
|
self.0.secure
|
||||||
@ -114,239 +67,91 @@ impl ServiceConfig {
|
|||||||
self.0.local_addr
|
self.0.local_addr
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Keep alive duration if configured.
|
/// Connection keep-alive setting.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn keep_alive(&self) -> Option<Duration> {
|
pub fn keep_alive(&self) -> KeepAlive {
|
||||||
self.0.keep_alive
|
self.0.keep_alive
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return state of connection keep-alive functionality
|
/// Creates a time object representing the deadline for this connection's keep-alive period, if
|
||||||
#[inline]
|
/// enabled.
|
||||||
pub fn keep_alive_enabled(&self) -> bool {
|
///
|
||||||
self.0.ka_enabled
|
/// When [`KeepAlive::Os`] or [`KeepAlive::Disabled`] is set, this will return `None`.
|
||||||
}
|
pub fn keep_alive_deadline(&self) -> Option<Instant> {
|
||||||
|
match self.keep_alive() {
|
||||||
/// Client timeout for first request.
|
KeepAlive::Timeout(dur) => Some(self.now() + dur),
|
||||||
#[inline]
|
KeepAlive::Os => None,
|
||||||
pub fn client_timer(&self) -> Option<Sleep> {
|
KeepAlive::Disabled => None,
|
||||||
let delay_time = self.0.client_timeout;
|
|
||||||
if delay_time != 0 {
|
|
||||||
Some(sleep_until(self.now() + Duration::from_millis(delay_time)))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Client timeout for first request.
|
/// Creates a time object representing the deadline for the client to finish sending the head of
|
||||||
pub fn client_timer_expire(&self) -> Option<Instant> {
|
/// its first request.
|
||||||
let delay = self.0.client_timeout;
|
///
|
||||||
if delay != 0 {
|
/// Returns `None` if this `ServiceConfig was` constructed with `client_request_timeout: 0`.
|
||||||
Some(self.now() + Duration::from_millis(delay))
|
pub fn client_request_deadline(&self) -> Option<Instant> {
|
||||||
} else {
|
let timeout = self.0.client_request_timeout;
|
||||||
None
|
(timeout != Duration::ZERO).then(|| self.now() + timeout)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Client disconnect timer
|
/// Creates a time object representing the deadline for the client to disconnect.
|
||||||
pub fn client_disconnect_timer(&self) -> Option<Instant> {
|
pub fn client_disconnect_deadline(&self) -> Option<Instant> {
|
||||||
let delay = self.0.client_disconnect;
|
let timeout = self.0.client_disconnect_timeout;
|
||||||
if delay != 0 {
|
(timeout != Duration::ZERO).then(|| self.now() + timeout)
|
||||||
Some(self.now() + Duration::from_millis(delay))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return keep-alive timer delay is configured.
|
|
||||||
#[inline]
|
|
||||||
pub fn keep_alive_timer(&self) -> Option<Sleep> {
|
|
||||||
self.keep_alive().map(|ka| sleep_until(self.now() + ka))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Keep-alive expire time
|
|
||||||
pub fn keep_alive_expire(&self) -> Option<Instant> {
|
|
||||||
self.keep_alive().map(|ka| self.now() + ka)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
pub(crate) fn now(&self) -> Instant {
|
pub(crate) fn now(&self) -> Instant {
|
||||||
self.0.date_service.now()
|
self.0.date_service.now()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
pub(crate) fn write_date_header(&self, dst: &mut BytesMut, camel_case: bool) {
|
||||||
pub fn set_date(&self, dst: &mut BytesMut, camel_case: bool) {
|
|
||||||
let mut buf: [u8; 39] = [0; 39];
|
let mut buf: [u8; 39] = [0; 39];
|
||||||
|
|
||||||
buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " });
|
buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " });
|
||||||
|
|
||||||
self.0
|
self.0
|
||||||
.date_service
|
.date_service
|
||||||
.set_date(|date| buf[6..35].copy_from_slice(&date.bytes));
|
.with_date(|date| buf[6..35].copy_from_slice(&date.bytes));
|
||||||
|
|
||||||
buf[35..].copy_from_slice(b"\r\n\r\n");
|
buf[35..].copy_from_slice(b"\r\n\r\n");
|
||||||
dst.extend_from_slice(&buf);
|
dst.extend_from_slice(&buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn set_date_header(&self, dst: &mut BytesMut) {
|
pub(crate) fn write_date_header_value(&self, dst: &mut BytesMut) {
|
||||||
self.0
|
self.0
|
||||||
.date_service
|
.date_service
|
||||||
.set_date(|date| dst.extend_from_slice(&date.bytes));
|
.with_date(|date| dst.extend_from_slice(&date.bytes));
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
|
||||||
struct Date {
|
|
||||||
bytes: [u8; DATE_VALUE_LENGTH],
|
|
||||||
pos: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Date {
|
|
||||||
fn new() -> Date {
|
|
||||||
let mut date = Date {
|
|
||||||
bytes: [0; DATE_VALUE_LENGTH],
|
|
||||||
pos: 0,
|
|
||||||
};
|
|
||||||
date.update();
|
|
||||||
date
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update(&mut self) {
|
|
||||||
self.pos = 0;
|
|
||||||
write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Write for Date {
|
|
||||||
fn write_str(&mut self, s: &str) -> fmt::Result {
|
|
||||||
let len = s.len();
|
|
||||||
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
|
|
||||||
self.pos += len;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Service for update Date and Instant periodically at 500 millis interval.
|
|
||||||
struct DateService {
|
|
||||||
current: Rc<Cell<(Date, Instant)>>,
|
|
||||||
handle: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for DateService {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
// stop the timer update async task on drop.
|
|
||||||
self.handle.abort();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DateService {
|
|
||||||
fn new() -> Self {
|
|
||||||
// shared date and timer for DateService and update async task.
|
|
||||||
let current = Rc::new(Cell::new((Date::new(), Instant::now())));
|
|
||||||
let current_clone = Rc::clone(¤t);
|
|
||||||
// spawn an async task sleep for 500 milli and update current date/timer in a loop.
|
|
||||||
// handle is used to stop the task on DateService drop.
|
|
||||||
let handle = actix_rt::spawn(async move {
|
|
||||||
#[cfg(test)]
|
|
||||||
let _notify = notify_on_drop::NotifyOnDrop::new();
|
|
||||||
|
|
||||||
let mut interval = interval(Duration::from_millis(500));
|
|
||||||
loop {
|
|
||||||
let now = interval.tick().await;
|
|
||||||
let date = Date::new();
|
|
||||||
current_clone.set((date, now));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
DateService { current, handle }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn now(&self) -> Instant {
|
|
||||||
self.current.get().1
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_date<F: FnMut(&Date)>(&self, mut f: F) {
|
|
||||||
f(&self.current.get().0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: move to a util module for testing all spawn handle drop style tasks.
|
|
||||||
/// Test Module for checking the drop state of certain async tasks that are spawned
|
|
||||||
/// with `actix_rt::spawn`
|
|
||||||
///
|
|
||||||
/// The target task must explicitly generate `NotifyOnDrop` when spawn the task
|
|
||||||
#[cfg(test)]
|
|
||||||
mod notify_on_drop {
|
|
||||||
use std::cell::RefCell;
|
|
||||||
|
|
||||||
thread_local! {
|
|
||||||
static NOTIFY_DROPPED: RefCell<Option<bool>> = RefCell::new(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if the spawned task is dropped.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
/// Panics when there was no `NotifyOnDrop` instance on current thread.
|
|
||||||
pub(crate) fn is_dropped() -> bool {
|
|
||||||
NOTIFY_DROPPED.with(|bool| {
|
|
||||||
bool.borrow()
|
|
||||||
.expect("No NotifyOnDrop existed on current thread")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct NotifyOnDrop;
|
|
||||||
|
|
||||||
impl NotifyOnDrop {
|
|
||||||
/// # Panic:
|
|
||||||
///
|
|
||||||
/// When construct multiple instances on any given thread.
|
|
||||||
pub(crate) fn new() -> Self {
|
|
||||||
NOTIFY_DROPPED.with(|bool| {
|
|
||||||
let mut bool = bool.borrow_mut();
|
|
||||||
if bool.is_some() {
|
|
||||||
panic!("NotifyOnDrop existed on current thread");
|
|
||||||
} else {
|
|
||||||
*bool = Some(false);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
NotifyOnDrop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for NotifyOnDrop {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
NOTIFY_DROPPED.with(|bool| {
|
|
||||||
if let Some(b) = bool.borrow_mut().as_mut() {
|
|
||||||
*b = true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::{date::DATE_VALUE_LENGTH, notify_on_drop};
|
||||||
|
|
||||||
use actix_rt::{task::yield_now, time::sleep};
|
use actix_rt::{
|
||||||
|
task::yield_now,
|
||||||
|
time::{sleep, sleep_until},
|
||||||
|
};
|
||||||
use memchr::memmem;
|
use memchr::memmem;
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_date_service_update() {
|
async fn test_date_service_update() {
|
||||||
let settings = ServiceConfig::new(KeepAlive::Os, 0, 0, false, None);
|
let settings =
|
||||||
|
ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None);
|
||||||
|
|
||||||
yield_now().await;
|
yield_now().await;
|
||||||
|
|
||||||
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||||
settings.set_date(&mut buf1, false);
|
settings.write_date_header(&mut buf1, false);
|
||||||
let now1 = settings.now();
|
let now1 = settings.now();
|
||||||
|
|
||||||
sleep_until(Instant::now() + Duration::from_secs(2)).await;
|
sleep_until((Instant::now() + Duration::from_secs(2)).into()).await;
|
||||||
yield_now().await;
|
yield_now().await;
|
||||||
|
|
||||||
let now2 = settings.now();
|
let now2 = settings.now();
|
||||||
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||||
settings.set_date(&mut buf2, false);
|
settings.write_date_header(&mut buf2, false);
|
||||||
|
|
||||||
assert_ne!(now1, now2);
|
assert_ne!(now1, now2);
|
||||||
|
|
||||||
@ -402,10 +207,10 @@ mod tests {
|
|||||||
let settings = ServiceConfig::default();
|
let settings = ServiceConfig::default();
|
||||||
|
|
||||||
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||||
settings.set_date(&mut buf1, false);
|
settings.write_date_header(&mut buf1, false);
|
||||||
|
|
||||||
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||||
settings.set_date(&mut buf2, false);
|
settings.write_date_header(&mut buf2, false);
|
||||||
|
|
||||||
assert_eq!(buf1, buf2);
|
assert_eq!(buf1, buf2);
|
||||||
}
|
}
|
||||||
@ -415,11 +220,11 @@ mod tests {
|
|||||||
let settings = ServiceConfig::default();
|
let settings = ServiceConfig::default();
|
||||||
|
|
||||||
let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||||
settings.set_date(&mut buf, false);
|
settings.write_date_header(&mut buf, false);
|
||||||
assert!(memmem::find(&buf, b"date:").is_some());
|
assert!(memmem::find(&buf, b"date:").is_some());
|
||||||
|
|
||||||
let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
|
||||||
settings.set_date(&mut buf, true);
|
settings.write_date_header(&mut buf, true);
|
||||||
assert!(memmem::find(&buf, b"Date:").is_some());
|
assert!(memmem::find(&buf, b"Date:").is_some());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
92
actix-http/src/date.rs
Normal file
92
actix-http/src/date.rs
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
use std::{
|
||||||
|
cell::Cell,
|
||||||
|
fmt::{self, Write},
|
||||||
|
rc::Rc,
|
||||||
|
time::{Duration, Instant, SystemTime},
|
||||||
|
};
|
||||||
|
|
||||||
|
use actix_rt::{task::JoinHandle, time::interval};
|
||||||
|
|
||||||
|
/// "Thu, 01 Jan 1970 00:00:00 GMT".len()
|
||||||
|
pub(crate) const DATE_VALUE_LENGTH: usize = 29;
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
pub(crate) struct Date {
|
||||||
|
pub(crate) bytes: [u8; DATE_VALUE_LENGTH],
|
||||||
|
pos: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Date {
|
||||||
|
fn new() -> Date {
|
||||||
|
let mut date = Date {
|
||||||
|
bytes: [0; DATE_VALUE_LENGTH],
|
||||||
|
pos: 0,
|
||||||
|
};
|
||||||
|
date.update();
|
||||||
|
date
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update(&mut self) {
|
||||||
|
self.pos = 0;
|
||||||
|
write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Write for Date {
|
||||||
|
fn write_str(&mut self, s: &str) -> fmt::Result {
|
||||||
|
let len = s.len();
|
||||||
|
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
|
||||||
|
self.pos += len;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Service for update Date and Instant periodically at 500 millis interval.
|
||||||
|
pub(crate) struct DateService {
|
||||||
|
current: Rc<Cell<(Date, Instant)>>,
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DateService {
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
// shared date and timer for DateService and update async task.
|
||||||
|
let current = Rc::new(Cell::new((Date::new(), Instant::now())));
|
||||||
|
let current_clone = Rc::clone(¤t);
|
||||||
|
// spawn an async task sleep for 500 millis and update current date/timer in a loop.
|
||||||
|
// handle is used to stop the task on DateService drop.
|
||||||
|
let handle = actix_rt::spawn(async move {
|
||||||
|
#[cfg(test)]
|
||||||
|
let _notify = crate::notify_on_drop::NotifyOnDrop::new();
|
||||||
|
|
||||||
|
let mut interval = interval(Duration::from_millis(500));
|
||||||
|
loop {
|
||||||
|
let now = interval.tick().await;
|
||||||
|
let date = Date::new();
|
||||||
|
current_clone.set((date, now.into_std()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
DateService { current, handle }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn now(&self) -> Instant {
|
||||||
|
self.current.get().1
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn with_date<F: FnMut(&Date)>(&self, mut f: F) {
|
||||||
|
f(&self.current.get().0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for DateService {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("DateService").finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for DateService {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// stop the timer update async task on drop.
|
||||||
|
self.handle.abort();
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
use std::io;
|
use std::{fmt, io};
|
||||||
|
|
||||||
use actix_codec::{Decoder, Encoder};
|
use actix_codec::{Decoder, Encoder};
|
||||||
use bitflags::bitflags;
|
use bitflags::bitflags;
|
||||||
@ -18,7 +18,7 @@ use crate::{
|
|||||||
bitflags! {
|
bitflags! {
|
||||||
struct Flags: u8 {
|
struct Flags: u8 {
|
||||||
const HEAD = 0b0000_0001;
|
const HEAD = 0b0000_0001;
|
||||||
const KEEPALIVE_ENABLED = 0b0000_1000;
|
const KEEP_ALIVE_ENABLED = 0b0000_1000;
|
||||||
const STREAM = 0b0001_0000;
|
const STREAM = 0b0001_0000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -38,7 +38,7 @@ struct ClientCodecInner {
|
|||||||
decoder: decoder::MessageDecoder<ResponseHead>,
|
decoder: decoder::MessageDecoder<ResponseHead>,
|
||||||
payload: Option<PayloadDecoder>,
|
payload: Option<PayloadDecoder>,
|
||||||
version: Version,
|
version: Version,
|
||||||
ctype: ConnectionType,
|
conn_type: ConnectionType,
|
||||||
|
|
||||||
// encoder part
|
// encoder part
|
||||||
flags: Flags,
|
flags: Flags,
|
||||||
@ -51,23 +51,32 @@ impl Default for ClientCodec {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for ClientCodec {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("h1::ClientCodec")
|
||||||
|
.field("flags", &self.inner.flags)
|
||||||
|
.finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ClientCodec {
|
impl ClientCodec {
|
||||||
/// Create HTTP/1 codec.
|
/// Create HTTP/1 codec.
|
||||||
///
|
///
|
||||||
/// `keepalive_enabled` how response `connection` header get generated.
|
/// `keepalive_enabled` how response `connection` header get generated.
|
||||||
pub fn new(config: ServiceConfig) -> Self {
|
pub fn new(config: ServiceConfig) -> Self {
|
||||||
let flags = if config.keep_alive_enabled() {
|
let flags = if config.keep_alive().enabled() {
|
||||||
Flags::KEEPALIVE_ENABLED
|
Flags::KEEP_ALIVE_ENABLED
|
||||||
} else {
|
} else {
|
||||||
Flags::empty()
|
Flags::empty()
|
||||||
};
|
};
|
||||||
|
|
||||||
ClientCodec {
|
ClientCodec {
|
||||||
inner: ClientCodecInner {
|
inner: ClientCodecInner {
|
||||||
config,
|
config,
|
||||||
decoder: decoder::MessageDecoder::default(),
|
decoder: decoder::MessageDecoder::default(),
|
||||||
payload: None,
|
payload: None,
|
||||||
version: Version::HTTP_11,
|
version: Version::HTTP_11,
|
||||||
ctype: ConnectionType::Close,
|
conn_type: ConnectionType::Close,
|
||||||
|
|
||||||
flags,
|
flags,
|
||||||
encoder: encoder::MessageEncoder::default(),
|
encoder: encoder::MessageEncoder::default(),
|
||||||
@ -77,12 +86,12 @@ impl ClientCodec {
|
|||||||
|
|
||||||
/// Check if request is upgrade
|
/// Check if request is upgrade
|
||||||
pub fn upgrade(&self) -> bool {
|
pub fn upgrade(&self) -> bool {
|
||||||
self.inner.ctype == ConnectionType::Upgrade
|
self.inner.conn_type == ConnectionType::Upgrade
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if last response is keep-alive
|
/// Check if last response is keep-alive
|
||||||
pub fn keepalive(&self) -> bool {
|
pub fn keep_alive(&self) -> bool {
|
||||||
self.inner.ctype == ConnectionType::KeepAlive
|
self.inner.conn_type == ConnectionType::KeepAlive
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check last request's message type
|
/// Check last request's message type
|
||||||
@ -104,8 +113,8 @@ impl ClientCodec {
|
|||||||
|
|
||||||
impl ClientPayloadCodec {
|
impl ClientPayloadCodec {
|
||||||
/// Check if last response is keep-alive
|
/// Check if last response is keep-alive
|
||||||
pub fn keepalive(&self) -> bool {
|
pub fn keep_alive(&self) -> bool {
|
||||||
self.inner.ctype == ConnectionType::KeepAlive
|
self.inner.conn_type == ConnectionType::KeepAlive
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Transform payload codec to a message codec
|
/// Transform payload codec to a message codec
|
||||||
@ -122,12 +131,12 @@ impl Decoder for ClientCodec {
|
|||||||
debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set");
|
debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set");
|
||||||
|
|
||||||
if let Some((req, payload)) = self.inner.decoder.decode(src)? {
|
if let Some((req, payload)) = self.inner.decoder.decode(src)? {
|
||||||
if let Some(ctype) = req.conn_type() {
|
if let Some(conn_type) = req.conn_type() {
|
||||||
// do not use peer's keep-alive
|
// do not use peer's keep-alive
|
||||||
self.inner.ctype = if ctype == ConnectionType::KeepAlive {
|
self.inner.conn_type = if conn_type == ConnectionType::KeepAlive {
|
||||||
self.inner.ctype
|
self.inner.conn_type
|
||||||
} else {
|
} else {
|
||||||
ctype
|
conn_type
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,9 +201,9 @@ impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
|
|||||||
.set(Flags::HEAD, head.as_ref().method == Method::HEAD);
|
.set(Flags::HEAD, head.as_ref().method == Method::HEAD);
|
||||||
|
|
||||||
// connection status
|
// connection status
|
||||||
inner.ctype = match head.as_ref().connection_type() {
|
inner.conn_type = match head.as_ref().connection_type() {
|
||||||
ConnectionType::KeepAlive => {
|
ConnectionType::KeepAlive => {
|
||||||
if inner.flags.contains(Flags::KEEPALIVE_ENABLED) {
|
if inner.flags.contains(Flags::KEEP_ALIVE_ENABLED) {
|
||||||
ConnectionType::KeepAlive
|
ConnectionType::KeepAlive
|
||||||
} else {
|
} else {
|
||||||
ConnectionType::Close
|
ConnectionType::Close
|
||||||
@ -211,7 +220,7 @@ impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
|
|||||||
false,
|
false,
|
||||||
inner.version,
|
inner.version,
|
||||||
length,
|
length,
|
||||||
inner.ctype,
|
inner.conn_type,
|
||||||
&inner.config,
|
&inner.config,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ use crate::{
|
|||||||
bitflags! {
|
bitflags! {
|
||||||
struct Flags: u8 {
|
struct Flags: u8 {
|
||||||
const HEAD = 0b0000_0001;
|
const HEAD = 0b0000_0001;
|
||||||
const KEEPALIVE_ENABLED = 0b0000_0010;
|
const KEEP_ALIVE_ENABLED = 0b0000_0010;
|
||||||
const STREAM = 0b0000_0100;
|
const STREAM = 0b0000_0100;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -42,7 +42,9 @@ impl Default for Codec {
|
|||||||
|
|
||||||
impl fmt::Debug for Codec {
|
impl fmt::Debug for Codec {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
write!(f, "h1::Codec({:?})", self.flags)
|
f.debug_struct("h1::Codec")
|
||||||
|
.field("flags", &self.flags)
|
||||||
|
.finish_non_exhaustive()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,8 +53,8 @@ impl Codec {
|
|||||||
///
|
///
|
||||||
/// `keepalive_enabled` how response `connection` header get generated.
|
/// `keepalive_enabled` how response `connection` header get generated.
|
||||||
pub fn new(config: ServiceConfig) -> Self {
|
pub fn new(config: ServiceConfig) -> Self {
|
||||||
let flags = if config.keep_alive_enabled() {
|
let flags = if config.keep_alive().enabled() {
|
||||||
Flags::KEEPALIVE_ENABLED
|
Flags::KEEP_ALIVE_ENABLED
|
||||||
} else {
|
} else {
|
||||||
Flags::empty()
|
Flags::empty()
|
||||||
};
|
};
|
||||||
@ -76,14 +78,14 @@ impl Codec {
|
|||||||
|
|
||||||
/// Check if last response is keep-alive.
|
/// Check if last response is keep-alive.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn keepalive(&self) -> bool {
|
pub fn keep_alive(&self) -> bool {
|
||||||
self.conn_type == ConnectionType::KeepAlive
|
self.conn_type == ConnectionType::KeepAlive
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if keep-alive enabled on server level.
|
/// Check if keep-alive enabled on server level.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn keepalive_enabled(&self) -> bool {
|
pub fn keep_alive_enabled(&self) -> bool {
|
||||||
self.flags.contains(Flags::KEEPALIVE_ENABLED)
|
self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check last request's message type.
|
/// Check last request's message type.
|
||||||
@ -124,7 +126,7 @@ impl Decoder for Codec {
|
|||||||
self.version = head.version;
|
self.version = head.version;
|
||||||
self.conn_type = head.connection_type();
|
self.conn_type = head.connection_type();
|
||||||
if self.conn_type == ConnectionType::KeepAlive
|
if self.conn_type == ConnectionType::KeepAlive
|
||||||
&& !self.flags.contains(Flags::KEEPALIVE_ENABLED)
|
&& !self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
|
||||||
{
|
{
|
||||||
self.conn_type = ConnectionType::Close
|
self.conn_type = ConnectionType::Close
|
||||||
}
|
}
|
||||||
@ -179,9 +181,11 @@ impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
|
|||||||
&self.config,
|
&self.config,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message::Chunk(Some(bytes)) => {
|
Message::Chunk(Some(bytes)) => {
|
||||||
self.encoder.encode_chunk(bytes.as_ref(), dst)?;
|
self.encoder.encode_chunk(bytes.as_ref(), dst)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Message::Chunk(None) => {
|
Message::Chunk(None) => {
|
||||||
self.encoder.encode_eof(dst)?;
|
self.encoder.encode_eof(dst)?;
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -17,7 +17,7 @@ use crate::{
|
|||||||
h1::{Codec, ExpectHandler, UpgradeHandler},
|
h1::{Codec, ExpectHandler, UpgradeHandler},
|
||||||
service::HttpFlow,
|
service::HttpFlow,
|
||||||
test::{TestBuffer, TestSeqBuffer},
|
test::{TestBuffer, TestSeqBuffer},
|
||||||
Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response,
|
Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode,
|
||||||
};
|
};
|
||||||
|
|
||||||
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
||||||
@ -34,7 +34,13 @@ fn stabilize_date_header(payload: &mut [u8]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn ok_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
fn ok_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
||||||
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok())))
|
status_service(StatusCode::OK)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status_service(
|
||||||
|
status: StatusCode,
|
||||||
|
) -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
||||||
|
fn_service(move |_req: Request| ready(Ok::<_, Error>(Response::new(status))))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn echo_path_service(
|
fn echo_path_service(
|
||||||
@ -65,11 +71,15 @@ fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, E
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn late_request() {
|
async fn late_request() {
|
||||||
let _ = env_logger::try_init();
|
|
||||||
|
|
||||||
let mut buf = TestBuffer::empty();
|
let mut buf = TestBuffer::empty();
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Disabled,
|
||||||
|
Duration::from_millis(100),
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
let services = HttpFlow::new(ok_service(), ExpectHandler, None);
|
let services = HttpFlow::new(ok_service(), ExpectHandler, None);
|
||||||
|
|
||||||
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
||||||
@ -127,10 +137,16 @@ async fn late_request() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_basic() {
|
async fn oneshot_connection() {
|
||||||
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Disabled,
|
||||||
|
Duration::from_millis(100),
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
||||||
|
|
||||||
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
||||||
@ -179,10 +195,16 @@ async fn test_basic() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_keep_alive_timeout() {
|
async fn keep_alive_timeout() {
|
||||||
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Timeout(Duration::from_millis(200)),
|
||||||
|
Duration::from_millis(100),
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
||||||
|
|
||||||
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
||||||
@ -229,7 +251,7 @@ async fn test_keep_alive_timeout() {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
// sleep slightly longer than keep-alive timeout
|
// sleep slightly longer than keep-alive timeout
|
||||||
sleep(Duration::from_millis(1100)).await;
|
sleep(Duration::from_millis(250)).await;
|
||||||
|
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
assert!(
|
assert!(
|
||||||
@ -252,10 +274,16 @@ async fn test_keep_alive_timeout() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_keep_alive_follow_up_req() {
|
async fn keep_alive_follow_up_req() {
|
||||||
let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Timeout(Duration::from_millis(500)),
|
||||||
|
Duration::from_millis(100),
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
||||||
|
|
||||||
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
||||||
@ -302,7 +330,7 @@ async fn test_keep_alive_follow_up_req() {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
// sleep for less than KA timeout
|
// sleep for less than KA timeout
|
||||||
sleep(Duration::from_millis(200)).await;
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
assert!(
|
assert!(
|
||||||
@ -371,7 +399,7 @@ async fn test_keep_alive_follow_up_req() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_req_parse_err() {
|
async fn req_parse_err() {
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
|
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
|
||||||
|
|
||||||
@ -413,7 +441,13 @@ async fn pipelining_ok_then_ok() {
|
|||||||
",
|
",
|
||||||
);
|
);
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Disabled,
|
||||||
|
Duration::from_millis(1),
|
||||||
|
Duration::from_millis(1),
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
||||||
|
|
||||||
@ -477,7 +511,13 @@ async fn pipelining_ok_then_bad() {
|
|||||||
",
|
",
|
||||||
);
|
);
|
||||||
|
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Disabled,
|
||||||
|
Duration::from_millis(1),
|
||||||
|
Duration::from_millis(1),
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
||||||
|
|
||||||
@ -531,10 +571,16 @@ async fn pipelining_ok_then_bad() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_expect() {
|
async fn expect_handling() {
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
let mut buf = TestSeqBuffer::empty();
|
let mut buf = TestSeqBuffer::empty();
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Disabled,
|
||||||
|
Duration::ZERO,
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None);
|
let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None);
|
||||||
|
|
||||||
@ -562,7 +608,6 @@ async fn test_expect() {
|
|||||||
|
|
||||||
// polls: manual
|
// polls: manual
|
||||||
assert_eq!(h1.poll_count, 1);
|
assert_eq!(h1.poll_count, 1);
|
||||||
eprintln!("poll count: {}", h1.poll_count);
|
|
||||||
|
|
||||||
if let DispatcherState::Normal { ref inner } = h1.inner {
|
if let DispatcherState::Normal { ref inner } = h1.inner {
|
||||||
let io = inner.io.as_ref().unwrap();
|
let io = inner.io.as_ref().unwrap();
|
||||||
@ -603,10 +648,16 @@ async fn test_expect() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_eager_expect() {
|
async fn expect_eager() {
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
let mut buf = TestSeqBuffer::empty();
|
let mut buf = TestSeqBuffer::empty();
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Disabled,
|
||||||
|
Duration::ZERO,
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
||||||
|
|
||||||
@ -663,7 +714,7 @@ async fn test_eager_expect() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_upgrade() {
|
async fn upgrade_handling() {
|
||||||
struct TestUpgrade;
|
struct TestUpgrade;
|
||||||
|
|
||||||
impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
|
impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
|
||||||
@ -683,7 +734,13 @@ async fn test_upgrade() {
|
|||||||
|
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
let mut buf = TestSeqBuffer::empty();
|
let mut buf = TestSeqBuffer::empty();
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
let cfg = ServiceConfig::new(
|
||||||
|
KeepAlive::Disabled,
|
||||||
|
Duration::ZERO,
|
||||||
|
Duration::ZERO,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade));
|
let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade));
|
||||||
|
|
||||||
|
@ -212,7 +212,7 @@ pub(crate) trait MessageType: Sized {
|
|||||||
|
|
||||||
// optimized date header, set_date writes \r\n
|
// optimized date header, set_date writes \r\n
|
||||||
if !has_date {
|
if !has_date {
|
||||||
config.set_date(dst, camel_case);
|
config.write_date_header(dst, camel_case);
|
||||||
} else {
|
} else {
|
||||||
// msg eof
|
// msg eof
|
||||||
dst.extend_from_slice(b"\r\n");
|
dst.extend_from_slice(b"\r\n");
|
||||||
@ -318,16 +318,17 @@ impl MessageType for RequestHeadType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: MessageType> MessageEncoder<T> {
|
impl<T: MessageType> MessageEncoder<T> {
|
||||||
/// Encode message
|
/// Encode chunk.
|
||||||
pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result<bool> {
|
pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result<bool> {
|
||||||
self.te.encode(msg, buf)
|
self.te.encode(msg, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Encode eof
|
/// Encode EOF.
|
||||||
pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> {
|
pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> {
|
||||||
self.te.encode_eof(buf)
|
self.te.encode_eof(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Encode message.
|
||||||
pub fn encode(
|
pub fn encode(
|
||||||
&mut self,
|
&mut self,
|
||||||
dst: &mut BytesMut,
|
dst: &mut BytesMut,
|
||||||
|
@ -13,6 +13,7 @@ mod encoder;
|
|||||||
mod expect;
|
mod expect;
|
||||||
mod payload;
|
mod payload;
|
||||||
mod service;
|
mod service;
|
||||||
|
mod timer;
|
||||||
mod upgrade;
|
mod upgrade;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
|
||||||
@ -28,9 +29,10 @@ pub use self::utils::SendResponse;
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Codec message
|
/// Codec message
|
||||||
pub enum Message<T> {
|
pub enum Message<T> {
|
||||||
/// Http message
|
/// HTTP message.
|
||||||
Item(T),
|
Item(T),
|
||||||
/// Payload chunk
|
|
||||||
|
/// Payload chunk.
|
||||||
Chunk(Option<Bytes>),
|
Chunk(Option<Bytes>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
80
actix-http/src/h1/timer.rs
Normal file
80
actix-http/src/h1/timer.rs
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
use std::{fmt, future::Future, pin::Pin, task::Context};
|
||||||
|
|
||||||
|
use actix_rt::time::{Instant, Sleep};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) enum TimerState {
|
||||||
|
Disabled,
|
||||||
|
Inactive,
|
||||||
|
Active { timer: Pin<Box<Sleep>> },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TimerState {
|
||||||
|
pub(super) fn new(enabled: bool) -> Self {
|
||||||
|
if enabled {
|
||||||
|
Self::Inactive
|
||||||
|
} else {
|
||||||
|
Self::Disabled
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn is_enabled(&self) -> bool {
|
||||||
|
matches!(self, Self::Active { .. } | Self::Inactive)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn set(&mut self, timer: Sleep, line: u32) {
|
||||||
|
if matches!(self, Self::Disabled) {
|
||||||
|
log::trace!("setting disabled timer from line {}", line);
|
||||||
|
}
|
||||||
|
|
||||||
|
*self = Self::Active {
|
||||||
|
timer: Box::pin(timer),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn set_and_init(&mut self, cx: &mut Context<'_>, timer: Sleep, line: u32) {
|
||||||
|
self.set(timer, line);
|
||||||
|
self.init(cx);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn clear(&mut self, line: u32) {
|
||||||
|
if matches!(self, Self::Disabled) {
|
||||||
|
log::trace!("trying to clear a disabled timer from line {}", line);
|
||||||
|
}
|
||||||
|
|
||||||
|
if matches!(self, Self::Inactive) {
|
||||||
|
log::trace!("trying to clear an inactive timer from line {}", line);
|
||||||
|
}
|
||||||
|
|
||||||
|
*self = Self::Inactive;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn init(&mut self, cx: &mut Context<'_>) {
|
||||||
|
if let TimerState::Active { timer } = self {
|
||||||
|
let _ = timer.as_mut().poll(cx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for TimerState {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
TimerState::Disabled => f.write_str("timer is disabled"),
|
||||||
|
TimerState::Inactive => f.write_str("timer is inactive"),
|
||||||
|
TimerState::Active { timer } => {
|
||||||
|
let deadline = timer.deadline();
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
if deadline < now {
|
||||||
|
f.write_str("timer is active and has reached deadline")
|
||||||
|
} else {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"timer is active and due to expire in {} milliseconds",
|
||||||
|
((deadline - now).as_secs_f32() * 1000.0)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -57,11 +57,11 @@ where
|
|||||||
conn_data: OnConnectData,
|
conn_data: OnConnectData,
|
||||||
timer: Option<Pin<Box<Sleep>>>,
|
timer: Option<Pin<Box<Sleep>>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let ping_pong = config.keep_alive().map(|dur| H2PingPong {
|
let ping_pong = config.keep_alive().duration().map(|dur| H2PingPong {
|
||||||
timer: timer
|
timer: timer
|
||||||
.map(|mut timer| {
|
.map(|mut timer| {
|
||||||
// reset timer if it's received from new function.
|
// reuse timer slot if it was initialized for handshake
|
||||||
timer.as_mut().reset(config.now() + dur);
|
timer.as_mut().reset((config.now() + dur).into());
|
||||||
timer
|
timer
|
||||||
})
|
})
|
||||||
.unwrap_or_else(|| Box::pin(sleep(dur))),
|
.unwrap_or_else(|| Box::pin(sleep(dur))),
|
||||||
@ -160,8 +160,8 @@ where
|
|||||||
Poll::Ready(_) => {
|
Poll::Ready(_) => {
|
||||||
ping_pong.on_flight = false;
|
ping_pong.on_flight = false;
|
||||||
|
|
||||||
let dead_line = this.config.keep_alive_expire().unwrap();
|
let dead_line = this.config.keep_alive_deadline().unwrap();
|
||||||
ping_pong.timer.as_mut().reset(dead_line);
|
ping_pong.timer.as_mut().reset(dead_line.into());
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(()))
|
return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(()))
|
||||||
@ -174,8 +174,8 @@ where
|
|||||||
|
|
||||||
ping_pong.ping_pong.send_ping(Ping::opaque())?;
|
ping_pong.ping_pong.send_ping(Ping::opaque())?;
|
||||||
|
|
||||||
let dead_line = this.config.keep_alive_expire().unwrap();
|
let dead_line = this.config.keep_alive_deadline().unwrap();
|
||||||
ping_pong.timer.as_mut().reset(dead_line);
|
ping_pong.timer.as_mut().reset(dead_line.into());
|
||||||
|
|
||||||
ping_pong.on_flight = true;
|
ping_pong.on_flight = true;
|
||||||
}
|
}
|
||||||
@ -322,7 +322,7 @@ fn prepare_response(
|
|||||||
// set date header
|
// set date header
|
||||||
if !has_date {
|
if !has_date {
|
||||||
let mut bytes = BytesMut::with_capacity(29);
|
let mut bytes = BytesMut::with_capacity(29);
|
||||||
config.set_date_header(&mut bytes);
|
config.write_date_header_value(&mut bytes);
|
||||||
res.headers_mut().insert(
|
res.headers_mut().insert(
|
||||||
DATE,
|
DATE,
|
||||||
// SAFETY: serialized date-times are known ASCII strings
|
// SAFETY: serialized date-times are known ASCII strings
|
||||||
|
@ -7,7 +7,7 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
use actix_rt::time::Sleep;
|
use actix_rt::time::{sleep_until, Sleep};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_core::{ready, Stream};
|
use futures_core::{ready, Stream};
|
||||||
use h2::{
|
use h2::{
|
||||||
@ -15,17 +15,17 @@ use h2::{
|
|||||||
RecvStream,
|
RecvStream,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
config::ServiceConfig,
|
||||||
|
error::{DispatchError, PayloadError},
|
||||||
|
};
|
||||||
|
|
||||||
mod dispatcher;
|
mod dispatcher;
|
||||||
mod service;
|
mod service;
|
||||||
|
|
||||||
pub use self::dispatcher::Dispatcher;
|
pub use self::dispatcher::Dispatcher;
|
||||||
pub use self::service::H2Service;
|
pub use self::service::H2Service;
|
||||||
|
|
||||||
use crate::{
|
|
||||||
config::ServiceConfig,
|
|
||||||
error::{DispatchError, PayloadError},
|
|
||||||
};
|
|
||||||
|
|
||||||
/// HTTP/2 peer stream.
|
/// HTTP/2 peer stream.
|
||||||
pub struct Payload {
|
pub struct Payload {
|
||||||
stream: RecvStream,
|
stream: RecvStream,
|
||||||
@ -67,7 +67,9 @@ where
|
|||||||
{
|
{
|
||||||
HandshakeWithTimeout {
|
HandshakeWithTimeout {
|
||||||
handshake: handshake(io),
|
handshake: handshake(io),
|
||||||
timer: config.client_timer().map(Box::pin),
|
timer: config
|
||||||
|
.client_request_deadline()
|
||||||
|
.map(|deadline| Box::pin(sleep_until(deadline.into()))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,7 +88,7 @@ where
|
|||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
|
|
||||||
match Pin::new(&mut this.handshake).poll(cx)? {
|
match Pin::new(&mut this.handshake).poll(cx)? {
|
||||||
// return the timer on success handshake. It can be re-used for h2 ping-pong.
|
// return the timer on success handshake; its slot can be re-used for h2 ping-pong
|
||||||
Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
|
Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
|
||||||
Poll::Pending => match this.timer.as_mut() {
|
Poll::Pending => match this.timer.as_mut() {
|
||||||
Some(timer) => {
|
Some(timer) => {
|
||||||
|
@ -630,7 +630,7 @@ impl Removed {
|
|||||||
/// Returns true if iterator contains no elements, without consuming it.
|
/// Returns true if iterator contains no elements, without consuming it.
|
||||||
///
|
///
|
||||||
/// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate
|
/// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate
|
||||||
/// wether any items were actually replaced or removed, respectively.
|
/// whether any items were actually replaced or removed, respectively.
|
||||||
pub fn is_empty(&self) -> bool {
|
pub fn is_empty(&self) -> bool {
|
||||||
match self.inner {
|
match self.inner {
|
||||||
// size hint lower bound of smallvec is the correct length
|
// size hint lower bound of smallvec is the correct length
|
||||||
|
@ -4,8 +4,7 @@ use bytes::BytesMut;
|
|||||||
use http::header::{HeaderValue, InvalidHeaderValue};
|
use http::header::{HeaderValue, InvalidHeaderValue};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue,
|
date::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue, helpers::MutWriter,
|
||||||
helpers::MutWriter,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A timestamp with HTTP-style formatting and parsing.
|
/// A timestamp with HTTP-style formatting and parsing.
|
||||||
|
83
actix-http/src/keep_alive.rs
Normal file
83
actix-http/src/keep_alive.rs
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// Connection keep-alive config.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum KeepAlive {
|
||||||
|
/// Keep-alive duration.
|
||||||
|
///
|
||||||
|
/// `KeepAlive::Timeout(Duration::ZERO)` is mapped to `KeepAlive::Disabled`.
|
||||||
|
Timeout(Duration),
|
||||||
|
|
||||||
|
/// Rely on OS to shutdown TCP connection.
|
||||||
|
///
|
||||||
|
/// Some defaults can be very long, check your OS documentation.
|
||||||
|
Os,
|
||||||
|
|
||||||
|
/// Keep-alive is disabled.
|
||||||
|
///
|
||||||
|
/// Connections will be closed immediately.
|
||||||
|
Disabled,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KeepAlive {
|
||||||
|
pub(crate) fn enabled(&self) -> bool {
|
||||||
|
!matches!(self, Self::Disabled)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn duration(&self) -> Option<Duration> {
|
||||||
|
match self {
|
||||||
|
KeepAlive::Timeout(dur) => Some(*dur),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Map zero duration to disabled.
|
||||||
|
pub(crate) fn normalize(self) -> KeepAlive {
|
||||||
|
match self {
|
||||||
|
KeepAlive::Timeout(Duration::ZERO) => KeepAlive::Disabled,
|
||||||
|
ka => ka,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for KeepAlive {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Timeout(Duration::from_secs(5))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Duration> for KeepAlive {
|
||||||
|
fn from(dur: Duration) -> Self {
|
||||||
|
KeepAlive::Timeout(dur).normalize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Option<Duration>> for KeepAlive {
|
||||||
|
fn from(ka_dur: Option<Duration>) -> Self {
|
||||||
|
match ka_dur {
|
||||||
|
Some(dur) => KeepAlive::from(dur),
|
||||||
|
None => KeepAlive::Disabled,
|
||||||
|
}
|
||||||
|
.normalize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn from_impls() {
|
||||||
|
let test: KeepAlive = Duration::from_secs(1).into();
|
||||||
|
assert_eq!(test, KeepAlive::Timeout(Duration::from_secs(1)));
|
||||||
|
|
||||||
|
let test: KeepAlive = Duration::from_secs(0).into();
|
||||||
|
assert_eq!(test, KeepAlive::Disabled);
|
||||||
|
|
||||||
|
let test: KeepAlive = Some(Duration::from_secs(0)).into();
|
||||||
|
assert_eq!(test, KeepAlive::Disabled);
|
||||||
|
|
||||||
|
let test: KeepAlive = None.into();
|
||||||
|
assert_eq!(test, KeepAlive::Disabled);
|
||||||
|
}
|
||||||
|
}
|
@ -33,6 +33,7 @@ pub use ::http::{Method, StatusCode, Version};
|
|||||||
pub mod body;
|
pub mod body;
|
||||||
mod builder;
|
mod builder;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod date;
|
||||||
#[cfg(feature = "__compress")]
|
#[cfg(feature = "__compress")]
|
||||||
pub mod encoding;
|
pub mod encoding;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
@ -42,7 +43,10 @@ pub mod h2;
|
|||||||
pub mod header;
|
pub mod header;
|
||||||
mod helpers;
|
mod helpers;
|
||||||
mod http_message;
|
mod http_message;
|
||||||
|
mod keep_alive;
|
||||||
mod message;
|
mod message;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod notify_on_drop;
|
||||||
mod payload;
|
mod payload;
|
||||||
mod requests;
|
mod requests;
|
||||||
mod responses;
|
mod responses;
|
||||||
@ -51,11 +55,12 @@ pub mod test;
|
|||||||
pub mod ws;
|
pub mod ws;
|
||||||
|
|
||||||
pub use self::builder::HttpServiceBuilder;
|
pub use self::builder::HttpServiceBuilder;
|
||||||
pub use self::config::{KeepAlive, ServiceConfig};
|
pub use self::config::ServiceConfig;
|
||||||
pub use self::error::Error;
|
pub use self::error::Error;
|
||||||
pub use self::extensions::Extensions;
|
pub use self::extensions::Extensions;
|
||||||
pub use self::header::ContentEncoding;
|
pub use self::header::ContentEncoding;
|
||||||
pub use self::http_message::HttpMessage;
|
pub use self::http_message::HttpMessage;
|
||||||
|
pub use self::keep_alive::KeepAlive;
|
||||||
pub use self::message::ConnectionType;
|
pub use self::message::ConnectionType;
|
||||||
pub use self::message::Message;
|
pub use self::message::Message;
|
||||||
#[allow(deprecated)]
|
#[allow(deprecated)]
|
||||||
|
49
actix-http/src/notify_on_drop.rs
Normal file
49
actix-http/src/notify_on_drop.rs
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
/// Test Module for checking the drop state of certain async tasks that are spawned
|
||||||
|
/// with `actix_rt::spawn`
|
||||||
|
///
|
||||||
|
/// The target task must explicitly generate `NotifyOnDrop` when spawn the task
|
||||||
|
use std::cell::RefCell;
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
static NOTIFY_DROPPED: RefCell<Option<bool>> = RefCell::new(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if the spawned task is dropped.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics when there was no `NotifyOnDrop` instance on current thread.
|
||||||
|
pub(crate) fn is_dropped() -> bool {
|
||||||
|
NOTIFY_DROPPED.with(|bool| {
|
||||||
|
bool.borrow()
|
||||||
|
.expect("No NotifyOnDrop existed on current thread")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct NotifyOnDrop;
|
||||||
|
|
||||||
|
impl NotifyOnDrop {
|
||||||
|
/// # Panics
|
||||||
|
/// Panics hen construct multiple instances on any given thread.
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
NOTIFY_DROPPED.with(|bool| {
|
||||||
|
let mut bool = bool.borrow_mut();
|
||||||
|
if bool.is_some() {
|
||||||
|
panic!("NotifyOnDrop existed on current thread");
|
||||||
|
} else {
|
||||||
|
*bool = Some(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
NotifyOnDrop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for NotifyOnDrop {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
NOTIFY_DROPPED.with(|bool| {
|
||||||
|
if let Some(b) = bool.borrow_mut().as_mut() {
|
||||||
|
*b = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -130,8 +130,8 @@ impl RequestHead {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Request contains `EXPECT` header.
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Request contains `EXPECT` header
|
|
||||||
pub fn expect(&self) -> bool {
|
pub fn expect(&self) -> bool {
|
||||||
self.flags.contains(Flags::EXPECT)
|
self.flags.contains(Flags::EXPECT)
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ impl ResponseHead {
|
|||||||
&mut self.headers
|
&mut self.headers
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the flag that controls wether to send headers formatted as Camel-Case.
|
/// Sets the flag that controls whether to send headers formatted as Camel-Case.
|
||||||
///
|
///
|
||||||
/// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase.
|
/// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase.
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -210,14 +210,15 @@ mod tests {
|
|||||||
use memchr::memmem;
|
use memchr::memmem;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
h1::H1Service,
|
||||||
header::{HeaderName, HeaderValue},
|
header::{HeaderName, HeaderValue},
|
||||||
Error, HttpService, Request, Response,
|
Error, Request, Response, ServiceConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn camel_case_headers() {
|
async fn camel_case_headers() {
|
||||||
let mut srv = actix_http_test::test_server(|| {
|
let mut srv = actix_http_test::test_server(|| {
|
||||||
HttpService::new(|req: Request| async move {
|
H1Service::with_config(ServiceConfig::default(), |req: Request| async move {
|
||||||
let mut res = Response::ok();
|
let mut res = Response::ok();
|
||||||
|
|
||||||
if req.path().contains("camel") {
|
if req.path().contains("camel") {
|
||||||
@ -228,6 +229,7 @@ mod tests {
|
|||||||
HeaderName::from_static("foo-bar"),
|
HeaderName::from_static("foo-bar"),
|
||||||
HeaderValue::from_static("baz"),
|
HeaderValue::from_static("baz"),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok::<_, Error>(res)
|
Ok::<_, Error>(res)
|
||||||
})
|
})
|
||||||
.tcp()
|
.tcp()
|
||||||
@ -235,9 +237,11 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
let _ = stream.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n");
|
let _ = stream
|
||||||
let mut data = vec![0; 1024];
|
.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n")
|
||||||
let _ = stream.read(&mut data);
|
.unwrap();
|
||||||
|
let mut data = vec![];
|
||||||
|
let _ = stream.read_to_end(&mut data).unwrap();
|
||||||
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
||||||
assert!(memmem::find(&data, b"Foo-Bar").is_some());
|
assert!(memmem::find(&data, b"Foo-Bar").is_some());
|
||||||
assert!(memmem::find(&data, b"foo-bar").is_none());
|
assert!(memmem::find(&data, b"foo-bar").is_none());
|
||||||
@ -247,9 +251,11 @@ mod tests {
|
|||||||
assert!(memmem::find(&data, b"content-length").is_none());
|
assert!(memmem::find(&data, b"content-length").is_none());
|
||||||
|
|
||||||
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
let _ = stream.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n");
|
let _ = stream
|
||||||
let mut data = vec![0; 1024];
|
.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n")
|
||||||
let _ = stream.read(&mut data);
|
.unwrap();
|
||||||
|
let mut data = vec![];
|
||||||
|
let _ = stream.read_to_end(&mut data).unwrap();
|
||||||
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
||||||
assert!(memmem::find(&data, b"Foo-Bar").is_none());
|
assert!(memmem::find(&data, b"Foo-Bar").is_none());
|
||||||
assert!(memmem::find(&data, b"foo-bar").is_some());
|
assert!(memmem::find(&data, b"foo-bar").is_some());
|
||||||
|
@ -19,9 +19,8 @@ use pin_project_lite::pin_project;
|
|||||||
use crate::{
|
use crate::{
|
||||||
body::{BoxBody, MessageBody},
|
body::{BoxBody, MessageBody},
|
||||||
builder::HttpServiceBuilder,
|
builder::HttpServiceBuilder,
|
||||||
config::{KeepAlive, ServiceConfig},
|
|
||||||
error::DispatchError,
|
error::DispatchError,
|
||||||
h1, h2, ConnectCallback, OnConnectData, Protocol, Request, Response,
|
h1, h2, ConnectCallback, OnConnectData, Protocol, Request, Response, ServiceConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol.
|
/// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol.
|
||||||
@ -43,9 +42,9 @@ where
|
|||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
B: MessageBody + 'static,
|
B: MessageBody + 'static,
|
||||||
{
|
{
|
||||||
/// Create builder for `HttpService` instance.
|
/// Constructs builder for `HttpService` instance.
|
||||||
pub fn build() -> HttpServiceBuilder<T, S> {
|
pub fn build() -> HttpServiceBuilder<T, S> {
|
||||||
HttpServiceBuilder::new()
|
HttpServiceBuilder::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,12 +57,10 @@ where
|
|||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
B: MessageBody + 'static,
|
B: MessageBody + 'static,
|
||||||
{
|
{
|
||||||
/// Create new `HttpService` instance.
|
/// Constructs new `HttpService` instance from service with default config.
|
||||||
pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self {
|
pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self {
|
||||||
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None);
|
|
||||||
|
|
||||||
HttpService {
|
HttpService {
|
||||||
cfg,
|
cfg: ServiceConfig::default(),
|
||||||
srv: service.into_factory(),
|
srv: service.into_factory(),
|
||||||
expect: h1::ExpectHandler,
|
expect: h1::ExpectHandler,
|
||||||
upgrade: None,
|
upgrade: None,
|
||||||
@ -72,7 +69,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create new `HttpService` instance with config.
|
/// Constructs new `HttpService` instance from config and service.
|
||||||
pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
|
pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
|
||||||
cfg: ServiceConfig,
|
cfg: ServiceConfig,
|
||||||
service: F,
|
service: F,
|
||||||
@ -97,11 +94,10 @@ where
|
|||||||
<S::Service as Service<Request>>::Future: 'static,
|
<S::Service as Service<Request>>::Future: 'static,
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
{
|
{
|
||||||
/// Provide service for `EXPECT: 100-Continue` support.
|
/// Sets service for `Expect: 100-Continue` handling.
|
||||||
///
|
///
|
||||||
/// Service get called with request that contains `EXPECT` header.
|
/// An expect service is called with requests that contain an `Expect` header. A successful
|
||||||
/// Service must return request in case of success, in that case
|
/// response type is also a request which will be forwarded to the main service.
|
||||||
/// request will be forwarded to main service.
|
|
||||||
pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U>
|
pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U>
|
||||||
where
|
where
|
||||||
X1: ServiceFactory<Request, Config = (), Response = Request>,
|
X1: ServiceFactory<Request, Config = (), Response = Request>,
|
||||||
@ -118,10 +114,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Provide service for custom `Connection: UPGRADE` support.
|
/// Sets service for custom `Connection: Upgrade` handling.
|
||||||
///
|
///
|
||||||
/// If service is provided then normal requests handling get halted
|
/// If service is provided then normal requests handling get halted and this service get called
|
||||||
/// and this service get called with original request and framed object.
|
/// with original request and framed object.
|
||||||
pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1>
|
pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1>
|
||||||
where
|
where
|
||||||
U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
|
U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
|
||||||
|
@ -242,7 +242,7 @@ impl io::Read for TestBuffer {
|
|||||||
|
|
||||||
impl io::Write for TestBuffer {
|
impl io::Write for TestBuffer {
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
RefCell::borrow_mut(&self.write_buf).extend(buf);
|
self.write_buf.borrow_mut().extend(buf);
|
||||||
Ok(buf.len())
|
Ok(buf.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
|
|||||||
Hello World Hello World Hello World Hello World Hello World";
|
Hello World Hello World Hello World Hello World Hello World";
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_v2() {
|
async fn h1_v2() {
|
||||||
let srv = test_server(move || {
|
let srv = test_server(move || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR)))
|
.finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR)))
|
||||||
@ -59,7 +59,7 @@ async fn test_h1_v2() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_connection_close() {
|
async fn connection_close() {
|
||||||
let srv = test_server(move || {
|
let srv = test_server(move || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR)))
|
.finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR)))
|
||||||
@ -73,7 +73,7 @@ async fn test_connection_close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_with_query_parameter() {
|
async fn with_query_parameter() {
|
||||||
let srv = test_server(move || {
|
let srv = test_server(move || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.finish(|req: Request| async move {
|
.finish(|req: Request| async move {
|
||||||
@ -104,7 +104,7 @@ impl From<ExpectFailed> for Response<BoxBody> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_expect() {
|
async fn h1_expect() {
|
||||||
let srv = test_server(move || {
|
let srv = test_server(move || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.expect(|req: Request| async {
|
.expect(|req: Request| async {
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::io;
|
use std::{io, time::Duration};
|
||||||
|
|
||||||
use actix_http::{error::Error, HttpService, Response};
|
use actix_http::{error::Error, HttpService, Response};
|
||||||
use actix_server::Server;
|
use actix_server::Server;
|
||||||
@ -19,7 +19,7 @@ async fn h2_ping_pong() -> io::Result<()> {
|
|||||||
.workers(1)
|
.workers(1)
|
||||||
.listen("h2_ping_pong", lst, || {
|
.listen("h2_ping_pong", lst, || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.keep_alive(3)
|
.keep_alive(Duration::from_secs(3))
|
||||||
.h2(|_| async { Ok::<_, Error>(Response::ok()) })
|
.h2(|_| async { Ok::<_, Error>(Response::ok()) })
|
||||||
.tcp()
|
.tcp()
|
||||||
})?
|
})?
|
||||||
@ -92,10 +92,10 @@ async fn h2_handshake_timeout() -> io::Result<()> {
|
|||||||
.workers(1)
|
.workers(1)
|
||||||
.listen("h2_ping_pong", lst, || {
|
.listen("h2_ping_pong", lst, || {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.keep_alive(30)
|
.keep_alive(Duration::from_secs(30))
|
||||||
// set first request timeout to 5 seconds.
|
// set first request timeout to 5 seconds.
|
||||||
// this is the timeout used for http2 handshake.
|
// this is the timeout used for http2 handshake.
|
||||||
.client_timeout(5000)
|
.client_request_timeout(Duration::from_secs(5))
|
||||||
.h2(|_| async { Ok::<_, Error>(Response::ok()) })
|
.h2(|_| async { Ok::<_, Error>(Response::ok()) })
|
||||||
.tcp()
|
.tcp()
|
||||||
})?
|
})?
|
||||||
|
@ -2,7 +2,7 @@ use std::{
|
|||||||
convert::Infallible,
|
convert::Infallible,
|
||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
net, thread,
|
net, thread,
|
||||||
time::Duration,
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_http::{
|
use actix_http::{
|
||||||
@ -22,12 +22,12 @@ use futures_util::{
|
|||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1() {
|
async fn h1_basic() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.keep_alive(KeepAlive::Disabled)
|
.keep_alive(KeepAlive::Disabled)
|
||||||
.client_timeout(1000)
|
.client_request_timeout(Duration::from_secs(1))
|
||||||
.client_disconnect(1000)
|
.client_disconnect_timeout(Duration::from_secs(1))
|
||||||
.h1(|req: Request| {
|
.h1(|req: Request| {
|
||||||
assert!(req.peer_addr().is_some());
|
assert!(req.peer_addr().is_some());
|
||||||
ok::<_, Infallible>(Response::ok())
|
ok::<_, Infallible>(Response::ok())
|
||||||
@ -43,12 +43,12 @@ async fn test_h1() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_2() {
|
async fn h1_2() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.keep_alive(KeepAlive::Disabled)
|
.keep_alive(KeepAlive::Disabled)
|
||||||
.client_timeout(1000)
|
.client_request_timeout(Duration::from_secs(1))
|
||||||
.client_disconnect(1000)
|
.client_disconnect_timeout(Duration::from_secs(1))
|
||||||
.finish(|req: Request| {
|
.finish(|req: Request| {
|
||||||
assert!(req.peer_addr().is_some());
|
assert!(req.peer_addr().is_some());
|
||||||
assert_eq!(req.version(), http::Version::HTTP_11);
|
assert_eq!(req.version(), http::Version::HTTP_11);
|
||||||
@ -75,7 +75,7 @@ impl From<ExpectFailed> for Response<BoxBody> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_expect_continue() {
|
async fn expect_continue() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.expect(fn_service(|req: Request| {
|
.expect(fn_service(|req: Request| {
|
||||||
@ -106,7 +106,7 @@ async fn test_expect_continue() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_expect_continue_h1() {
|
async fn expect_continue_h1() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.expect(fn_service(|req: Request| {
|
.expect(fn_service(|req: Request| {
|
||||||
@ -139,7 +139,7 @@ async fn test_expect_continue_h1() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_chunked_payload() {
|
async fn chunked_payload() {
|
||||||
let chunk_sizes = vec![32768, 32, 32768];
|
let chunk_sizes = vec![32768, 32, 32768];
|
||||||
let total_size: usize = chunk_sizes.iter().sum();
|
let total_size: usize = chunk_sizes.iter().sum();
|
||||||
|
|
||||||
@ -197,26 +197,43 @@ async fn test_chunked_payload() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_slow_request() {
|
async fn slow_request_408() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(100)
|
.client_request_timeout(Duration::from_millis(200))
|
||||||
|
.keep_alive(Duration::from_secs(2))
|
||||||
.finish(|_| ok::<_, Infallible>(Response::ok()))
|
.finish(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
.tcp()
|
.tcp()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n");
|
let _ = stream.write_all(b"GET /test HTTP/1.1\r\n");
|
||||||
let mut data = String::new();
|
let mut data = String::new();
|
||||||
let _ = stream.read_to_string(&mut data);
|
let _ = stream.read_to_string(&mut data);
|
||||||
assert!(data.starts_with("HTTP/1.1 408 Request Timeout"));
|
assert!(
|
||||||
|
data.starts_with("HTTP/1.1 408 Request Timeout"),
|
||||||
|
"response was not 408: {}",
|
||||||
|
data
|
||||||
|
);
|
||||||
|
|
||||||
|
let diff = start.elapsed();
|
||||||
|
|
||||||
|
if diff < Duration::from_secs(1) {
|
||||||
|
// test success
|
||||||
|
} else if diff < Duration::from_secs(3) {
|
||||||
|
panic!("request seems to have wrongly timed-out according to keep-alive");
|
||||||
|
} else {
|
||||||
|
panic!("request took way too long to time out");
|
||||||
|
}
|
||||||
|
|
||||||
srv.stop().await;
|
srv.stop().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_http1_malformed_request() {
|
async fn http1_malformed_request() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
@ -234,7 +251,7 @@ async fn test_http1_malformed_request() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_http1_keepalive() {
|
async fn http1_keepalive() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
@ -257,23 +274,25 @@ async fn test_http1_keepalive() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_http1_keepalive_timeout() {
|
async fn http1_keepalive_timeout() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.keep_alive(1)
|
.keep_alive(Duration::from_secs(1))
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
.tcp()
|
.tcp()
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n");
|
|
||||||
let mut data = vec![0; 1024];
|
let _ = stream.write_all(b"GET /test HTTP/1.1\r\n\r\n");
|
||||||
|
let mut data = vec![0; 256];
|
||||||
let _ = stream.read(&mut data);
|
let _ = stream.read(&mut data);
|
||||||
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
|
||||||
|
|
||||||
thread::sleep(Duration::from_millis(1100));
|
thread::sleep(Duration::from_millis(1100));
|
||||||
|
|
||||||
let mut data = vec![0; 1024];
|
let mut data = vec![0; 256];
|
||||||
let res = stream.read(&mut data).unwrap();
|
let res = stream.read(&mut data).unwrap();
|
||||||
assert_eq!(res, 0);
|
assert_eq!(res, 0);
|
||||||
|
|
||||||
@ -281,7 +300,7 @@ async fn test_http1_keepalive_timeout() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_http1_keepalive_close() {
|
async fn http1_keepalive_close() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
@ -303,7 +322,7 @@ async fn test_http1_keepalive_close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_http10_keepalive_default_close() {
|
async fn http10_keepalive_default_close() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
@ -325,7 +344,7 @@ async fn test_http10_keepalive_default_close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_http10_keepalive() {
|
async fn http10_keepalive() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
.h1(|_| ok::<_, Infallible>(Response::ok()))
|
||||||
@ -354,7 +373,7 @@ async fn test_http10_keepalive() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_http1_keepalive_disabled() {
|
async fn http1_keepalive_disabled() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.keep_alive(KeepAlive::Disabled)
|
.keep_alive(KeepAlive::Disabled)
|
||||||
@ -377,7 +396,7 @@ async fn test_http1_keepalive_disabled() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_content_length() {
|
async fn content_length() {
|
||||||
use actix_http::{
|
use actix_http::{
|
||||||
header::{HeaderName, HeaderValue},
|
header::{HeaderName, HeaderValue},
|
||||||
StatusCode,
|
StatusCode,
|
||||||
@ -426,7 +445,7 @@ async fn test_content_length() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_headers() {
|
async fn h1_headers() {
|
||||||
let data = STR.repeat(10);
|
let data = STR.repeat(10);
|
||||||
let data2 = data.clone();
|
let data2 = data.clone();
|
||||||
|
|
||||||
@ -492,7 +511,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
|
|||||||
Hello World Hello World Hello World Hello World Hello World";
|
Hello World Hello World Hello World Hello World Hello World";
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_body() {
|
async fn h1_body() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
||||||
@ -511,7 +530,7 @@ async fn test_h1_body() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_head_empty() {
|
async fn h1_head_empty() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
||||||
@ -538,7 +557,7 @@ async fn test_h1_head_empty() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_head_binary() {
|
async fn h1_head_binary() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
||||||
@ -565,7 +584,7 @@ async fn test_h1_head_binary() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_head_binary2() {
|
async fn h1_head_binary2() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
|
||||||
@ -588,7 +607,7 @@ async fn test_h1_head_binary2() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_body_length() {
|
async fn h1_body_length() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| {
|
.h1(|_| {
|
||||||
@ -612,7 +631,7 @@ async fn test_h1_body_length() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_body_chunked_explicit() {
|
async fn h1_body_chunked_explicit() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| {
|
.h1(|_| {
|
||||||
@ -649,7 +668,7 @@ async fn test_h1_body_chunked_explicit() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_body_chunked_implicit() {
|
async fn h1_body_chunked_implicit() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| {
|
.h1(|_| {
|
||||||
@ -680,7 +699,7 @@ async fn test_h1_body_chunked_implicit() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_response_http_error_handling() {
|
async fn h1_response_http_error_handling() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(fn_service(|_| {
|
.h1(fn_service(|_| {
|
||||||
@ -719,7 +738,7 @@ impl From<BadRequest> for Response<BoxBody> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_service_error() {
|
async fn h1_service_error() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.h1(|_| err::<Response<()>, _>(BadRequest))
|
.h1(|_| err::<Response<()>, _>(BadRequest))
|
||||||
@ -738,7 +757,7 @@ async fn test_h1_service_error() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_h1_on_connect() {
|
async fn h1_on_connect() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.on_connect_ext(|_, data| {
|
.on_connect_ext(|_, data| {
|
||||||
@ -761,7 +780,7 @@ async fn test_h1_on_connect() {
|
|||||||
/// Tests compliance with 304 Not Modified spec in RFC 7232 §4.1.
|
/// Tests compliance with 304 Not Modified spec in RFC 7232 §4.1.
|
||||||
/// https://datatracker.ietf.org/doc/html/rfc7232#section-4.1
|
/// https://datatracker.ietf.org/doc/html/rfc7232#section-4.1
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_not_modified_spec_h1() {
|
async fn not_modified_spec_h1() {
|
||||||
// TODO: this test needing a few seconds to complete reveals some weirdness with either the
|
// TODO: this test needing a few seconds to complete reveals some weirdness with either the
|
||||||
// dispatcher or the client, though similar hangs occur on other tests in this file, only
|
// dispatcher or the client, though similar hangs occur on other tests in this file, only
|
||||||
// succeeding, it seems, because of the keepalive timer
|
// succeeding, it seems, because of the keepalive timer
|
||||||
|
@ -109,7 +109,7 @@ async fn service(msg: Frame) -> Result<Message, Error> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_simple() {
|
async fn simple() {
|
||||||
let mut srv = test_server(|| {
|
let mut srv = test_server(|| {
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.upgrade(fn_factory(|| async {
|
.upgrade(fn_factory(|| async {
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
- Rename `TestServerConfig::{client_timeout => client_request_timeout}`. [#2611]
|
||||||
|
|
||||||
|
[#2611]: https://github.com/actix/actix-web/pull/2611
|
||||||
|
|
||||||
|
|
||||||
## 0.1.0-beta.11 - 2022-01-04
|
## 0.1.0-beta.11 - 2022-01-04
|
||||||
|
@ -149,7 +149,7 @@ where
|
|||||||
let local_addr = tcp.local_addr().unwrap();
|
let local_addr = tcp.local_addr().unwrap();
|
||||||
let factory = factory.clone();
|
let factory = factory.clone();
|
||||||
let srv_cfg = cfg.clone();
|
let srv_cfg = cfg.clone();
|
||||||
let timeout = cfg.client_timeout;
|
let timeout = cfg.client_request_timeout;
|
||||||
|
|
||||||
let builder = Server::build().workers(1).disable_signals().system_exit();
|
let builder = Server::build().workers(1).disable_signals().system_exit();
|
||||||
|
|
||||||
@ -167,7 +167,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.h1(map_config(fac, move |_| app_cfg.clone()))
|
.h1(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.tcp()
|
.tcp()
|
||||||
}),
|
}),
|
||||||
@ -183,7 +183,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.h2(map_config(fac, move |_| app_cfg.clone()))
|
.h2(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.tcp()
|
.tcp()
|
||||||
}),
|
}),
|
||||||
@ -199,7 +199,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.finish(map_config(fac, move |_| app_cfg.clone()))
|
.finish(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.tcp()
|
.tcp()
|
||||||
}),
|
}),
|
||||||
@ -218,7 +218,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.h1(map_config(fac, move |_| app_cfg.clone()))
|
.h1(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.openssl(acceptor.clone())
|
.openssl(acceptor.clone())
|
||||||
}),
|
}),
|
||||||
@ -234,7 +234,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.h2(map_config(fac, move |_| app_cfg.clone()))
|
.h2(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.openssl(acceptor.clone())
|
.openssl(acceptor.clone())
|
||||||
}),
|
}),
|
||||||
@ -250,7 +250,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.finish(map_config(fac, move |_| app_cfg.clone()))
|
.finish(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.openssl(acceptor.clone())
|
.openssl(acceptor.clone())
|
||||||
}),
|
}),
|
||||||
@ -269,7 +269,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.h1(map_config(fac, move |_| app_cfg.clone()))
|
.h1(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.rustls(config.clone())
|
.rustls(config.clone())
|
||||||
}),
|
}),
|
||||||
@ -285,7 +285,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.h2(map_config(fac, move |_| app_cfg.clone()))
|
.h2(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.rustls(config.clone())
|
.rustls(config.clone())
|
||||||
}),
|
}),
|
||||||
@ -301,7 +301,7 @@ where
|
|||||||
.map_err(|err| err.into().error_response());
|
.map_err(|err| err.into().error_response());
|
||||||
|
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.client_timeout(timeout)
|
.client_request_timeout(timeout)
|
||||||
.finish(map_config(fac, move |_| app_cfg.clone()))
|
.finish(map_config(fac, move |_| app_cfg.clone()))
|
||||||
.rustls(config.clone())
|
.rustls(config.clone())
|
||||||
}),
|
}),
|
||||||
@ -388,7 +388,7 @@ pub fn config() -> TestServerConfig {
|
|||||||
pub struct TestServerConfig {
|
pub struct TestServerConfig {
|
||||||
tp: HttpVer,
|
tp: HttpVer,
|
||||||
stream: StreamType,
|
stream: StreamType,
|
||||||
client_timeout: u64,
|
client_request_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for TestServerConfig {
|
impl Default for TestServerConfig {
|
||||||
@ -403,7 +403,7 @@ impl TestServerConfig {
|
|||||||
TestServerConfig {
|
TestServerConfig {
|
||||||
tp: HttpVer::Both,
|
tp: HttpVer::Both,
|
||||||
stream: StreamType::Tcp,
|
stream: StreamType::Tcp,
|
||||||
client_timeout: 5000,
|
client_request_timeout: Duration::from_secs(5),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -433,9 +433,9 @@ impl TestServerConfig {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set client timeout in milliseconds for first request.
|
/// Set client timeout for first request.
|
||||||
pub fn client_timeout(mut self, val: u64) -> Self {
|
pub fn client_request_timeout(mut self, dur: Duration) -> Self {
|
||||||
self.client_timeout = val;
|
self.client_request_timeout = dur;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,7 @@ where
|
|||||||
let is_expect = if head.as_ref().headers.contains_key(EXPECT) {
|
let is_expect = if head.as_ref().headers.contains_key(EXPECT) {
|
||||||
match body.size() {
|
match body.size() {
|
||||||
BodySize::None | BodySize::Sized(0) => {
|
BodySize::None | BodySize::Sized(0) => {
|
||||||
let keep_alive = framed.codec_ref().keepalive();
|
let keep_alive = framed.codec_ref().keep_alive();
|
||||||
framed.io_mut().on_release(keep_alive);
|
framed.io_mut().on_release(keep_alive);
|
||||||
|
|
||||||
// TODO: use a new variant or a new type better describing error violate
|
// TODO: use a new variant or a new type better describing error violate
|
||||||
@ -119,7 +119,7 @@ where
|
|||||||
|
|
||||||
match pin_framed.codec_ref().message_type() {
|
match pin_framed.codec_ref().message_type() {
|
||||||
h1::MessageType::None => {
|
h1::MessageType::None => {
|
||||||
let keep_alive = pin_framed.codec_ref().keepalive();
|
let keep_alive = pin_framed.codec_ref().keep_alive();
|
||||||
pin_framed.io_mut().on_release(keep_alive);
|
pin_framed.io_mut().on_release(keep_alive);
|
||||||
|
|
||||||
Ok((head, Payload::None))
|
Ok((head, Payload::None))
|
||||||
@ -223,7 +223,7 @@ impl<Io: ConnectionIo> Stream for PlStream<Io> {
|
|||||||
match ready!(this.framed.as_mut().next_item(cx)?) {
|
match ready!(this.framed.as_mut().next_item(cx)?) {
|
||||||
Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))),
|
Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))),
|
||||||
Some(None) => {
|
Some(None) => {
|
||||||
let keep_alive = this.framed.codec_ref().keepalive();
|
let keep_alive = this.framed.codec_ref().keep_alive();
|
||||||
this.framed.io_mut().on_release(keep_alive);
|
this.framed.io_mut().on_release(keep_alive);
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ use std::{
|
|||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
net,
|
net,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_http::{body::MessageBody, Extensions, HttpService, KeepAlive, Request, Response};
|
use actix_http::{body::MessageBody, Extensions, HttpService, KeepAlive, Request, Response};
|
||||||
@ -27,8 +28,8 @@ struct Socket {
|
|||||||
struct Config {
|
struct Config {
|
||||||
host: Option<String>,
|
host: Option<String>,
|
||||||
keep_alive: KeepAlive,
|
keep_alive: KeepAlive,
|
||||||
client_timeout: u64,
|
client_request_timeout: Duration,
|
||||||
client_shutdown: u64,
|
client_disconnect_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An HTTP Server.
|
/// An HTTP Server.
|
||||||
@ -88,9 +89,9 @@ where
|
|||||||
factory,
|
factory,
|
||||||
config: Arc::new(Mutex::new(Config {
|
config: Arc::new(Mutex::new(Config {
|
||||||
host: None,
|
host: None,
|
||||||
keep_alive: KeepAlive::Timeout(5),
|
keep_alive: KeepAlive::default(),
|
||||||
client_timeout: 5000,
|
client_request_timeout: Duration::from_secs(5),
|
||||||
client_shutdown: 5000,
|
client_disconnect_timeout: Duration::from_secs(1),
|
||||||
})),
|
})),
|
||||||
backlog: 1024,
|
backlog: 1024,
|
||||||
sockets: Vec::new(),
|
sockets: Vec::new(),
|
||||||
@ -200,11 +201,17 @@ where
|
|||||||
/// To disable timeout set value to 0.
|
/// To disable timeout set value to 0.
|
||||||
///
|
///
|
||||||
/// By default client timeout is set to 5000 milliseconds.
|
/// By default client timeout is set to 5000 milliseconds.
|
||||||
pub fn client_timeout(self, val: u64) -> Self {
|
pub fn client_request_timeout(self, dur: Duration) -> Self {
|
||||||
self.config.lock().unwrap().client_timeout = val;
|
self.config.lock().unwrap().client_request_timeout = dur;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")]
|
||||||
|
pub fn client_timeout(self, dur: Duration) -> Self {
|
||||||
|
self.client_request_timeout(dur)
|
||||||
|
}
|
||||||
|
|
||||||
/// Set server connection shutdown timeout in milliseconds.
|
/// Set server connection shutdown timeout in milliseconds.
|
||||||
///
|
///
|
||||||
/// Defines a timeout for shutdown connection. If a shutdown procedure does not complete
|
/// Defines a timeout for shutdown connection. If a shutdown procedure does not complete
|
||||||
@ -213,11 +220,17 @@ where
|
|||||||
/// To disable timeout set value to 0.
|
/// To disable timeout set value to 0.
|
||||||
///
|
///
|
||||||
/// By default client timeout is set to 5000 milliseconds.
|
/// By default client timeout is set to 5000 milliseconds.
|
||||||
pub fn client_shutdown(self, val: u64) -> Self {
|
pub fn client_disconnect_timeout(self, dur: Duration) -> Self {
|
||||||
self.config.lock().unwrap().client_shutdown = val;
|
self.config.lock().unwrap().client_disconnect_timeout = dur;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")]
|
||||||
|
pub fn client_shutdown(self, dur: u64) -> Self {
|
||||||
|
self.client_disconnect_timeout(Duration::from_millis(dur))
|
||||||
|
}
|
||||||
|
|
||||||
/// Set server host name.
|
/// Set server host name.
|
||||||
///
|
///
|
||||||
/// Host name is used by application router as a hostname for url generation.
|
/// Host name is used by application router as a hostname for url generation.
|
||||||
@ -291,8 +304,8 @@ where
|
|||||||
|
|
||||||
let mut svc = HttpService::build()
|
let mut svc = HttpService::build()
|
||||||
.keep_alive(c.keep_alive)
|
.keep_alive(c.keep_alive)
|
||||||
.client_timeout(c.client_timeout)
|
.client_request_timeout(c.client_request_timeout)
|
||||||
.client_disconnect(c.client_shutdown)
|
.client_disconnect_timeout(c.client_disconnect_timeout)
|
||||||
.local_addr(addr);
|
.local_addr(addr);
|
||||||
|
|
||||||
if let Some(handler) = on_connect_fn.clone() {
|
if let Some(handler) = on_connect_fn.clone() {
|
||||||
@ -349,8 +362,8 @@ where
|
|||||||
|
|
||||||
let svc = HttpService::build()
|
let svc = HttpService::build()
|
||||||
.keep_alive(c.keep_alive)
|
.keep_alive(c.keep_alive)
|
||||||
.client_timeout(c.client_timeout)
|
.client_request_timeout(c.client_request_timeout)
|
||||||
.client_disconnect(c.client_shutdown)
|
.client_disconnect_timeout(c.client_disconnect_timeout)
|
||||||
.local_addr(addr);
|
.local_addr(addr);
|
||||||
|
|
||||||
let svc = if let Some(handler) = on_connect_fn.clone() {
|
let svc = if let Some(handler) = on_connect_fn.clone() {
|
||||||
@ -410,8 +423,8 @@ where
|
|||||||
|
|
||||||
let svc = HttpService::build()
|
let svc = HttpService::build()
|
||||||
.keep_alive(c.keep_alive)
|
.keep_alive(c.keep_alive)
|
||||||
.client_timeout(c.client_timeout)
|
.client_request_timeout(c.client_request_timeout)
|
||||||
.client_disconnect(c.client_shutdown);
|
.client_disconnect_timeout(c.client_disconnect_timeout);
|
||||||
|
|
||||||
let svc = if let Some(handler) = on_connect_fn.clone() {
|
let svc = if let Some(handler) = on_connect_fn.clone() {
|
||||||
svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext))
|
svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext))
|
||||||
@ -537,8 +550,8 @@ where
|
|||||||
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({
|
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({
|
||||||
let mut svc = HttpService::build()
|
let mut svc = HttpService::build()
|
||||||
.keep_alive(c.keep_alive)
|
.keep_alive(c.keep_alive)
|
||||||
.client_timeout(c.client_timeout)
|
.client_request_timeout(c.client_request_timeout)
|
||||||
.client_disconnect(c.client_shutdown);
|
.client_disconnect_timeout(c.client_disconnect_timeout);
|
||||||
|
|
||||||
if let Some(handler) = on_connect_fn.clone() {
|
if let Some(handler) = on_connect_fn.clone() {
|
||||||
svc = svc
|
svc = svc
|
||||||
@ -593,8 +606,8 @@ where
|
|||||||
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then(
|
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then(
|
||||||
HttpService::build()
|
HttpService::build()
|
||||||
.keep_alive(c.keep_alive)
|
.keep_alive(c.keep_alive)
|
||||||
.client_timeout(c.client_timeout)
|
.client_request_timeout(c.client_request_timeout)
|
||||||
.client_disconnect(c.client_shutdown)
|
.client_disconnect_timeout(c.client_disconnect_timeout)
|
||||||
.finish(map_config(fac, move |_| config.clone())),
|
.finish(map_config(fac, move |_| config.clone())),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -26,9 +26,9 @@ async fn test_start() {
|
|||||||
.backlog(1)
|
.backlog(1)
|
||||||
.max_connections(10)
|
.max_connections(10)
|
||||||
.max_connection_rate(10)
|
.max_connection_rate(10)
|
||||||
.keep_alive(10)
|
.keep_alive(Duration::from_secs(10))
|
||||||
.client_timeout(5000)
|
.client_request_timeout(Duration::from_secs(5))
|
||||||
.client_shutdown(0)
|
.client_disconnect_timeout(Duration::ZERO)
|
||||||
.server_hostname("localhost")
|
.server_hostname("localhost")
|
||||||
.system_exit()
|
.system_exit()
|
||||||
.disable_signals()
|
.disable_signals()
|
||||||
|
@ -8,6 +8,7 @@ use std::{
|
|||||||
io::{Read, Write},
|
io::{Read, Write},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
@ -835,9 +836,10 @@ async fn test_server_cookies() {
|
|||||||
async fn test_slow_request() {
|
async fn test_slow_request() {
|
||||||
use std::net;
|
use std::net;
|
||||||
|
|
||||||
let srv = actix_test::start_with(actix_test::config().client_timeout(200), || {
|
let srv = actix_test::start_with(
|
||||||
App::new().service(web::resource("/").route(web::to(HttpResponse::Ok)))
|
actix_test::config().client_request_timeout(Duration::from_millis(200)),
|
||||||
});
|
|| App::new().service(web::resource("/").route(web::to(HttpResponse::Ok))),
|
||||||
|
);
|
||||||
|
|
||||||
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
|
||||||
let mut data = String::new();
|
let mut data = String::new();
|
||||||
|
Loading…
Reference in New Issue
Block a user