1212
1313import json
1414import logging
15+ import time
1516import uuid
1617from datetime import datetime
1718
4041 ALIVE_STATUSES ,
4142 PROGRESS_STATUSES ,
4243 REANA_GITLAB_URL ,
44+ REANA_JOB_MONITORING_PROMETHEUS_URL ,
4345 REANA_URL ,
4446 REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT ,
4547)
@@ -142,13 +144,9 @@ def on_message(self, body, message):
142144 f"Unexpected error while processing workflow: { e } " , exc_info = True
143145 )
144146
145-
146147def _update_workflow_status (workflow , status , logs ):
147148 """Update workflow status in DB."""
148149
149- logging .warning (
150- "some test debug"
151- )
152150
153151 if workflow .status != status :
154152 Workflow .update_workflow_status (Session , workflow .id_ , status , logs , None )
@@ -158,6 +156,8 @@ def _update_workflow_status(workflow, status, logs):
158156 if status not in ALIVE_STATUSES :
159157 workflow .run_finished_at = datetime .now ()
160158 workflow .logs = workflow .logs or ""
159+
160+ _get_job_monitoring_data (workflow )
161161
162162 try :
163163 workflow_engine_logs = _get_workflow_engine_pod_logs (workflow )
@@ -301,18 +301,72 @@ def _delete_workflow_job(workflow: Workflow) -> None:
301301 )
302302
303303
304- def _get_workflow_engine_pod_logs (workflow : Workflow ) -> str :
304+ def _get_workflow_pod (workflow : Workflow ) -> str | None :
305305 pods = current_k8s_corev1_api_client .list_namespaced_pod (
306306 namespace = REANA_RUNTIME_KUBERNETES_NAMESPACE ,
307307 label_selector = f"reana-run-batch-workflow-uuid={ str (workflow .id_ )} " ,
308308 )
309309 for pod in pods .items :
310310 if str (workflow .id_ ) in pod .metadata .name :
311- return current_k8s_corev1_api_client .read_namespaced_pod_log (
312- namespace = pod .metadata .namespace ,
313- name = pod .metadata .name ,
314- container = "workflow-engine" ,
315- )
311+ return pod
312+
313+ return None
314+
315+
316+ def _get_workflow_engine_pod_logs (workflow : Workflow ) -> str :
317+ pod = _get_workflow_pod (workflow )
318+
319+ if pod is not None :
320+
321+ return current_k8s_corev1_api_client .read_namespaced_pod_log (
322+ namespace = pod .metadata .namespace ,
323+ name = pod .metadata .name ,
324+ container = "workflow-engine" ,
325+ )
326+
316327 # There might not be any pod returned by `list_namespaced_pod`, for example
317328 # when a workflow fails to be scheduled
318329 return ""
330+
331+
332+ def _get_job_monitoring_data (workflow ) -> dict :
333+ """Collect job monitoring data."""
334+
335+ logging .info (
336+ "collecting moniting data for workflow %s" ,
337+ )
338+
339+ pod = _get_workflow_pod (workflow )
340+
341+ monitoring_dict = {}
342+
343+ for metric in [
344+ # TODO: to define which metrics we want to collect really
345+ "container_memory_usage_bytes" ,
346+ "container_cpu_usage_seconds_total" ,
347+ ]:
348+ query = f'container_cpu_system_seconds_total{{pod="{ pod .metadata .name } ",image!=""}}'
349+
350+ logging .info (
351+ "querying prometheus for workflow pod %s" ,
352+ query ,
353+ )
354+
355+ r = requests .get (REANA_JOB_MONITORING_PROMETHEUS_URL ,
356+ params = dict (
357+ query = query ,
358+ step = 1 ,
359+ # TODO: add start and end time of the workflow
360+ start = time .time () - 600 ,
361+ end = time .time ()))
362+
363+ r .raise_for_status ()
364+
365+ monitoring_dict ['metric' ] = r .json ()
366+
367+ logging .info (
368+ "monitoring data for workflow %s: %s" ,
369+ monitoring_dict ,
370+ )
371+
372+ return monitoring_dict
0 commit comments