diff --git a/src/server/worker.rs b/src/server/worker.rs index 889a2ac5..33d3bc79 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -114,6 +114,7 @@ pub(crate) struct Worker { services: Vec, availability: WorkerAvailability, conns: Connections, + factories: Vec>, } impl Actor for Worker { @@ -122,17 +123,23 @@ impl Actor for Worker { impl Worker { pub(crate) fn new( - ctx: &mut Context, services: Vec>, + ctx: &mut Context, factories: Vec>, availability: WorkerAvailability, ) -> Self { + availability.set(false); let wrk = MAX_CONNS_COUNTER.with(|conns| Worker { availability, + factories, services: Vec::new(), conns: conns.clone(), }); + let mut fut = Vec::new(); + for factory in &wrk.factories { + fut.push(factory.create()); + } ctx.wait( - future::join_all(services.into_iter().map(|s| s.create())) + future::join_all(fut) .into_actor(&wrk) .map_err(|e, _, ctx| { error!("Can not start worker: {:?}", e); @@ -140,8 +147,13 @@ impl Worker { ctx.stop(); }).and_then(|services, act, ctx| { act.services.extend(services); - act.availability.set(true); - ctx.spawn(CheckReadiness(true)); + let mut readiness = CheckReadiness { + avail: true, + idx: 0, + fut: None, + }; + let _ = readiness.poll(act, ctx); + ctx.spawn(readiness); fut::ok(()) }), ); @@ -226,26 +238,55 @@ impl Handler for Worker { } } -struct CheckReadiness(bool); +struct CheckReadiness { + avail: bool, + idx: usize, + fut: Option>>, +} impl ActorFuture for CheckReadiness { type Item = (); type Error = (); type Actor = Worker; - fn poll(&mut self, act: &mut Worker, _: &mut Context) -> Poll<(), ()> { - let mut val = act.conns.check(); - if val { - for service in &mut act.services { - if let Ok(Async::NotReady) = service.poll_ready() { - val = false; - break; + fn poll(&mut self, act: &mut Worker, ctx: &mut Context) -> Poll<(), ()> { + if self.fut.is_some() { + match self.fut.as_mut().unwrap().poll() { + Ok(Async::Ready(service)) => { + trace!("Service has been restarted"); + act.services[self.idx] = service; + self.fut.take(); + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_) => { + panic!("Can not restart service"); } } } - if self.0 != val { - self.0 = val; - act.availability.set(val); + + let mut ready = act.conns.check(); + if ready { + // check if service is restarting + let mut failed = None; + for (idx, service) in act.services.iter_mut().enumerate() { + match service.poll_ready() { + Ok(Async::Ready(_)) => (), + Ok(Async::NotReady) => ready = false, + Err(_) => { + error!("Service readiness check returned error, restarting"); + failed = Some(idx); + } + } + } + if let Some(idx) = failed { + self.idx = idx; + self.fut = Some(act.factories[idx].create()); + return self.poll(act, ctx); + } + } + if self.avail != ready { + self.avail = ready; + act.availability.set(ready); } Ok(Async::NotReady) }