1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 18:02:58 +01:00

Reduce ServerWorker size (#321)

This commit is contained in:
fakeshadow 2021-04-12 17:12:59 -07:00 committed by GitHub
parent ddce2d6d12
commit e0fb67f646
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -127,10 +127,10 @@ impl WorkerAvailability {
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
rx: UnboundedReceiver<Conn>, rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>, rx2: UnboundedReceiver<Stop>,
services: Vec<WorkerService>, services: Box<[WorkerService]>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Box<[Box<dyn InternalServiceFactory>]>,
state: WorkerState, state: WorkerState,
shutdown_timeout: Duration, shutdown_timeout: Duration,
} }
@ -199,6 +199,8 @@ impl ServerWorker {
availability: WorkerAvailability, availability: WorkerAvailability,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
assert!(!availability.available());
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel(); let (tx2, rx2) = unbounded_channel();
let avail = availability.clone(); let avail = availability.clone();
@ -213,20 +215,7 @@ impl ServerWorker {
.unwrap() .unwrap()
}) })
.spawn(async move { .spawn(async move {
availability.set(false); let fut = factories
let mut wrk = ServerWorker {
rx,
rx2,
services: Vec::new(),
availability,
conns: Counter::new(config.max_concurrent_connections),
factories,
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
};
let fut = wrk
.factories
.iter() .iter()
.enumerate() .enumerate()
.map(|(idx, factory)| { .map(|(idx, factory)| {
@ -239,29 +228,44 @@ impl ServerWorker {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// a second spawn to make sure worker future runs as non boxed future. // a second spawn to run !Send future tasks.
// As Arbiter::spawn would box the future before send it to arbiter.
spawn(async move { spawn(async move {
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect(); let res = join_all(fut)
match res { .await
Ok(services) => { .into_iter()
for item in services { .collect::<Result<Vec<_>, _>>();
for (factory, token, service) in item { let services = match res {
assert_eq!(token.0, wrk.services.len()); Ok(res) => res
wrk.services.push(WorkerService { .into_iter()
factory, .flatten()
service, .fold(Vec::new(), |mut services, (factory, token, service)| {
status: WorkerServiceStatus::Unavailable, assert_eq!(token.0, services.len());
}); services.push(WorkerService {
} factory,
} service,
} status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => { Err(e) => {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::current().stop(); Arbiter::current().stop();
return;
} }
} };
wrk.await
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
availability,
conns: Counter::new(config.max_concurrent_connections),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
}); });
}); });