1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-04 09:21:55 +01:00

Use named type for WorkerState::Restarting and Shutdown (#317)

This commit is contained in:
fakeshadow 2021-04-04 13:53:19 -07:00 committed by GitHub
parent 8aade720ed
commit 3859e91799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -34,7 +34,7 @@ pub(crate) struct WorkerCommand(Conn);
/// and `false` if some connections still alive. /// and `false` if some connections still alive.
pub(crate) struct StopCommand { pub(crate) struct StopCommand {
graceful: bool, graceful: bool,
result: oneshot::Sender<bool>, tx: oneshot::Sender<bool>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -98,8 +98,8 @@ impl WorkerHandle {
} }
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (result, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx2.send(StopCommand { graceful, result }); let _ = self.tx2.send(StopCommand { graceful, tx });
rx rx
} }
} }
@ -221,7 +221,7 @@ impl ServerWorker {
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx, rx,
rx2, rx2,
services: Default::default(), services: Vec::new(),
availability, availability,
factories, factories,
state: Default::default(), state: Default::default(),
@ -272,11 +272,15 @@ impl ServerWorker {
WorkerHandle::new(idx, tx1, tx2, avail) WorkerHandle::new(idx, tx1, tx2, avail)
} }
fn restart_service(&mut self, token: Token, idx: usize) { fn restart_service(&mut self, token: Token, factory_id: usize) {
let factory = &self.factories[idx]; let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(token)); trace!("Service {:?} failed, restarting", factory.name(token));
self.services[token.0].status = WorkerServiceStatus::Restarting; self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(idx, token, factory.create()); self.state = WorkerState::Restarting(Restart {
factory_id,
token,
fut: factory.create(),
});
} }
fn shutdown(&mut self, force: bool) { fn shutdown(&mut self, force: bool) {
@ -342,16 +346,24 @@ impl ServerWorker {
enum WorkerState { enum WorkerState {
Available, Available,
Unavailable, Unavailable,
Restarting( Restarting(Restart),
usize, Shutdown(Shutdown),
Token, }
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
), struct Restart {
// Shutdown keep states necessary for server shutdown: factory_id: usize,
// Sleep for interval check the shutdown progress. token: Token,
// Instant for the start time of shutdown. fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. }
Shutdown(Pin<Box<Sleep>>, Instant, oneshot::Sender<bool>),
// Shutdown keep states necessary for server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
struct Shutdown {
timer: Pin<Box<Sleep>>,
start_from: Instant,
tx: oneshot::Sender<bool>,
} }
impl Default for WorkerState { impl Default for WorkerState {
@ -367,27 +379,29 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut(); let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) = if let Poll::Ready(Some(StopCommand { graceful, tx })) =
Pin::new(&mut this.rx2).poll_recv(cx) Pin::new(&mut this.rx2).poll_recv(cx)
{ {
this.availability.set(false); this.availability.set(false);
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
info!("Shutting down worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = result.send(true); let _ = tx.send(true);
return Poll::Ready(()); return Poll::Ready(());
} else if graceful { } else if graceful {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
this.shutdown(false); this.shutdown(false);
let timer = Box::pin(sleep(Duration::from_secs(1))); this.state = WorkerState::Shutdown(Shutdown {
let start_from = Instant::now(); timer: Box::pin(sleep(Duration::from_secs(1))),
this.state = WorkerState::Shutdown(timer, start_from, result); start_from: Instant::now(),
tx,
});
} else { } else {
info!("Force shutdown worker, {} connections", num); info!("Force shutdown worker, {} connections", num);
this.shutdown(true); this.shutdown(true);
let _ = result.send(false); let _ = tx.send(false);
return Poll::Ready(()); return Poll::Ready(());
} }
} }
@ -405,11 +419,14 @@ impl Future for ServerWorker {
self.poll(cx) self.poll(cx)
} }
}, },
WorkerState::Restarting(idx, token, ref mut fut) => { WorkerState::Restarting(ref mut restart) => {
let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { let factory_id = restart.factory_id;
let token = restart.token;
let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| {
panic!( panic!(
"Can not restart {:?} service", "Can not restart {:?} service",
this.factories[idx].name(token) this.factories[factory_id].name(token)
) )
}); });
@ -421,7 +438,7 @@ impl Future for ServerWorker {
trace!( trace!(
"Service {:?} has been restarted", "Service {:?} has been restarted",
this.factories[idx].name(token) this.factories[factory_id].name(token)
); );
this.services[token.0].created(service); this.services[token.0].created(service);
@ -429,29 +446,29 @@ impl Future for ServerWorker {
self.poll(cx) self.poll(cx)
} }
WorkerState::Shutdown(ref mut timer, ref start_from, _) => { WorkerState::Shutdown(ref mut shutdown) => {
// Wait for 1 second. // Wait for 1 second.
ready!(timer.as_mut().poll(cx)); ready!(shutdown.timer.as_mut().poll(cx));
if num_connections() == 0 { if num_connections() == 0 {
// Graceful shutdown. // Graceful shutdown.
if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = sender.send(true); let _ = shutdown.tx.send(true);
} }
Arbiter::current().stop(); Arbiter::current().stop();
Poll::Ready(()) Poll::Ready(())
} else if start_from.elapsed() >= this.shutdown_timeout { } else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
// Timeout forceful shutdown. // Timeout forceful shutdown.
if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = sender.send(false); let _ = shutdown.tx.send(false);
} }
Arbiter::current().stop(); Arbiter::current().stop();
Poll::Ready(()) Poll::Ready(())
} else { } else {
// Reset timer and wait for 1 second. // Reset timer and wait for 1 second.
let time = Instant::now() + Duration::from_secs(1); let time = Instant::now() + Duration::from_secs(1);
timer.as_mut().reset(time); shutdown.timer.as_mut().reset(time);
timer.as_mut().poll(cx) shutdown.timer.as_mut().poll(cx)
} }
} }
// actively poll stream and handle worker command // actively poll stream and handle worker command