1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 18:02:58 +01:00

Refactor WorkerState::Shutdown (#310)

This commit is contained in:
fakeshadow 2021-04-04 12:34:52 -07:00 committed by GitHub
parent 8079c50ddb
commit 8aade720ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,17 +1,27 @@
use std::future::Future; use std::{
use std::pin::Pin; future::Future,
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; mem,
use std::sync::Arc; pin::Pin,
use std::task::{Context, Poll}; sync::{
use std::time::Duration; atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use actix_rt::time::{sleep, Sleep}; use actix_rt::{
use actix_rt::{spawn, Arbiter}; spawn,
time::{sleep, Instant, Sleep},
Arbiter,
};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace}; use log::{error, info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::{
use tokio::sync::oneshot; mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream; use crate::socket::MioStream;
@ -132,7 +142,7 @@ pub(crate) struct ServerWorker {
conns: Counter, conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState, state: WorkerState,
config: ServerWorkerConfig, shutdown_timeout: Duration,
} }
struct WorkerService { struct WorkerService {
@ -211,12 +221,12 @@ impl ServerWorker {
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx, rx,
rx2, rx2,
services: Default::default(),
availability, availability,
factories, factories,
config, state: Default::default(),
services: Vec::new(), shutdown_timeout: config.shutdown_timeout,
conns: conns.clone(), conns: conns.clone(),
state: WorkerState::Unavailable,
}); });
let fut = wrk let fut = wrk
@ -337,53 +347,61 @@ enum WorkerState {
Token, Token,
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>, LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
), ),
Shutdown( // Shutdown keep states necessary for server shutdown:
Pin<Box<Sleep>>, // Sleep for interval check the shutdown progress.
Pin<Box<Sleep>>, // Instant for the start time of shutdown.
Option<oneshot::Sender<bool>>, // Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
), Shutdown(Pin<Box<Sleep>>, Instant, oneshot::Sender<bool>),
}
impl Default for WorkerState {
fn default() -> Self {
Self::Unavailable
}
} }
impl Future for ServerWorker { impl Future for ServerWorker {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) = if let Poll::Ready(Some(StopCommand { graceful, result })) =
Pin::new(&mut self.rx2).poll_recv(cx) Pin::new(&mut this.rx2).poll_recv(cx)
{ {
self.availability.set(false); this.availability.set(false);
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
info!("Shutting down worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = result.send(true); let _ = result.send(true);
return Poll::Ready(()); return Poll::Ready(());
} else if graceful { } else if graceful {
self.shutdown(false);
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( this.shutdown(false);
Box::pin(sleep(Duration::from_secs(1))),
Box::pin(sleep(self.config.shutdown_timeout)), let timer = Box::pin(sleep(Duration::from_secs(1)));
Some(result), let start_from = Instant::now();
); this.state = WorkerState::Shutdown(timer, start_from, result);
} else { } else {
info!("Force shutdown worker, {} connections", num); info!("Force shutdown worker, {} connections", num);
self.shutdown(true); this.shutdown(true);
let _ = result.send(false); let _ = result.send(false);
return Poll::Ready(()); return Poll::Ready(());
} }
} }
match self.state { match this.state {
WorkerState::Unavailable => match self.check_readiness(cx) { WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => { Ok(true) => {
self.state = WorkerState::Available; this.state = WorkerState::Available;
self.availability.set(true); this.availability.set(true);
self.poll(cx) self.poll(cx)
} }
Ok(false) => Poll::Pending, Ok(false) => Poll::Pending,
Err((token, idx)) => { Err((token, idx)) => {
self.restart_service(token, idx); this.restart_service(token, idx);
self.poll(cx) self.poll(cx)
} }
}, },
@ -391,7 +409,7 @@ impl Future for ServerWorker {
let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
panic!( panic!(
"Can not restart {:?} service", "Can not restart {:?} service",
self.factories[idx].name(token) this.factories[idx].name(token)
) )
}); });
@ -403,60 +421,61 @@ impl Future for ServerWorker {
trace!( trace!(
"Service {:?} has been restarted", "Service {:?} has been restarted",
self.factories[idx].name(token) this.factories[idx].name(token)
); );
self.services[token.0].created(service); this.services[token.0].created(service);
self.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
self.poll(cx) self.poll(cx)
} }
WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { WorkerState::Shutdown(ref mut timer, ref start_from, _) => {
let num = num_connections(); // Wait for 1 second.
if num == 0 { ready!(timer.as_mut().poll(cx));
let _ = tx.take().unwrap().send(true);
if num_connections() == 0 {
// Graceful shutdown.
if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) {
let _ = sender.send(true);
}
Arbiter::current().stop(); Arbiter::current().stop();
return Poll::Ready(()); Poll::Ready(())
} } else if start_from.elapsed() >= this.shutdown_timeout {
// Timeout forceful shutdown.
// check graceful timeout if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) {
if Pin::new(t2).poll(cx).is_ready() { let _ = sender.send(false);
let _ = tx.take().unwrap().send(false); }
self.shutdown(true);
Arbiter::current().stop(); Arbiter::current().stop();
return Poll::Ready(()); Poll::Ready(())
} else {
// Reset timer and wait for 1 second.
let time = Instant::now() + Duration::from_secs(1);
timer.as_mut().reset(time);
timer.as_mut().poll(cx)
} }
// sleep for 1 second and then check again
if t1.as_mut().poll(cx).is_ready() {
*t1 = Box::pin(sleep(Duration::from_secs(1)));
let _ = t1.as_mut().poll(cx);
}
Poll::Pending
} }
// actively poll stream and handle worker command // actively poll stream and handle worker command
WorkerState::Available => loop { WorkerState::Available => loop {
match self.check_readiness(cx) { match this.check_readiness(cx) {
Ok(true) => {} Ok(true) => {}
Ok(false) => { Ok(false) => {
trace!("Worker is unavailable"); trace!("Worker is unavailable");
self.availability.set(false); this.availability.set(false);
self.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
return self.poll(cx); return self.poll(cx);
} }
Err((token, idx)) => { Err((token, idx)) => {
self.restart_service(token, idx); this.restart_service(token, idx);
self.availability.set(false); this.availability.set(false);
return self.poll(cx); return self.poll(cx);
} }
} }
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream // handle incoming io stream
Some(WorkerCommand(msg)) => { Some(WorkerCommand(msg)) => {
let guard = self.conns.get(); let guard = this.conns.get();
let _ = self.services[msg.token.0].service.call((guard, msg.io)); let _ = this.services[msg.token.0].service.call((guard, msg.io));
} }
None => return Poll::Ready(()), None => return Poll::Ready(()),
}; };