From d49ecf72036a42da406a8fcf1009b78bf42415ee Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 14 Apr 2021 06:48:05 -0700 Subject: [PATCH] Fix bug where backpressure happen too early (#332) --- actix-server/src/accept.rs | 193 +++++++++++++++++++++++++++++++------ 1 file changed, 165 insertions(+), 28 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5b9f99c7..026bcc37 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -83,9 +83,59 @@ struct Accept { handles: Vec, srv: Server, next: usize, + avail: Availability, backpressure: bool, } +/// Array of u128 with every bit as marker for a worker handle's availability. +struct Availability([u128; 4]); + +impl Default for Availability { + fn default() -> Self { + Self([0; 4]) + } +} + +impl Availability { + /// Check if any worker handle is available + fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Set worker handle available state by index. + fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + }; + + if avail { + self.0[offset] |= 1 << idx as u128; + } else { + let shift = 1 << idx as u128; + + debug_assert_ne!(self.0[offset] & shift, 0); + + self.0[offset] ^= shift; + } + } + + /// Set all worker handle to available state. + /// This would result in a re-check on all workers' availability. + fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { + handles.iter().for_each(|handle| { + self.set_available(handle.idx, true); + }) + } +} + /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means /// next connection might be ready to be accepted. @@ -116,6 +166,7 @@ impl Accept { System::set_current(sys); let (mut accept, sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); + accept.poll_with(sockets); }) .unwrap(); @@ -148,12 +199,18 @@ impl Accept { }); } + let mut avail = Availability::default(); + + // Assume all handles are avail at construct time. + avail.set_available_all(&handles); + let accept = Accept { poll, waker, handles, srv, next: 0, + avail, backpressure: false, }; @@ -166,12 +223,8 @@ impl Accept { loop { if let Err(e) = self.poll.poll(&mut events, None) { match e.kind() { - std::io::ErrorKind::Interrupted => { - continue; - } - _ => { - panic!("Poll error: {}", e); - } + std::io::ErrorKind::Interrupted => continue, + _ => panic!("Poll error: {}", e), } } @@ -190,6 +243,8 @@ impl Accept { // from backpressure. Some(WakerInterest::WorkerAvailable) => { drop(guard); + // Assume all worker are avail as no worker index returned. + self.avail.set_available_all(&self.handles); self.maybe_backpressure(&mut sockets, false); } // a new worker thread is made and it's handle would be added to Accept @@ -197,6 +252,7 @@ impl Accept { drop(guard); // maybe we want to recover from a backpressure. self.maybe_backpressure(&mut sockets, false); + self.avail.set_available(handle.idx, true); self.handles.push(handle); } // got timer interest and it's time to try register socket(s) again @@ -342,27 +398,25 @@ impl Accept { if self.backpressure { // send_connection would remove fault worker from handles. // worst case here is conn get dropped after all handles are gone. - while !self.handles.is_empty() { - match self.send_connection(sockets, conn) { - Ok(_) => return, - Err(c) => conn = c, - } + while let Err(c) = self.send_connection(sockets, conn) { + conn = c } } else { - // Do one round and try to send conn to all workers until it succeed. - // Start from self.next. - let mut idx = 0; - while idx < self.handles.len() { - idx += 1; - if self.handles[self.next].available() { + while self.avail.available() { + let next = self.next(); + let idx = next.idx; + if next.available() { + self.avail.set_available(idx, true); match self.send_connection(sockets, conn) { Ok(_) => return, Err(c) => conn = c, } } else { + self.avail.set_available(idx, false); self.set_next(); } } + // Sending Conn failed due to either all workers are in error or not available. // Enter backpressure state and try again. self.maybe_backpressure(sockets, true); @@ -370,28 +424,22 @@ impl Accept { } } - // Set next worker handle that would accept work. - fn set_next(&mut self) { - self.next = (self.next + 1) % self.handles.len(); - } - // Send connection to worker and handle error. fn send_connection( &mut self, sockets: &mut Slab, conn: Conn, ) -> Result<(), Conn> { - match self.handles[self.next].send(conn) { + match self.next().send(conn) { Ok(_) => { self.set_next(); Ok(()) } Err(conn) => { - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made. - // after that remove the fault worker and enter backpressure if necessary. - self.srv.worker_faulted(self.handles[self.next].idx); - self.handles.swap_remove(self.next); + // Worker thread is error and could be gone. + // Remove worker handle and notify `ServerBuilder`. + self.remove_next(); + if self.handles.is_empty() { error!("No workers"); self.maybe_backpressure(sockets, true); @@ -401,6 +449,7 @@ impl Accept { } else if self.handles.len() <= self.next { self.next = 0; } + Err(conn) } } @@ -445,4 +494,92 @@ impl Accept { }; } } + + fn next(&self) -> &WorkerHandleAccept { + &self.handles[self.next] + } + + /// Set next worker handle that would accept connection. + fn set_next(&mut self) { + self.next = (self.next + 1) % self.handles.len(); + } + + /// Remove next worker handle that fail to accept connection. + fn remove_next(&mut self) { + let handle = self.handles.swap_remove(self.next); + let idx = handle.idx; + // A message is sent to `ServerBuilder` future to notify it a new worker + // should be made. + self.srv.worker_faulted(idx); + self.avail.set_available(idx, false); + } +} + +#[cfg(test)] +mod test { + use super::Availability; + + fn single(aval: &mut Availability, idx: usize) { + aval.set_available(idx, true); + assert!(aval.available()); + + aval.set_available(idx, true); + + aval.set_available(idx, false); + assert!(!aval.available()); + } + + fn multi(aval: &mut Availability, mut idx: Vec) { + idx.iter().for_each(|idx| aval.set_available(*idx, true)); + + assert!(aval.available()); + + while let Some(idx) = idx.pop() { + assert!(aval.available()); + aval.set_available(idx, false); + } + + assert!(!aval.available()); + } + + #[test] + fn availability() { + let mut aval = Availability::default(); + + single(&mut aval, 1); + single(&mut aval, 128); + single(&mut aval, 256); + single(&mut aval, 511); + + let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); + + multi(&mut aval, idx); + + multi(&mut aval, (0..511).collect()) + } + + #[test] + #[should_panic] + fn overflow() { + let mut aval = Availability::default(); + single(&mut aval, 512); + } + + #[test] + #[should_panic] + fn double_set_unavailable() { + let mut aval = Availability::default(); + aval.set_available(233, false); + } + + #[test] + fn pin_point() { + let mut aval = Availability::default(); + + aval.set_available(438, true); + + aval.set_available(479, true); + + assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); + } }