diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index bd28ccda..9409dfb4 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,17 +1,27 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; +use std::{ + future::Future, + mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; -use actix_rt::time::{sleep, Sleep}; -use actix_rt::{spawn, Arbiter}; +use actix_rt::{ + spawn, + time::{sleep, Instant, Sleep}, + Arbiter, +}; use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, +}; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; @@ -132,7 +142,7 @@ pub(crate) struct ServerWorker { conns: Counter, factories: Vec>, state: WorkerState, - config: ServerWorkerConfig, + shutdown_timeout: Duration, } struct WorkerService { @@ -211,12 +221,12 @@ impl ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, + services: Default::default(), availability, factories, - config, - services: Vec::new(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, conns: conns.clone(), - state: WorkerState::Unavailable, }); let fut = wrk @@ -337,53 +347,61 @@ enum WorkerState { Token, LocalBoxFuture<'static, Result, ()>>, ), - Shutdown( - Pin>, - Pin>, - Option>, - ), + // Shutdown keep states necessary for server shutdown: + // Sleep for interval check the shutdown progress. + // Instant for the start time of shutdown. + // Sender for send back the shutdown outcome(force/grace) to StopCommand caller. + Shutdown(Pin>, Instant, oneshot::Sender), +} + +impl Default for WorkerState { + fn default() -> Self { + Self::Unavailable + } } impl Future for ServerWorker { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); + // `StopWorker` message handler if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_recv(cx) + Pin::new(&mut this.rx2).poll_recv(cx) { - self.availability.set(false); + this.availability.set(false); let num = num_connections(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = result.send(true); return Poll::Ready(()); } else if graceful { - self.shutdown(false); info!("Graceful worker shutdown, {} connections", num); - self.state = WorkerState::Shutdown( - Box::pin(sleep(Duration::from_secs(1))), - Box::pin(sleep(self.config.shutdown_timeout)), - Some(result), - ); + this.shutdown(false); + + let timer = Box::pin(sleep(Duration::from_secs(1))); + let start_from = Instant::now(); + this.state = WorkerState::Shutdown(timer, start_from, result); } else { info!("Force shutdown worker, {} connections", num); - self.shutdown(true); + this.shutdown(true); + let _ = result.send(false); return Poll::Ready(()); } } - match self.state { - WorkerState::Unavailable => match self.check_readiness(cx) { + match this.state { + WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { - self.state = WorkerState::Available; - self.availability.set(true); + this.state = WorkerState::Available; + this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, Err((token, idx)) => { - self.restart_service(token, idx); + this.restart_service(token, idx); self.poll(cx) } }, @@ -391,7 +409,7 @@ impl Future for ServerWorker { let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", - self.factories[idx].name(token) + this.factories[idx].name(token) ) }); @@ -403,60 +421,61 @@ impl Future for ServerWorker { trace!( "Service {:?} has been restarted", - self.factories[idx].name(token) + this.factories[idx].name(token) ); - self.services[token.0].created(service); - self.state = WorkerState::Unavailable; + this.services[token.0].created(service); + this.state = WorkerState::Unavailable; self.poll(cx) } - WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { - let num = num_connections(); - if num == 0 { - let _ = tx.take().unwrap().send(true); + WorkerState::Shutdown(ref mut timer, ref start_from, _) => { + // Wait for 1 second. + ready!(timer.as_mut().poll(cx)); + + if num_connections() == 0 { + // Graceful shutdown. + if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { + let _ = sender.send(true); + } Arbiter::current().stop(); - return Poll::Ready(()); - } - - // check graceful timeout - if Pin::new(t2).poll(cx).is_ready() { - let _ = tx.take().unwrap().send(false); - self.shutdown(true); + Poll::Ready(()) + } else if start_from.elapsed() >= this.shutdown_timeout { + // Timeout forceful shutdown. + if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { + let _ = sender.send(false); + } Arbiter::current().stop(); - return Poll::Ready(()); + Poll::Ready(()) + } else { + // Reset timer and wait for 1 second. + let time = Instant::now() + Duration::from_secs(1); + timer.as_mut().reset(time); + timer.as_mut().poll(cx) } - - // sleep for 1 second and then check again - if t1.as_mut().poll(cx).is_ready() { - *t1 = Box::pin(sleep(Duration::from_secs(1))); - let _ = t1.as_mut().poll(cx); - } - - Poll::Pending } // actively poll stream and handle worker command WorkerState::Available => loop { - match self.check_readiness(cx) { + match this.check_readiness(cx) { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - self.availability.set(false); - self.state = WorkerState::Unavailable; + this.availability.set(false); + this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { - self.restart_service(token, idx); - self.availability.set(false); + this.restart_service(token, idx); + this.availability.set(false); return self.poll(cx); } } - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { // handle incoming io stream Some(WorkerCommand(msg)) => { - let guard = self.conns.get(); - let _ = self.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.conns.get(); + let _ = this.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), };