mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-24 00:01:11 +01:00
Server: run after await (#426)
Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
parent
8c4ec34cd4
commit
4e0dd091f5
@ -2,8 +2,10 @@
|
|||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
* Hide implementation details of `Server`. [#424]
|
* Hide implementation details of `Server`. [#424]
|
||||||
|
* `Server` now runs only after awaiting it. [#425]
|
||||||
|
|
||||||
[#424]: https://github.com/actix/actix-net/pull/424
|
[#424]: https://github.com/actix/actix-net/pull/424
|
||||||
|
[#425]: https://github.com/actix/actix-net/pull/425
|
||||||
|
|
||||||
|
|
||||||
## 2.0.0-beta.9 - 2021-11-15
|
## 2.0.0-beta.9 - 2021-11-15
|
||||||
|
@ -42,5 +42,5 @@ actix-rt = "2.4.0"
|
|||||||
|
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
env_logger = "0.9"
|
env_logger = "0.9"
|
||||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
|
||||||
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
|
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
|
||||||
|
@ -82,7 +82,7 @@ async fn run() -> io::Result<()> {
|
|||||||
ok(size)
|
ok(size)
|
||||||
})
|
})
|
||||||
})?
|
})?
|
||||||
.workers(1)
|
.workers(2)
|
||||||
.run()
|
.run()
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -46,6 +46,7 @@ impl ServerHandle {
|
|||||||
let _ = self.cmd_tx.send(ServerCommand::Stop {
|
let _ = self.cmd_tx.send(ServerCommand::Stop {
|
||||||
graceful,
|
graceful,
|
||||||
completion: Some(tx),
|
completion: Some(tx),
|
||||||
|
force_system_stop: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
async {
|
async {
|
||||||
|
@ -7,12 +7,10 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use actix_rt::{time::sleep, System};
|
use actix_rt::{time::sleep, System};
|
||||||
use futures_core::future::BoxFuture;
|
use futures_core::{future::BoxFuture, Stream};
|
||||||
|
use futures_util::stream::StreamExt as _;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use tokio::sync::{
|
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
|
||||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
|
||||||
oneshot,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
accept::Accept,
|
accept::Accept,
|
||||||
@ -49,6 +47,9 @@ pub(crate) enum ServerCommand {
|
|||||||
|
|
||||||
/// Return channel to notify caller that shutdown is complete.
|
/// Return channel to notify caller that shutdown is complete.
|
||||||
completion: Option<oneshot::Sender<()>>,
|
completion: Option<oneshot::Sender<()>>,
|
||||||
|
|
||||||
|
/// Force System exit when true, overriding `ServerBuilder::system_exit()` if it is false.
|
||||||
|
force_system_stop: bool,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,8 +61,8 @@ pub(crate) enum ServerCommand {
|
|||||||
/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
|
/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
|
||||||
/// distributes connections with a round-robin strategy.
|
/// distributes connections with a round-robin strategy.
|
||||||
///
|
///
|
||||||
/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve
|
/// The [Server] must be awaited or polled in order to start running. It will resolve when the
|
||||||
/// when the server has fully shut down.
|
/// server has fully shut down.
|
||||||
///
|
///
|
||||||
/// # Shutdown Signals
|
/// # Shutdown Signals
|
||||||
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
|
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
|
||||||
@ -119,8 +120,11 @@ pub(crate) enum ServerCommand {
|
|||||||
/// .await
|
/// .await
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
#[must_use = "Server does nothing unless you `.await` or poll it"]
|
||||||
pub struct Server(Result<ServerInner, Option<io::Error>>);
|
pub struct Server {
|
||||||
|
handle: ServerHandle,
|
||||||
|
fut: BoxFuture<'static, io::Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
/// Create server build.
|
/// Create server build.
|
||||||
@ -129,62 +133,26 @@ impl Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn new(builder: ServerBuilder) -> Self {
|
pub(crate) fn new(builder: ServerBuilder) -> Self {
|
||||||
Server(ServerInner::new(builder).map_err(Some))
|
Server {
|
||||||
|
handle: ServerHandle::new(builder.cmd_tx.clone()),
|
||||||
|
fut: Box::pin(ServerInner::run(builder)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a handle for ServerFuture that can be used to change state of actix server.
|
/// Get a `Server` handle that can be used issue commands and change it's state.
|
||||||
///
|
///
|
||||||
/// See [ServerHandle](ServerHandle) for usage.
|
/// See [ServerHandle](ServerHandle) for usage.
|
||||||
pub fn handle(&self) -> ServerHandle {
|
pub fn handle(&self) -> ServerHandle {
|
||||||
match &self.0 {
|
self.handle.clone()
|
||||||
Ok(inner) => ServerHandle::new(inner.cmd_tx.clone()),
|
|
||||||
Err(err) => {
|
|
||||||
// TODO: i don't think this is the best way to handle server startup fail
|
|
||||||
panic!(
|
|
||||||
"server handle can not be obtained because server failed to start up: {}",
|
|
||||||
err.as_ref().unwrap()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for Server {
|
impl Future for Server {
|
||||||
type Output = io::Result<()>;
|
type Output = io::Result<()>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
#[inline]
|
||||||
match &mut self.as_mut().get_mut().0 {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
Err(err) => Poll::Ready(Err(err
|
Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
|
||||||
.take()
|
|
||||||
.expect("Server future cannot be polled after error"))),
|
|
||||||
|
|
||||||
Ok(inner) => {
|
|
||||||
// poll Signals
|
|
||||||
if let Some(ref mut signals) = inner.signals {
|
|
||||||
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
|
|
||||||
inner.stop_task = inner.handle_signal(signal);
|
|
||||||
// drop signals listener
|
|
||||||
inner.signals = None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle stop tasks and eager drain command channel
|
|
||||||
loop {
|
|
||||||
if let Some(ref mut fut) = inner.stop_task {
|
|
||||||
// only resolve stop task and exit
|
|
||||||
return fut.as_mut().poll(cx).map(|_| Ok(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
|
|
||||||
Poll::Ready(Some(cmd)) => {
|
|
||||||
// if stop task is required, set it and loop
|
|
||||||
inner.stop_task = inner.handle_cmd(cmd);
|
|
||||||
}
|
|
||||||
_ => return Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,16 +160,27 @@ pub struct ServerInner {
|
|||||||
worker_handles: Vec<WorkerHandleServer>,
|
worker_handles: Vec<WorkerHandleServer>,
|
||||||
worker_config: ServerWorkerConfig,
|
worker_config: ServerWorkerConfig,
|
||||||
services: Vec<Box<dyn InternalServiceFactory>>,
|
services: Vec<Box<dyn InternalServiceFactory>>,
|
||||||
exit: bool,
|
|
||||||
cmd_tx: UnboundedSender<ServerCommand>,
|
|
||||||
cmd_rx: UnboundedReceiver<ServerCommand>,
|
|
||||||
signals: Option<Signals>,
|
|
||||||
waker_queue: WakerQueue,
|
waker_queue: WakerQueue,
|
||||||
stop_task: Option<BoxFuture<'static, ()>>,
|
system_stop: bool,
|
||||||
|
stopping: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerInner {
|
impl ServerInner {
|
||||||
fn new(mut builder: ServerBuilder) -> io::Result<Self> {
|
async fn run(builder: ServerBuilder) -> io::Result<()> {
|
||||||
|
let (mut this, mut mux) = Self::run_sync(builder)?;
|
||||||
|
|
||||||
|
while let Some(cmd) = mux.next().await {
|
||||||
|
this.handle_cmd(cmd).await;
|
||||||
|
|
||||||
|
if this.stopping {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> {
|
||||||
let sockets = mem::take(&mut builder.sockets)
|
let sockets = mem::take(&mut builder.sockets)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|t| (t.0, t.2))
|
.map(|t| (t.0, t.2))
|
||||||
@ -228,67 +207,65 @@ impl ServerInner {
|
|||||||
|
|
||||||
let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?;
|
let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?;
|
||||||
|
|
||||||
// construct OS signals listener future
|
let mux = ServerEventMultiplexer {
|
||||||
let signals = (builder.listen_os_signals).then(Signals::new);
|
signal_fut: (builder.listen_os_signals).then(Signals::new),
|
||||||
|
|
||||||
Ok(ServerInner {
|
|
||||||
cmd_tx: builder.cmd_tx.clone(),
|
|
||||||
cmd_rx: builder.cmd_rx,
|
cmd_rx: builder.cmd_rx,
|
||||||
signals,
|
};
|
||||||
|
|
||||||
|
let server = ServerInner {
|
||||||
waker_queue,
|
waker_queue,
|
||||||
worker_handles,
|
worker_handles,
|
||||||
worker_config: builder.worker_config,
|
worker_config: builder.worker_config,
|
||||||
services: builder.factories,
|
services: builder.factories,
|
||||||
exit: builder.exit,
|
system_stop: builder.exit,
|
||||||
stop_task: None,
|
stopping: false,
|
||||||
})
|
};
|
||||||
|
|
||||||
|
Ok((server, mux))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
|
async fn handle_cmd(&mut self, item: ServerCommand) {
|
||||||
match item {
|
match item {
|
||||||
ServerCommand::Pause(tx) => {
|
ServerCommand::Pause(tx) => {
|
||||||
self.waker_queue.wake(WakerInterest::Pause);
|
self.waker_queue.wake(WakerInterest::Pause);
|
||||||
let _ = tx.send(());
|
let _ = tx.send(());
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerCommand::Resume(tx) => {
|
ServerCommand::Resume(tx) => {
|
||||||
self.waker_queue.wake(WakerInterest::Resume);
|
self.waker_queue.wake(WakerInterest::Resume);
|
||||||
let _ = tx.send(());
|
let _ = tx.send(());
|
||||||
None
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerCommand::Stop {
|
ServerCommand::Stop {
|
||||||
graceful,
|
graceful,
|
||||||
completion,
|
completion,
|
||||||
|
force_system_stop,
|
||||||
} => {
|
} => {
|
||||||
let exit = self.exit;
|
self.stopping = true;
|
||||||
|
|
||||||
// stop accept thread
|
// stop accept thread
|
||||||
self.waker_queue.wake(WakerInterest::Stop);
|
self.waker_queue.wake(WakerInterest::Stop);
|
||||||
|
|
||||||
// stop workers
|
// send stop signal to workers
|
||||||
let workers_stop = self
|
let workers_stop = self
|
||||||
.worker_handles
|
.worker_handles
|
||||||
.iter()
|
.iter()
|
||||||
.map(|worker| worker.stop(graceful))
|
.map(|worker| worker.stop(graceful))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
Some(Box::pin(async move {
|
if graceful {
|
||||||
if graceful {
|
// wait for all workers to shut down
|
||||||
// wait for all workers to shut down
|
let _ = join_all(workers_stop).await;
|
||||||
let _ = join_all(workers_stop).await;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(tx) = completion {
|
if let Some(tx) = completion {
|
||||||
let _ = tx.send(());
|
let _ = tx.send(());
|
||||||
}
|
}
|
||||||
|
|
||||||
if exit {
|
if self.system_stop || force_system_stop {
|
||||||
sleep(Duration::from_millis(300)).await;
|
sleep(Duration::from_millis(300)).await;
|
||||||
System::try_current().as_ref().map(System::stop);
|
System::try_current().as_ref().map(System::stop);
|
||||||
}
|
}
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerCommand::WorkerFaulted(idx) => {
|
ServerCommand::WorkerFaulted(idx) => {
|
||||||
@ -321,40 +298,60 @@ impl ServerInner {
|
|||||||
|
|
||||||
Err(err) => error!("can not restart worker {}: {}", idx, err),
|
Err(err) => error!("can not restart worker {}: {}", idx, err),
|
||||||
};
|
};
|
||||||
|
|
||||||
None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_signal(&mut self, signal: SignalKind) -> Option<BoxFuture<'static, ()>> {
|
fn map_signal(signal: SignalKind) -> ServerCommand {
|
||||||
match signal {
|
match signal {
|
||||||
SignalKind::Int => {
|
SignalKind::Int => {
|
||||||
info!("SIGINT received; starting forced shutdown");
|
info!("SIGINT received; starting forced shutdown");
|
||||||
self.exit = true;
|
ServerCommand::Stop {
|
||||||
self.handle_cmd(ServerCommand::Stop {
|
|
||||||
graceful: false,
|
graceful: false,
|
||||||
completion: None,
|
completion: None,
|
||||||
})
|
force_system_stop: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SignalKind::Term => {
|
SignalKind::Term => {
|
||||||
info!("SIGTERM received; starting graceful shutdown");
|
info!("SIGTERM received; starting graceful shutdown");
|
||||||
self.exit = true;
|
ServerCommand::Stop {
|
||||||
self.handle_cmd(ServerCommand::Stop {
|
|
||||||
graceful: true,
|
graceful: true,
|
||||||
completion: None,
|
completion: None,
|
||||||
})
|
force_system_stop: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SignalKind::Quit => {
|
SignalKind::Quit => {
|
||||||
info!("SIGQUIT received; starting forced shutdown");
|
info!("SIGQUIT received; starting forced shutdown");
|
||||||
self.exit = true;
|
ServerCommand::Stop {
|
||||||
self.handle_cmd(ServerCommand::Stop {
|
|
||||||
graceful: false,
|
graceful: false,
|
||||||
completion: None,
|
completion: None,
|
||||||
})
|
force_system_stop: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ServerEventMultiplexer {
|
||||||
|
cmd_rx: UnboundedReceiver<ServerCommand>,
|
||||||
|
signal_fut: Option<Signals>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for ServerEventMultiplexer {
|
||||||
|
type Item = ServerCommand;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let this = Pin::into_inner(self);
|
||||||
|
|
||||||
|
if let Some(signal_fut) = &mut this.signal_fut {
|
||||||
|
if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) {
|
||||||
|
this.signal_fut = None;
|
||||||
|
return Poll::Ready(Some(ServerInner::map_signal(signal)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Pin::new(&mut this.cmd_rx).poll_recv(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -487,27 +487,46 @@ async fn worker_restart() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[should_panic]
|
fn no_runtime_on_init() {
|
||||||
fn no_runtime() {
|
use std::{thread::sleep, time::Duration};
|
||||||
// test set up in a way that would prevent time out if support for runtime-less init was added
|
|
||||||
|
|
||||||
let addr = unused_addr();
|
let addr = unused_addr();
|
||||||
|
let counter = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
let srv = Server::build()
|
let mut srv = Server::build()
|
||||||
.workers(1)
|
.workers(2)
|
||||||
.disable_signals()
|
.disable_signals()
|
||||||
.bind("test", addr, move || {
|
.bind("test", addr, {
|
||||||
fn_service(|_| async { Ok::<_, ()>(()) })
|
let counter = counter.clone();
|
||||||
|
move || {
|
||||||
|
counter.fetch_add(1, Ordering::SeqCst);
|
||||||
|
fn_service(|_| async { Ok::<_, ()>(()) })
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
|
fn is_send<T: Send>(_: &T) {}
|
||||||
|
is_send(&srv);
|
||||||
|
is_send(&srv.handle());
|
||||||
|
|
||||||
|
sleep(Duration::from_millis(1_000));
|
||||||
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
||||||
|
|
||||||
let rt = tokio::runtime::Builder::new_current_thread()
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let _ = srv.handle().stop(true);
|
rt.block_on(async move {
|
||||||
|
let _ = futures_util::poll!(&mut srv);
|
||||||
|
|
||||||
rt.block_on(async { srv.await }).unwrap();
|
// available after the first poll
|
||||||
|
sleep(Duration::from_millis(500));
|
||||||
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
||||||
|
|
||||||
|
let _ = srv.handle().stop(true);
|
||||||
|
srv.await
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user