Skip to content

Commit 4727aec

Browse files
committed
feat(dask): update service status upon cluster deletion (reanahub#648)
Mark the Dask service as deleted instead of removing it when the Dask cluster terminates.
1 parent 12a9742 commit 4727aec

File tree

4 files changed

+34
-20
lines changed

4 files changed

+34
-20
lines changed

reana_workflow_controller/consumer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
calculate_hash_of_dir,
3030
calculate_job_input_hash,
3131
build_unique_component_name,
32-
get_dask_component_name,
3332
)
3433
from reana_db.database import Session
3534
from reana_db.models import Job, JobCache, Workflow, RunStatus, Service

reana_workflow_controller/dask.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
from flask import current_app
1313

1414
from kubernetes import client
15-
from kubernetes.client.exceptions import ApiException
1615

1716
from reana_db.database import Session
18-
from reana_db.models import Service
17+
from reana_db.models import Service, ServiceStatus
1918
from reana_db.utils import _get_workflow_with_uuid_or_name
2019
from reana_commons.config import (
2120
K8S_CERN_EOS_AVAILABLE,
@@ -677,16 +676,16 @@ def delete_dask_cluster(workflow_id, user_id) -> None:
677676
)
678677
if dask_service:
679678
workflow = _get_workflow_with_uuid_or_name(str(workflow_id), user_id)
680-
workflow.services.remove(dask_service)
681-
Session.delete(dask_service)
679+
dask_service.status = ServiceStatus.deleted
682680
Session.object_session(workflow).commit()
681+
683682
logging.info(
684-
f"Dask service model for workflow {workflow_id} deleted successfully from database."
683+
f"Dask service model for workflow {workflow_id} status updated to 'deleted' in database."
685684
)
686685

687686
except Exception as e:
688687
errors.append(
689-
f"Error deleting Dask Service model from database of the workflow: {workflow_id}: {e}"
688+
f"Error updating Dask Service model status in database for workflow {workflow_id}: {e}"
690689
)
691690

692691
# Raise collected errors if any

reana_workflow_controller/rest/utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,13 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
249249
return all_logs
250250

251251

252+
def build_service_logs(workflow):
253+
"""Return the logs for all services of a workflow."""
254+
# Currently we have only Dask cluster service, need a more generic implementation if we introduce additional services.
255+
256+
return json.loads(workflow.services[0].logs)
257+
258+
252259
def remove_workflow_jobs_from_cache(workflow):
253260
"""Remove any cached jobs from given workflow.
254261

reana_workflow_controller/rest/workflows_status.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2024, 2025 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -27,6 +27,7 @@
2727
)
2828
from reana_workflow_controller.rest.utils import (
2929
build_workflow_logs,
30+
build_service_logs,
3031
delete_workflow,
3132
get_workflow_name,
3233
get_workflow_progress,
@@ -150,6 +151,7 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
150151
steps = None
151152
if request.is_json:
152153
steps = request.json
154+
153155
if steps:
154156
workflow_logs = {
155157
"workflow_logs": None,
@@ -172,20 +174,27 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
172174
workflow_logs = {
173175
"workflow_logs": logs or workflow.logs,
174176
"job_logs": build_workflow_logs(workflow, paginate=paginate),
177+
"service_logs": {},
175178
"engine_specific": workflow.engine_specific,
176179
}
177-
return (
178-
jsonify(
179-
{
180-
"workflow_id": workflow.id_,
181-
"workflow_name": get_workflow_name(workflow),
182-
"logs": json.dumps(workflow_logs),
183-
"user": user_uuid,
184-
"live_logs_enabled": REANA_OPENSEARCH_ENABLED,
185-
}
186-
),
187-
200,
188-
)
180+
181+
# Get all services logs
182+
workflow_logs["service_logs"] = {
183+
s.name: [log.log for log in s.logs] for s in workflow.services
184+
}
185+
186+
return (
187+
jsonify(
188+
{
189+
"workflow_id": workflow.id_,
190+
"workflow_name": get_workflow_name(workflow),
191+
"logs": json.dumps(workflow_logs),
192+
"user": user_uuid,
193+
"live_logs_enabled": REANA_OPENSEARCH_ENABLED,
194+
}
195+
),
196+
200,
197+
)
189198

190199
except ValueError:
191200
return (

0 commit comments

Comments
 (0)