From d40538a48c495ba6a4747e2759556e0476a6a3dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelizaveta=20Leme=C5=A1eva?= Date: Wed, 4 Sep 2024 16:36:34 +0200 Subject: [PATCH 1/3] feat(utils): add multiline log formatter (#468) --- AUTHORS.md | 1 + reana_job_controller/factory.py | 8 ++++- reana_job_controller/utils.py | 28 ++++++++++++++- tests/test_utils.py | 62 +++++++++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 tests/test_utils.py diff --git a/AUTHORS.md b/AUTHORS.md index a531210e..c91699a1 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -13,6 +13,7 @@ The list of contributors in alphabetical order: - [Elena Gazzarrini](https://orcid.org/0000-0001-5772-5166) - [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553) - [Jan Okraska](https://orcid.org/0000-0002-1416-3244) +- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270) - [Kenyi Hurtado-Anampa](https://orcid.org/0000-0002-9779-3566) - [Marco Donadoni](https://orcid.org/0000-0003-2922-5505) - [Marco Vidal](https://orcid.org/0000-0002-9363-4971) diff --git a/reana_job_controller/factory.py b/reana_job_controller/factory.py index fd87f720..49077298 100644 --- a/reana_job_controller/factory.py +++ b/reana_job_controller/factory.py @@ -18,6 +18,7 @@ from reana_job_controller import config from reana_job_controller.spec import build_openapi_spec +from reana_job_controller.utils import MultilineFormatter @event.listens_for(db_engine, "checkin") @@ -55,7 +56,12 @@ def shutdown_session(response_or_exc): def create_app(config_mapping=None): """Create REANA-Job-Controller application.""" - logging.basicConfig(level=REANA_LOG_LEVEL, format=REANA_LOG_FORMAT) + handler = logging.StreamHandler() + handler.setFormatter(MultilineFormatter(REANA_LOG_FORMAT)) + logging.basicConfig( + level=REANA_LOG_LEVEL, format=REANA_LOG_FORMAT, handlers=[handler] + ) + app = Flask(__name__) app.secret_key = "mega secret key" app.session = Session diff --git a/reana_job_controller/utils.py b/reana_job_controller/utils.py index 2f6389f7..db5605a3 100644 --- a/reana_job_controller/utils.py +++ b/reana_job_controller/utils.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2017, 2018, 2019, 2020, 2022, 2023 CERN. +# Copyright (C) 2017, 2018, 2019, 2020, 2022, 2023, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -13,11 +13,37 @@ import socket import subprocess import sys +from logging import Formatter, LogRecord from reana_db.database import Session from reana_db.models import Workflow +class MultilineFormatter(Formatter): + """Logging formatter for multiline logs.""" + + def format(self, record: LogRecord): + """Format multiline log message. + + :param record: LogRecord object. + :type record: logging.LogRecord + + :return: Formatted log message. + :rtype: str + """ + save_msg = str(record.msg) + output = "" + lines = save_msg.splitlines() + for line in lines: + record.msg = line + output += super().format(record) + "\n" + output = output.strip() + record.msg = save_msg + record.message = output + + return output + + def singleton(cls): """Singelton decorator.""" instances = {} diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..69841e64 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# +# This file is part of REANA. +# Copyright (C) 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +import logging +import pytest + +from reana_job_controller.utils import MultilineFormatter + +"""REANA-Job-Controller utils tests.""" + + +@pytest.mark.parametrize( + "message,expected_output", + [ + ( + "test", + "name | INFO | test", + ), + ( + "test\n", + "name | INFO | test", + ), + ( + "test\ntest", + "name | INFO | test\nname | INFO | test", + ), + ( + "test\ntest\n\n\n", + "name | INFO | test\nname | INFO | test\nname | INFO | \nname | INFO |", + ), + ( + " test\ntest ", + "name | INFO | test\nname | INFO | test", + ), + ( + " t e s\tt\n t e s t ", + "name | INFO | t e s\tt\nname | INFO | t e s t", + ), + ], +) +def test_multiline_formatter_format(message, expected_output): + """Test MultilineFormatter formatting.""" + formatter = MultilineFormatter("%(name)s | " "%(levelname)s | %(message)s") + assert ( + formatter.format( + logging.LogRecord( + "name", + logging.INFO, + "pathname", + 1, + message, + None, + None, + ), + ) + == expected_output + ) From 88b0acbb8ae6a40889314f2dc68394561db4dccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelizaveta=20Leme=C5=A1eva?= Date: Wed, 4 Sep 2024 16:37:42 +0200 Subject: [PATCH 2/3] feat(job_manager): log pod errors as warnings (#468) --- .../kubernetes_job_manager.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/reana_job_controller/kubernetes_job_manager.py b/reana_job_controller/kubernetes_job_manager.py index 8f1783bd..09309e29 100644 --- a/reana_job_controller/kubernetes_job_manager.py +++ b/reana_job_controller/kubernetes_job_manager.py @@ -287,12 +287,21 @@ def _get_containers_logs(cls, job_pod) -> Optional[str]: ) pod_logs += "{}: :\n {}\n".format(container.name, container_log) if hasattr(container.state.terminated, "reason"): + if container.state.terminated.reason != "Completed": + message = "Job pod {} was terminated, reason: {}, message: {}".format( + job_pod.metadata.name, + container.state.terminated.reason, + container.state.terminated.message, + ) + logging.warn(message) pod_logs += "\n{}\n".format(container.state.terminated.reason) elif container.state.waiting: # No need to fetch logs, as the container has not started yet. - pod_logs += "Container {} failed, error: {}".format( + message = "Container {} failed, error: {}".format( container.name, container.state.waiting.message ) + logging.warn(message) + pod_logs += message return pod_logs except client.rest.ApiException as e: @@ -334,7 +343,9 @@ def get_logs(cls, backend_job_id, **kwargs): if not logs: logs = "" - message = f"\n{job_pod.status.reason}\nThe job was killed due to exceeding timeout" + message = ( + f"{job_pod.status.reason}: The job was killed due to exceeding timeout" + ) try: specified_timeout = job_pod.spec.active_deadline_seconds @@ -345,8 +356,9 @@ def get_logs(cls, backend_job_id, **kwargs): f"Kubernetes job id: {backend_job_id}. Could not get job timeout from Job spec." ) - logs += message - logging.info( + logs += "\n{message}\n" + logging.warn(message) + logging.warn( f"Kubernetes job id: {backend_job_id} was killed due to timeout." ) From db9c258f8b0c55c899ad1b457fc260ff8a1aa820 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelizaveta=20Leme=C5=A1eva?= Date: Wed, 4 Sep 2024 16:38:19 +0200 Subject: [PATCH 3/3] feat(job_monitor): log pod errors and disruptions as warnings (#468) --- reana_job_controller/job_monitor.py | 30 +++++++++++++-- tests/test_job_monitor.py | 59 +++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index 4308a806..925a58e1 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -171,7 +171,7 @@ def get_job_status(self, job_pod) -> Optional[str]: f"Kubernetes job {backend_job_id}, assuming successful." ) elif reason != "Completed": - logging.info( + logging.warn( f"Kubernetes job id: {backend_job_id} failed, phase 'Succeeded' but " f"container '{container.name}' was terminated because of '{reason}'." ) @@ -199,19 +199,19 @@ def get_job_status(self, job_pod) -> Optional[str]: continue if "ErrImagePull" in reason: - logging.info( + logging.warn( f"Container {container.name} in Kubernetes job {backend_job_id} " "failed to fetch image." ) status = JobStatus.failed.name elif "InvalidImageName" in reason: - logging.info( + logging.warn( f"Container {container.name} in Kubernetes job {backend_job_id} " "failed due to invalid image name." ) status = JobStatus.failed.name elif "CreateContainerConfigError" in reason: - logging.info( + logging.warn( f"Container {container.name} in Kubernetes job {backend_job_id} " f"failed due to container configuration error: {message}" ) @@ -247,6 +247,11 @@ def watch_jobs(self, job_db, app=None): backend_job_id, job_pod=job_pod ) + if job_status == JobStatus.failed.name: + self.log_disruption( + event["object"].status.conditions, backend_job_id + ) + store_job_logs(reana_job_id, logs) update_job_status(reana_job_id, job_status) @@ -260,6 +265,23 @@ def watch_jobs(self, job_db, app=None): logging.error(traceback.format_exc()) logging.error("Unexpected error: {}".format(e)) + def log_disruption(self, conditions, backend_job_id): + """Log disruption message from Kubernetes event conditions. + + Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions. + + :param conditions: List of Kubernetes event conditions. + :param backend_job_id: Backend job ID. + """ + disruption_target = next( + (item for item in conditions if item.type == "DisruptionTarget"), + None, + ) + if disruption_target: + logging.warn( + f"{disruption_target.reason}: Job {backend_job_id} was disrupted: {disruption_target.message}" + ) + condorJobStatus = { "Unexpanded": 0, diff --git a/tests/test_job_monitor.py b/tests/test_job_monitor.py index 34e84e90..4eb8323e 100644 --- a/tests/test_job_monitor.py +++ b/tests/test_job_monitor.py @@ -12,6 +12,7 @@ import mock import pytest +from kubernetes.client.models import V1PodCondition from reana_job_controller.job_monitor import ( JobMonitorHTCondorCERN, @@ -107,3 +108,61 @@ def test_kubernetes_should_process_job( ) assert bool(job_monitor_k8s.should_process_job(job_pod_event)) == should_process + + +@pytest.mark.parametrize( + "conditions,is_call_expected,expected_message", + [ + ( + [ + V1PodCondition( + type="PodScheduled", + status="True", + ), + V1PodCondition( + type="DisruptionTarget", + status="True", + reason="EvictionByEvictionAPI", + message="Eviction API: evicting", + ), + V1PodCondition( + type="Initialized", + status="True", + ), + ], + True, + "EvictionByEvictionAPI: Job backend_job_id was disrupted: Eviction API: evicting", + ), + ( + [ + V1PodCondition( + type="PodScheduled", + status="True", + ), + V1PodCondition( + type="Initialized", + status="True", + ), + ], + False, + "", + ), + ( + [], + False, + "", + ), + ], +) +def test_log_disruption_evicted(conditions, is_call_expected, expected_message): + """Test logging of disruption target condition.""" + with ( + mock.patch("reana_job_controller.job_monitor.threading"), + mock.patch("reana_job_controller.job_monitor.logging.warn") as log_mock, + ): + job_monitor_k8s = JobMonitorKubernetes(app=None) + job_monitor_k8s.log_disruption(conditions, "backend_job_id") + if is_call_expected: + log_mock.assert_called_with(expected_message) + else: + log_mock.assert_not_called()