1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 03:42:59 +01:00

Fix bug where backpressure happen too early (#332)

This commit is contained in:
fakeshadow 2021-04-14 06:48:05 -07:00 committed by GitHub
parent e0fb67f646
commit d49ecf7203
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -83,9 +83,59 @@ struct Accept {
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
next: usize, next: usize,
avail: Availability,
backpressure: bool, backpressure: bool,
} }
/// Array of u128 with every bit as marker for a worker handle's availability.
struct Availability([u128; 4]);
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
}
impl Availability {
/// Check if any worker handle is available
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
};
if avail {
self.0[offset] |= 1 << idx as u128;
} else {
let shift = 1 << idx as u128;
debug_assert_ne!(self.0[offset] & shift, 0);
self.0[offset] ^= shift;
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx, true);
})
}
}
/// This function defines errors that are per-connection. Which basically /// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means /// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted. /// next connection might be ready to be accepted.
@ -116,6 +166,7 @@ impl Accept {
System::set_current(sys); System::set_current(sys);
let (mut accept, sockets) = let (mut accept, sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv); Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(sockets); accept.poll_with(sockets);
}) })
.unwrap(); .unwrap();
@ -148,12 +199,18 @@ impl Accept {
}); });
} }
let mut avail = Availability::default();
// Assume all handles are avail at construct time.
avail.set_available_all(&handles);
let accept = Accept { let accept = Accept {
poll, poll,
waker, waker,
handles, handles,
srv, srv,
next: 0, next: 0,
avail,
backpressure: false, backpressure: false,
}; };
@ -166,12 +223,8 @@ impl Accept {
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() { match e.kind() {
std::io::ErrorKind::Interrupted => { std::io::ErrorKind::Interrupted => continue,
continue; _ => panic!("Poll error: {}", e),
}
_ => {
panic!("Poll error: {}", e);
}
} }
} }
@ -190,6 +243,8 @@ impl Accept {
// from backpressure. // from backpressure.
Some(WakerInterest::WorkerAvailable) => { Some(WakerInterest::WorkerAvailable) => {
drop(guard); drop(guard);
// Assume all worker are avail as no worker index returned.
self.avail.set_available_all(&self.handles);
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
@ -197,6 +252,7 @@ impl Accept {
drop(guard); drop(guard);
// maybe we want to recover from a backpressure. // maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(handle.idx, true);
self.handles.push(handle); self.handles.push(handle);
} }
// got timer interest and it's time to try register socket(s) again // got timer interest and it's time to try register socket(s) again
@ -342,27 +398,25 @@ impl Accept {
if self.backpressure { if self.backpressure {
// send_connection would remove fault worker from handles. // send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone. // worst case here is conn get dropped after all handles are gone.
while !self.handles.is_empty() { while let Err(c) = self.send_connection(sockets, conn) {
match self.send_connection(sockets, conn) { conn = c
Ok(_) => return,
Err(c) => conn = c,
}
} }
} else { } else {
// Do one round and try to send conn to all workers until it succeed. while self.avail.available() {
// Start from self.next. let next = self.next();
let mut idx = 0; let idx = next.idx;
while idx < self.handles.len() { if next.available() {
idx += 1; self.avail.set_available(idx, true);
if self.handles[self.next].available() {
match self.send_connection(sockets, conn) { match self.send_connection(sockets, conn) {
Ok(_) => return, Ok(_) => return,
Err(c) => conn = c, Err(c) => conn = c,
} }
} else { } else {
self.avail.set_available(idx, false);
self.set_next(); self.set_next();
} }
} }
// Sending Conn failed due to either all workers are in error or not available. // Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again. // Enter backpressure state and try again.
self.maybe_backpressure(sockets, true); self.maybe_backpressure(sockets, true);
@ -370,28 +424,22 @@ impl Accept {
} }
} }
// Set next worker handle that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
// Send connection to worker and handle error. // Send connection to worker and handle error.
fn send_connection( fn send_connection(
&mut self, &mut self,
sockets: &mut Slab<ServerSocketInfo>, sockets: &mut Slab<ServerSocketInfo>,
conn: Conn, conn: Conn,
) -> Result<(), Conn> { ) -> Result<(), Conn> {
match self.handles[self.next].send(conn) { match self.next().send(conn) {
Ok(_) => { Ok(_) => {
self.set_next(); self.set_next();
Ok(()) Ok(())
} }
Err(conn) => { Err(conn) => {
// worker lost contact and could be gone. a message is sent to // Worker thread is error and could be gone.
// `ServerBuilder` future to notify it a new worker should be made. // Remove worker handle and notify `ServerBuilder`.
// after that remove the fault worker and enter backpressure if necessary. self.remove_next();
self.srv.worker_faulted(self.handles[self.next].idx);
self.handles.swap_remove(self.next);
if self.handles.is_empty() { if self.handles.is_empty() {
error!("No workers"); error!("No workers");
self.maybe_backpressure(sockets, true); self.maybe_backpressure(sockets, true);
@ -401,6 +449,7 @@ impl Accept {
} else if self.handles.len() <= self.next { } else if self.handles.len() <= self.next {
self.next = 0; self.next = 0;
} }
Err(conn) Err(conn)
} }
} }
@ -445,4 +494,92 @@ impl Accept {
}; };
} }
} }
fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next]
}
/// Set next worker handle that would accept connection.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
/// Remove next worker handle that fail to accept connection.
fn remove_next(&mut self) {
let handle = self.handles.swap_remove(self.next);
let idx = handle.idx;
// A message is sent to `ServerBuilder` future to notify it a new worker
// should be made.
self.srv.worker_faulted(idx);
self.avail.set_available(idx, false);
}
}
#[cfg(test)]
mod test {
use super::Availability;
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
aval.set_available(idx, true);
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
#[should_panic]
fn double_set_unavailable() {
let mut aval = Availability::default();
aval.set_available(233, false);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
} }