diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 8e122623..cd7491d7 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -127,10 +127,10 @@ impl WorkerAvailability { pub(crate) struct ServerWorker { rx: UnboundedReceiver, rx2: UnboundedReceiver, - services: Vec, + services: Box<[WorkerService]>, availability: WorkerAvailability, conns: Counter, - factories: Vec>, + factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, } @@ -199,6 +199,8 @@ impl ServerWorker { availability: WorkerAvailability, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { + assert!(!availability.available()); + let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); @@ -213,20 +215,7 @@ impl ServerWorker { .unwrap() }) .spawn(async move { - availability.set(false); - let mut wrk = ServerWorker { - rx, - rx2, - services: Vec::new(), - availability, - conns: Counter::new(config.max_concurrent_connections), - factories, - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }; - - let fut = wrk - .factories + let fut = factories .iter() .enumerate() .map(|(idx, factory)| { @@ -239,29 +228,44 @@ impl ServerWorker { }) .collect::>(); - // a second spawn to make sure worker future runs as non boxed future. - // As Arbiter::spawn would box the future before send it to arbiter. + // a second spawn to run !Send future tasks. spawn(async move { - let res: Result, _> = join_all(fut).await.into_iter().collect(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - } - } - } + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + let services = match res { + Ok(res) => res + .into_iter() + .flatten() + .fold(Vec::new(), |mut services, (factory, token, service)| { + assert_eq!(token.0, services.len()); + services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) + .into_boxed_slice(), Err(e) => { error!("Can not start worker: {:?}", e); Arbiter::current().stop(); + return; } - } - wrk.await + }; + + // a third spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker { + rx, + rx2, + services, + availability, + conns: Counter::new(config.max_concurrent_connections), + factories: factories.into_boxed_slice(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }); }); });