1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 02:21:07 +01:00

prevent large shutdown timeout from panicking

closes #298
This commit is contained in:
Rob Ede 2021-03-26 23:37:01 +00:00
parent 12d3942b98
commit f9262dbec0
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
3 changed files with 39 additions and 38 deletions

View File

@ -2,7 +2,7 @@ use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
use actix_rt::{ use actix_rt::{
time::{sleep_until, Instant}, time::{sleep, Instant},
System, System,
}; };
use log::{error, info}; use log::{error, info};
@ -16,14 +16,17 @@ use crate::worker::{Conn, WorkerHandle};
use crate::Token; use crate::Token;
struct ServerSocketInfo { struct ServerSocketInfo {
// addr for socket. mainly used for logging. /// Address of socket. Mainly used for logging.
addr: SocketAddr, addr: SocketAddr,
// be ware this is the crate token for identify socket and should not be confused with
// mio::Token /// Beware this is the crate token for identify socket and should not be confused
/// with `mio::Token`.
token: Token, token: Token,
lst: MioListener, lst: MioListener,
// timeout is used to mark the deadline when this socket's listener should be registered again
// after an error. /// Timeout is used to mark the deadline when this socket's listener should be registered again
/// after an error.
timeout: Option<Instant>, timeout: Option<Instant>,
} }
@ -226,10 +229,9 @@ impl Accept {
Some(WakerInterest::Stop) => { Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets); return self.deregister_all(&mut sockets);
} }
// waker queue is drained. // waker queue is drained
None => { None => {
// Reset the WakerQueue before break so it does not grow // Reset the WakerQueue before break so it does not grow infinitely
// infinitely.
WakerQueue::reset(&mut guard); WakerQueue::reset(&mut guard);
break 'waker; break 'waker;
} }
@ -328,8 +330,8 @@ impl Accept {
} }
Err(tmp) => { Err(tmp) => {
// worker lost contact and could be gone. a message is sent to // worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made. // `ServerBuilder` future to notify it a new worker should be made
// after that remove the fault worker. // after that remove the fault worker
self.srv.worker_faulted(self.handles[self.next].idx); self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp; msg = tmp;
self.handles.swap_remove(self.next); self.handles.swap_remove(self.next);
@ -403,15 +405,15 @@ impl Accept {
error!("Can not deregister server socket {}", err); error!("Can not deregister server socket {}", err);
} }
// sleep after error. write the timeout to socket info as later the poll // sleep after error. write the timeout to socket info as later
// would need it mark which socket and when it's listener should be // the poll would need it mark which socket and when it's
// registered. // listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500)); info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll // after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone(); let waker = self.waker.clone();
System::current().arbiter().spawn(async move { System::current().arbiter().spawn(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await; sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer); waker.wake(WakerInterest::Timer);
}); });

View File

@ -1,12 +1,12 @@
use std::future::Future; use std::{
use std::pin::Pin; future::Future,
use std::task::{Context, Poll}; io, mem,
use std::time::Duration; pin::Pin,
use std::{io, mem}; task::{Context, Poll},
time::Duration,
};
use actix_rt::net::TcpStream; use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use actix_rt::time::{sleep_until, Instant};
use actix_rt::{self as rt, System};
use log::{error, info}; use log::{error, info};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -122,13 +122,13 @@ impl ServerBuilder {
self self
} }
/// Stop actix system. /// Stop Actix system.
pub fn system_exit(mut self) -> Self { pub fn system_exit(mut self) -> Self {
self.exit = true; self.exit = true;
self self
} }
/// Disable signal handling /// Disable signal handling.
pub fn disable_signals(mut self) -> Self { pub fn disable_signals(mut self) -> Self {
self.no_signals = true; self.no_signals = true;
self self
@ -136,9 +136,8 @@ impl ServerBuilder {
/// Timeout for graceful workers shutdown in seconds. /// Timeout for graceful workers shutdown in seconds.
/// ///
/// After receiving a stop signal, workers have this much time to finish /// After receiving a stop signal, workers have this much time to finish serving requests.
/// serving requests. Workers still alive after the timeout are force /// Workers still alive after the timeout are force dropped.
/// dropped.
/// ///
/// By default shutdown timeout sets to 30 seconds. /// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u64) -> Self { pub fn shutdown_timeout(mut self, sec: u64) -> Self {
@ -147,11 +146,10 @@ impl ServerBuilder {
self self
} }
/// Execute external configuration as part of the server building /// Execute external configuration as part of the server building process.
/// process.
/// ///
/// This function is useful for moving parts of configuration to a /// This function is useful for moving parts of configuration to a different module or
/// different module or even library. /// even library.
pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder> pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder>
where where
F: Fn(&mut ServiceConfig) -> io::Result<()>, F: Fn(&mut ServiceConfig) -> io::Result<()>,
@ -268,6 +266,7 @@ impl ServerBuilder {
self.sockets self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst))); .push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self) Ok(self)
} }
@ -393,7 +392,7 @@ impl ServerBuilder {
} }
if exit { if exit {
rt::spawn(async { rt::spawn(async {
sleep_until(Instant::now() + Duration::from_millis(300)).await; sleep(Duration::from_millis(300)).await;
System::current().stop(); System::current().stop();
}); });
} }
@ -402,7 +401,7 @@ impl ServerBuilder {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
rt::spawn(async { rt::spawn(async {
sleep_until(Instant::now() + Duration::from_millis(300)).await; sleep(Duration::from_millis(300)).await;
System::current().stop(); System::current().stop();
}); });
} }

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::time::{sleep, Sleep};
use actix_rt::{spawn, Arbiter}; use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
@ -361,8 +361,8 @@ impl Future for ServerWorker {
if num != 0 { if num != 0 {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( self.state = WorkerState::Shutdown(
Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))), Box::pin(sleep(Duration::from_secs(1))),
Box::pin(sleep_until(Instant::now() + self.config.shutdown_timeout)), Box::pin(sleep(self.config.shutdown_timeout)),
Some(result), Some(result),
); );
} else { } else {
@ -438,7 +438,7 @@ impl Future for ServerWorker {
// sleep for 1 second and then check again // sleep for 1 second and then check again
if t1.as_mut().poll(cx).is_ready() { if t1.as_mut().poll(cx).is_ready() {
*t1 = Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))); *t1 = Box::pin(sleep(Duration::from_secs(1)));
let _ = t1.as_mut().poll(cx); let _ = t1.as_mut().poll(cx);
} }