1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

Add support for Proxy Protocol v1 & v2 for HTTP/1.x

This commit is contained in:
veeshi 2024-02-04 18:11:30 +00:00
parent e518170a30
commit f4254c5a10
14 changed files with 424 additions and 28 deletions

View File

@ -62,7 +62,7 @@ jobs:
set -e
cargo test --lib --tests -p=actix-router --all-features
cargo test --lib --tests -p=actix-http --all-features
cargo test --lib --tests -p=actix-web --features=rustls-0_20,rustls-0_21,rustls-0_22,openssl -- --skip=test_reading_deflate_encoding_large_random_rustls
cargo test --lib --tests -p=actix-web --features=rustls-0_20,rustls-0_21,rustls-0_22,openssl,proxy-protocol-- --skip=test_reading_deflate_encoding_large_random_rustls
cargo test --lib --tests -p=actix-web-codegen --all-features
cargo test --lib --tests -p=awc --all-features
cargo test --lib --tests -p=actix-http-test --all-features

View File

@ -76,7 +76,7 @@ jobs:
set -e
cargo test --lib --tests -p=actix-router --all-features
cargo test --lib --tests -p=actix-http --all-features
cargo test --lib --tests -p=actix-web --features=rustls-0_20,rustls-0_21,rustls-0_22,openssl -- --skip=test_reading_deflate_encoding_large_random_rustls
cargo test --lib --tests -p=actix-web --features=rustls-0_20,rustls-0_21,rustls-0_22,openssl,proxy-protocol -- --skip=test_reading_deflate_encoding_large_random_rustls
cargo test --lib --tests -p=actix-web-codegen --all-features
cargo test --lib --tests -p=awc --all-features
cargo test --lib --tests -p=actix-http-test --all-features

View File

@ -66,6 +66,9 @@ rustls-0_21 = ["actix-tls/accept", "actix-tls/rustls-0_21"]
# TLS via Rustls v0.22
rustls-0_22 = ["actix-tls/accept", "actix-tls/rustls-0_22"]
# Proxy protocol support
proxy-protocol = ["ppp"]
# Compression codecs
compress-brotli = ["__compress", "brotli"]
compress-gzip = ["__compress", "flate2"]
@ -111,13 +114,16 @@ rand = { version = "0.8", optional = true }
sha1 = { version = "0.10", optional = true }
# openssl/rustls
actix-tls = { version = "3.3", default-features = false, optional = true }
actix-tls = { version = "3.1", default-features = false, optional = true }
# compress-*
brotli = { version = "3.3.3", optional = true }
flate2 = { version = "1.0.13", optional = true }
zstd = { version = "0.13", optional = true }
# Proxy protocol support
ppp = { version = "0.2", optional = true }
[dev-dependencies]
actix-http-test = { version = "3", features = ["openssl"] }
actix-server = "2"

View File

@ -23,6 +23,7 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_phantom: PhantomData<S>,
proxy_protocol: bool,
}
impl<T, S> Default for HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler>
@ -46,6 +47,8 @@ where
upgrade: None,
on_connect_ext: None,
_phantom: PhantomData,
proxy_protocol: false,
}
}
}
@ -124,6 +127,12 @@ where
self.client_disconnect_timeout(dur)
}
/// Enable `PROXY` protocol support.
pub fn proxy_protocol(mut self, enabled: bool) -> Self {
self.proxy_protocol = enabled;
self
}
/// Provide service for `EXPECT: 100-Continue` support.
///
/// Service get called with request that contains `EXPECT` header.
@ -146,6 +155,7 @@ where
upgrade: self.upgrade,
on_connect_ext: self.on_connect_ext,
_phantom: PhantomData,
proxy_protocol: self.proxy_protocol,
}
}
@ -170,6 +180,7 @@ where
upgrade: Some(upgrade.into_factory()),
on_connect_ext: self.on_connect_ext,
_phantom: PhantomData,
proxy_protocol: self.proxy_protocol,
}
}
@ -201,6 +212,7 @@ where
self.client_disconnect_timeout,
self.secure,
self.local_addr,
self.proxy_protocol,
);
H1Service::with_config(cfg, service.into_factory())
@ -226,6 +238,7 @@ where
self.client_disconnect_timeout,
self.secure,
self.local_addr,
self.proxy_protocol,
);
crate::h2::H2Service::with_config(cfg, service.into_factory())
@ -248,6 +261,7 @@ where
self.client_disconnect_timeout,
self.secure,
self.local_addr,
self.proxy_protocol,
);
HttpService::with_config(cfg, service.into_factory())

