Skip to content

Commit eaf2b59

Browse files
committed
feat(dask): add nthreads option to dask-worker command (#636)
Following the integration of Dask into REANA and discussions with several REANA users, we identified the need to make the number of threads configurable. In this commit, we propagate the specified number of threads per worker or the default value to the `dask-worker` command. Closes reanahub/reana#874
1 parent e4a64d8 commit eaf2b59

File tree

7 files changed

+23
-11
lines changed

7 files changed

+23
-11
lines changed

reana_workflow_controller/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,11 @@ def _parse_interactive_sessions_environments(env_var):
346346
)
347347
"""Maximum memory for one Dask worker."""
348348

349+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS = int(
350+
os.getenv("REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS", 4)
351+
)
352+
"""Number of threads for one Dask worker by default."""
353+
349354
VOMSPROXY_CONTAINER_IMAGE = os.getenv(
350355
"VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.3.0"
351356
)

reana_workflow_controller/dask.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __init__(
5858
workflow_workspace,
5959
user_id,
6060
num_of_workers,
61+
num_of_threads,
6162
single_worker_memory,
6263
kerberos=False,
6364
voms_proxy=False,
@@ -77,6 +78,7 @@ def __init__(
7778
self.cluster_name = get_dask_component_name(workflow_id, "cluster")
7879
self.num_of_workers = num_of_workers
7980
self.single_worker_memory = single_worker_memory
81+
self.num_of_threads = num_of_threads
8082
self.workflow_spec = workflow_spec
8183
self.workflow_workspace = workflow_workspace
8284
self.workflow_id = str(workflow_id)
@@ -179,9 +181,9 @@ def _prepare_cluster(self):
179181
] = self.cluster_image
180182

181183
# Create the worker command
182-
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][
183-
0
184-
] = f'cd {self.workflow_workspace} && {self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"][0]}'
184+
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["args"] = [
185+
f"cd {self.workflow_workspace} && exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788 --nthreads {self.num_of_threads} --memory-limit {self.single_worker_memory}"
186+
]
185187

186188
# Set resource limits for workers
187189
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["resources"] = {

reana_workflow_controller/templates/dask_cluster.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ spec:
88
- name: worker
99
imagePullPolicy: "IfNotPresent"
1010
command: ["/bin/sh", "-c"]
11-
args:
12-
- exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788
1311
ports:
1412
- name: http-dashboard
1513
containerPort: 8788

reana_workflow_controller/workflow_run_manager.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
WORKFLOW_ENGINE_YADAGE_ENV_VARS,
9292
REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS,
9393
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
94+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
9495
)
9596

9697

@@ -415,6 +416,13 @@ def start_batch_workflow_run(
415416
"single_worker_memory",
416417
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
417418
),
419+
num_of_threads=self.workflow.reana_specification["workflow"]
420+
.get("resources", {})
421+
.get("dask", {})
422+
.get(
423+
"single_worker_threads",
424+
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_THREADS,
425+
),
418426
kerberos=self.requires_kerberos(),
419427
voms_proxy=self.requires_voms_proxy(),
420428
rucio=self.requires_rucio(),

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
"marshmallow>2.13.0,<3.0.0", # same upper pin as reana-server
5252
"opensearch-py>=2.7.0,<2.8.0",
5353
"packaging>=18.0",
54-
"reana-commons[kubernetes]>=0.95.0a8,<0.96.0",
54+
"reana-commons[kubernetes]>=0.95.0a9,<0.96.0",
5555
"reana-db>=0.95.0a5,<0.96.0",
5656
"requests>=2.25.0",
5757
"sqlalchemy-utils>=0.31.0",

tests/conftest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2024 CERN.
4+
# Copyright (C) 2017, 2018, 2019, 2020, 2021, 2022, 2024, 2025 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -209,6 +209,7 @@ def dask_resource_manager(sample_serial_workflow_in_db_with_dask, mock_user_secr
209209
workflow_workspace="/path/to/workspace",
210210
user_id="user-123",
211211
num_of_workers=2,
212+
num_of_threads=8,
212213
single_worker_memory="256Mi",
213214
)
214215
return manager

tests/test_dask.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# This file is part of REANA.
2-
# Copyright (C) 2024 CERN.
2+
# Copyright (C) 2024, 2025 CERN.
33
#
44
# REANA is free software; you can redistribute it and/or modify it
55
# under the terms of the MIT License; see LICENSE file for more details.
@@ -425,9 +425,7 @@ def test_prepare_cluster(dask_resource_manager):
425425
"env"
426426
]
427427

428-
expected_command = (
429-
f"cd {dask_resource_manager.workflow_workspace} && worker-command"
430-
)
428+
expected_command = f"cd {dask_resource_manager.workflow_workspace} && exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788 --nthreads 8 --memory-limit 256Mi"
431429
assert (
432430
dask_resource_manager.cluster_body["spec"]["worker"]["spec"]["containers"][
433431
0

0 commit comments

Comments
 (0)