From 4e0dd091f55ebaf47f5c6345cf15ea75d8ce1449 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Sun, 5 Dec 2021 22:22:47 +0300 Subject: [PATCH] Server: run after await (#426) Co-authored-by: Rob Ede --- actix-server/CHANGES.md | 2 + actix-server/Cargo.toml | 2 +- actix-server/examples/tcp-echo.rs | 2 +- actix-server/src/handle.rs | 1 + actix-server/src/server.rs | 195 +++++++++++++++--------------- actix-server/tests/test_server.rs | 37 ++++-- 6 files changed, 129 insertions(+), 110 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 3c1c32c7..8e7a146e 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -2,8 +2,10 @@ ## Unreleased - 2021-xx-xx * Hide implementation details of `Server`. [#424] +* `Server` now runs only after awaiting it. [#425] [#424]: https://github.com/actix/actix-net/pull/424 +[#425]: https://github.com/actix/actix-net/pull/425 ## 2.0.0-beta.9 - 2021-11-15 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 96319589..03548023 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -42,5 +42,5 @@ actix-rt = "2.4.0" bytes = "1" env_logger = "0.9" -futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } +futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 930ebf0a..52ae5349 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -82,7 +82,7 @@ async fn run() -> io::Result<()> { ok(size) }) })? - .workers(1) + .workers(2) .run() .await } diff --git a/actix-server/src/handle.rs b/actix-server/src/handle.rs index 53f00bee..b535e2bf 100644 --- a/actix-server/src/handle.rs +++ b/actix-server/src/handle.rs @@ -46,6 +46,7 @@ impl ServerHandle { let _ = self.cmd_tx.send(ServerCommand::Stop { graceful, completion: Some(tx), + force_system_stop: false, }); async { diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index bc0c9561..fec3b06e 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -7,12 +7,10 @@ use std::{ }; use actix_rt::{time::sleep, System}; -use futures_core::future::BoxFuture; +use futures_core::{future::BoxFuture, Stream}; +use futures_util::stream::StreamExt as _; use log::{error, info}; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, -}; +use tokio::sync::{mpsc::UnboundedReceiver, oneshot}; use crate::{ accept::Accept, @@ -49,6 +47,9 @@ pub(crate) enum ServerCommand { /// Return channel to notify caller that shutdown is complete. completion: Option>, + + /// Force System exit when true, overriding `ServerBuilder::system_exit()` if it is false. + force_system_stop: bool, }, } @@ -60,8 +61,8 @@ pub(crate) enum ServerCommand { /// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and /// distributes connections with a round-robin strategy. /// -/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve -/// when the server has fully shut down. +/// The [Server] must be awaited or polled in order to start running. It will resolve when the +/// server has fully shut down. /// /// # Shutdown Signals /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a @@ -119,8 +120,11 @@ pub(crate) enum ServerCommand { /// .await /// } /// ``` -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Server(Result>); +#[must_use = "Server does nothing unless you `.await` or poll it"] +pub struct Server { + handle: ServerHandle, + fut: BoxFuture<'static, io::Result<()>>, +} impl Server { /// Create server build. @@ -129,62 +133,26 @@ impl Server { } pub(crate) fn new(builder: ServerBuilder) -> Self { - Server(ServerInner::new(builder).map_err(Some)) + Server { + handle: ServerHandle::new(builder.cmd_tx.clone()), + fut: Box::pin(ServerInner::run(builder)), + } } - /// Get a handle for ServerFuture that can be used to change state of actix server. + /// Get a `Server` handle that can be used issue commands and change it's state. /// /// See [ServerHandle](ServerHandle) for usage. pub fn handle(&self) -> ServerHandle { - match &self.0 { - Ok(inner) => ServerHandle::new(inner.cmd_tx.clone()), - Err(err) => { - // TODO: i don't think this is the best way to handle server startup fail - panic!( - "server handle can not be obtained because server failed to start up: {}", - err.as_ref().unwrap() - ); - } - } + self.handle.clone() } } impl Future for Server { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match &mut self.as_mut().get_mut().0 { - Err(err) => Poll::Ready(Err(err - .take() - .expect("Server future cannot be polled after error"))), - - Ok(inner) => { - // poll Signals - if let Some(ref mut signals) = inner.signals { - if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { - inner.stop_task = inner.handle_signal(signal); - // drop signals listener - inner.signals = None; - } - } - - // handle stop tasks and eager drain command channel - loop { - if let Some(ref mut fut) = inner.stop_task { - // only resolve stop task and exit - return fut.as_mut().poll(cx).map(|_| Ok(())); - } - - match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { - Poll::Ready(Some(cmd)) => { - // if stop task is required, set it and loop - inner.stop_task = inner.handle_cmd(cmd); - } - _ => return Poll::Pending, - } - } - } - } + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut Pin::into_inner(self).fut).poll(cx) } } @@ -192,16 +160,27 @@ pub struct ServerInner { worker_handles: Vec, worker_config: ServerWorkerConfig, services: Vec>, - exit: bool, - cmd_tx: UnboundedSender, - cmd_rx: UnboundedReceiver, - signals: Option, waker_queue: WakerQueue, - stop_task: Option>, + system_stop: bool, + stopping: bool, } impl ServerInner { - fn new(mut builder: ServerBuilder) -> io::Result { + async fn run(builder: ServerBuilder) -> io::Result<()> { + let (mut this, mut mux) = Self::run_sync(builder)?; + + while let Some(cmd) = mux.next().await { + this.handle_cmd(cmd).await; + + if this.stopping { + break; + } + } + + Ok(()) + } + + fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> { let sockets = mem::take(&mut builder.sockets) .into_iter() .map(|t| (t.0, t.2)) @@ -228,67 +207,65 @@ impl ServerInner { let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?; - // construct OS signals listener future - let signals = (builder.listen_os_signals).then(Signals::new); - - Ok(ServerInner { - cmd_tx: builder.cmd_tx.clone(), + let mux = ServerEventMultiplexer { + signal_fut: (builder.listen_os_signals).then(Signals::new), cmd_rx: builder.cmd_rx, - signals, + }; + + let server = ServerInner { waker_queue, worker_handles, worker_config: builder.worker_config, services: builder.factories, - exit: builder.exit, - stop_task: None, - }) + system_stop: builder.exit, + stopping: false, + }; + + Ok((server, mux)) } - fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + async fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { self.waker_queue.wake(WakerInterest::Pause); let _ = tx.send(()); - None } ServerCommand::Resume(tx) => { self.waker_queue.wake(WakerInterest::Resume); let _ = tx.send(()); - None } ServerCommand::Stop { graceful, completion, + force_system_stop, } => { - let exit = self.exit; + self.stopping = true; // stop accept thread self.waker_queue.wake(WakerInterest::Stop); - // stop workers + // send stop signal to workers let workers_stop = self .worker_handles .iter() .map(|worker| worker.stop(graceful)) .collect::>(); - Some(Box::pin(async move { - if graceful { - // wait for all workers to shut down - let _ = join_all(workers_stop).await; - } + if graceful { + // wait for all workers to shut down + let _ = join_all(workers_stop).await; + } - if let Some(tx) = completion { - let _ = tx.send(()); - } + if let Some(tx) = completion { + let _ = tx.send(()); + } - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - })) + if self.system_stop || force_system_stop { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } } ServerCommand::WorkerFaulted(idx) => { @@ -321,40 +298,60 @@ impl ServerInner { Err(err) => error!("can not restart worker {}: {}", idx, err), }; - - None } } } - fn handle_signal(&mut self, signal: SignalKind) -> Option> { + fn map_signal(signal: SignalKind) -> ServerCommand { match signal { SignalKind::Int => { info!("SIGINT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { + ServerCommand::Stop { graceful: false, completion: None, - }) + force_system_stop: true, + } } SignalKind::Term => { info!("SIGTERM received; starting graceful shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { + ServerCommand::Stop { graceful: true, completion: None, - }) + force_system_stop: true, + } } SignalKind::Quit => { info!("SIGQUIT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { + ServerCommand::Stop { graceful: false, completion: None, - }) + force_system_stop: true, + } } } } } + +struct ServerEventMultiplexer { + cmd_rx: UnboundedReceiver, + signal_fut: Option, +} + +impl Stream for ServerEventMultiplexer { + type Item = ServerCommand; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + + if let Some(signal_fut) = &mut this.signal_fut { + if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) { + this.signal_fut = None; + return Poll::Ready(Some(ServerInner::map_signal(signal))); + } + } + + Pin::new(&mut this.cmd_rx).poll_recv(cx) + } +} diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 07eb2478..e175e325 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -487,27 +487,46 @@ async fn worker_restart() { } #[test] -#[should_panic] -fn no_runtime() { - // test set up in a way that would prevent time out if support for runtime-less init was added +fn no_runtime_on_init() { + use std::{thread::sleep, time::Duration}; let addr = unused_addr(); + let counter = Arc::new(AtomicUsize::new(0)); - let srv = Server::build() - .workers(1) + let mut srv = Server::build() + .workers(2) .disable_signals() - .bind("test", addr, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) + .bind("test", addr, { + let counter = counter.clone(); + move || { + counter.fetch_add(1, Ordering::SeqCst); + fn_service(|_| async { Ok::<_, ()>(()) }) + } }) .unwrap() .run(); + fn is_send(_: &T) {} + is_send(&srv); + is_send(&srv.handle()); + + sleep(Duration::from_millis(1_000)); + assert_eq!(counter.load(Ordering::SeqCst), 0); + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - let _ = srv.handle().stop(true); + rt.block_on(async move { + let _ = futures_util::poll!(&mut srv); - rt.block_on(async { srv.await }).unwrap(); + // available after the first poll + sleep(Duration::from_millis(500)); + assert_eq!(counter.load(Ordering::SeqCst), 2); + + let _ = srv.handle().stop(true); + srv.await + }) + .unwrap(); }