mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-21 06:55:37 +02:00
refactor connection counter (#343)
* Remove restart_worker test * Remove Slab * Rework counter * Make counter limit switch accurate * Remove backpressure. Add pause state * make changes for review * fix doc comment for counter
This commit is contained in:
@@ -7,18 +7,14 @@ use actix_rt::{
|
||||
};
|
||||
use log::{error, info};
|
||||
use mio::{Interest, Poll, Token as MioToken};
|
||||
use slab::Slab;
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::socket::MioListener;
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
|
||||
use crate::worker::{Conn, WorkerHandleAccept};
|
||||
use crate::Token;
|
||||
|
||||
struct ServerSocketInfo {
|
||||
/// Beware this is the crate token for identify socket and should not be confused
|
||||
/// with `mio::Token`.
|
||||
token: Token,
|
||||
token: usize,
|
||||
|
||||
lst: MioListener,
|
||||
|
||||
@@ -62,7 +58,7 @@ impl AcceptLoop {
|
||||
|
||||
pub(crate) fn start(
|
||||
&mut self,
|
||||
socks: Vec<(Token, MioListener)>,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
) {
|
||||
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
|
||||
@@ -81,7 +77,7 @@ struct Accept {
|
||||
srv: Server,
|
||||
next: usize,
|
||||
avail: Availability,
|
||||
backpressure: bool,
|
||||
paused: bool,
|
||||
}
|
||||
|
||||
/// Array of u128 with every bit as marker for a worker handle's availability.
|
||||
@@ -95,23 +91,22 @@ impl Default for Availability {
|
||||
|
||||
impl Availability {
|
||||
/// Check if any worker handle is available
|
||||
#[inline(always)]
|
||||
fn available(&self) -> bool {
|
||||
self.0.iter().any(|a| *a != 0)
|
||||
}
|
||||
|
||||
/// Check if worker handle is available by index
|
||||
#[inline(always)]
|
||||
fn get_available(&self, idx: usize) -> bool {
|
||||
let (offset, idx) = Self::offset(idx);
|
||||
|
||||
self.0[offset] & (1 << idx as u128) != 0
|
||||
}
|
||||
|
||||
/// Set worker handle available state by index.
|
||||
fn set_available(&mut self, idx: usize, avail: bool) {
|
||||
let (offset, idx) = if idx < 128 {
|
||||
(0, idx)
|
||||
} else if idx < 128 * 2 {
|
||||
(1, idx - 128)
|
||||
} else if idx < 128 * 3 {
|
||||
(2, idx - 128 * 2)
|
||||
} else if idx < 128 * 4 {
|
||||
(3, idx - 128 * 3)
|
||||
} else {
|
||||
panic!("Max WorkerHandle count is 512")
|
||||
};
|
||||
let (offset, idx) = Self::offset(idx);
|
||||
|
||||
let off = 1 << idx as u128;
|
||||
if avail {
|
||||
@@ -128,6 +123,21 @@ impl Availability {
|
||||
self.set_available(handle.idx(), true);
|
||||
})
|
||||
}
|
||||
|
||||
/// Get offset and adjusted index of given worker handle index.
|
||||
fn offset(idx: usize) -> (usize, usize) {
|
||||
if idx < 128 {
|
||||
(0, idx)
|
||||
} else if idx < 128 * 2 {
|
||||
(1, idx - 128)
|
||||
} else if idx < 128 * 3 {
|
||||
(2, idx - 128 * 2)
|
||||
} else if idx < 128 * 4 {
|
||||
(3, idx - 128 * 3)
|
||||
} else {
|
||||
panic!("Max WorkerHandle count is 512")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This function defines errors that are per-connection. Which basically
|
||||
@@ -147,7 +157,7 @@ impl Accept {
|
||||
pub(crate) fn start(
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(Token, MioListener)>,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
srv: Server,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
) {
|
||||
@@ -158,10 +168,10 @@ impl Accept {
|
||||
.name("actix-server accept loop".to_owned())
|
||||
.spawn(move || {
|
||||
System::set_current(sys);
|
||||
let (mut accept, sockets) =
|
||||
let (mut accept, mut sockets) =
|
||||
Accept::new_with_sockets(poll, waker, socks, handles, srv);
|
||||
|
||||
accept.poll_with(sockets);
|
||||
accept.poll_with(&mut sockets);
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
@@ -169,26 +179,25 @@ impl Accept {
|
||||
fn new_with_sockets(
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(Token, MioListener)>,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
srv: Server,
|
||||
) -> (Accept, Slab<ServerSocketInfo>) {
|
||||
let mut sockets = Slab::new();
|
||||
for (hnd_token, mut lst) in socks.into_iter() {
|
||||
let entry = sockets.vacant_entry();
|
||||
let token = entry.key();
|
||||
) -> (Accept, Vec<ServerSocketInfo>) {
|
||||
let sockets = socks
|
||||
.into_iter()
|
||||
.map(|(token, mut lst)| {
|
||||
// Start listening for incoming connections
|
||||
poll.registry()
|
||||
.register(&mut lst, MioToken(token), Interest::READABLE)
|
||||
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
|
||||
|
||||
// Start listening for incoming connections
|
||||
poll.registry()
|
||||
.register(&mut lst, MioToken(token), Interest::READABLE)
|
||||
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
|
||||
|
||||
entry.insert(ServerSocketInfo {
|
||||
token: hnd_token,
|
||||
lst,
|
||||
timeout: None,
|
||||
});
|
||||
}
|
||||
ServerSocketInfo {
|
||||
token,
|
||||
lst,
|
||||
timeout: None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut avail = Availability::default();
|
||||
|
||||
@@ -202,19 +211,19 @@ impl Accept {
|
||||
srv,
|
||||
next: 0,
|
||||
avail,
|
||||
backpressure: false,
|
||||
paused: false,
|
||||
};
|
||||
|
||||
(accept, sockets)
|
||||
}
|
||||
|
||||
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) {
|
||||
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
|
||||
let mut events = mio::Events::with_capacity(128);
|
||||
|
||||
loop {
|
||||
if let Err(e) = self.poll.poll(&mut events, None) {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::Interrupted => continue,
|
||||
io::ErrorKind::Interrupted => continue,
|
||||
_ => panic!("Poll error: {}", e),
|
||||
}
|
||||
}
|
||||
@@ -222,122 +231,146 @@ impl Accept {
|
||||
for event in events.iter() {
|
||||
let token = event.token();
|
||||
match token {
|
||||
// This is a loop because interests for command from previous version was
|
||||
// a loop that would try to drain the command channel. It's yet unknown
|
||||
// if it's necessary/good practice to actively drain the waker queue.
|
||||
WAKER_TOKEN => 'waker: loop {
|
||||
// take guard with every iteration so no new interest can be added
|
||||
// until the current task is done.
|
||||
let mut guard = self.waker.guard();
|
||||
match guard.pop_front() {
|
||||
// worker notify it becomes available. we may want to recover
|
||||
// from backpressure.
|
||||
Some(WakerInterest::WorkerAvailable(idx)) => {
|
||||
drop(guard);
|
||||
self.maybe_backpressure(&mut sockets, false);
|
||||
self.avail.set_available(idx, true);
|
||||
}
|
||||
// a new worker thread is made and it's handle would be added to Accept
|
||||
Some(WakerInterest::Worker(handle)) => {
|
||||
drop(guard);
|
||||
// maybe we want to recover from a backpressure.
|
||||
self.maybe_backpressure(&mut sockets, false);
|
||||
self.avail.set_available(handle.idx(), true);
|
||||
self.handles.push(handle);
|
||||
}
|
||||
// got timer interest and it's time to try register socket(s) again
|
||||
Some(WakerInterest::Timer) => {
|
||||
drop(guard);
|
||||
self.process_timer(&mut sockets)
|
||||
}
|
||||
Some(WakerInterest::Pause) => {
|
||||
drop(guard);
|
||||
self.deregister_all(&mut sockets);
|
||||
}
|
||||
Some(WakerInterest::Resume) => {
|
||||
drop(guard);
|
||||
sockets.iter_mut().for_each(|(token, info)| {
|
||||
self.register_logged(token, info);
|
||||
});
|
||||
}
|
||||
Some(WakerInterest::Stop) => {
|
||||
return self.deregister_all(&mut sockets);
|
||||
}
|
||||
// waker queue is drained
|
||||
None => {
|
||||
// Reset the WakerQueue before break so it does not grow infinitely
|
||||
WakerQueue::reset(&mut guard);
|
||||
break 'waker;
|
||||
}
|
||||
WAKER_TOKEN => {
|
||||
let exit = self.handle_waker(sockets);
|
||||
if exit {
|
||||
info!("Accept is stopped.");
|
||||
return;
|
||||
}
|
||||
},
|
||||
}
|
||||
_ => {
|
||||
let token = usize::from(token);
|
||||
self.accept(&mut sockets, token);
|
||||
self.accept(sockets, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
|
||||
fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
|
||||
// This is a loop because interests for command from previous version was
|
||||
// a loop that would try to drain the command channel. It's yet unknown
|
||||
// if it's necessary/good practice to actively drain the waker queue.
|
||||
loop {
|
||||
// take guard with every iteration so no new interest can be added
|
||||
// until the current task is done.
|
||||
let mut guard = self.waker.guard();
|
||||
match guard.pop_front() {
|
||||
// worker notify it becomes available.
|
||||
Some(WakerInterest::WorkerAvailable(idx)) => {
|
||||
drop(guard);
|
||||
|
||||
self.avail.set_available(idx, true);
|
||||
|
||||
if !self.paused {
|
||||
self.accept_all(sockets);
|
||||
}
|
||||
}
|
||||
// a new worker thread is made and it's handle would be added to Accept
|
||||
Some(WakerInterest::Worker(handle)) => {
|
||||
drop(guard);
|
||||
|
||||
self.avail.set_available(handle.idx(), true);
|
||||
self.handles.push(handle);
|
||||
|
||||
if !self.paused {
|
||||
self.accept_all(sockets);
|
||||
}
|
||||
}
|
||||
// got timer interest and it's time to try register socket(s) again
|
||||
Some(WakerInterest::Timer) => {
|
||||
drop(guard);
|
||||
|
||||
self.process_timer(sockets)
|
||||
}
|
||||
Some(WakerInterest::Pause) => {
|
||||
drop(guard);
|
||||
|
||||
self.paused = true;
|
||||
|
||||
self.deregister_all(sockets);
|
||||
}
|
||||
Some(WakerInterest::Resume) => {
|
||||
drop(guard);
|
||||
|
||||
self.paused = false;
|
||||
|
||||
sockets.iter_mut().for_each(|info| {
|
||||
self.register_logged(info);
|
||||
});
|
||||
|
||||
self.accept_all(sockets);
|
||||
}
|
||||
Some(WakerInterest::Stop) => {
|
||||
self.deregister_all(sockets);
|
||||
|
||||
return true;
|
||||
}
|
||||
// waker queue is drained
|
||||
None => {
|
||||
// Reset the WakerQueue before break so it does not grow infinitely
|
||||
WakerQueue::reset(&mut guard);
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
|
||||
let now = Instant::now();
|
||||
sockets
|
||||
.iter_mut()
|
||||
// Only sockets that had an associated timeout were deregistered.
|
||||
.filter(|(_, info)| info.timeout.is_some())
|
||||
.for_each(|(token, info)| {
|
||||
.filter(|info| info.timeout.is_some())
|
||||
.for_each(|info| {
|
||||
let inst = info.timeout.take().unwrap();
|
||||
|
||||
if now < inst {
|
||||
info.timeout = Some(inst);
|
||||
} else if !self.backpressure {
|
||||
self.register_logged(token, info);
|
||||
} else if !self.paused {
|
||||
self.register_logged(info);
|
||||
}
|
||||
|
||||
// Drop the timeout if server is in backpressure and socket timeout is expired.
|
||||
// When server recovers from backpressure it will register all sockets without
|
||||
// Drop the timeout if server is paused and socket timeout is expired.
|
||||
// When server recovers from pause it will register all sockets without
|
||||
// a timeout value so this socket register will be delayed till then.
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
|
||||
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
|
||||
let token = MioToken(info.token);
|
||||
self.poll
|
||||
.registry()
|
||||
.register(&mut info.lst, MioToken(token), Interest::READABLE)
|
||||
.register(&mut info.lst, token, Interest::READABLE)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
|
||||
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
|
||||
// On windows, calling register without deregister cause an error.
|
||||
// See https://github.com/actix/actix-web/issues/905
|
||||
// Calling reregister seems to fix the issue.
|
||||
let token = MioToken(info.token);
|
||||
self.poll
|
||||
.registry()
|
||||
.register(&mut info.lst, mio::Token(token), Interest::READABLE)
|
||||
.register(&mut info.lst, token, Interest::READABLE)
|
||||
.or_else(|_| {
|
||||
self.poll.registry().reregister(
|
||||
&mut info.lst,
|
||||
mio::Token(token),
|
||||
Interest::READABLE,
|
||||
)
|
||||
self.poll
|
||||
.registry()
|
||||
.reregister(&mut info.lst, token, Interest::READABLE)
|
||||
})
|
||||
}
|
||||
|
||||
fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) {
|
||||
match self.register(token, info) {
|
||||
fn register_logged(&self, info: &mut ServerSocketInfo) {
|
||||
match self.register(info) {
|
||||
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
|
||||
Err(e) => error!("Can not register server socket {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
|
||||
self.poll.registry().deregister(&mut info.lst)
|
||||
}
|
||||
|
||||
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
|
||||
match self.deregister(info) {
|
||||
match self.poll.registry().deregister(&mut info.lst) {
|
||||
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
|
||||
Err(e) => {
|
||||
error!("Can not deregister server socket {}", e)
|
||||
@@ -345,7 +378,7 @@ impl Accept {
|
||||
}
|
||||
}
|
||||
|
||||
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
|
||||
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
|
||||
// This is a best effort implementation with following limitation:
|
||||
//
|
||||
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
|
||||
@@ -358,70 +391,23 @@ impl Accept {
|
||||
.iter_mut()
|
||||
// Take all timeout.
|
||||
// This is to prevent Accept::process_timer method re-register a socket afterwards.
|
||||
.map(|(_, info)| (info.timeout.take(), info))
|
||||
.map(|info| (info.timeout.take(), info))
|
||||
// Socket info with a timeout is already deregistered so skip them.
|
||||
.filter(|(timeout, _)| timeout.is_none())
|
||||
.for_each(|(_, info)| self.deregister_logged(info));
|
||||
}
|
||||
|
||||
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
|
||||
// Only operate when server is in a different backpressure than the given flag.
|
||||
if self.backpressure != on {
|
||||
self.backpressure = on;
|
||||
sockets
|
||||
.iter_mut()
|
||||
// Only operate on sockets without associated timeout.
|
||||
// Sockets with it should be handled by `accept` and `process_timer` methods.
|
||||
// They are already deregistered or need to be reregister in the future.
|
||||
.filter(|(_, info)| info.timeout.is_none())
|
||||
.for_each(|(token, info)| {
|
||||
if on {
|
||||
self.deregister_logged(info);
|
||||
} else {
|
||||
self.register_logged(token, info);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
|
||||
if self.backpressure {
|
||||
// send_connection would remove fault worker from handles.
|
||||
// worst case here is conn get dropped after all handles are gone.
|
||||
while let Err(c) = self.send_connection(sockets, conn) {
|
||||
conn = c
|
||||
}
|
||||
} else {
|
||||
while self.avail.available() {
|
||||
let next = self.next();
|
||||
let idx = next.idx();
|
||||
if next.available() {
|
||||
self.avail.set_available(idx, true);
|
||||
match self.send_connection(sockets, conn) {
|
||||
Ok(_) => return,
|
||||
Err(c) => conn = c,
|
||||
}
|
||||
} else {
|
||||
self.avail.set_available(idx, false);
|
||||
self.set_next();
|
||||
}
|
||||
}
|
||||
|
||||
// Sending Conn failed due to either all workers are in error or not available.
|
||||
// Enter backpressure state and try again.
|
||||
self.maybe_backpressure(sockets, true);
|
||||
self.accept_one(sockets, conn);
|
||||
}
|
||||
}
|
||||
|
||||
// Send connection to worker and handle error.
|
||||
fn send_connection(
|
||||
&mut self,
|
||||
sockets: &mut Slab<ServerSocketInfo>,
|
||||
conn: Conn,
|
||||
) -> Result<(), Conn> {
|
||||
match self.next().send(conn) {
|
||||
fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
|
||||
let next = self.next();
|
||||
match next.send(conn) {
|
||||
Ok(_) => {
|
||||
// Increment counter of WorkerHandle.
|
||||
// Set worker to unavailable with it hit max (Return false).
|
||||
if !next.inc_counter() {
|
||||
let idx = next.idx();
|
||||
self.avail.set_available(idx, false);
|
||||
}
|
||||
self.set_next();
|
||||
Ok(())
|
||||
}
|
||||
@@ -432,7 +418,6 @@ impl Accept {
|
||||
|
||||
if self.handles.is_empty() {
|
||||
error!("No workers");
|
||||
self.maybe_backpressure(sockets, true);
|
||||
// All workers are gone and Conn is nowhere to be sent.
|
||||
// Treat this situation as Ok and drop Conn.
|
||||
return Ok(());
|
||||
@@ -445,19 +430,38 @@ impl Accept {
|
||||
}
|
||||
}
|
||||
|
||||
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
|
||||
fn accept_one(&mut self, mut conn: Conn) {
|
||||
loop {
|
||||
let info = sockets
|
||||
.get_mut(token)
|
||||
.expect("ServerSocketInfo is removed from Slab");
|
||||
let next = self.next();
|
||||
let idx = next.idx();
|
||||
|
||||
if self.avail.get_available(idx) {
|
||||
match self.send_connection(conn) {
|
||||
Ok(_) => return,
|
||||
Err(c) => conn = c,
|
||||
}
|
||||
} else {
|
||||
self.avail.set_available(idx, false);
|
||||
self.set_next();
|
||||
|
||||
if !self.avail.available() {
|
||||
while let Err(c) = self.send_connection(conn) {
|
||||
conn = c;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
|
||||
while self.avail.available() {
|
||||
let info = &mut sockets[token];
|
||||
|
||||
match info.lst.accept() {
|
||||
Ok(io) => {
|
||||
let msg = Conn {
|
||||
io,
|
||||
token: info.token,
|
||||
};
|
||||
self.accept_one(sockets, msg);
|
||||
let conn = Conn { io, token };
|
||||
self.accept_one(conn);
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
|
||||
Err(ref e) if connection_error(e) => continue,
|
||||
@@ -485,11 +489,22 @@ impl Accept {
|
||||
}
|
||||
}
|
||||
|
||||
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
|
||||
sockets
|
||||
.iter_mut()
|
||||
.map(|info| info.token)
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
.for_each(|idx| self.accept(sockets, idx))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn next(&self) -> &WorkerHandleAccept {
|
||||
&self.handles[self.next]
|
||||
}
|
||||
|
||||
/// Set next worker handle that would accept connection.
|
||||
#[inline(always)]
|
||||
fn set_next(&mut self) {
|
||||
self.next = (self.next + 1) % self.handles.len();
|
||||
}
|
||||
|
Reference in New Issue
Block a user