View File

@ -20,6 +20,7 @@ struct Inner {
secure: bool,
local_addr: Option<std::net::SocketAddr>,
date_service: DateService,
proxy_protocol: bool,
}
impl Default for ServiceConfig {
@ -30,6 +31,7 @@ impl Default for ServiceConfig {
Duration::ZERO,
false,
None,
false,
)
}
}
@ -42,6 +44,7 @@ impl ServiceConfig {
client_disconnect_timeout: Duration,
secure: bool,
local_addr: Option<net::SocketAddr>,
proxy_protocol: bool,
) -> ServiceConfig {
ServiceConfig(Rc::new(Inner {
keep_alive: keep_alive.normalize(),
@ -50,6 +53,7 @@ impl ServiceConfig {
secure,
local_addr,
date_service: DateService::new(),
proxy_protocol,
}))
}
@ -73,6 +77,12 @@ impl ServiceConfig {
self.0.keep_alive
}
/// Proxy protocol setting.
#[inline]
pub fn proxy_protocol(&self) -> bool {
self.0.proxy_protocol
}
/// Creates a time object representing the deadline for this connection's keep-alive period, if
/// enabled.
///
@ -143,8 +153,14 @@ mod tests {
#[actix_rt::test]
async fn test_date_service_update() {
let settings =
ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None);
let settings = ServiceConfig::new(
KeepAlive::Os,
Duration::ZERO,
Duration::ZERO,
false,
None,
false,
);
yield_now().await;

View File

@ -10,6 +10,8 @@ use super::{
encoder, Message, MessageType,
};
use crate::{body::BodySize, error::ParseError, ConnectionType, Request, Response, ServiceConfig};
#[cfg(feature = "proxy-protocol")]
use crate::{http_message::HttpMessage, proxy_protocol::ProxyProtocol};
bitflags! {
#[derive(Debug, Clone, Copy)]
@ -110,6 +112,7 @@ impl Decoder for Codec {
type Error = ParseError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
#[allow(clippy::collapsible_else_if)]
if let Some(ref mut payload) = self.payload {
Ok(match payload.decode(src)? {
Some(PayloadItem::Chunk(chunk)) => Some(Message::Chunk(Some(chunk))),
@ -119,29 +122,49 @@ impl Decoder for Codec {
}
None => None,
})
} else if let Some((req, payload)) = self.decoder.decode(src)? {
let head = req.head();
self.flags.set(Flags::HEAD, head.method == Method::HEAD);
self.version = head.version;
self.conn_type = head.connection_type();
if self.conn_type == ConnectionType::KeepAlive
&& !self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
{
self.conn_type = ConnectionType::Close
}
match payload {
PayloadType::None => self.payload = None,
PayloadType::Payload(pl) => self.payload = Some(pl),
PayloadType::Stream(pl) => {
self.payload = Some(pl);
self.flags.insert(Flags::STREAM);
}
}
Ok(Some(Message::Item(req)))
} else {
Ok(None)
#[cfg(feature = "proxy-protocol")]
let proxy_protocol = if self.config.proxy_protocol() {
let p = ProxyProtocol::decode(src)?;
if p.is_none() {
return Ok(None);
}
p
} else {
None
};
if let Some((req, payload)) = self.decoder.decode(src)? {
let head = req.head();
self.flags.set(Flags::HEAD, head.method == Method::HEAD);
self.version = head.version;
self.conn_type = head.connection_type();
if self.conn_type == ConnectionType::KeepAlive
&& !self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
{
self.conn_type = ConnectionType::Close
}
#[cfg(feature = "proxy-protocol")]
// set proxy protocol
if let Some(proxy_protocol) = proxy_protocol {
let mut extensions = req.extensions_mut();
extensions.insert(proxy_protocol);
}
match payload {
PayloadType::None => self.payload = None,
PayloadType::Payload(pl) => self.payload = Some(pl),
PayloadType::Stream(pl) => {
self.payload = Some(pl);
self.flags.insert(Flags::STREAM);
}
}
Ok(Some(Message::Item(req)))
} else {
Ok(None)
}
}
}
}

View File

@ -84,6 +84,7 @@ async fn late_request() {
Duration::ZERO,
false,
None,
false,
);
let services = HttpFlow::new(ok_service(), ExpectHandler, None);
@ -151,6 +152,7 @@ async fn oneshot_connection() {
Duration::ZERO,
false,
None,
false,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -212,6 +214,7 @@ async fn keep_alive_timeout() {
Duration::ZERO,
false,
None,
false,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -291,6 +294,7 @@ async fn keep_alive_follow_up_req() {
Duration::ZERO,
false,
None,
false,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -455,6 +459,7 @@ async fn pipelining_ok_then_ok() {
Duration::from_millis(1),
false,
None,
false,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -525,6 +530,7 @@ async fn pipelining_ok_then_bad() {
Duration::from_millis(1),
false,
None,
false,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -588,6 +594,7 @@ async fn expect_handling() {
Duration::ZERO,
false,
None,
false,
);
let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None);
@ -665,6 +672,7 @@ async fn expect_eager() {
Duration::ZERO,
false,
None,
false,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -748,6 +756,7 @@ async fn upgrade_handling() {
Duration::ZERO,
false,
None,
false,
);
let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade));

View File

@ -49,6 +49,8 @@ mod message;
#[cfg(test)]
mod notify_on_drop;
mod payload;
#[cfg(feature = "proxy-protocol")]
pub mod proxy_protocol;
mod requests;
mod responses;
mod service;

View File

@ -0,0 +1,177 @@
use bytes::{Bytes, BytesMut};
pub use ppp::{
v1::Addresses as V1Addresses,
v2::{Addresses as V2Addresses, Command, Protocol, Type as TlvType},
};
use tracing::trace;
use crate::error::ParseError;
const V1_PREFIX_LEN: usize = 5;
const V1_MAX_LEN: usize = 107;
const V2_PREFIX_LEN: usize = 12;
const V2_MIN_LEN: usize = 16;
const V2_LEN_INDEX_1: usize = 14;
const V2_LEN_INDEX_2: usize = 15;
#[derive(Clone, Debug, PartialEq)]
pub enum ProxyProtocol {
V1(ProxyProtocolV1),
V2(ProxyProtocolV2),
}
impl ProxyProtocol {
pub(crate) fn decode(src: &mut BytesMut) -> Result<Option<Self>, ParseError> {
if src.len() >= V1_PREFIX_LEN
&& &src[..V1_PREFIX_LEN] == ppp::v1::PROTOCOL_PREFIX.as_bytes()
{
if let Some(line_end) = src.iter().position(|b| *b == b'\r') {
if let Some(delimiter) = src.get(line_end + 1) {
if delimiter == &b'\n' {
let proxy_line = src.split_to(line_end + 2).freeze();
if proxy_line.len() > V1_MAX_LEN {
trace!("proxy protocol header too long");
return Err(ParseError::Header);
}
match ppp::v1::Header::try_from(&proxy_line[..]) {
Ok(header) => Ok(Some(
ProxyProtocolV1 {
addresses: header.addresses,
}
.into(),
)),
Err(e) => {
trace!("error parsing proxy protocol v1 header: {:?}", e);
Err(ParseError::Header)
}
}
} else {
trace!("invalid line ending found");
Err(ParseError::Header)
}
} else {
trace!("no line ending found, might be a partial request");
Ok(None)
}
} else if src.len() > V1_MAX_LEN {
trace!("proxy protocol header too long");
Err(ParseError::Header)
} else {
trace!("no line ending found, might be a partial request");
Ok(None)
}
} else if src.len() >= V2_PREFIX_LEN && &src[..V2_PREFIX_LEN] == ppp::v2::PROTOCOL_PREFIX {
if src.len() < V2_MIN_LEN {
return Ok(None);
}
let total_length = V2_MIN_LEN
+ u16::from_be_bytes([src[V2_LEN_INDEX_1], src[V2_LEN_INDEX_2]]) as usize;
if src.len() < total_length {
return Ok(None);
}
let proxy_line = src.split_to(total_length).freeze();
match ppp::v2::Header::try_from(&proxy_line[..]) {
Ok(header) => {
let size_hint = header.tlvs().size_hint();
let mut converted_tlvs: Vec<Tlv> =
Vec::with_capacity(size_hint.1.unwrap_or(size_hint.0));
for tlv in header.tlvs() {
match tlv {
Ok(tlv) => {
converted_tlvs.push(tlv.into());
}
Err(e) => {
trace!(
"error parsing proxy protocol v2 type length value: {:?}",
e
);
return Err(ParseError::Header);
}
}
}
Ok(Some(
ProxyProtocolV2 {
addresses: header.addresses,
command: header.command,
protocol: header.protocol,
tlvs: converted_tlvs,
}
.into(),
))
}
Err(e) => {
trace!("error parsing proxy protocol v1 header: {:?}", e);
return Err(ParseError::Header);
}
}
} else if src.len() < V1_PREFIX_LEN || src.len() < V2_PREFIX_LEN {
trace!("not enough data to parse proxy protocol header");
Ok(None)
} else {
trace!("invalid proxy protocol header");
Err(ParseError::Header)
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ProxyProtocolV1 {
pub addresses: V1Addresses,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ProxyProtocolV2 {
pub addresses: V2Addresses,
pub command: Command,
pub protocol: Protocol,
pub tlvs: Vec<Tlv>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct Tlv {
pub kind: TlvType,
pub value: bytes::Bytes,
}
impl From<ProxyProtocolV1> for ProxyProtocol {
fn from(v1: ProxyProtocolV1) -> Self {
ProxyProtocol::V1(v1)
}
}
impl From<ProxyProtocolV2> for ProxyProtocol {
fn from(v2: ProxyProtocolV2) -> Self {
ProxyProtocol::V2(v2)
}
}
impl From<ppp::v2::TypeLengthValue<'_>> for Tlv {
fn from(tlv: ppp::v2::TypeLengthValue<'_>) -> Self {
Tlv {
kind: match tlv.kind {
0x01 => TlvType::ALPN,
0x02 => TlvType::Authority,
0x03 => TlvType::CRC32C,
0x04 => TlvType::NoOp,
0x05 => TlvType::UniqueId,
0x20 => TlvType::SSL,
0x21 => TlvType::SSLVersion,
0x22 => TlvType::SSLCommonName,
0x23 => TlvType::SSLCipher,
0x24 => TlvType::SSLSignatureAlgorithm,
0x25 => TlvType::SSLKeyAlgorithm,
0x30 => TlvType::NetworkNamespace,
_ => unreachable!(),
},
value: Bytes::copy_from_slice(&tlv.value),
}
}
}

View File

@ -173,6 +173,7 @@ where
HttpService::build()
.client_request_timeout(timeout)
.proxy_protocol(srv_cfg.proxy_protocol)
.h1(map_config(fac, move |_| app_cfg.clone()))
.tcp()
}),
@ -461,6 +462,7 @@ pub struct TestServerConfig {
client_request_timeout: Duration,
port: u16,
workers: usize,
proxy_protocol: bool,
}
impl Default for TestServerConfig {
@ -478,6 +480,7 @@ impl TestServerConfig {
client_request_timeout: Duration::from_secs(5),
port: 0,
workers: 1,
proxy_protocol: false,
}
}
@ -558,6 +561,14 @@ impl TestServerConfig {
self.workers = workers;
self
}
/// Enable proxy protocol support.
///
/// By default, the server does not use proxy protocol.
pub fn proxy_protocol(mut self) -> Self {
self.proxy_protocol = true;
self
}
}
/// A basic HTTP server controller that simplifies the process of writing integration tests for

View File

@ -72,6 +72,9 @@ rustls-0_21 = ["http2", "actix-http/rustls-0_21", "actix-tls/accept", "actix-tls
# TLS via Rustls v0.22
rustls-0_22 = ["http2", "actix-http/rustls-0_22", "actix-tls/accept", "actix-tls/rustls-0_22"]
# Proxy protocol support
proxy-protocol = ["actix-http/proxy-protocol"]
# Internal (PRIVATE!) features used to aid testing and checking feature status.
# Don't rely on these whatsoever. They may disappear at anytime.
__compress = []
@ -145,6 +148,10 @@ required-features = ["compress-brotli", "compress-gzip", "compress-zstd", "cooki
name = "compression"
required-features = ["compress-brotli", "compress-gzip", "compress-zstd"]
[[test]]
name = "proxy_protocol"
required-features = ["proxy-protocol"]
[[example]]
name = "basic"
required-features = ["compress-gzip"]

View File

@ -36,6 +36,7 @@ struct Config {
client_disconnect_timeout: Duration,
#[allow(dead_code)] // only dead when no TLS features are enabled
tls_handshake_timeout: Option<Duration>,
proxy_protocol: bool,
}
/// An HTTP Server.
@ -119,6 +120,7 @@ where
client_request_timeout: Duration::from_secs(5),
client_disconnect_timeout: Duration::from_secs(1),
tls_handshake_timeout: None,
proxy_protocol: false,
})),
backlog: 1024,
sockets: Vec::new(),
@ -155,6 +157,21 @@ where
self
}
/// Sets if the server should use the PROXY protocol.
pub fn proxy_protocol(self, enabled: bool) -> Self {
#[cfg(not(feature = "proxy-protocol"))]
{
if enabled {
panic!(
"Proxy protocol support is not enabled. Enable the `proxy-protocol` feature."
)
}
}
self.config.lock().unwrap().proxy_protocol = enabled;
self
}
/// Sets the maximum number of pending connections.
///
/// This refers to the number of clients that can be waiting to be served. Exceeding this number
@ -513,6 +530,7 @@ where
.keep_alive(cfg.keep_alive)
.client_request_timeout(cfg.client_request_timeout)
.client_disconnect_timeout(cfg.client_disconnect_timeout)
.proxy_protocol(cfg.proxy_protocol)
.local_addr(addr);
if let Some(handler) = on_connect_fn.clone() {

View File

@ -0,0 +1,113 @@
use std::{
io::{Read, Write},
net::{Ipv4Addr, Shutdown, TcpStream},
};
use actix_http::proxy_protocol::{
Command, Protocol, ProxyProtocol, ProxyProtocolV1, ProxyProtocolV2, TlvType, V1Addresses,
V2Addresses,
};
use actix_test::TestServerConfig;
use actix_web::{get, App, HttpMessage, HttpRequest, HttpResponse, Responder};
#[get("/v1")]
async fn proxy_protocol_v1(req: HttpRequest) -> impl Responder {
let extensions = req.extensions();
let proxy_protocol = extensions.get::<ProxyProtocol>().unwrap();
if let ProxyProtocol::V1(ProxyProtocolV1 {
addresses: V1Addresses::Tcp4(addr),
}) = proxy_protocol
{
if addr.source_address == Ipv4Addr::new(127, 0, 1, 2)
&& addr.destination_address == Ipv4Addr::new(192, 168, 1, 101)
&& addr.source_port == 80
&& addr.destination_port == 443
{
return HttpResponse::Ok().body(format!("{:?}", proxy_protocol));
}
}
HttpResponse::NotFound().finish()
}
#[get("/v2")]
async fn proxy_protocol_v2(req: HttpRequest) -> impl Responder {
let extensions = req.extensions();
let proxy_protocol = extensions.get::<ProxyProtocol>().unwrap();
if let ProxyProtocol::V2(ProxyProtocolV2 {
addresses: V2Addresses::IPv4(addr),
command,
protocol,
tlvs,
}) = proxy_protocol
{
if addr.source_address == Ipv4Addr::new(127, 0, 1, 2)
&& addr.destination_address == Ipv4Addr::new(192, 168, 1, 101)
&& addr.source_port == 80
&& addr.destination_port == 443
&& matches!(command, Command::Proxy)
&& matches!(protocol, Protocol::Datagram)
&& tlvs.len() == 1
&& tlvs[0].kind == TlvType::NoOp
&& tlvs[0].value[..] == [42]
{
return HttpResponse::Ok().body(format!("{:?}", proxy_protocol));
}
}
HttpResponse::NotFound().finish()
}
#[actix_rt::test]
async fn test_parse_proxy_protocol_v1() {
let srv = actix_test::start_with(TestServerConfig::default().h1().proxy_protocol(), || {
App::new().service(proxy_protocol_v1)
});
let mut stream = TcpStream::connect(srv.addr()).unwrap();
stream
.write_all(
b"PROXY TCP4 127.0.1.2 192.168.1.101 80 443\r\nGET /v1 HTTP/1.1\r\n\r\n".as_ref(),
)
.unwrap();
let mut buf = [0; 1024];
let n = stream.read(&mut buf).unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
println!("{}", response);
assert!(response.starts_with("HTTP/1.1 200 OK\r\n"));
stream.shutdown(Shutdown::Both).unwrap();
srv.stop().await;
}
#[actix_rt::test]
async fn test_parse_proxy_protocol_v2() {
let srv = actix_test::start_with(TestServerConfig::default().h1().proxy_protocol(), || {
App::new().service(proxy_protocol_v2)
});
let mut stream = TcpStream::connect(srv.addr()).unwrap();
stream.write_all(b"\r\n\r\n\0\r\nQUIT\n".as_ref()).unwrap();
stream
.write_all(&[
0x21, 0x12, 0, 16, 127, 0, 1, 2, 192, 168, 1, 101, 0, 80, 1, 187, 4, 0, 1, 42,
])
.unwrap();
stream.write_all(b"GET /v2 HTTP/1.1\r\n\r\n").unwrap();
let mut buf = [0; 1024];
let n = stream.read(&mut buf).unwrap();
let response = String::from_utf8_lossy(&buf[..n]);
println!("{}", response);
assert!(response.starts_with("HTTP/1.1 200 OK\r\n"));
stream.shutdown(Shutdown::Both).unwrap();
srv.stop().await;
}

View File

@ -14,7 +14,7 @@ save_exit_code() {
save_exit_code cargo test --lib --tests -p=actix-router --all-features -- --nocapture
save_exit_code cargo test --lib --tests -p=actix-http --all-features -- --nocapture
save_exit_code cargo test --lib --tests -p=actix-web --features=rustls,openssl -- --nocapture
save_exit_code cargo test --lib --tests -p=actix-web --features=rustls,openssl,proxy-protocol -- --nocapture
save_exit_code cargo test --lib --tests -p=actix-web-codegen --all-features -- --nocapture
save_exit_code cargo test --lib --tests -p=awc --all-features -- --nocapture
save_exit_code cargo test --lib --tests -p=actix-http-test --all-features -- --nocapture