1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 21:51:06 +01:00

add MPTCP socket protocol (optional) (#466)

Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
Martin André 2023-07-17 05:10:36 +02:00 committed by GitHub
parent 8d5d1dbf6f
commit 755b231e00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 5 deletions

View File

@ -2,6 +2,7 @@
## Unreleased - 2023-xx-xx ## Unreleased - 2023-xx-xx
- Add support for MultiPath TCP (MPTCP) with `MpTcp` enum and `ServerBuilder::mptcp()` method.
- Minimum supported Rust version (MSRV) is now 1.65. - Minimum supported Rust version (MSRV) is now 1.65.
## 2.2.0 - 2022-12-21 ## 2.2.0 - 2022-12-21

View File

@ -12,6 +12,22 @@ use crate::{
Server, Server,
}; };
/// Multipath TCP (MPTCP) preference.
///
/// Also see [`ServerBuilder::mptcp()`].
#[derive(Debug, Clone)]
pub enum MpTcp {
/// MPTCP will not be used when binding sockets.
Disabled,
/// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be
/// attempted, too.
TcpFallback,
/// MPTCP will be used when binding sockets (with no fallback).
NoFallback,
}
/// [Server] builder. /// [Server] builder.
pub struct ServerBuilder { pub struct ServerBuilder {
pub(crate) threads: usize, pub(crate) threads: usize,
@ -19,6 +35,7 @@ pub struct ServerBuilder {
pub(crate) backlog: u32, pub(crate) backlog: u32,
pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>, pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
pub(crate) sockets: Vec<(usize, String, MioListener)>, pub(crate) sockets: Vec<(usize, String, MioListener)>,
pub(crate) mptcp: MpTcp,
pub(crate) exit: bool, pub(crate) exit: bool,
pub(crate) listen_os_signals: bool, pub(crate) listen_os_signals: bool,
pub(crate) cmd_tx: UnboundedSender<ServerCommand>, pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
@ -43,6 +60,7 @@ impl ServerBuilder {
factories: Vec::new(), factories: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
backlog: 2048, backlog: 2048,
mptcp: MpTcp::Disabled,
exit: false, exit: false,
listen_os_signals: true, listen_os_signals: true,
cmd_tx, cmd_tx,
@ -96,6 +114,24 @@ impl ServerBuilder {
self self
} }
/// Sets MultiPath TCP (MPTCP) preference on bound sockets.
///
/// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance
/// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev]
/// for more info about MPTCP itself.
///
/// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to
/// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`.
///
/// This method will have no effect if called after a `bind()`.
///
/// [mptcp.dev]: https://www.mptcp.dev
#[cfg(target_os = "linux")]
pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self {
self.mptcp = mptcp_enabled;
self
}
/// Sets the maximum per-worker number of concurrent connections. /// Sets the maximum per-worker number of concurrent connections.
/// ///
/// All socket listeners will stop accepting connections when this limit is reached for /// All socket listeners will stop accepting connections when this limit is reached for
@ -144,7 +180,7 @@ impl ServerBuilder {
U: ToSocketAddrs, U: ToSocketAddrs,
N: AsRef<str>, N: AsRef<str>,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog, &self.mptcp)?;
trace!("binding server to: {:?}", &sockets); trace!("binding server to: {:?}", &sockets);
@ -260,13 +296,14 @@ impl ServerBuilder {
pub(super) fn bind_addr<S: ToSocketAddrs>( pub(super) fn bind_addr<S: ToSocketAddrs>(
addr: S, addr: S,
backlog: u32, backlog: u32,
mptcp: &MpTcp,
) -> io::Result<Vec<MioTcpListener>> { ) -> io::Result<Vec<MioTcpListener>> {
let mut opt_err = None; let mut opt_err = None;
let mut success = false; let mut success = false;
let mut sockets = Vec::new(); let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? { for addr in addr.to_socket_addrs()? {
match create_mio_tcp_listener(addr, backlog) { match create_mio_tcp_listener(addr, backlog, mptcp) {
Ok(lst) => { Ok(lst) => {
success = true; success = true;
sockets.push(lst); sockets.push(lst);

View File

@ -21,7 +21,10 @@ mod worker;
#[doc(hidden)] #[doc(hidden)]
pub use self::socket::FromStream; pub use self::socket::FromStream;
pub use self::{ pub use self::{
builder::ServerBuilder, handle::ServerHandle, server::Server, service::ServerServiceFactory, builder::{MpTcp, ServerBuilder},
handle::ServerHandle,
server::Server,
service::ServerServiceFactory,
test_server::TestServer, test_server::TestServer,
}; };

View File

@ -11,6 +11,8 @@ pub(crate) use {
mio::net::UnixListener as MioUnixListener, std::os::unix::net::UnixListener as StdUnixListener, mio::net::UnixListener as MioUnixListener, std::os::unix::net::UnixListener as StdUnixListener,
}; };
use crate::builder::MpTcp;
pub(crate) enum MioListener { pub(crate) enum MioListener {
Tcp(MioTcpListener), Tcp(MioTcpListener),
#[cfg(unix)] #[cfg(unix)]
@ -223,10 +225,30 @@ mod unix_impl {
pub(crate) fn create_mio_tcp_listener( pub(crate) fn create_mio_tcp_listener(
addr: StdSocketAddr, addr: StdSocketAddr,
backlog: u32, backlog: u32,
mptcp: &MpTcp,
) -> io::Result<MioTcpListener> { ) -> io::Result<MioTcpListener> {
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?; #[cfg(not(target_os = "linux"))]
let protocol = Protocol::TCP;
#[cfg(target_os = "linux")]
let protocol = if matches!(mptcp, MpTcp::Disabled) {
Protocol::TCP
} else {
Protocol::MPTCP
};
let socket = match Socket::new(Domain::for_address(addr), Type::STREAM, Some(protocol)) {
Ok(sock) => sock,
Err(err) if matches!(mptcp, MpTcp::TcpFallback) => {
tracing::warn!("binding socket as MPTCP failed: {err}");
tracing::warn!("falling back to TCP");
Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?
}
Err(err) => return Err(err),
};
socket.set_reuse_address(true)?; socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?; socket.set_nonblocking(true)?;
@ -247,7 +269,7 @@ mod tests {
assert_eq!(format!("{}", addr), "127.0.0.1:8080"); assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let lst = create_mio_tcp_listener(addr, 128).unwrap(); let lst = create_mio_tcp_listener(addr, 128, &MpTcp::Disabled).unwrap();
let lst = MioListener::Tcp(lst); let lst = MioListener::Tcp(lst);
assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1")); assert!(format!("{}", lst).contains("127.0.0.1"));