Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opensearch): fetch logs from opensearch (#602) #602

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The list of contributors in alphabetical order:
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
- [Lukas Heinrich](https://orcid.org/0000-0002-4048-7584)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
Expand Down
10 changes: 10 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@
#
# needs_sphinx = '1.0'

# https://about.readthedocs.com/blog/2024/07/addons-by-default/
# Define the canonical URL if you are using a custom domain on Read the Docs
html_baseurl = os.environ.get("READTHEDOCS_CANONICAL_URL", "")

# Tell Jinja2 templates the build is running on Read the Docs
if os.environ.get("READTHEDOCS", "") == "True":
if "html_context" not in globals():
html_context = {}
html_context["READTHEDOCS"] = True

# Do not warn on external images.
suppress_warnings = ["image.nonlocal_uri"]

Expand Down
4 changes: 4 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@
"description": "Request succeeded. Info about workflow, including the status is returned.",
"examples": {
"application/json": {
"live_logs_enabled": false,
"logs": "{'workflow_logs': string, 'job_logs': { '256b25f4-4cfb-4684-b7a8-73872ef455a2': string, '256b25f4-4cfb-4684-b7a8-73872ef455a3': string, }, 'engine_specific': object, }",
"user": "00000000-0000-0000-0000-000000000000",
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
Expand All @@ -695,6 +696,9 @@
},
"schema": {
"properties": {
"live_logs_enabled": {
"type": "boolean"
},
"logs": {
"type": "string"
},
Expand Down
30 changes: 30 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,36 @@ def _env_vars_dict_to_k8s_list(env_vars):
)
"""Common to all workflow engines environment variables for debug mode."""

REANA_OPENSEARCH_ENABLED = (
os.getenv("REANA_OPENSEARCH_ENABLED", "false").lower() == "true"
)
"""OpenSearch enabled flag."""

REANA_OPENSEARCH_HOST = os.getenv(
"REANA_OPENSEARCH_HOST", "reana-opensearch-master.default.svc.cluster.local"
)
"""OpenSearch host."""

REANA_OPENSEARCH_PORT = os.getenv("REANA_OPENSEARCH_PORT", "9200")
"""OpenSearch port."""

REANA_OPENSEARCH_URL_PREFIX = os.getenv("REANA_OPENSEARCH_URL_PREFIX", "")
"""OpenSearch URL prefix."""

REANA_OPENSEARCH_USER = os.getenv("REANA_OPENSEARCH_USER", "admin")
"""OpenSearch user."""

REANA_OPENSEARCH_PASSWORD = os.getenv("REANA_OPENSEARCH_PASSWORD", "admin")
"""OpenSearch password."""

REANA_OPENSEARCH_USE_SSL = (
os.getenv("REANA_OPENSEARCH_USE_SSL", "false").lower() == "true"
)
"""OpenSearch SSL flag."""

REANA_OPENSEARCH_CA_CERTS = os.getenv("REANA_OPENSEARCH_CA_CERTS")
"""OpenSearch CA certificates."""


def _parse_interactive_sessions_environments(env_var):
config = {}
Expand Down
176 changes: 176 additions & 0 deletions reana_workflow_controller/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""OpenSearch client and log fetcher."""

import logging
from opensearchpy import OpenSearch

from reana_workflow_controller.config import (
REANA_OPENSEARCH_CA_CERTS,
REANA_OPENSEARCH_HOST,
REANA_OPENSEARCH_PASSWORD,
REANA_OPENSEARCH_PORT,
REANA_OPENSEARCH_URL_PREFIX,
REANA_OPENSEARCH_USE_SSL,
REANA_OPENSEARCH_USER,
REANA_OPENSEARCH_ENABLED,
)


def build_opensearch_client(
host: str = REANA_OPENSEARCH_HOST,
port: str = REANA_OPENSEARCH_PORT,
url_prefix: str = REANA_OPENSEARCH_URL_PREFIX,
http_auth: tuple | None = (REANA_OPENSEARCH_USER, REANA_OPENSEARCH_PASSWORD),
use_ssl: bool = REANA_OPENSEARCH_USE_SSL,
ca_certs: str | None = REANA_OPENSEARCH_CA_CERTS,
) -> OpenSearch:
"""
Build an OpenSearch client object.
:param host: OpenSearch host.
:param port: OpenSearch port.
:param url_prefix: URL prefix.
:param http_auth: HTTP authentication credentials.
:param use_ssl: Use SSL/TLS for connection.
:param ca_certs: Path to CA certificates.
:return: OpenSearch client object.
"""
opensearch_client = OpenSearch(
hosts=f"{host}:{port}",
http_compress=True, # enables gzip compression for request bodies
http_auth=http_auth,
use_ssl=use_ssl,
ca_certs=ca_certs,
url_prefix=url_prefix,
verify_certs=True,
)
return opensearch_client


class OpenSearchLogFetcher(object):
"""Retrieves job and workflow logs from OpenSearch API."""

def __init__(
self,
os_client: OpenSearch | None = None,
job_index: str = "fluentbit-job_log",
workflow_index: str = "fluentbit-workflow_log",
max_rows: int = 5000,
log_key: str = "log",
order: str = "asc",
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
timeout: int = 5,
) -> None:
"""
Initialize the OpenSearchLogFetcher object.
:param os_client: OpenSearch client object.
:param job_index: Index name for job logs.
:param workflow_index: Index name for workflow logs.
:param max_rows: Maximum number of rows to fetch.
:param log_key: Key for log message in the response.
:param order: Order of logs (asc/desc).
:param job_log_matcher: Job log matcher.
:param workflow_log_matcher: Workflow log matcher.
:param timeout: Timeout for OpenSearch queries.
:return: None
"""
if os_client is None:
os_client = build_opensearch_client()

self.os_client = os_client
self.job_index = job_index
self.workflow_index = workflow_index
self.max_rows = max_rows
self.log_key = log_key
self.order = order
self.job_log_matcher = job_log_matcher
self.workflow_log_matcher = workflow_log_matcher
self.timeout = timeout

def fetch_logs(self, id: str, index: str, match: str) -> str | None:
"""
Fetch logs of a specific job or workflow.
:param id: Job or workflow ID.
:param index: Index name for logs.
:param match: Matcher for logs.
:return: Job or workflow logs.
"""
query = {
"query": {"match": {match: id}},
"sort": [{"@timestamp": {"order": self.order}}],
}

try:
response = self.os_client.search(
index=index, body=query, size=self.max_rows, timeout=self.timeout
)
except Exception as e:
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
return None

return self._concat_rows(response["hits"]["hits"])

def fetch_job_logs(self, backend_job_id: str) -> str:
"""
Fetch logs of a specific job.
:param backend_job_id: Job ID.
:return: Job logs.
"""
return self.fetch_logs(
backend_job_id,
self.job_index,
self.job_log_matcher,
)

def fetch_workflow_logs(self, workflow_id: str) -> str | None:
"""
Fetch logs of a specific workflow.
:param workflow_id: Workflow ID.
:return: Workflow logs.
"""
return self.fetch_logs(
workflow_id,
self.workflow_index,
self.workflow_log_matcher,
)

def _concat_rows(self, rows: list) -> str | None:
"""
Concatenate log messages from rows.
:param rows: List of rows.
:return: Concatenated log messages.
"""
logs = ""

for hit in rows:
logs += hit["_source"][self.log_key] + "\n"

return logs


def build_opensearch_log_fetcher() -> OpenSearchLogFetcher | None:
"""
Build OpenSearchLogFetcher object.
:return: OpenSearchLogFetcher object.
"""
return OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None
15 changes: 13 additions & 2 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -166,6 +166,8 @@ def is_uuid_v4(uuid_or_name: str) -> bool:

def build_workflow_logs(workflow, steps=None, paginate=None):
"""Return the logs for all jobs of a workflow."""
from reana_workflow_controller.opensearch import build_opensearch_log_fetcher

query = Session.query(Job).filter_by(workflow_uuid=workflow.id_)
if steps:
query = query.filter(Job.job_name.in_(steps))
Expand All @@ -179,6 +181,15 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
finished_at = (
job.finished_at.strftime(WORKFLOW_TIME_FORMAT) if job.finished_at else None
)

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
if open_search_log_fetcher
else None
)

item = {
"workflow_uuid": str(job.workflow_uuid) or "",
"job_name": job.job_name or "",
Expand All @@ -187,7 +198,7 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
"docker_img": job.docker_img or "",
"cmd": job.prettified_cmd or "",
"status": job.status.name or "",
"logs": job.logs or "",
"logs": logs or job.logs or "",
"started_at": started_at,
"finished_at": finished_at,
}
Expand Down
23 changes: 20 additions & 3 deletions reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022 CERN.
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -16,6 +16,7 @@
from reana_commons.errors import REANASecretDoesNotExist
from reana_db.utils import _get_workflow_with_uuid_or_name

from reana_workflow_controller.config import REANA_OPENSEARCH_ENABLED
from reana_workflow_controller.errors import (
REANAExternalCallError,
REANAWorkflowControllerError,
Expand Down Expand Up @@ -100,6 +101,8 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
type: string
user:
type: string
live_logs_enabled:
type: boolean
examples:
application/json:
{
Expand All @@ -112,7 +115,8 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
},
'engine_specific': object,
}",
"user": "00000000-0000-0000-0000-000000000000"
"user": "00000000-0000-0000-0000-000000000000",
"live_logs_enabled": false
}
400:
description: >-
Expand Down Expand Up @@ -150,8 +154,20 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
"engine_specific": None,
}
else:
from reana_workflow_controller.opensearch import (
build_opensearch_log_fetcher,
)

open_search_log_fetcher = build_opensearch_log_fetcher()

logs = (
open_search_log_fetcher.fetch_workflow_logs(workflow.id_)
if open_search_log_fetcher
else None
)

workflow_logs = {
"workflow_logs": workflow.logs,
"workflow_logs": logs or workflow.logs,
"job_logs": build_workflow_logs(workflow, paginate=paginate),
"engine_specific": workflow.engine_specific,
}
Expand All @@ -162,6 +178,7 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
"workflow_name": get_workflow_name(workflow),
"logs": json.dumps(workflow_logs),
"user": user_uuid,
"live_logs_enabled": REANA_OPENSEARCH_ENABLED,
}
),
200,
Expand Down
Loading
Loading