From 9888c1c5e6d9338534a11b85ea34e8fcbceb92a1 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 14 Sep 2018 00:07:50 -0700 Subject: [PATCH] fix switching to unavailable state for worker --- src/server/worker.rs | 53 ++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/src/server/worker.rs b/src/server/worker.rs index 7d5a1fc0..eb8f7e48 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -210,6 +210,7 @@ impl Future for Worker { WorkerState::Unavailable(mut conns) => { match self.check_readiness() { Ok(true) => { + trace!("Serveice is available"); self.state = WorkerState::Available; // process requests from wait queue @@ -227,10 +228,12 @@ impl Future for Worker { ) } Ok(false) => { + trace!("Serveice is unavailable"); self.state = WorkerState::Unavailable(conns); return self.poll(); } Err(idx) => { + trace!("Serveice failed, restarting"); self.state = WorkerState::Restarting( idx, self.factories[idx].create(), @@ -247,6 +250,7 @@ impl Future for Worker { return Ok(Async::NotReady); } Err(idx) => { + trace!("Serveice failed, restarting"); self.state = WorkerState::Restarting(idx, self.factories[idx].create()); return self.poll(); } @@ -303,29 +307,34 @@ impl Future for Worker { loop { match self.rx.poll() { // handle incoming tcp stream - Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => match self - .check_readiness() - { - Ok(true) => { - let guard = self.conns.get(); - spawn( - self.services[msg.handler.0] - .call(ServerMessage::Connect(msg.io)) - .map(|val| { - drop(guard); - val - }), - ); - } - Ok(false) => { - self.availability.set(false); - self.state = WorkerState::Unavailable(vec![msg]); - } - Err(idx) => { - self.availability.set(false); - self.state = - WorkerState::Restarting(idx, self.factories[idx].create()); + Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => { + match self.check_readiness() + { + Ok(true) => { + let guard = self.conns.get(); + spawn( + self.services[msg.handler.0] + .call(ServerMessage::Connect(msg.io)) + .map(|val| { + drop(guard); + val + }), + ); + continue + } + Ok(false) => { + trace!("Serveice is unsavailable"); + self.availability.set(false); + self.state = WorkerState::Unavailable(vec![msg]); + } + Err(idx) => { + trace!("Serveice failed, restarting"); + self.availability.set(false); + self.state = + WorkerState::Restarting(idx, self.factories[idx].create()); + } } + return self.poll(); }, // `StopWorker` message handler Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => {