1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 20:12:58 +01:00

fix switching to unavailable state for worker

This commit is contained in:
Nikolay Kim 2018-09-14 00:07:50 -07:00
parent ec7757f032
commit 9888c1c5e6

View File

@ -210,6 +210,7 @@ impl Future for Worker {
WorkerState::Unavailable(mut conns) => { WorkerState::Unavailable(mut conns) => {
match self.check_readiness() { match self.check_readiness() {
Ok(true) => { Ok(true) => {
trace!("Serveice is available");
self.state = WorkerState::Available; self.state = WorkerState::Available;
// process requests from wait queue // process requests from wait queue
@ -227,10 +228,12 @@ impl Future for Worker {
) )
} }
Ok(false) => { Ok(false) => {
trace!("Serveice is unavailable");
self.state = WorkerState::Unavailable(conns); self.state = WorkerState::Unavailable(conns);
return self.poll(); return self.poll();
} }
Err(idx) => { Err(idx) => {
trace!("Serveice failed, restarting");
self.state = WorkerState::Restarting( self.state = WorkerState::Restarting(
idx, idx,
self.factories[idx].create(), self.factories[idx].create(),
@ -247,6 +250,7 @@ impl Future for Worker {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Err(idx) => { Err(idx) => {
trace!("Serveice failed, restarting");
self.state = WorkerState::Restarting(idx, self.factories[idx].create()); self.state = WorkerState::Restarting(idx, self.factories[idx].create());
return self.poll(); return self.poll();
} }
@ -303,29 +307,34 @@ impl Future for Worker {
loop { loop {
match self.rx.poll() { match self.rx.poll() {
// handle incoming tcp stream // handle incoming tcp stream
Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => match self Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => {
.check_readiness() match self.check_readiness()
{ {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
spawn( spawn(
self.services[msg.handler.0] self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io)) .call(ServerMessage::Connect(msg.io))
.map(|val| { .map(|val| {
drop(guard); drop(guard);
val val
}), }),
); );
} continue
Ok(false) => { }
self.availability.set(false); Ok(false) => {
self.state = WorkerState::Unavailable(vec![msg]); trace!("Serveice is unsavailable");
} self.availability.set(false);
Err(idx) => { self.state = WorkerState::Unavailable(vec![msg]);
self.availability.set(false); }
self.state = Err(idx) => {
WorkerState::Restarting(idx, self.factories[idx].create()); trace!("Serveice failed, restarting");
self.availability.set(false);
self.state =
WorkerState::Restarting(idx, self.factories[idx].create());
}
} }
return self.poll();
}, },
// `StopWorker` message handler // `StopWorker` message handler
Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => { Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => {