From 1a4b97a445c0de578ccce836c87425335b0454f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelizaveta=20Leme=C5=A1eva?= Date: Wed, 4 Sep 2024 16:38:19 +0200 Subject: [PATCH] feat(job_monitor): log pod errors, disruptions to warning (#468) --- reana_job_controller/job_monitor.py | 30 +++++++++++++-- tests/test_job_monitor.py | 59 +++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index 4308a806..925a58e1 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -171,7 +171,7 @@ def get_job_status(self, job_pod) -> Optional[str]: f"Kubernetes job {backend_job_id}, assuming successful." ) elif reason != "Completed": - logging.info( + logging.warn( f"Kubernetes job id: {backend_job_id} failed, phase 'Succeeded' but " f"container '{container.name}' was terminated because of '{reason}'." ) @@ -199,19 +199,19 @@ def get_job_status(self, job_pod) -> Optional[str]: continue if "ErrImagePull" in reason: - logging.info( + logging.warn( f"Container {container.name} in Kubernetes job {backend_job_id} " "failed to fetch image." ) status = JobStatus.failed.name elif "InvalidImageName" in reason: - logging.info( + logging.warn( f"Container {container.name} in Kubernetes job {backend_job_id} " "failed due to invalid image name." ) status = JobStatus.failed.name elif "CreateContainerConfigError" in reason: - logging.info( + logging.warn( f"Container {container.name} in Kubernetes job {backend_job_id} " f"failed due to container configuration error: {message}" ) @@ -247,6 +247,11 @@ def watch_jobs(self, job_db, app=None): backend_job_id, job_pod=job_pod ) + if job_status == JobStatus.failed.name: + self.log_disruption( + event["object"].status.conditions, backend_job_id + ) + store_job_logs(reana_job_id, logs) update_job_status(reana_job_id, job_status) @@ -260,6 +265,23 @@ def watch_jobs(self, job_db, app=None): logging.error(traceback.format_exc()) logging.error("Unexpected error: {}".format(e)) + def log_disruption(self, conditions, backend_job_id): + """Log disruption message from Kubernetes event conditions. + + Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions. + + :param conditions: List of Kubernetes event conditions. + :param backend_job_id: Backend job ID. + """ + disruption_target = next( + (item for item in conditions if item.type == "DisruptionTarget"), + None, + ) + if disruption_target: + logging.warn( + f"{disruption_target.reason}: Job {backend_job_id} was disrupted: {disruption_target.message}" + ) + condorJobStatus = { "Unexpanded": 0, diff --git a/tests/test_job_monitor.py b/tests/test_job_monitor.py index 34e84e90..4eb8323e 100644 --- a/tests/test_job_monitor.py +++ b/tests/test_job_monitor.py @@ -12,6 +12,7 @@ import mock import pytest +from kubernetes.client.models import V1PodCondition from reana_job_controller.job_monitor import ( JobMonitorHTCondorCERN, @@ -107,3 +108,61 @@ def test_kubernetes_should_process_job( ) assert bool(job_monitor_k8s.should_process_job(job_pod_event)) == should_process + + +@pytest.mark.parametrize( + "conditions,is_call_expected,expected_message", + [ + ( + [ + V1PodCondition( + type="PodScheduled", + status="True", + ), + V1PodCondition( + type="DisruptionTarget", + status="True", + reason="EvictionByEvictionAPI", + message="Eviction API: evicting", + ), + V1PodCondition( + type="Initialized", + status="True", + ), + ], + True, + "EvictionByEvictionAPI: Job backend_job_id was disrupted: Eviction API: evicting", + ), + ( + [ + V1PodCondition( + type="PodScheduled", + status="True", + ), + V1PodCondition( + type="Initialized", + status="True", + ), + ], + False, + "", + ), + ( + [], + False, + "", + ), + ], +) +def test_log_disruption_evicted(conditions, is_call_expected, expected_message): + """Test logging of disruption target condition.""" + with ( + mock.patch("reana_job_controller.job_monitor.threading"), + mock.patch("reana_job_controller.job_monitor.logging.warn") as log_mock, + ): + job_monitor_k8s = JobMonitorKubernetes(app=None) + job_monitor_k8s.log_disruption(conditions, "backend_job_id") + if is_call_expected: + log_mock.assert_called_with(expected_message) + else: + log_mock.assert_not_called()