diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index dbf66b20..23ba616c 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -128,7 +128,7 @@ impl Availability { /// This would result in a re-check on all workers' availability. fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { handles.iter().for_each(|handle| { - self.set_available(handle.idx, true); + self.set_available(handle.idx(), true); }) } } @@ -248,7 +248,7 @@ impl Accept { drop(guard); // maybe we want to recover from a backpressure. self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(handle.idx, true); + self.avail.set_available(handle.idx(), true); self.handles.push(handle); } // got timer interest and it's time to try register socket(s) again @@ -400,7 +400,7 @@ impl Accept { } else { while self.avail.available() { let next = self.next(); - let idx = next.idx; + let idx = next.idx(); if next.available() { self.avail.set_available(idx, true); match self.send_connection(sockets, conn) { @@ -503,7 +503,7 @@ impl Accept { /// Remove next worker handle that fail to accept connection. fn remove_next(&mut self) { let handle = self.handles.swap_remove(self.next); - let idx = handle.idx; + let idx = handle.idx(); // A message is sent to `ServerBuilder` future to notify it a new worker // should be made. self.srv.worker_faulted(idx); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 3d499382..7bc211b1 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -47,11 +47,7 @@ fn handle_pair( tx2: UnboundedSender, avail: WorkerAvailability, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let accept = WorkerHandleAccept { - idx, - tx: tx1, - avail, - }; + let accept = WorkerHandleAccept { tx: tx1, avail }; let server = WorkerHandleServer { idx, tx: tx2 }; @@ -63,16 +59,22 @@ fn handle_pair( /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { - pub idx: usize, tx: UnboundedSender, avail: WorkerAvailability, } impl WorkerHandleAccept { + #[inline(always)] + pub(crate) fn idx(&self) -> usize { + self.avail.idx + } + + #[inline(always)] pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { self.tx.send(msg).map_err(|msg| msg.0) } + #[inline(always)] pub(crate) fn available(&self) -> bool { self.avail.available() } @@ -110,13 +112,18 @@ impl WorkerAvailability { } } + #[inline(always)] pub fn available(&self) -> bool { self.available.load(Ordering::Acquire) } pub fn set(&self, val: bool) { - let old = self.available.swap(val, Ordering::Release); - // notify the accept on switched to available. + // Ordering: + // + // There could be multiple set calls happen in one ::poll. + // Order is important between them. + let old = self.available.swap(val, Ordering::AcqRel); + // Notify the accept on switched to available. if !old && val { self.waker.wake(WakerInterest::WorkerAvailable(self.idx)); } @@ -374,6 +381,10 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { + // Set availability to true so if accept try to send connection to this worker + // it would find worker is gone and remove it. + // This is helpful when worker is dropped unexpected. + self.availability.set(true); // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); }