Skip to content

Commit f56f057

Browse files
jlemeshtiborsimko
authored andcommitted
feat(opensearch): fetch live logs from OpenSearch (reanahub#602)
1 parent e36c6a2 commit f56f057

File tree

10 files changed

+594
-40
lines changed

10 files changed

+594
-40
lines changed

AUTHORS.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The list of contributors in alphabetical order:
1313
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
1414
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
1515
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
16+
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
1617
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
1718
- [Lukas Heinrich](https://orcid.org/0000-0002-4048-7584)
1819
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)

docs/openapi.json

+4
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@
687687
"description": "Request succeeded. Info about workflow, including the status is returned.",
688688
"examples": {
689689
"application/json": {
690+
"live_logs_enabled": false,
690691
"logs": "{'workflow_logs': string, 'job_logs': { '256b25f4-4cfb-4684-b7a8-73872ef455a2': string, '256b25f4-4cfb-4684-b7a8-73872ef455a3': string, }, 'engine_specific': object, }",
691692
"user": "00000000-0000-0000-0000-000000000000",
692693
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
@@ -695,6 +696,9 @@
695696
},
696697
"schema": {
697698
"properties": {
699+
"live_logs_enabled": {
700+
"type": "boolean"
701+
},
698702
"logs": {
699703
"type": "string"
700704
},

reana_workflow_controller/config.py

+30
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,36 @@ def _env_vars_dict_to_k8s_list(env_vars):
154154
)
155155
"""Common to all workflow engines environment variables for debug mode."""
156156

157+
REANA_OPENSEARCH_ENABLED = (
158+
os.getenv("REANA_OPENSEARCH_ENABLED", "false").lower() == "true"
159+
)
160+
"""OpenSearch enabled flag."""
161+
162+
REANA_OPENSEARCH_HOST = os.getenv(
163+
"REANA_OPENSEARCH_HOST", "reana-opensearch-master.default.svc.cluster.local"
164+
)
165+
"""OpenSearch host."""
166+
167+
REANA_OPENSEARCH_PORT = os.getenv("REANA_OPENSEARCH_PORT", "9200")
168+
"""OpenSearch port."""
169+
170+
REANA_OPENSEARCH_URL_PREFIX = os.getenv("REANA_OPENSEARCH_URL_PREFIX", "")
171+
"""OpenSearch URL prefix."""
172+
173+
REANA_OPENSEARCH_USER = os.getenv("REANA_OPENSEARCH_USER", "admin")
174+
"""OpenSearch user."""
175+
176+
REANA_OPENSEARCH_PASSWORD = os.getenv("REANA_OPENSEARCH_PASSWORD", "admin")
177+
"""OpenSearch password."""
178+
179+
REANA_OPENSEARCH_USE_SSL = (
180+
os.getenv("REANA_OPENSEARCH_USE_SSL", "false").lower() == "true"
181+
)
182+
"""OpenSearch SSL flag."""
183+
184+
REANA_OPENSEARCH_CA_CERTS = os.getenv("REANA_OPENSEARCH_CA_CERTS")
185+
"""OpenSearch CA certificates."""
186+
157187

158188
def _parse_interactive_sessions_environments(env_var):
159189
config = {}
+176
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of REANA.
4+
# Copyright (C) 2024 CERN.
5+
#
6+
# REANA is free software; you can redistribute it and/or modify it
7+
# under the terms of the MIT License; see LICENSE file for more details.
8+
9+
"""OpenSearch client and log fetcher."""
10+
11+
import logging
12+
from opensearchpy import OpenSearch
13+
14+
from reana_workflow_controller.config import (
15+
REANA_OPENSEARCH_CA_CERTS,
16+
REANA_OPENSEARCH_HOST,
17+
REANA_OPENSEARCH_PASSWORD,
18+
REANA_OPENSEARCH_PORT,
19+
REANA_OPENSEARCH_URL_PREFIX,
20+
REANA_OPENSEARCH_USE_SSL,
21+
REANA_OPENSEARCH_USER,
22+
REANA_OPENSEARCH_ENABLED,
23+
)
24+
25+
26+
def build_opensearch_client(
27+
host: str = REANA_OPENSEARCH_HOST,
28+
port: str = REANA_OPENSEARCH_PORT,
29+
url_prefix: str = REANA_OPENSEARCH_URL_PREFIX,
30+
http_auth: tuple | None = (REANA_OPENSEARCH_USER, REANA_OPENSEARCH_PASSWORD),
31+
use_ssl: bool = REANA_OPENSEARCH_USE_SSL,
32+
ca_certs: str | None = REANA_OPENSEARCH_CA_CERTS,
33+
) -> OpenSearch:
34+
"""
35+
Build an OpenSearch client object.
36+
37+
:param host: OpenSearch host.
38+
:param port: OpenSearch port.
39+
:param url_prefix: URL prefix.
40+
:param http_auth: HTTP authentication credentials.
41+
:param use_ssl: Use SSL/TLS for connection.
42+
:param ca_certs: Path to CA certificates.
43+
44+
:return: OpenSearch client object.
45+
"""
46+
opensearch_client = OpenSearch(
47+
hosts=f"{host}:{port}",
48+
http_compress=True, # enables gzip compression for request bodies
49+
http_auth=http_auth,
50+
use_ssl=use_ssl,
51+
ca_certs=ca_certs,
52+
url_prefix=url_prefix,
53+
verify_certs=True,
54+
)
55+
return opensearch_client
56+
57+
58+
class OpenSearchLogFetcher(object):
59+
"""Retrieves job and workflow logs from OpenSearch API."""
60+
61+
def __init__(
62+
self,
63+
os_client: OpenSearch | None = None,
64+
job_index: str = "fluentbit-job_log",
65+
workflow_index: str = "fluentbit-workflow_log",
66+
max_rows: int = 5000,
67+
log_key: str = "log",
68+
order: str = "asc",
69+
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
70+
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
71+
timeout: int = 5,
72+
) -> None:
73+
"""
74+
Initialize the OpenSearchLogFetcher object.
75+
76+
:param os_client: OpenSearch client object.
77+
:param job_index: Index name for job logs.
78+
:param workflow_index: Index name for workflow logs.
79+
:param max_rows: Maximum number of rows to fetch.
80+
:param log_key: Key for log message in the response.
81+
:param order: Order of logs (asc/desc).
82+
:param job_log_matcher: Job log matcher.
83+
:param workflow_log_matcher: Workflow log matcher.
84+
:param timeout: Timeout for OpenSearch queries.
85+
86+
:return: None
87+
"""
88+
if os_client is None:
89+
os_client = build_opensearch_client()
90+
91+
self.os_client = os_client
92+
self.job_index = job_index
93+
self.workflow_index = workflow_index
94+
self.max_rows = max_rows
95+
self.log_key = log_key
96+
self.order = order
97+
self.job_log_matcher = job_log_matcher
98+
self.workflow_log_matcher = workflow_log_matcher
99+
self.timeout = timeout
100+
101+
def fetch_logs(self, id: str, index: str, match: str) -> str | None:
102+
"""
103+
Fetch logs of a specific job or workflow.
104+
105+
:param id: Job or workflow ID.
106+
:param index: Index name for logs.
107+
:param match: Matcher for logs.
108+
109+
:return: Job or workflow logs.
110+
"""
111+
query = {
112+
"query": {"match": {match: id}},
113+
"sort": [{"@timestamp": {"order": self.order}}],
114+
}
115+
116+
try:
117+
response = self.os_client.search(
118+
index=index, body=query, size=self.max_rows, timeout=self.timeout
119+
)
120+
except Exception as e:
121+
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
122+
return None
123+
124+
return self._concat_rows(response["hits"]["hits"])
125+
126+
def fetch_job_logs(self, backend_job_id: str) -> str:
127+
"""
128+
Fetch logs of a specific job.
129+
130+
:param backend_job_id: Job ID.
131+
132+
:return: Job logs.
133+
"""
134+
return self.fetch_logs(
135+
backend_job_id,
136+
self.job_index,
137+
self.job_log_matcher,
138+
)
139+
140+
def fetch_workflow_logs(self, workflow_id: str) -> str | None:
141+
"""
142+
Fetch logs of a specific workflow.
143+
144+
:param workflow_id: Workflow ID.
145+
146+
:return: Workflow logs.
147+
"""
148+
return self.fetch_logs(
149+
workflow_id,
150+
self.workflow_index,
151+
self.workflow_log_matcher,
152+
)
153+
154+
def _concat_rows(self, rows: list) -> str | None:
155+
"""
156+
Concatenate log messages from rows.
157+
158+
:param rows: List of rows.
159+
160+
:return: Concatenated log messages.
161+
"""
162+
logs = ""
163+
164+
for hit in rows:
165+
logs += hit["_source"][self.log_key] + "\n"
166+
167+
return logs
168+
169+
170+
def build_opensearch_log_fetcher() -> OpenSearchLogFetcher | None:
171+
"""
172+
Build OpenSearchLogFetcher object.
173+
174+
:return: OpenSearchLogFetcher object.
175+
"""
176+
return OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None

reana_workflow_controller/rest/utils.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022, 2023 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2023, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -166,6 +166,8 @@ def is_uuid_v4(uuid_or_name: str) -> bool:
166166

167167
def build_workflow_logs(workflow, steps=None, paginate=None):
168168
"""Return the logs for all jobs of a workflow."""
169+
from reana_workflow_controller.opensearch import build_opensearch_log_fetcher
170+
169171
query = Session.query(Job).filter_by(workflow_uuid=workflow.id_)
170172
if steps:
171173
query = query.filter(Job.job_name.in_(steps))
@@ -179,6 +181,15 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
179181
finished_at = (
180182
job.finished_at.strftime(WORKFLOW_TIME_FORMAT) if job.finished_at else None
181183
)
184+
185+
open_search_log_fetcher = build_opensearch_log_fetcher()
186+
187+
logs = (
188+
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
189+
if open_search_log_fetcher
190+
else None
191+
)
192+
182193
item = {
183194
"workflow_uuid": str(job.workflow_uuid) or "",
184195
"job_name": job.job_name or "",
@@ -187,7 +198,7 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
187198
"docker_img": job.docker_img or "",
188199
"cmd": job.prettified_cmd or "",
189200
"status": job.status.name or "",
190-
"logs": job.logs or "",
201+
"logs": logs or job.logs or "",
191202
"started_at": started_at,
192203
"finished_at": finished_at,
193204
}

reana_workflow_controller/rest/workflows_status.py

+20-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -16,6 +16,7 @@
1616
from reana_commons.errors import REANASecretDoesNotExist
1717
from reana_db.utils import _get_workflow_with_uuid_or_name
1818

19+
from reana_workflow_controller.config import REANA_OPENSEARCH_ENABLED
1920
from reana_workflow_controller.errors import (
2021
REANAExternalCallError,
2122
REANAWorkflowControllerError,
@@ -100,6 +101,8 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
100101
type: string
101102
user:
102103
type: string
104+
live_logs_enabled:
105+
type: boolean
103106
examples:
104107
application/json:
105108
{
@@ -112,7 +115,8 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
112115
},
113116
'engine_specific': object,
114117
}",
115-
"user": "00000000-0000-0000-0000-000000000000"
118+
"user": "00000000-0000-0000-0000-000000000000",
119+
"live_logs_enabled": false
116120
}
117121
400:
118122
description: >-
@@ -150,8 +154,20 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
150154
"engine_specific": None,
151155
}
152156
else:
157+
from reana_workflow_controller.opensearch import (
158+
build_opensearch_log_fetcher,
159+
)
160+
161+
open_search_log_fetcher = build_opensearch_log_fetcher()
162+
163+
logs = (
164+
open_search_log_fetcher.fetch_workflow_logs(workflow.id_)
165+
if open_search_log_fetcher
166+
else None
167+
)
168+
153169
workflow_logs = {
154-
"workflow_logs": workflow.logs,
170+
"workflow_logs": logs or workflow.logs,
155171
"job_logs": build_workflow_logs(workflow, paginate=paginate),
156172
"engine_specific": workflow.engine_specific,
157173
}
@@ -162,6 +178,7 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
162178
"workflow_name": get_workflow_name(workflow),
163179
"logs": json.dumps(workflow_logs),
164180
"user": user_uuid,
181+
"live_logs_enabled": REANA_OPENSEARCH_ENABLED,
165182
}
166183
),
167184
200,

0 commit comments

Comments
 (0)