1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 20:01:48 +01:00

Fix memory ordering of WorkerAvailability (#340)

This commit is contained in:
fakeshadow 2021-04-16 03:20:08 -07:00 committed by GitHub
parent bd48908792
commit 19468feef8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 12 deletions

View File

@ -128,7 +128,7 @@ impl Availability {
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx, true);
self.set_available(handle.idx(), true);
})
}
}
@ -248,7 +248,7 @@ impl Accept {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(handle.idx, true);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
}
// got timer interest and it's time to try register socket(s) again
@ -400,7 +400,7 @@ impl Accept {
} else {
while self.avail.available() {
let next = self.next();
let idx = next.idx;
let idx = next.idx();
if next.available() {
self.avail.set_available(idx, true);
match self.send_connection(sockets, conn) {
@ -503,7 +503,7 @@ impl Accept {
/// Remove next worker handle that fail to accept connection.
fn remove_next(&mut self) {
let handle = self.handles.swap_remove(self.next);
let idx = handle.idx;
let idx = handle.idx();
// A message is sent to `ServerBuilder` future to notify it a new worker
// should be made.
self.srv.worker_faulted(idx);

View File

@ -47,11 +47,7 @@ fn handle_pair(
tx2: UnboundedSender<Stop>,
avail: WorkerAvailability,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept {
idx,
tx: tx1,
avail,
};
let accept = WorkerHandleAccept { tx: tx1, avail };
let server = WorkerHandleServer { idx, tx: tx2 };
@ -63,16 +59,22 @@ fn handle_pair(
///
/// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept {
pub idx: usize,
tx: UnboundedSender<Conn>,
avail: WorkerAvailability,
}
impl WorkerHandleAccept {
#[inline(always)]
pub(crate) fn idx(&self) -> usize {
self.avail.idx
}
#[inline(always)]
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0)
}
#[inline(always)]
pub(crate) fn available(&self) -> bool {
self.avail.available()
}
@ -110,13 +112,18 @@ impl WorkerAvailability {
}
}
#[inline(always)]
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release);
// notify the accept on switched to available.
// Ordering:
//
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
// Order is important between them.
let old = self.available.swap(val, Ordering::AcqRel);
// Notify the accept on switched to available.
if !old && val {
self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
}
@ -374,6 +381,10 @@ impl Default for WorkerState {
impl Drop for ServerWorker {
fn drop(&mut self) {
// Set availability to true so if accept try to send connection to this worker
// it would find worker is gone and remove it.
// This is helpful when worker is dropped unexpected.
self.availability.set(true);
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
}