Skip to content

Commit e88d26f

Browse files
committed
feat(dask): make Dask autoscaler configurable (reanahub#604)
1 parent 1515865 commit e88d26f

File tree

4 files changed

+52
-33
lines changed

4 files changed

+52
-33
lines changed

reana_workflow_controller/config.py

+3
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ def _parse_interactive_sessions_environments(env_var):
290290
DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
291291
"""Whether Dask is enabled in the cluster or not"""
292292

293+
DASK_AUTOSCALER_ENABLED = os.getenv("DASK_AUTOSCALER_ENABLED", "true").lower() == "true"
294+
"""Whether Dask autoscaler is enabled in the cluster or not"""
295+
293296
REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
294297
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
295298
)

reana_workflow_controller/consumer.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from reana_workflow_controller.errors import REANAWorkflowControllerError
4848
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress
4949

50+
from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
5051
from reana_workflow_controller.dask import requires_dask
5152

5253
try:
@@ -325,13 +326,15 @@ def _delete_dask_cluster(workflow: Workflow) -> None:
325326
name=f"reana-run-dask-{workflow.id_}",
326327
)
327328

328-
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
329-
group="kubernetes.dask.org",
330-
version="v1",
331-
plural="daskautoscalers",
332-
namespace="default",
333-
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
334-
)
329+
if DASK_AUTOSCALER_ENABLED:
330+
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
331+
group="kubernetes.dask.org",
332+
version="v1",
333+
plural="daskautoscalers",
334+
namespace="default",
335+
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
336+
)
337+
335338
delete_dask_dashboard_ingress(
336339
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
337340
)

reana_workflow_controller/dask.py

+39-17
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
get_workspace_volume,
2828
get_reana_shared_volume,
2929
)
30+
from reana_commons.job_utils import kubernetes_memory_to_bytes
3031

32+
from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
3133
from reana_workflow_controller.k8s import create_dask_dashboard_ingress
3234

3335

@@ -64,7 +66,7 @@ def __init__(
6466
self.user_id = user_id
6567

6668
self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
67-
self.cluster_body, self.autoscaler_body = self._load_dask_templates()
69+
self.cluster_body = self._load_dask_cluster_template()
6870
self.cluster_image = self.cluster_spec["image"]
6971
self.dask_scheduler_uri = (
7072
f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786"
@@ -77,29 +79,43 @@ def __init__(
7779
)
7880
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID
7981

80-
def _load_dask_templates(self):
81-
"""Load Dask templates from YAML files."""
82+
if DASK_AUTOSCALER_ENABLED:
83+
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
84+
self.autoscaler_body = self._load_dask_autoscaler_template()
85+
86+
def _load_dask_cluster_template(self):
87+
"""Load Dask cluster template from YAML file."""
8288
with open(
8389
"reana_workflow_controller/templates/dask_cluster.yaml", "r"
84-
) as dask_cluster_yaml, open(
85-
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
86-
) as dask_autoscaler_yaml:
90+
) as dask_cluster_yaml:
8791
dask_cluster_body = yaml.safe_load(dask_cluster_yaml)
88-
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)
8992
dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = []
9093
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = []
9194
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][
9295
"volumeMounts"
9396
] = []
9497
dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = []
9598

96-
return dask_cluster_body, dask_autoscaler_body
99+
return dask_cluster_body
100+
101+
def _load_dask_autoscaler_template(self):
102+
"""Load Dask autoscaler template from YAML file."""
103+
with open(
104+
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
105+
) as dask_autoscaler_yaml:
106+
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)
107+
108+
return dask_autoscaler_body
97109

98110
def create_dask_resources(self):
99111
"""Create necessary Dask resources for the workflow."""
100112
self._prepare_cluster()
101113
self._create_dask_cluster()
102-
self._create_dask_autoscaler()
114+
115+
if DASK_AUTOSCALER_ENABLED:
116+
self._prepare_autoscaler()
117+
self._create_dask_autoscaler()
118+
103119
create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)
104120

105121
def _prepare_cluster(self):
@@ -113,16 +129,10 @@ def _prepare_cluster(self):
113129
# Add the name of the cluster, used in scheduler service name
114130
self.cluster_body["metadata"] = {"name": self.cluster_name}
115131

116-
# Add the name of the dask autoscaler
117-
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}
118-
119132
self.cluster_body["spec"]["scheduler"]["service"]["selector"][
120133
"dask.org/cluster-name"
121134
] = self.cluster_name
122135

123-
# Connect autoscaler to the cluster
124-
self.autoscaler_body["spec"]["cluster"] = self.cluster_name
125-
126136
# Add image to worker and scheduler
127137
self.cluster_body["spec"]["worker"]["spec"]["containers"][0][
128138
"image"
@@ -141,8 +151,9 @@ def _prepare_cluster(self):
141151
"limits": {"memory": f"{self.single_worker_memory}", "cpu": "1"}
142152
}
143153

144-
# Set max limit on autoscaler
145-
self.autoscaler_body["spec"]["maximum"] = self.num_of_workers
154+
self.cluster_body["spec"]["worker"]["replicas"] = (
155+
0 if DASK_AUTOSCALER_ENABLED else self.num_of_workers
156+
)
146157

147158
# Add DASK SCHEDULER URI env variable
148159
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append(
@@ -174,6 +185,17 @@ def _prepare_cluster(self):
174185
if rucio:
175186
self._add_rucio_init_container()
176187

188+
def _prepare_autoscaler(self):
189+
"""Prepare Dask autoscaler body."""
190+
# Add the name of the dask autoscaler
191+
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}
192+
193+
# Connect autoscaler to the cluster
194+
self.autoscaler_body["spec"]["cluster"] = self.cluster_name
195+
196+
# Set max limit on autoscaler
197+
self.autoscaler_body["spec"]["maximum"] = self.num_of_workers
198+
177199
def _add_image_pull_secrets(self):
178200
"""Attach the configured image pull secrets to scheduler and worker containers."""
179201
image_pull_secrets = []

tests/test_dask.py

-9
Original file line numberDiff line numberDiff line change
@@ -415,10 +415,6 @@ def test_prepare_cluster(dask_resource_manager):
415415
dask_resource_manager.cluster_body["metadata"]["name"]
416416
== dask_resource_manager.cluster_name
417417
)
418-
assert (
419-
dask_resource_manager.autoscaler_body["metadata"]["name"]
420-
== dask_resource_manager.autoscaler_name
421-
)
422418

423419
assert {
424420
"name": "DASK_SCHEDULER_URI",
@@ -462,11 +458,6 @@ def test_prepare_cluster(dask_resource_manager):
462458
"worker"
463459
]["spec"]["volumes"]
464460

465-
assert (
466-
dask_resource_manager.autoscaler_body["spec"]["cluster"]
467-
== dask_resource_manager.cluster_name
468-
)
469-
470461
assert (
471462
dask_resource_manager.cluster_body["spec"]["scheduler"]["service"][
472463
"selector"

0 commit comments

Comments
 (0)