diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index e98f9d2b..1f9f7b3c 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased - 2023-xx-xx +- Add support for MultiPath TCP (MPTCP) with `MpTcp` enum and `ServerBuilder::mptcp()` method. - Minimum supported Rust version (MSRV) is now 1.65. ## 2.2.0 - 2022-12-21 diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index badac77b..e1d3a2d3 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -12,6 +12,22 @@ use crate::{ Server, }; +/// Multipath TCP (MPTCP) preference. +/// +/// Also see [`ServerBuilder::mptcp()`]. +#[derive(Debug, Clone)] +pub enum MpTcp { + /// MPTCP will not be used when binding sockets. + Disabled, + + /// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be + /// attempted, too. + TcpFallback, + + /// MPTCP will be used when binding sockets (with no fallback). + NoFallback, +} + /// [Server] builder. pub struct ServerBuilder { pub(crate) threads: usize, @@ -19,6 +35,7 @@ pub struct ServerBuilder { pub(crate) backlog: u32, pub(crate) factories: Vec>, pub(crate) sockets: Vec<(usize, String, MioListener)>, + pub(crate) mptcp: MpTcp, pub(crate) exit: bool, pub(crate) listen_os_signals: bool, pub(crate) cmd_tx: UnboundedSender, @@ -43,6 +60,7 @@ impl ServerBuilder { factories: Vec::new(), sockets: Vec::new(), backlog: 2048, + mptcp: MpTcp::Disabled, exit: false, listen_os_signals: true, cmd_tx, @@ -96,6 +114,24 @@ impl ServerBuilder { self } + /// Sets MultiPath TCP (MPTCP) preference on bound sockets. + /// + /// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance + /// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev] + /// for more info about MPTCP itself. + /// + /// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to + /// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`. + /// + /// This method will have no effect if called after a `bind()`. + /// + /// [mptcp.dev]: https://www.mptcp.dev + #[cfg(target_os = "linux")] + pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self { + self.mptcp = mptcp_enabled; + self + } + /// Sets the maximum per-worker number of concurrent connections. /// /// All socket listeners will stop accepting connections when this limit is reached for @@ -144,7 +180,7 @@ impl ServerBuilder { U: ToSocketAddrs, N: AsRef, { - let sockets = bind_addr(addr, self.backlog)?; + let sockets = bind_addr(addr, self.backlog, &self.mptcp)?; trace!("binding server to: {:?}", &sockets); @@ -260,13 +296,14 @@ impl ServerBuilder { pub(super) fn bind_addr( addr: S, backlog: u32, + mptcp: &MpTcp, ) -> io::Result> { let mut opt_err = None; let mut success = false; let mut sockets = Vec::new(); for addr in addr.to_socket_addrs()? { - match create_mio_tcp_listener(addr, backlog) { + match create_mio_tcp_listener(addr, backlog, mptcp) { Ok(lst) => { success = true; sockets.push(lst); diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 5e265d74..24adf8ff 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -21,7 +21,10 @@ mod worker; #[doc(hidden)] pub use self::socket::FromStream; pub use self::{ - builder::ServerBuilder, handle::ServerHandle, server::Server, service::ServerServiceFactory, + builder::{MpTcp, ServerBuilder}, + handle::ServerHandle, + server::Server, + service::ServerServiceFactory, test_server::TestServer, }; diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index f0942e38..486c0d46 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -11,6 +11,8 @@ pub(crate) use { mio::net::UnixListener as MioUnixListener, std::os::unix::net::UnixListener as StdUnixListener, }; +use crate::builder::MpTcp; + pub(crate) enum MioListener { Tcp(MioTcpListener), #[cfg(unix)] @@ -223,10 +225,30 @@ mod unix_impl { pub(crate) fn create_mio_tcp_listener( addr: StdSocketAddr, backlog: u32, + mptcp: &MpTcp, ) -> io::Result { use socket2::{Domain, Protocol, Socket, Type}; - let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?; + #[cfg(not(target_os = "linux"))] + let protocol = Protocol::TCP; + #[cfg(target_os = "linux")] + let protocol = if matches!(mptcp, MpTcp::Disabled) { + Protocol::TCP + } else { + Protocol::MPTCP + }; + + let socket = match Socket::new(Domain::for_address(addr), Type::STREAM, Some(protocol)) { + Ok(sock) => sock, + + Err(err) if matches!(mptcp, MpTcp::TcpFallback) => { + tracing::warn!("binding socket as MPTCP failed: {err}"); + tracing::warn!("falling back to TCP"); + Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))? + } + + Err(err) => return Err(err), + }; socket.set_reuse_address(true)?; socket.set_nonblocking(true)?; @@ -247,7 +269,7 @@ mod tests { assert_eq!(format!("{}", addr), "127.0.0.1:8080"); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); - let lst = create_mio_tcp_listener(addr, 128).unwrap(); + let lst = create_mio_tcp_listener(addr, 128, &MpTcp::Disabled).unwrap(); let lst = MioListener::Tcp(lst); assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{}", lst).contains("127.0.0.1"));