diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 57acb82c..a68aa266 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -290,6 +290,9 @@ def _parse_interactive_sessions_environments(env_var): DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true")) """Whether Dask is enabled in the cluster or not""" +DASK_AUTOSCALER_ENABLED = strtobool(os.getenv("DASK_AUTOSCALER_ENABLED", "true")) +"""Whether dask autoscaler is enabled in the cluster or not""" + REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv( "REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi" ) diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index d096e9d6..ec2d5296 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -47,6 +47,7 @@ from reana_workflow_controller.errors import REANAWorkflowControllerError from reana_workflow_controller.k8s import delete_dask_dashboard_ingress +from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED from reana_workflow_controller.dask import requires_dask try: @@ -325,13 +326,15 @@ def _delete_dask_cluster(workflow: Workflow) -> None: name=f"reana-run-dask-{workflow.id_}", ) - current_k8s_custom_objects_api_client.delete_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskautoscalers", - namespace="default", - name=f"dask-autoscaler-reana-run-dask-{workflow.id_}", - ) + if DASK_AUTOSCALER_ENABLED: + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskautoscalers", + namespace="default", + name=f"dask-autoscaler-reana-run-dask-{workflow.id_}", + ) + delete_dask_dashboard_ingress( f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_ ) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 96e84433..3c74a837 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -27,7 +27,9 @@ get_workspace_volume, get_reana_shared_volume, ) +from reana_commons.job_utils import kubernetes_memory_to_bytes +from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED from reana_workflow_controller.k8s import create_dask_dashboard_ingress @@ -64,7 +66,7 @@ def __init__( self.user_id = user_id self.cluster_spec = workflow_spec.get("resources", {}).get("dask", []) - self.cluster_body, self.autoscaler_body = self._load_dask_templates() + self.cluster_body = self._load_dask_cluster_template() self.cluster_image = self.cluster_spec["image"] self.dask_scheduler_uri = ( f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786" @@ -77,15 +79,16 @@ def __init__( ) self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID - def _load_dask_templates(self): - """Load Dask templates from YAML files.""" + if DASK_AUTOSCALER_ENABLED: + self.autoscaler_name = f"dask-autoscaler-{cluster_name}" + self.autoscaler_body = self._load_dask_autoscaler_template() + + def _load_dask_cluster_template(self): + """Load Dask cluster template from YAML file.""" with open( "reana_workflow_controller/templates/dask_cluster.yaml", "r" - ) as dask_cluster_yaml, open( - "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" - ) as dask_autoscaler_yaml: + ) as dask_cluster_yaml: dask_cluster_body = yaml.safe_load(dask_cluster_yaml) - dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = [] dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = [] dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][ @@ -93,13 +96,26 @@ def _load_dask_templates(self): ] = [] dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = [] - return dask_cluster_body, dask_autoscaler_body + return dask_cluster_body + + def _load_dask_autoscaler_template(self): + """Load Dask autoscaler template from YAML file.""" + with open( + "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" + ) as dask_autoscaler_yaml: + dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) + + return dask_autoscaler_body def create_dask_resources(self): """Create necessary Dask resources for the workflow.""" self._prepare_cluster() self._create_dask_cluster() - self._create_dask_autoscaler() + + if DASK_AUTOSCALER_ENABLED: + self._prepare_autoscaler() + self._create_dask_autoscaler() + create_dask_dashboard_ingress(self.cluster_name, self.workflow_id) def _prepare_cluster(self): @@ -113,16 +129,10 @@ def _prepare_cluster(self): # Add the name of the cluster, used in scheduler service name self.cluster_body["metadata"] = {"name": self.cluster_name} - # Add the name of the dask autoscaler - self.autoscaler_body["metadata"] = {"name": self.autoscaler_name} - self.cluster_body["spec"]["scheduler"]["service"]["selector"][ "dask.org/cluster-name" ] = self.cluster_name - # Connect autoscaler to the cluster - self.autoscaler_body["spec"]["cluster"] = self.cluster_name - # Add image to worker and scheduler self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ "image" @@ -141,8 +151,9 @@ def _prepare_cluster(self): "limits": {"memory": f"{self.single_worker_memory}", "cpu": "1"} } - # Set max limit on autoscaler - self.autoscaler_body["spec"]["maximum"] = self.num_of_workers + self.cluster_body["spec"]["worker"]["replicas"] = ( + 0 if DASK_AUTOSCALER_ENABLED else self.num_of_workers + ) # Add DASK SCHEDULER URI env variable self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append( @@ -174,6 +185,17 @@ def _prepare_cluster(self): if rucio: self._add_rucio_init_container() + def _prepare_autoscaler(self): + """Prepare Dask autoscaler body.""" + # Add the name of the dask autoscaler + self.autoscaler_body["metadata"] = {"name": self.autoscaler_name} + + # Connect autoscaler to the cluster + self.autoscaler_body["spec"]["cluster"] = self.cluster_name + + # Set max limit on autoscaler + self.autoscaler_body["spec"]["maximum"] = self.num_of_workers + def _add_image_pull_secrets(self): """Attach the configured image pull secrets to scheduler and worker containers.""" image_pull_secrets = [] diff --git a/tests/test_dask.py b/tests/test_dask.py index 142e2420..50bfc197 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -415,10 +415,6 @@ def test_prepare_cluster(dask_resource_manager): dask_resource_manager.cluster_body["metadata"]["name"] == dask_resource_manager.cluster_name ) - assert ( - dask_resource_manager.autoscaler_body["metadata"]["name"] - == dask_resource_manager.autoscaler_name - ) assert { "name": "DASK_SCHEDULER_URI", @@ -462,11 +458,6 @@ def test_prepare_cluster(dask_resource_manager): "worker" ]["spec"]["volumes"] - assert ( - dask_resource_manager.autoscaler_body["spec"]["cluster"] - == dask_resource_manager.cluster_name - ) - assert ( dask_resource_manager.cluster_body["spec"]["scheduler"]["service"][ "selector"