Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job_monitor): log job pod errors, disruptions to warning (#468) #468

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The list of contributors in alphabetical order:
- [Elena Gazzarrini](https://orcid.org/0000-0001-5772-5166)
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Kenyi Hurtado-Anampa](https://orcid.org/0000-0002-9779-3566)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
- [Marco Vidal](https://orcid.org/0000-0002-9363-4971)
Expand Down
8 changes: 7 additions & 1 deletion reana_job_controller/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from reana_job_controller import config
from reana_job_controller.spec import build_openapi_spec
from reana_job_controller.utils import MultilineFormatter


@event.listens_for(db_engine, "checkin")
Expand Down Expand Up @@ -55,7 +56,12 @@ def shutdown_session(response_or_exc):

def create_app(config_mapping=None):
"""Create REANA-Job-Controller application."""
logging.basicConfig(level=REANA_LOG_LEVEL, format=REANA_LOG_FORMAT)
handler = logging.StreamHandler()
handler.setFormatter(MultilineFormatter(REANA_LOG_FORMAT))
logging.basicConfig(
level=REANA_LOG_LEVEL, format=REANA_LOG_FORMAT, handlers=[handler]
)

app = Flask(__name__)
app.secret_key = "mega secret key"
app.session = Session
Expand Down
30 changes: 26 additions & 4 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
f"Kubernetes job {backend_job_id}, assuming successful."
)
elif reason != "Completed":
logging.info(
logging.warn(
f"Kubernetes job id: {backend_job_id} failed, phase 'Succeeded' but "
f"container '{container.name}' was terminated because of '{reason}'."
)
Expand Down Expand Up @@ -199,19 +199,19 @@
continue

if "ErrImagePull" in reason:
logging.info(
logging.warn(
f"Container {container.name} in Kubernetes job {backend_job_id} "
"failed to fetch image."
)
status = JobStatus.failed.name
elif "InvalidImageName" in reason:
logging.info(
logging.warn(
f"Container {container.name} in Kubernetes job {backend_job_id} "
"failed due to invalid image name."
)
status = JobStatus.failed.name
elif "CreateContainerConfigError" in reason:
logging.info(
logging.warn(

Check warning on line 214 in reana_job_controller/job_monitor.py

View check run for this annotation

Codecov / codecov/patch

reana_job_controller/job_monitor.py#L214

Added line #L214 was not covered by tests
f"Container {container.name} in Kubernetes job {backend_job_id} "
f"failed due to container configuration error: {message}"
)
Expand Down Expand Up @@ -247,6 +247,11 @@
backend_job_id, job_pod=job_pod
)

if job_status == JobStatus.failed.name:
self.log_disruption(

Check warning on line 251 in reana_job_controller/job_monitor.py

View check run for this annotation

Codecov / codecov/patch

reana_job_controller/job_monitor.py#L250-L251

Added lines #L250 - L251 were not covered by tests
event["object"].status.conditions, backend_job_id
)

store_job_logs(reana_job_id, logs)
update_job_status(reana_job_id, job_status)

Expand All @@ -260,6 +265,23 @@
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))

def log_disruption(self, conditions, backend_job_id):
"""Log disruption message from Kubernetes event conditions.

Usually it is pod eviction but can be any of https://kubernetes.io/docs/concepts/workloads/pods/disruptions/#pod-disruption-conditions.

:param conditions: List of Kubernetes event conditions.
:param backend_job_id: Backend job ID.
"""
disruption_target = next(
(item for item in conditions if item.type == "DisruptionTarget"),
None,
)
if disruption_target:
logging.warn(
f"{disruption_target.reason}: Job {backend_job_id} was disrupted: {disruption_target.message}"
)


condorJobStatus = {
"Unexpanded": 0,
Expand Down
20 changes: 16 additions & 4 deletions reana_job_controller/kubernetes_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,21 @@
)
pod_logs += "{}: :\n {}\n".format(container.name, container_log)
if hasattr(container.state.terminated, "reason"):
if container.state.terminated.reason != "Completed":
message = "Job pod {} was terminated, reason: {}, message: {}".format(
job_pod.metadata.name,
container.state.terminated.reason,
container.state.terminated.message,
)
logging.warn(message)
pod_logs += "\n{}\n".format(container.state.terminated.reason)
elif container.state.waiting:
# No need to fetch logs, as the container has not started yet.
pod_logs += "Container {} failed, error: {}".format(
message = "Container {} failed, error: {}".format(
container.name, container.state.waiting.message
)
logging.warn(message)
pod_logs += message

return pod_logs
except client.rest.ApiException as e:
Expand Down Expand Up @@ -334,7 +343,9 @@
if not logs:
logs = ""

message = f"\n{job_pod.status.reason}\nThe job was killed due to exceeding timeout"
message = (

Check warning on line 346 in reana_job_controller/kubernetes_job_manager.py

View check run for this annotation

Codecov / codecov/patch

reana_job_controller/kubernetes_job_manager.py#L346

Added line #L346 was not covered by tests
f"{job_pod.status.reason}: The job was killed due to exceeding timeout"
)

try:
specified_timeout = job_pod.spec.active_deadline_seconds
Expand All @@ -345,8 +356,9 @@
f"Kubernetes job id: {backend_job_id}. Could not get job timeout from Job spec."
)

logs += message
logging.info(
logs += "\n{message}\n"
logging.warn(message)
logging.warn(

Check warning on line 361 in reana_job_controller/kubernetes_job_manager.py

View check run for this annotation

Codecov / codecov/patch

reana_job_controller/kubernetes_job_manager.py#L359-L361

Added lines #L359 - L361 were not covered by tests
f"Kubernetes job id: {backend_job_id} was killed due to timeout."
)

Expand Down
28 changes: 27 additions & 1 deletion reana_job_controller/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017, 2018, 2019, 2020, 2022, 2023 CERN.
# Copyright (C) 2017, 2018, 2019, 2020, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -13,11 +13,37 @@
import socket
import subprocess
import sys
from logging import Formatter, LogRecord

from reana_db.database import Session
from reana_db.models import Workflow


class MultilineFormatter(Formatter):
"""Logging formatter for multiline logs."""

def format(self, record: LogRecord):
"""Format multiline log message.
:param record: LogRecord object.
:type record: logging.LogRecord
:return: Formatted log message.
:rtype: str
"""
save_msg = str(record.msg)
output = ""
lines = save_msg.splitlines()
for line in lines:
record.msg = line
output += super().format(record) + "\n"
output = output.strip()
record.msg = save_msg
record.message = output

return output


def singleton(cls):
"""Singelton decorator."""
instances = {}
Expand Down
59 changes: 59 additions & 0 deletions tests/test_job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import mock
import pytest
from kubernetes.client.models import V1PodCondition

from reana_job_controller.job_monitor import (
JobMonitorHTCondorCERN,
Expand Down Expand Up @@ -107,3 +108,61 @@ def test_kubernetes_should_process_job(
)

assert bool(job_monitor_k8s.should_process_job(job_pod_event)) == should_process


@pytest.mark.parametrize(
"conditions,is_call_expected,expected_message",
[
(
[
V1PodCondition(
type="PodScheduled",
status="True",
),
V1PodCondition(
type="DisruptionTarget",
status="True",
reason="EvictionByEvictionAPI",
message="Eviction API: evicting",
),
V1PodCondition(
type="Initialized",
status="True",
),
],
True,
"EvictionByEvictionAPI: Job backend_job_id was disrupted: Eviction API: evicting",
),
(
[
V1PodCondition(
type="PodScheduled",
status="True",
),
V1PodCondition(
type="Initialized",
status="True",
),
],
False,
"",
),
(
[],
False,
"",
),
],
)
def test_log_disruption_evicted(conditions, is_call_expected, expected_message):
"""Test logging of disruption target condition."""
with (
mock.patch("reana_job_controller.job_monitor.threading"),
mock.patch("reana_job_controller.job_monitor.logging.warn") as log_mock,
):
job_monitor_k8s = JobMonitorKubernetes(app=None)
job_monitor_k8s.log_disruption(conditions, "backend_job_id")
if is_call_expected:
log_mock.assert_called_with(expected_message)
else:
log_mock.assert_not_called()
62 changes: 62 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

import logging
import pytest

from reana_job_controller.utils import MultilineFormatter

"""REANA-Job-Controller utils tests."""


@pytest.mark.parametrize(
"message,expected_output",
[
(
"test",
"name | INFO | test",
),
(
"test\n",
"name | INFO | test",
),
(
"test\ntest",
"name | INFO | test\nname | INFO | test",
),
(
"test\ntest\n\n\n",
"name | INFO | test\nname | INFO | test\nname | INFO | \nname | INFO |",
),
(
" test\ntest ",
"name | INFO | test\nname | INFO | test",
),
(
" t e s\tt\n t e s t ",
"name | INFO | t e s\tt\nname | INFO | t e s t",
),
],
)
def test_multiline_formatter_format(message, expected_output):
"""Test MultilineFormatter formatting."""
formatter = MultilineFormatter("%(name)s | " "%(levelname)s | %(message)s")
assert (
formatter.format(
logging.LogRecord(
"name",
logging.INFO,
"pathname",
1,
message,
None,
None,
),
)
== expected_output
)
Loading