mirror of
https://github.com/fafhrd91/actix-net
synced 2025-01-18 20:01:48 +01:00
Remove MAX_CONN (#316)
This commit is contained in:
parent
3859e91799
commit
f1573931dd
@ -19,7 +19,7 @@ use crate::signals::{Signal, Signals};
|
||||
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
use crate::socket::{MioTcpListener, MioTcpSocket};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
|
||||
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
|
||||
use crate::{join_all, Token};
|
||||
|
||||
/// Server builder
|
||||
@ -117,8 +117,8 @@ impl ServerBuilder {
|
||||
/// reached for each worker.
|
||||
///
|
||||
/// By default max connections is set to a 25k per worker.
|
||||
pub fn maxconn(self, num: usize) -> Self {
|
||||
worker::max_concurrent_connections(num);
|
||||
pub fn maxconn(mut self, num: usize) -> Self {
|
||||
self.worker_config.max_concurrent_connections(num);
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,7 @@ use std::{
|
||||
mem,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
@ -43,27 +43,6 @@ pub(crate) struct Conn {
|
||||
pub token: Token,
|
||||
}
|
||||
|
||||
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
|
||||
|
||||
/// Sets the maximum per-worker number of concurrent connections.
|
||||
///
|
||||
/// All socket listeners will stop accepting connections when this limit is
|
||||
/// reached for each worker.
|
||||
///
|
||||
/// By default max connections is set to a 25k per worker.
|
||||
pub fn max_concurrent_connections(num: usize) {
|
||||
MAX_CONNS.store(num, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static MAX_CONNS_COUNTER: Counter =
|
||||
Counter::new(MAX_CONNS.load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
pub(crate) fn num_connections() -> usize {
|
||||
MAX_CONNS_COUNTER.with(|conns| conns.total())
|
||||
}
|
||||
|
||||
// a handle to worker that can send message to worker and share the availability of worker to other
|
||||
// thread.
|
||||
#[derive(Clone)]
|
||||
@ -173,6 +152,7 @@ enum WorkerServiceStatus {
|
||||
pub(crate) struct ServerWorkerConfig {
|
||||
shutdown_timeout: Duration,
|
||||
max_blocking_threads: usize,
|
||||
max_concurrent_connections: usize,
|
||||
}
|
||||
|
||||
impl Default for ServerWorkerConfig {
|
||||
@ -182,6 +162,7 @@ impl Default for ServerWorkerConfig {
|
||||
Self {
|
||||
shutdown_timeout: Duration::from_secs(30),
|
||||
max_blocking_threads,
|
||||
max_concurrent_connections: 25600,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -191,6 +172,10 @@ impl ServerWorkerConfig {
|
||||
self.max_blocking_threads = num;
|
||||
}
|
||||
|
||||
pub(crate) fn max_concurrent_connections(&mut self, num: usize) {
|
||||
self.max_concurrent_connections = num;
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
|
||||
self.shutdown_timeout = dur;
|
||||
}
|
||||
@ -218,16 +203,16 @@ impl ServerWorker {
|
||||
})
|
||||
.spawn(async move {
|
||||
availability.set(false);
|
||||
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
|
||||
let mut wrk = ServerWorker {
|
||||
rx,
|
||||
rx2,
|
||||
services: Vec::new(),
|
||||
availability,
|
||||
conns: Counter::new(config.max_concurrent_connections),
|
||||
factories,
|
||||
state: Default::default(),
|
||||
shutdown_timeout: config.shutdown_timeout,
|
||||
conns: conns.clone(),
|
||||
});
|
||||
};
|
||||
|
||||
let fut = wrk
|
||||
.factories
|
||||
@ -383,7 +368,7 @@ impl Future for ServerWorker {
|
||||
Pin::new(&mut this.rx2).poll_recv(cx)
|
||||
{
|
||||
this.availability.set(false);
|
||||
let num = num_connections();
|
||||
let num = this.conns.total();
|
||||
if num == 0 {
|
||||
info!("Shutting down worker, 0 connections");
|
||||
let _ = tx.send(true);
|
||||
@ -450,7 +435,7 @@ impl Future for ServerWorker {
|
||||
// Wait for 1 second.
|
||||
ready!(shutdown.timer.as_mut().poll(cx));
|
||||
|
||||
if num_connections() == 0 {
|
||||
if this.conns.total() == 0 {
|
||||
// Graceful shutdown.
|
||||
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
|
||||
let _ = shutdown.tx.send(true);
|
||||
|
Loading…
x
Reference in New Issue
Block a user