From f9262dbec02dbdb777d055ee475634898b4ee248 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 26 Mar 2021 23:37:01 +0000 Subject: [PATCH] prevent large shutdown timeout from panicking closes #298 --- actix-server/src/accept.rs | 32 +++++++++++++++++--------------- actix-server/src/builder.rs | 37 ++++++++++++++++++------------------- actix-server/src/worker.rs | 8 ++++---- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index c8c1da47..8c64ca38 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -2,7 +2,7 @@ use std::time::Duration; use std::{io, thread}; use actix_rt::{ - time::{sleep_until, Instant}, + time::{sleep, Instant}, System, }; use log::{error, info}; @@ -16,14 +16,17 @@ use crate::worker::{Conn, WorkerHandle}; use crate::Token; struct ServerSocketInfo { - // addr for socket. mainly used for logging. + /// Address of socket. Mainly used for logging. addr: SocketAddr, - // be ware this is the crate token for identify socket and should not be confused with - // mio::Token + + /// Beware this is the crate token for identify socket and should not be confused + /// with `mio::Token`. token: Token, + lst: MioListener, - // timeout is used to mark the deadline when this socket's listener should be registered again - // after an error. + + /// Timeout is used to mark the deadline when this socket's listener should be registered again + /// after an error. timeout: Option, } @@ -226,10 +229,9 @@ impl Accept { Some(WakerInterest::Stop) => { return self.deregister_all(&mut sockets); } - // waker queue is drained. + // waker queue is drained None => { - // Reset the WakerQueue before break so it does not grow - // infinitely. + // Reset the WakerQueue before break so it does not grow infinitely WakerQueue::reset(&mut guard); break 'waker; } @@ -328,8 +330,8 @@ impl Accept { } Err(tmp) => { // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made. - // after that remove the fault worker. + // `ServerBuilder` future to notify it a new worker should be made + // after that remove the fault worker self.srv.worker_faulted(self.handles[self.next].idx); msg = tmp; self.handles.swap_remove(self.next); @@ -403,15 +405,15 @@ impl Accept { error!("Can not deregister server socket {}", err); } - // sleep after error. write the timeout to socket info as later the poll - // would need it mark which socket and when it's listener should be - // registered. + // sleep after error. write the timeout to socket info as later + // the poll would need it mark which socket and when it's + // listener should be registered info.timeout = Some(Instant::now() + Duration::from_millis(500)); // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); System::current().arbiter().spawn(async move { - sleep_until(Instant::now() + Duration::from_millis(510)).await; + sleep(Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); }); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 78a1323d..c20bb4f5 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,12 +1,12 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; -use std::{io, mem}; +use std::{ + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; -use actix_rt::net::TcpStream; -use actix_rt::time::{sleep_until, Instant}; -use actix_rt::{self as rt, System}; +use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use log::{error, info}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; @@ -122,13 +122,13 @@ impl ServerBuilder { self } - /// Stop actix system. + /// Stop Actix system. pub fn system_exit(mut self) -> Self { self.exit = true; self } - /// Disable signal handling + /// Disable signal handling. pub fn disable_signals(mut self) -> Self { self.no_signals = true; self @@ -136,9 +136,8 @@ impl ServerBuilder { /// Timeout for graceful workers shutdown in seconds. /// - /// After receiving a stop signal, workers have this much time to finish - /// serving requests. Workers still alive after the timeout are force - /// dropped. + /// After receiving a stop signal, workers have this much time to finish serving requests. + /// Workers still alive after the timeout are force dropped. /// /// By default shutdown timeout sets to 30 seconds. pub fn shutdown_timeout(mut self, sec: u64) -> Self { @@ -147,11 +146,10 @@ impl ServerBuilder { self } - /// Execute external configuration as part of the server building - /// process. + /// Execute external configuration as part of the server building process. /// - /// This function is useful for moving parts of configuration to a - /// different module or even library. + /// This function is useful for moving parts of configuration to a different module or + /// even library. pub fn configure(mut self, f: F) -> io::Result where F: Fn(&mut ServiceConfig) -> io::Result<()>, @@ -268,6 +266,7 @@ impl ServerBuilder { self.sockets .push((token, name.as_ref().to_string(), MioListener::from(lst))); + Ok(self) } @@ -393,7 +392,7 @@ impl ServerBuilder { } if exit { rt::spawn(async { - sleep_until(Instant::now() + Duration::from_millis(300)).await; + sleep(Duration::from_millis(300)).await; System::current().stop(); }); } @@ -402,7 +401,7 @@ impl ServerBuilder { // we need to stop system if server was spawned if self.exit { rt::spawn(async { - sleep_until(Instant::now() + Duration::from_millis(300)).await; + sleep(Duration::from_millis(300)).await; System::current().stop(); }); } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index defc7306..aa6d31fc 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use actix_rt::time::{sleep_until, Instant, Sleep}; +use actix_rt::time::{sleep, Sleep}; use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; @@ -361,8 +361,8 @@ impl Future for ServerWorker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))), - Box::pin(sleep_until(Instant::now() + self.config.shutdown_timeout)), + Box::pin(sleep(Duration::from_secs(1))), + Box::pin(sleep(self.config.shutdown_timeout)), Some(result), ); } else { @@ -438,7 +438,7 @@ impl Future for ServerWorker { // sleep for 1 second and then check again if t1.as_mut().poll(cx).is_ready() { - *t1 = Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))); + *t1 = Box::pin(sleep(Duration::from_secs(1))); let _ = t1.as_mut().poll(cx); }