From 3859e91799aae4a7fdaf667091ab1416a7e66f1e Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 4 Apr 2021 13:53:19 -0700 Subject: [PATCH] Use named type for WorkerState::Restarting and Shutdown (#317) --- actix-server/src/worker.rs | 89 +++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 9409dfb4..6417dd0b 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -34,7 +34,7 @@ pub(crate) struct WorkerCommand(Conn); /// and `false` if some connections still alive. pub(crate) struct StopCommand { graceful: bool, - result: oneshot::Sender, + tx: oneshot::Sender, } #[derive(Debug)] @@ -98,8 +98,8 @@ impl WorkerHandle { } pub fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (result, rx) = oneshot::channel(); - let _ = self.tx2.send(StopCommand { graceful, result }); + let (tx, rx) = oneshot::channel(); + let _ = self.tx2.send(StopCommand { graceful, tx }); rx } } @@ -221,7 +221,7 @@ impl ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, - services: Default::default(), + services: Vec::new(), availability, factories, state: Default::default(), @@ -272,11 +272,15 @@ impl ServerWorker { WorkerHandle::new(idx, tx1, tx2, avail) } - fn restart_service(&mut self, token: Token, idx: usize) { - let factory = &self.factories[idx]; + fn restart_service(&mut self, token: Token, factory_id: usize) { + let factory = &self.factories[factory_id]; trace!("Service {:?} failed, restarting", factory.name(token)); self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting(idx, token, factory.create()); + self.state = WorkerState::Restarting(Restart { + factory_id, + token, + fut: factory.create(), + }); } fn shutdown(&mut self, force: bool) { @@ -342,16 +346,24 @@ impl ServerWorker { enum WorkerState { Available, Unavailable, - Restarting( - usize, - Token, - LocalBoxFuture<'static, Result, ()>>, - ), - // Shutdown keep states necessary for server shutdown: - // Sleep for interval check the shutdown progress. - // Instant for the start time of shutdown. - // Sender for send back the shutdown outcome(force/grace) to StopCommand caller. - Shutdown(Pin>, Instant, oneshot::Sender), + Restarting(Restart), + Shutdown(Shutdown), +} + +struct Restart { + factory_id: usize, + token: Token, + fut: LocalBoxFuture<'static, Result, ()>>, +} + +// Shutdown keep states necessary for server shutdown: +// Sleep for interval check the shutdown progress. +// Instant for the start time of shutdown. +// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. +struct Shutdown { + timer: Pin>, + start_from: Instant, + tx: oneshot::Sender, } impl Default for WorkerState { @@ -367,27 +379,29 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(StopCommand { graceful, result })) = + if let Poll::Ready(Some(StopCommand { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { this.availability.set(false); let num = num_connections(); if num == 0 { info!("Shutting down worker, 0 connections"); - let _ = result.send(true); + let _ = tx.send(true); return Poll::Ready(()); } else if graceful { info!("Graceful worker shutdown, {} connections", num); this.shutdown(false); - let timer = Box::pin(sleep(Duration::from_secs(1))); - let start_from = Instant::now(); - this.state = WorkerState::Shutdown(timer, start_from, result); + this.state = WorkerState::Shutdown(Shutdown { + timer: Box::pin(sleep(Duration::from_secs(1))), + start_from: Instant::now(), + tx, + }); } else { info!("Force shutdown worker, {} connections", num); this.shutdown(true); - let _ = result.send(false); + let _ = tx.send(false); return Poll::Ready(()); } } @@ -405,11 +419,14 @@ impl Future for ServerWorker { self.poll(cx) } }, - WorkerState::Restarting(idx, token, ref mut fut) => { - let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + WorkerState::Restarting(ref mut restart) => { + let factory_id = restart.factory_id; + let token = restart.token; + + let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", - this.factories[idx].name(token) + this.factories[factory_id].name(token) ) }); @@ -421,7 +438,7 @@ impl Future for ServerWorker { trace!( "Service {:?} has been restarted", - this.factories[idx].name(token) + this.factories[factory_id].name(token) ); this.services[token.0].created(service); @@ -429,29 +446,29 @@ impl Future for ServerWorker { self.poll(cx) } - WorkerState::Shutdown(ref mut timer, ref start_from, _) => { + WorkerState::Shutdown(ref mut shutdown) => { // Wait for 1 second. - ready!(timer.as_mut().poll(cx)); + ready!(shutdown.timer.as_mut().poll(cx)); if num_connections() == 0 { // Graceful shutdown. - if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { - let _ = sender.send(true); + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(true); } Arbiter::current().stop(); Poll::Ready(()) - } else if start_from.elapsed() >= this.shutdown_timeout { + } else if shutdown.start_from.elapsed() >= this.shutdown_timeout { // Timeout forceful shutdown. - if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { - let _ = sender.send(false); + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(false); } Arbiter::current().stop(); Poll::Ready(()) } else { // Reset timer and wait for 1 second. let time = Instant::now() + Duration::from_secs(1); - timer.as_mut().reset(time); - timer.as_mut().poll(cx) + shutdown.timer.as_mut().reset(time); + shutdown.timer.as_mut().poll(cx) } } // actively poll stream and handle worker command