Skip to content

Commit

Permalink
feat(dask): make dask autoscaler optional (reanahub#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Sep 20, 2024
1 parent e148d06 commit ac5573d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 27 deletions.
3 changes: 3 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ def _parse_interactive_sessions_environments(env_var):
DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether dask is enabled in the cluster or not"""

DASK_AUTOSCALER_ENABLED = strtobool(os.getenv("DASK_AUTOSCALER_ENABLED", "true"))
"""Whether dask autoscaler is enabled in the cluster or not"""

REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT = float(
os.getenv("REANA_DASK_CLUSTER_DEFAULT_CORES_LIMIT", 4)
)
Expand Down
57 changes: 39 additions & 18 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress


Expand Down Expand Up @@ -62,14 +63,13 @@ def __init__(
self.memory = memory
self.single_worker_cores = single_worker_cores
self.single_worker_memory = single_worker_memory
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.workflow_spec = workflow_spec
self.workflow_workspace = workflow_workspace
self.workflow_id = workflow_workspace.split("/")[-1]
self.user_id = user_id

self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
self.cluster_body, self.autoscaler_body = self._load_dask_templates()
self.cluster_body = self._load_dask_cluster_template()
self.cluster_image = self.cluster_spec["image"]
self.dask_scheduler_uri = (
f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786"
Expand All @@ -82,29 +82,43 @@ def __init__(
)
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID

def _load_dask_templates(self):
"""Load Dask templates from YAML files."""
if DASK_AUTOSCALER_ENABLED:
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.autoscaler_body = self._load_dask_autoscaler_template()

def _load_dask_cluster_template(self):
"""Load Dask cluster template from YAML file."""
with open(
"reana_workflow_controller/templates/dask_cluster.yaml", "r"
) as dask_cluster_yaml, open(
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
) as dask_autoscaler_yaml:
) as dask_cluster_yaml:
dask_cluster_body = yaml.safe_load(dask_cluster_yaml)
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)
dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = []
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = []
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][
"volumeMounts"
] = []
dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = []

return dask_cluster_body, dask_autoscaler_body
return dask_cluster_body

def _load_dask_autoscaler_template(self):
"""Load Dask autoscaler template from YAML file."""
with open(
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
) as dask_autoscaler_yaml:
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)

return dask_autoscaler_body

def create_dask_resources(self):
"""Create necessary dask resources for the workflow."""
self._prepare_cluster()
self._create_dask_cluster()
self._create_dask_autoscaler()

if DASK_AUTOSCALER_ENABLED:
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)

def _prepare_cluster(self):
Expand All @@ -118,16 +132,10 @@ def _prepare_cluster(self):
# Add the name of the cluster, used in scheduler service name
self.cluster_body["metadata"] = {"name": self.cluster_name}

# Add the name of the dask autoscaler
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}

self.cluster_body["spec"]["scheduler"]["service"]["selector"][
"dask.org/cluster-name"
] = self.cluster_name

# Connect autoscaler to the cluster
self.autoscaler_body["spec"]["cluster"] = self.cluster_name

# Add image to worker and scheduler
self.cluster_body["spec"]["worker"]["spec"]["containers"][0][
"image"
Expand All @@ -149,8 +157,10 @@ def _prepare_cluster(self):
}
}

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.calculate_max_allowed_workers()
# Set number of workers in the cluster
self.cluster_body["spec"]["worker"]["replicas"] = (
0 if DASK_AUTOSCALER_ENABLED else self.calculate_max_allowed_workers()
)

# Add DASK SCHEDULER URI env variable
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append(
Expand Down Expand Up @@ -182,6 +192,17 @@ def _prepare_cluster(self):
if rucio:
self._add_rucio_init_container()

def _prepare_autoscaler(self):
"""Prepare Dask autoscaler body."""
# Add the name of the dask autoscaler
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}

# Connect autoscaler to the cluster
self.autoscaler_body["spec"]["cluster"] = self.cluster_name

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.calculate_max_allowed_workers()

def _add_image_pull_secrets(self):
"""Attach the configured image pull secrets to scheduler and worker containers."""
image_pull_secrets = []
Expand Down
9 changes: 0 additions & 9 deletions tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,6 @@ def test_prepare_cluster(dask_resource_manager):
dask_resource_manager.cluster_body["metadata"]["name"]
== dask_resource_manager.cluster_name
)
assert (
dask_resource_manager.autoscaler_body["metadata"]["name"]
== dask_resource_manager.autoscaler_name
)

assert {
"name": "DASK_SCHEDULER_URI",
Expand Down Expand Up @@ -488,11 +484,6 @@ def test_prepare_cluster(dask_resource_manager):
"worker"
]["spec"]["volumes"]

assert (
dask_resource_manager.autoscaler_body["spec"]["cluster"]
== dask_resource_manager.cluster_name
)

assert (
dask_resource_manager.cluster_body["spec"]["scheduler"]["service"][
"selector"
Expand Down

0 comments on commit ac5573d

Please sign in to comment.