1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 18:02:58 +01:00

block and wait for accept thread to exit. (#443)

Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
fakeshadow 2022-03-02 11:52:12 +08:00 committed by GitHub
parent 2a54065fae
commit 7804ed12eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 5 deletions

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
- Wait for accept thread to stop before sending completion signal. [#443]
[#443]: https://github.com/actix/actix-net/pull/443
## 2.0.0 - 2022-01-19 ## 2.0.0 - 2022-01-19

View File

@ -41,7 +41,7 @@ impl Accept {
pub(crate) fn start( pub(crate) fn start(
sockets: Vec<(usize, MioListener)>, sockets: Vec<(usize, MioListener)>,
builder: &ServerBuilder, builder: &ServerBuilder,
) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> { ) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>, thread::JoinHandle<()>)> {
let handle_server = ServerHandle::new(builder.cmd_tx.clone()); let handle_server = ServerHandle::new(builder.cmd_tx.clone());
// construct poll instance and its waker // construct poll instance and its waker
@ -73,12 +73,12 @@ impl Accept {
handle_server, handle_server,
)?; )?;
thread::Builder::new() let accept_handle = thread::Builder::new()
.name("actix-server acceptor".to_owned()) .name("actix-server acceptor".to_owned())
.spawn(move || accept.poll_with(&mut sockets)) .spawn(move || accept.poll_with(&mut sockets))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok((waker_queue, handles_server)) Ok((waker_queue, handles_server, accept_handle))
} }
fn new_with_sockets( fn new_with_sockets(

View File

@ -3,6 +3,7 @@ use std::{
io, mem, io, mem,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
thread,
time::Duration, time::Duration,
}; };
@ -158,6 +159,7 @@ impl Future for Server {
pub struct ServerInner { pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>, worker_handles: Vec<WorkerHandleServer>,
accept_handle: Option<thread::JoinHandle<()>>,
worker_config: ServerWorkerConfig, worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue, waker_queue: WakerQueue,
@ -205,7 +207,7 @@ impl ServerInner {
); );
} }
let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?; let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
let mux = ServerEventMultiplexer { let mux = ServerEventMultiplexer {
signal_fut: (builder.listen_os_signals).then(Signals::new), signal_fut: (builder.listen_os_signals).then(Signals::new),
@ -214,6 +216,7 @@ impl ServerInner {
let server = ServerInner { let server = ServerInner {
waker_queue, waker_queue,
accept_handle: Some(accept_handle),
worker_handles, worker_handles,
worker_config: builder.worker_config, worker_config: builder.worker_config,
services: builder.factories, services: builder.factories,
@ -243,7 +246,8 @@ impl ServerInner {
} => { } => {
self.stopping = true; self.stopping = true;
// stop accept thread // Signal accept thread to stop.
// Signal is non-blocking; we wait for thread to stop later.
self.waker_queue.wake(WakerInterest::Stop); self.waker_queue.wake(WakerInterest::Stop);
// send stop signal to workers // send stop signal to workers
@ -258,6 +262,13 @@ impl ServerInner {
let _ = join_all(workers_stop).await; let _ = join_all(workers_stop).await;
} }
// wait for accept thread stop
self.accept_handle
.take()
.unwrap()
.join()
.expect("Accept thread must not panic in any case");
if let Some(tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
} }