diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index d8e90a97..e78a9304 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -407,3 +407,9 @@ def _parse_interactive_sessions_environments(env_var): MAX_WORKFLOW_SHARING_MESSAGE_LENGTH = 5000 """Maximum length of the user-provided message when sharing a workflow.""" + + +REANA_JOB_MONITORING_PROMETHEUS_URL = os.getenv( + "REANA_JOB_MONITORING_PROMETHEUS_URL", + "http://reana-prometheus-server/api/v1/query_range", +) \ No newline at end of file diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index c871934e..8322318e 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -12,6 +12,7 @@ import json import logging +import time import uuid from datetime import datetime @@ -40,6 +41,7 @@ ALIVE_STATUSES, PROGRESS_STATUSES, REANA_GITLAB_URL, + REANA_JOB_MONITORING_PROMETHEUS_URL, REANA_URL, REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT, ) @@ -142,9 +144,10 @@ def on_message(self, body, message): f"Unexpected error while processing workflow: {e}", exc_info=True ) - def _update_workflow_status(workflow, status, logs): """Update workflow status in DB.""" + + if workflow.status != status: Workflow.update_workflow_status(Session, workflow.id_, status, logs, None) if workflow.git_ref: @@ -153,6 +156,8 @@ def _update_workflow_status(workflow, status, logs): if status not in ALIVE_STATUSES: workflow.run_finished_at = datetime.now() workflow.logs = workflow.logs or "" + + _get_job_monitoring_data(workflow) try: workflow_engine_logs = _get_workflow_engine_pod_logs(workflow) @@ -296,18 +301,72 @@ def _delete_workflow_job(workflow: Workflow) -> None: ) -def _get_workflow_engine_pod_logs(workflow: Workflow) -> str: +def _get_workflow_pod(workflow: Workflow) -> str | None: pods = current_k8s_corev1_api_client.list_namespaced_pod( namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, label_selector=f"reana-run-batch-workflow-uuid={str(workflow.id_)}", ) for pod in pods.items: if str(workflow.id_) in pod.metadata.name: - return current_k8s_corev1_api_client.read_namespaced_pod_log( - namespace=pod.metadata.namespace, - name=pod.metadata.name, - container="workflow-engine", - ) + return pod + + return None + + +def _get_workflow_engine_pod_logs(workflow: Workflow) -> str: + pod = _get_workflow_pod(workflow) + + if pod is not None: + + return current_k8s_corev1_api_client.read_namespaced_pod_log( + namespace=pod.metadata.namespace, + name=pod.metadata.name, + container="workflow-engine", + ) + # There might not be any pod returned by `list_namespaced_pod`, for example # when a workflow fails to be scheduled return "" + + +def _get_job_monitoring_data(workflow) -> dict: + """Collect job monitoring data.""" + + logging.info( + "collecting moniting data for workflow %s", + ) + + pod = _get_workflow_pod(workflow) + + monitoring_dict = {} + + for metric in [ + # TODO: to define which metrics we want to collect really + "container_memory_usage_bytes", + "container_cpu_usage_seconds_total", + ]: + query = f'container_cpu_system_seconds_total{{pod="{pod.metadata.name}",image!=""}}' + + logging.info( + "querying prometheus for workflow pod %s", + query, + ) + + r = requests.get(REANA_JOB_MONITORING_PROMETHEUS_URL, + params=dict( + query=query, + step=1, + # TODO: add start and end time of the workflow + start=time.time() - 600, + end=time.time())) + + r.raise_for_status() + + monitoring_dict['metric'] = r.json() + + logging.info( + "monitoring data for workflow %s: %s", + monitoring_dict, + ) + + return monitoring_dict \ No newline at end of file diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index e4f547c4..bb68a924 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -454,6 +454,8 @@ def start_interactive_session(self, interactive_session_type, image=None, **kwar f"Interactive type {interactive_session_type} does not exist." ) + print("test") + validated_image = _validate_interactive_session_image( interactive_session_type, image )