1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-05-12 06:12:49 +02:00

feat: server shutdown signal (#676)

This commit is contained in:
Rob Ede 2025-05-09 19:30:11 +01:00 committed by GitHub
parent 270360e095
commit 064da0e3ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 470 additions and 292 deletions

View File

@ -1,3 +1,8 @@
version: "0.2"
words:
- actix
- addrs
- mptcp
- nonblocking
- oneshot
- rustup

View File

@ -8,3 +8,4 @@ updates:
directory: /
schedule:
interval: weekly
versioning-strategy: lockfile-only

519
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -187,10 +187,7 @@ impl SystemRunner {
match exit_code {
0 => Ok(()),
nonzero => Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", nonzero),
)),
nonzero => Err(io::Error::other(format!("Non-zero exit code: {}", nonzero))),
}
}
@ -199,8 +196,7 @@ impl SystemRunner {
let SystemRunner { rt, stop_rx, .. } = self;
// run loop
rt.block_on(stop_rx)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
rt.block_on(stop_rx).map_err(io::Error::other)
}
/// Retrieves a reference to the underlying [Actix runtime](crate::Runtime) associated with this

View File

@ -2,6 +2,9 @@
## Unreleased
## 2.6.0
- Add `ServerBuilder::shutdown_signal()` method.
- Minimum supported Rust version (MSRV) is now 1.74.
## 2.5.1

View File

@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "2.5.1"
version = "2.6.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
@ -44,7 +44,10 @@ actix-rt = "2.8"
bytes = "1"
futures-util = { version = "0.3.17", default-features = false, features = ["sink", "async-await-macro"] }
pretty_env_logger = "0.5"
tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs"] }
static_assertions = "1"
tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs", "time"] }
tokio-util = "0.7"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
[lints]
workspace = true

View File

