From bd48908792c57f8d55ca20d058803fcf7e7d234d Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 15 Apr 2021 21:59:10 -0700 Subject: [PATCH] Return worker index in WakerInterest::WorkerAvailable (#337) --- actix-server/src/accept.rs | 5 ++--- actix-server/src/builder.rs | 2 +- actix-server/src/waker_queue.rs | 2 +- actix-server/src/worker.rs | 6 ++++-- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5b152fb2..dbf66b20 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -238,11 +238,10 @@ impl Accept { match guard.pop_front() { // worker notify it becomes available. we may want to recover // from backpressure. - Some(WakerInterest::WorkerAvailable) => { + Some(WakerInterest::WorkerAvailable(idx)) => { drop(guard); - // Assume all worker are avail as no worker index returned. - self.avail.set_available_all(&self.handles); self.maybe_backpressure(&mut sockets, false); + self.avail.set_available(idx, true); } // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index aa18bb22..66aba10c 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -320,7 +320,7 @@ impl ServerBuilder { idx: usize, waker: WakerQueue, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let avail = WorkerAvailability::new(waker); + let avail = WorkerAvailability::new(idx, waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); ServerWorker::start(idx, services, avail, self.worker_config) diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 8aa493aa..3f8669d4 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -72,7 +72,7 @@ impl WakerQueue { pub(crate) enum WakerInterest { /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker /// available and can accept new tasks. - WorkerAvailable, + WorkerAvailable(usize), /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `ServerCommand` and notify `Accept` to do exactly these tasks. Pause, diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 65951345..3d499382 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -96,13 +96,15 @@ impl WorkerHandleServer { #[derive(Clone)] pub(crate) struct WorkerAvailability { + idx: usize, waker: WakerQueue, available: Arc, } impl WorkerAvailability { - pub fn new(waker: WakerQueue) -> Self { + pub fn new(idx: usize, waker: WakerQueue) -> Self { WorkerAvailability { + idx, waker, available: Arc::new(AtomicBool::new(false)), } @@ -116,7 +118,7 @@ impl WorkerAvailability { let old = self.available.swap(val, Ordering::Release); // notify the accept on switched to available. if !old && val { - self.waker.wake(WakerInterest::WorkerAvailable); + self.waker.wake(WakerInterest::WorkerAvailable(self.idx)); } } }