1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-31 10:22:09 +01:00

326 lines
9.9 KiB
Rust
Raw Normal View History

2021-11-04 20:30:43 +00:00
use std::{io, time::Duration};
use actix_rt::net::TcpStream;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
2022-03-08 22:13:55 +00:00
use tracing::{info, trace};
2021-11-04 20:30:43 +00:00
use crate::{
server::ServerCommand,
2021-12-26 22:32:35 +00:00
service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
2023-07-17 03:05:39 +01:00
socket::{create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs},
2021-11-04 20:30:43 +00:00
worker::ServerWorkerConfig,
Server,
};
/// Multipath TCP (MPTCP) preference.
///
/// Also see [`ServerBuilder::mptcp()`].
#[derive(Debug, Clone)]
pub enum MpTcp {
/// MPTCP will not be used when binding sockets.
Disabled,
/// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be
/// attempted, too.
TcpFallback,
/// MPTCP will be used when binding sockets (with no fallback).
NoFallback,
}
2021-11-04 20:30:43 +00:00
/// [Server] builder.
2018-12-09 20:30:04 -08:00
pub struct ServerBuilder {
2021-11-04 20:30:43 +00:00
pub(crate) threads: usize,
pub(crate) token: usize,
pub(crate) backlog: u32,
pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
pub(crate) sockets: Vec<(usize, String, MioListener)>,
pub(crate) mptcp: MpTcp,
2021-11-04 20:30:43 +00:00
pub(crate) exit: bool,
pub(crate) listen_os_signals: bool,
pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
pub(crate) worker_config: ServerWorkerConfig,
2018-12-09 21:51:35 -08:00
}
impl Default for ServerBuilder {
fn default() -> Self {
Self::new()
}
2018-08-19 10:47:04 -07:00
}
2018-12-09 20:30:04 -08:00
impl ServerBuilder {
2018-12-09 21:51:35 -08:00
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
2021-11-04 20:30:43 +00:00
let (cmd_tx, cmd_rx) = unbounded_channel();
2018-12-09 21:51:35 -08:00
2018-12-09 20:30:04 -08:00
ServerBuilder {
threads: num_cpus::get_physical(),
token: 0,
2021-11-04 20:30:43 +00:00
factories: Vec::new(),
2018-08-19 10:47:04 -07:00
sockets: Vec::new(),
2019-03-11 12:01:55 -07:00
backlog: 2048,
mptcp: MpTcp::Disabled,
2018-08-19 10:47:04 -07:00
exit: false,
2021-11-04 20:30:43 +00:00
listen_os_signals: true,
cmd_tx,
cmd_rx,
2021-02-04 07:01:51 -08:00
worker_config: ServerWorkerConfig::default(),
2018-08-19 10:47:04 -07:00
}
}
/// Set number of workers to start.
///
/// `num` must be greater than 0.
///
/// The default worker count is the number of physical CPU cores available. If your benchmark
/// testing indicates that simultaneous multi-threading is beneficial to your app, you can use
/// the [`num_cpus`] crate to acquire the _logical_ core count instead.
2018-08-19 10:47:04 -07:00
pub fn workers(mut self, num: usize) -> Self {
assert_ne!(num, 0, "workers must be greater than 0");
2018-08-19 10:47:04 -07:00
self.threads = num;
self
}
2021-02-04 07:01:51 -08:00
/// Set max number of threads for each worker's blocking task thread pool.
///
/// One thread pool is set up **per worker**; not shared across workers.
///
/// # Examples:
/// ```
/// # use actix_server::ServerBuilder;
/// let builder = ServerBuilder::new()
/// .workers(4) // server has 4 worker thread.
/// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
/// ```
///
/// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
self.worker_config.max_blocking_threads(num);
self
}
2019-03-11 12:01:55 -07:00
/// Set the maximum number of pending connections.
///
2021-11-01 23:36:51 +00:00
/// This refers to the number of clients that can be waiting to be served. Exceeding this number
/// results in the client getting an error when attempting to connect. It should only affect
/// servers under significant load.
2019-03-11 12:01:55 -07:00
///
/// Generally set in the 64-2048 range. Default value is 2048.
///
/// This method should be called before `bind()` method call.
pub fn backlog(mut self, num: u32) -> Self {
2019-03-11 12:01:55 -07:00
self.backlog = num;
self
}
/// Sets MultiPath TCP (MPTCP) preference on bound sockets.
///
/// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance
/// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev]
/// for more info about MPTCP itself.
///
/// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to
/// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`.
///
/// This method will have no effect if called after a `bind()`.
///
/// [mptcp.dev]: https://www.mptcp.dev
#[cfg(target_os = "linux")]
pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self {
self.mptcp = mptcp_enabled;
self
}
2018-08-19 10:47:04 -07:00
/// Sets the maximum per-worker number of concurrent connections.
///
2021-11-01 23:36:51 +00:00
/// All socket listeners will stop accepting connections when this limit is reached for
/// each worker.
2018-08-19 10:47:04 -07:00
///
2018-09-07 11:35:25 -07:00
/// By default max connections is set to a 25k per worker.
2021-11-01 23:36:51 +00:00
pub fn max_concurrent_connections(mut self, num: usize) -> Self {
2021-04-04 15:00:12 -07:00
self.worker_config.max_concurrent_connections(num);
2018-08-19 10:47:04 -07:00
self
}
2021-11-01 23:36:51 +00:00
#[doc(hidden)]
#[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
pub fn maxconn(self, num: usize) -> Self {
self.max_concurrent_connections(num)
}
2021-11-15 02:33:13 +00:00
/// Stop Actix `System` after server shutdown.
2018-08-19 10:47:04 -07:00
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
2021-11-04 20:30:43 +00:00
/// Disable OS signal handling.
2018-08-19 10:47:04 -07:00
pub fn disable_signals(mut self) -> Self {
2021-11-04 20:30:43 +00:00
self.listen_os_signals = false;
2018-08-19 10:47:04 -07:00
self
}
2018-09-26 20:40:45 -07:00
/// Timeout for graceful workers shutdown in seconds.
2018-08-19 10:47:04 -07:00
///
/// After receiving a stop signal, workers have this much time to finish serving requests.
/// Workers still alive after the timeout are force dropped.
2018-08-19 10:47:04 -07:00
///
/// By default shutdown timeout sets to 30 seconds.
2019-05-18 10:56:41 -07:00
pub fn shutdown_timeout(mut self, sec: u64) -> Self {
2021-02-04 07:01:51 -08:00
self.worker_config
.shutdown_timeout(Duration::from_secs(sec));
2018-08-19 10:47:04 -07:00
self
}
2018-12-09 21:51:35 -08:00
/// Add new service to the server.
2021-12-27 18:27:54 +00:00
pub fn bind<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
2018-08-19 10:47:04 -07:00
where
2021-12-26 22:32:35 +00:00
F: ServerServiceFactory<TcpStream>,
U: ToSocketAddrs,
2021-12-27 18:27:54 +00:00
N: AsRef<str>,
2018-08-19 10:47:04 -07:00
{
let sockets = bind_addr(addr, self.backlog, &self.mptcp)?;
2018-08-19 10:47:04 -07:00
2021-11-04 20:30:43 +00:00
trace!("binding server to: {:?}", &sockets);
2018-08-19 10:47:04 -07:00
for lst in sockets {
let token = self.next_token();
2021-11-04 20:30:43 +00:00
self.factories.push(StreamNewService::create(
2019-03-09 07:27:56 -08:00
name.as_ref().to_string(),
token,
factory.clone(),
lst.local_addr()?,
));
2019-12-29 10:07:46 +06:00
self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
2018-08-19 10:47:04 -07:00
}
2021-11-04 20:30:43 +00:00
2018-08-19 10:47:04 -07:00
Ok(self)
}
2021-11-04 20:30:43 +00:00
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
2021-12-26 22:32:35 +00:00
F: ServerServiceFactory<TcpStream>,
2021-11-04 20:30:43 +00:00
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let token = self.next_token();
self.factories.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Starts processing incoming connections and return server controller.
pub fn run(self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("starting {} workers", self.threads);
2021-11-04 20:30:43 +00:00
Server::new(self)
}
}
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
}
#[cfg(unix)]
impl ServerBuilder {
2019-07-18 17:05:40 +06:00
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
2019-07-18 17:05:40 +06:00
where
2021-12-26 22:32:35 +00:00
F: ServerServiceFactory<actix_rt::net::UnixStream>,
2019-07-18 17:05:40 +06:00
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
2022-01-28 21:46:17 +00:00
if let Err(err) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
2022-01-28 21:46:17 +00:00
if err.kind() != std::io::ErrorKind::NotFound {
return Err(err);
}
}
2019-07-18 17:05:40 +06:00
let lst = crate::socket::StdUnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
2019-07-18 17:05:40 +06:00
/// Add new unix domain service to the server.
2021-11-01 23:36:51 +00:00
///
/// Useful when running as a systemd service and a socket FD is acquired externally.
pub fn listen_uds<F, N: AsRef<str>>(
mut self,
name: N,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
2021-12-26 22:32:35 +00:00
F: ServerServiceFactory<actix_rt::net::UnixStream>,
{
use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?;
let token = self.next_token();
2023-07-17 03:05:39 +01:00
let addr = crate::socket::StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
2021-11-04 20:30:43 +00:00
self.factories.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
2019-12-29 10:07:46 +06:00
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
2018-09-26 20:40:45 -07:00
}
2018-08-19 10:47:04 -07:00
}
pub(super) fn bind_addr<S: ToSocketAddrs>(
2019-03-11 12:01:55 -07:00
addr: S,
backlog: u32,
mptcp: &MpTcp,
) -> io::Result<Vec<MioTcpListener>> {
2022-01-28 22:13:10 +00:00
let mut opt_err = None;
2021-11-01 23:41:28 +00:00
let mut success = false;
2018-08-19 10:47:04 -07:00
let mut sockets = Vec::new();
2022-01-28 22:13:10 +00:00
2018-08-19 10:47:04 -07:00
for addr in addr.to_socket_addrs()? {
match create_mio_tcp_listener(addr, backlog, mptcp) {
2018-08-19 10:47:04 -07:00
Ok(lst) => {
2021-11-01 23:41:28 +00:00
success = true;
2018-08-19 10:47:04 -07:00
sockets.push(lst);
}
2022-01-28 22:13:10 +00:00
Err(err) => opt_err = Some(err),
2018-08-19 10:47:04 -07:00
}
}
2021-11-01 23:41:28 +00:00
if success {
2018-08-19 10:47:04 -07:00
Ok(sockets)
2022-01-28 22:13:10 +00:00
} else if let Some(err) = opt_err.take() {
2021-11-01 23:41:28 +00:00
Err(err)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
2018-08-19 10:47:04 -07:00
}
}