@ -5,11 +5,11 @@
<!-- prettier-ignore-start -->
[![crates.io](https://img.shields.io/crates/v/actix-server?label=latest)](https://crates.io/crates/actix-server)
[![Documentation](https://docs.rs/actix-server/badge.svg?version=2.5.1)](https://docs.rs/actix-server/2.5.1)
[![Documentation](https://docs.rs/actix-server/badge.svg?version=2.6.0)](https://docs.rs/actix-server/2.6.0)
[![Version](https://img.shields.io/badge/rustc-1.52+-ab6000.svg)](https://blog.rust-lang.org/2021/05/06/Rust-1.52.0.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-server.svg)
<br />
[![Dependency Status](https://deps.rs/crate/actix-server/2.5.1/status.svg)](https://deps.rs/crate/actix-server/2.5.1)
[![Dependency Status](https://deps.rs/crate/actix-server/2.6.0/status.svg)](https://deps.rs/crate/actix-server/2.6.0)
![Download](https://img.shields.io/crates/d/actix-server.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -0,0 +1,51 @@
//! Demonstrates use of the `ServerBuilder::shutdown_signal` method using `tokio-util`s
//! `CancellationToken` helper using a nonsensical timer. In practice, this cancellation token would
//! be wired throughout your application and typically triggered by OS signals elsewhere.
use std::{io, time::Duration};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::fn_service;
use tokio_util::sync::CancellationToken;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{prelude::*, EnvFilter};
async fn run(stop_signal: CancellationToken) -> io::Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();
let addr = ("127.0.0.1", 8080);
tracing::info!("starting server on port: {}", &addr.0);
Server::build()
.bind("shutdown-signal", addr, || {
fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
})?
.shutdown_signal(stop_signal.cancelled_owned())
.workers(2)
.run()
.await
}
#[tokio::main]
async fn main() -> io::Result<()> {
let stop_signal = CancellationToken::new();
tokio::spawn({
let stop_signal = stop_signal.clone();
async move {
tokio::time::sleep(Duration::from_secs(10)).await;
stop_signal.cancel();
}
});
run(stop_signal).await?;
Ok(())
}

View File

@ -76,7 +76,7 @@ impl Accept {
let accept_handle = thread::Builder::new()
.name("actix-server acceptor".to_owned())
.spawn(move || accept.poll_with(&mut sockets))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
.map_err(io::Error::other)?;
Ok((waker_queue, handles_server, accept_handle))
}

View File

@ -1,6 +1,7 @@
use std::{io, num::NonZeroUsize, time::Duration};
use std::{future::Future, io, num::NonZeroUsize, time::Duration};
use actix_rt::net::TcpStream;
use futures_core::future::BoxFuture;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{
@ -39,6 +40,7 @@ pub struct ServerBuilder {
pub(crate) mptcp: MpTcp,
pub(crate) exit: bool,
pub(crate) listen_os_signals: bool,
pub(crate) shutdown_signal: Option<BoxFuture<'static, ()>>,
pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
pub(crate) worker_config: ServerWorkerConfig,
@ -64,6 +66,7 @@ impl ServerBuilder {
mptcp: MpTcp::Disabled,
exit: false,
listen_os_signals: true,
shutdown_signal: None,
cmd_tx,
cmd_rx,
worker_config: ServerWorkerConfig::default(),
@ -170,6 +173,41 @@ impl ServerBuilder {
self
}
/// Specify shutdown signal from a future.
///
/// Using this method will prevent OS signal handlers being set up.
///
/// Typically, a `CancellationToken` will be used, but any future _can_ be.
///
/// # Examples
///
/// ```
/// # use std::io;
/// # use tokio::net::TcpStream;
/// # use actix_server::Server;
/// # async fn run() -> io::Result<()> {
/// use actix_service::fn_service;
/// use tokio_util::sync::CancellationToken;
///
/// let stop_signal = CancellationToken::new();
///
/// Server::build()
/// .bind("shutdown-signal", "127.0.0.1:12345", || {
/// fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
/// })?
/// .shutdown_signal(stop_signal.cancelled_owned())
/// .run()
/// .await
/// # }
/// ```
pub fn shutdown_signal<Fut>(mut self, shutdown_signal: Fut) -> Self
where
Fut: Future<Output = ()> + Send + 'static,
{
self.shutdown_signal = Some(Box::pin(shutdown_signal));
self
}
/// Timeout for graceful workers shutdown in seconds.
///
/// After receiving a stop signal, workers have this much time to finish serving requests.
@ -370,9 +408,6 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
} else if let Some(err) = opt_err.take() {
Err(err)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
Err(io::Error::other("Can not bind to address."))
}
}

View File

@ -18,7 +18,7 @@ use crate::{
builder::ServerBuilder,
join_all::join_all,
service::InternalServiceFactory,
signals::{SignalKind, Signals},
signals::{OsSignals, SignalKind, StopSignal},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle,
@ -210,7 +210,12 @@ impl ServerInner {
let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
let mux = ServerEventMultiplexer {
signal_fut: (builder.listen_os_signals).then(Signals::new),
signal_fut: builder.shutdown_signal.map(StopSignal::Cancel).or_else(|| {
builder
.listen_os_signals
.then(OsSignals::new)
.map(StopSignal::Os)
}),
cmd_rx: builder.cmd_rx,
};
@ -315,7 +320,16 @@ impl ServerInner {
fn map_signal(signal: SignalKind) -> ServerCommand {
match signal {
SignalKind::Int => {
SignalKind::Cancel => {
info!("Cancellation token/channel received; starting graceful shutdown");
ServerCommand::Stop {
graceful: true,
completion: None,
force_system_stop: true,
}
}
SignalKind::OsInt => {
info!("SIGINT received; starting forced shutdown");
ServerCommand::Stop {
graceful: false,
@ -324,7 +338,7 @@ impl ServerInner {
}
}
SignalKind::Term => {
SignalKind::OsTerm => {
info!("SIGTERM received; starting graceful shutdown");
ServerCommand::Stop {
graceful: true,
@ -333,7 +347,7 @@ impl ServerInner {
}
}
SignalKind::Quit => {
SignalKind::OsQuit => {
info!("SIGQUIT received; starting forced shutdown");
ServerCommand::Stop {
graceful: false,
@ -347,7 +361,7 @@ impl ServerInner {
struct ServerEventMultiplexer {
cmd_rx: UnboundedReceiver<ServerCommand>,
signal_fut: Option<Signals>,
signal_fut: Option<StopSignal>,
}
impl Stream for ServerEventMultiplexer {

View File

@ -1,10 +1,11 @@
use std::{
fmt,
future::Future,
pin::Pin,
pin::{pin, Pin},
task::{Context, Poll},
};
use futures_core::future::BoxFuture;
use tracing::trace;
/// Types of process signals.
@ -12,28 +13,51 @@ use tracing::trace;
#[derive(Debug, Clone, Copy, PartialEq)]
#[allow(dead_code)] // variants are never constructed on non-unix
pub(crate) enum SignalKind {
/// `SIGINT`
Int,
/// Cancellation token or channel.
Cancel,
/// `SIGTERM`
Term,
/// OS `SIGINT`.
OsInt,
/// `SIGQUIT`
Quit,
/// OS `SIGTERM`.
OsTerm,
/// OS `SIGQUIT`.
OsQuit,
}
impl fmt::Display for SignalKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
SignalKind::Int => "SIGINT",
SignalKind::Term => "SIGTERM",
SignalKind::Quit => "SIGQUIT",
SignalKind::Cancel => "Cancellation token or channel",
SignalKind::OsInt => "SIGINT",
SignalKind::OsTerm => "SIGTERM",
SignalKind::OsQuit => "SIGQUIT",
})
}
}
pub(crate) enum StopSignal {
/// OS signal handling is configured.
Os(OsSignals),
/// Cancellation token or channel.
Cancel(BoxFuture<'static, ()>),
}
impl Future for StopSignal {
type Output = SignalKind;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {
StopSignal::Os(os_signals) => pin!(os_signals).poll(cx),
StopSignal::Cancel(cancel) => pin!(cancel).poll(cx).map(|()| SignalKind::Cancel),
}
}
}
/// Process signal listener.
pub(crate) struct Signals {
pub(crate) struct OsSignals {
#[cfg(not(unix))]
signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>,
@ -41,14 +65,14 @@ pub(crate) struct Signals {
signals: Vec<(SignalKind, actix_rt::signal::unix::Signal)>,
}
impl Signals {
impl OsSignals {
/// Constructs an OS signal listening future.
pub(crate) fn new() -> Self {
trace!("setting up OS signal listener");
#[cfg(not(unix))]
{
Signals {
OsSignals {
signals: Box::pin(actix_rt::signal::ctrl_c()),
}
}
@ -58,9 +82,9 @@ impl Signals {
use actix_rt::signal::unix;
let sig_map = [
(unix::SignalKind::interrupt(), SignalKind::Int),
(unix::SignalKind::terminate(), SignalKind::Term),
(unix::SignalKind::quit(), SignalKind::Quit),
(unix::SignalKind::interrupt(), SignalKind::OsInt),
(unix::SignalKind::terminate(), SignalKind::OsTerm),
(unix::SignalKind::quit(), SignalKind::OsQuit),
];
let signals = sig_map
@ -79,18 +103,18 @@ impl Signals {
})
.collect::<Vec<_>>();
Signals { signals }
OsSignals { signals }
}
}
}
impl Future for Signals {
impl Future for OsSignals {
type Output = SignalKind;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))]
{
self.signals.as_mut().poll(cx).map(|_| SignalKind::Int)
self.signals.as_mut().poll(cx).map(|_| SignalKind::OsInt)
}
#[cfg(unix)]
@ -106,3 +130,10 @@ impl Future for Signals {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
static_assertions::assert_impl_all!(StopSignal: Send, Unpin);
}

View File

@ -341,11 +341,10 @@ impl ServerWorker {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("can not start worker: {:?}", err);
return Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
));
error!("can not start worker: {err:?}");
return Err(io::Error::other(format!(
"can not start server service {idx}",
)));
}
}
}
@ -440,13 +439,12 @@ impl ServerWorker {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("can not start worker: {:?}", err);
error!("can not start worker: {err:?}");
Arbiter::current().stop();
factory_tx
.send(Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
)))
.send(Err(io::Error::other(format!(
"can not start server service {idx}",
))))
.unwrap();
return;
}

View File

@ -1,6 +1,8 @@
_list:
@just --list
toolchain := ""
# Check project.
check: && clippy
just --unstable --fmt --check
@ -42,8 +44,6 @@ non_linux_all_features_list := ```
```
all_crate_features := if os() == "linux" { "--all-features" } else { "--features='" + non_linux_all_features_list + "'" }
toolchain := ""
# Run Clippy over workspace.
clippy:
cargo {{ toolchain }} clippy --workspace --all-targets {{ all_crate_features }}