Skip to content

Commit a2167cb

Browse files
committed
feat(opensearch): fetch logs from opensearch (reanahub#602)
1 parent e36c6a2 commit a2167cb

File tree

7 files changed

+411
-13
lines changed

7 files changed

+411
-13
lines changed

reana_workflow_controller/config.py

+30
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,36 @@ def _env_vars_dict_to_k8s_list(env_vars):
154154
)
155155
"""Common to all workflow engines environment variables for debug mode."""
156156

157+
REANA_OPENSEARCH_ENABLED = (
158+
os.getenv("REANA_OPENSEARCH_ENABLED", "false").lower() == "true"
159+
)
160+
"""OpenSearch enabled flag."""
161+
162+
REANA_OPENSEARCH_HOST = os.getenv(
163+
"REANA_OPENSEARCH_HOST", "opensearch-cluster-master.default.svc.cluster.local"
164+
)
165+
"""OpenSearch host."""
166+
167+
REANA_OPENSEARCH_PORT = os.getenv("REANA_OPENSEARCH_PORT", "9200")
168+
"""OpenSearch port."""
169+
170+
REANA_OPENSEARCH_URL_PREFIX = os.getenv("REANA_OPENSEARCH_URL_PREFIX", "")
171+
"""OpenSearch URL prefix."""
172+
173+
REANA_OPENSEARCH_USER = os.getenv("REANA_OPENSEARCH_USER", "admin")
174+
"""OpenSearch user."""
175+
176+
REANA_OPENSEARCH_PASSWORD = os.getenv("REANA_OPENSEARCH_PASSWORD", "admin")
177+
"""OpenSearch password."""
178+
179+
REANA_OPENSEARCH_USE_SSL = (
180+
os.getenv("REANA_OPENSEARCH_USE_SSL", "false").lower() == "true"
181+
)
182+
"""OpenSearch SSL flag."""
183+
184+
REANA_OPENSEARCH_CA_CERTS = os.getenv("REANA_OPENSEARCH_CA_CERTS")
185+
"""OpenSearch CA certificates."""
186+
157187

158188
def _parse_interactive_sessions_environments(env_var):
159189
config = {}
+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of REANA.
4+
# Copyright (C) 2024 CERN.
5+
#
6+
# REANA is free software; you can redistribute it and/or modify it
7+
# under the terms of the MIT License; see LICENSE file for more details.
8+
9+
"""OpenSearch client and log fetcher."""
10+
11+
import logging
12+
from opensearchpy import OpenSearch
13+
14+
from reana_workflow_controller.config import (
15+
REANA_OPENSEARCH_CA_CERTS,
16+
REANA_OPENSEARCH_HOST,
17+
REANA_OPENSEARCH_PASSWORD,
18+
REANA_OPENSEARCH_PORT,
19+
REANA_OPENSEARCH_URL_PREFIX,
20+
REANA_OPENSEARCH_USE_SSL,
21+
REANA_OPENSEARCH_USER,
22+
)
23+
24+
25+
def build_opensearch_client(
26+
host: str = REANA_OPENSEARCH_HOST,
27+
port: str = REANA_OPENSEARCH_PORT,
28+
url_prefix: str = REANA_OPENSEARCH_URL_PREFIX,
29+
http_auth: tuple | None = (REANA_OPENSEARCH_USER, REANA_OPENSEARCH_PASSWORD),
30+
use_ssl: bool = REANA_OPENSEARCH_USE_SSL,
31+
ca_certs: str | None = REANA_OPENSEARCH_CA_CERTS,
32+
) -> OpenSearch:
33+
"""
34+
Build an OpenSearch client object.
35+
36+
:param host: OpenSearch host.
37+
:param port: OpenSearch port.
38+
:param url_prefix: URL prefix.
39+
:param http_auth: HTTP authentication credentials.
40+
:param use_ssl: Use SSL/TLS for connection.
41+
:param ca_certs: Path to CA certificates.
42+
43+
:return: OpenSearch client object.
44+
"""
45+
opensearch_client = OpenSearch(
46+
hosts=f"{host}:{port}",
47+
http_compress=True, # enables gzip compression for request bodies
48+
http_auth=http_auth,
49+
use_ssl=use_ssl,
50+
ca_certs=ca_certs,
51+
url_prefix=url_prefix,
52+
verify_certs=True,
53+
)
54+
return opensearch_client
55+
56+
57+
class OpenSearchLogFetcher(object):
58+
"""Retrieves job and workflow logs from OpenSearch API."""
59+
60+
def __init__(
61+
self,
62+
os_client: OpenSearch | None = None,
63+
job_index: str = "fluentbit-job_log",
64+
workflow_index: str = "fluentbit-workflow_log",
65+
max_rows: int = 5000,
66+
log_key: str = "log",
67+
order: str = "asc",
68+
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
69+
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
70+
timeout: int = 5,
71+
) -> None:
72+
"""
73+
Initialize the OpenSearchLogFetcher object.
74+
75+
:param os_client: OpenSearch client object.
76+
:param job_index: Index name for job logs.
77+
:param workflow_index: Index name for workflow logs.
78+
:param max_rows: Maximum number of rows to fetch.
79+
:param log_key: Key for log message in the response.
80+
:param order: Order of logs (asc/desc).
81+
:param job_log_matcher: Job log matcher.
82+
:param workflow_log_matcher: Workflow log matcher.
83+
:param timeout: Timeout for OpenSearch queries.
84+
85+
:return: None
86+
"""
87+
if os_client is None:
88+
os_client = build_opensearch_client()
89+
90+
self.os_client = os_client
91+
self.job_index = job_index
92+
self.workflow_index = workflow_index
93+
self.max_rows = max_rows
94+
self.log_key = log_key
95+
self.order = order
96+
self.job_log_matcher = job_log_matcher
97+
self.workflow_log_matcher = workflow_log_matcher
98+
self.timeout = timeout
99+
100+
def fetch_logs(self, id: str, index: str, match: str) -> str | None:
101+
"""
102+
Fetch logs of a specific job or workflow.
103+
104+
:param id: Job or workflow ID.
105+
:param index: Index name for logs.
106+
:param match: Matcher for logs.
107+
108+
:return: Job or workflow logs.
109+
"""
110+
query = {
111+
"query": {"match": {match: id}},
112+
"sort": [{"@timestamp": {"order": self.order}}],
113+
}
114+
115+
try:
116+
response = self.os_client.search(
117+
index=index, body=query, size=self.max_rows, timeout=self.timeout
118+
)
119+
except Exception as e:
120+
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
121+
return None
122+
123+
return self._concat_rows(response["hits"]["hits"])
124+
125+
def fetch_job_logs(self, backend_job_id: str) -> str:
126+
"""
127+
Fetch logs of a specific job.
128+
129+
:param backend_job_id: Job ID.
130+
131+
:return: Job logs.
132+
"""
133+
return self.fetch_logs(
134+
backend_job_id,
135+
self.job_index,
136+
self.job_log_matcher,
137+
)
138+
139+
def fetch_workflow_logs(self, workflow_id: str) -> str | None:
140+
"""
141+
Fetch logs of a specific workflow.
142+
143+
:param workflow_id: Workflow ID.
144+
145+
:return: Workflow logs.
146+
"""
147+
return self.fetch_logs(
148+
workflow_id,
149+
self.workflow_index,
150+
self.workflow_log_matcher,
151+
)
152+
153+
def _concat_rows(self, rows: list) -> str | None:
154+
"""
155+
Concatenate log messages from rows.
156+
157+
:param rows: List of rows.
158+
159+
:return: Concatenated log messages.
160+
"""
161+
logs = ""
162+
163+
for hit in rows:
164+
logs += hit["_source"][self.log_key] + "\n"
165+
166+
return logs

reana_workflow_controller/rest/utils.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022, 2023 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2023, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -164,7 +164,7 @@ def is_uuid_v4(uuid_or_name: str) -> bool:
164164
return uuid.hex == uuid_or_name.replace("-", "")
165165

166166

167-
def build_workflow_logs(workflow, steps=None, paginate=None):
167+
def build_workflow_logs(workflow, steps=None, paginate=None, fetcher=None):
168168
"""Return the logs for all jobs of a workflow."""
169169
query = Session.query(Job).filter_by(workflow_uuid=workflow.id_)
170170
if steps:
@@ -179,6 +179,9 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
179179
finished_at = (
180180
job.finished_at.strftime(WORKFLOW_TIME_FORMAT) if job.finished_at else None
181181
)
182+
183+
logs = fetcher.fetch_job_logs(job.backend_job_id) if fetcher else None
184+
182185
item = {
183186
"workflow_uuid": str(job.workflow_uuid) or "",
184187
"job_name": job.job_name or "",
@@ -187,7 +190,7 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
187190
"docker_img": job.docker_img or "",
188191
"cmd": job.prettified_cmd or "",
189192
"status": job.status.name or "",
190-
"logs": job.logs or "",
193+
"logs": logs or job.logs or "",
191194
"started_at": started_at,
192195
"finished_at": finished_at,
193196
}

reana_workflow_controller/rest/workflows_status.py

+17-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -16,11 +16,14 @@
1616
from reana_commons.errors import REANASecretDoesNotExist
1717
from reana_db.utils import _get_workflow_with_uuid_or_name
1818

19+
from reana_workflow_controller.config import REANA_OPENSEARCH_ENABLED
20+
1921
from reana_workflow_controller.errors import (
2022
REANAExternalCallError,
2123
REANAWorkflowControllerError,
2224
REANAWorkflowStatusError,
2325
)
26+
from reana_workflow_controller.opensearch import OpenSearchLogFetcher
2427
from reana_workflow_controller.rest.utils import (
2528
build_workflow_logs,
2629
delete_workflow,
@@ -140,19 +143,27 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
140143

141144
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid, True)
142145

146+
fetcher = OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None
147+
143148
steps = None
144149
if request.is_json:
145150
steps = request.json
146151
if steps:
147152
workflow_logs = {
148153
"workflow_logs": None,
149-
"job_logs": build_workflow_logs(workflow, steps, paginate=paginate),
154+
"job_logs": build_workflow_logs(
155+
workflow, steps, paginate=paginate, fetcher=fetcher
156+
),
150157
"engine_specific": None,
151158
}
152159
else:
160+
logs = fetcher.fetch_workflow_logs(workflow.id_) if fetcher else None
161+
153162
workflow_logs = {
154-
"workflow_logs": workflow.logs,
155-
"job_logs": build_workflow_logs(workflow, paginate=paginate),
163+
"workflow_logs": logs or workflow.logs,
164+
"job_logs": build_workflow_logs(
165+
workflow, paginate=paginate, fetcher=fetcher
166+
),
156167
"engine_specific": workflow.engine_specific,
157168
}
158169
return (
@@ -279,7 +290,8 @@ def get_workflow_status(workflow_id_or_name): # noqa
279290
try:
280291
user_uuid = request.args["user"]
281292
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid, True)
282-
workflow_logs = build_workflow_logs(workflow)
293+
fetcher = OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None
294+
workflow_logs = build_workflow_logs(workflow, fetcher=fetcher)
283295

284296
return (
285297
jsonify(

requirements.txt

+6-5
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ bracex==2.4 # via wcmatch
1313
bravado==10.3.2 # via reana-commons
1414
bravado-core==6.1.0 # via bravado, reana-commons
1515
cachetools==5.4.0 # via google-auth
16-
certifi==2024.7.4 # via kubernetes, requests
16+
certifi==2024.7.4 # via kubernetes, opensearch-py, requests
1717
cffi==1.16.0 # via cryptography
1818
charset-normalizer==3.3.2 # via requests
1919
checksumdir==1.1.9 # via reana-commons
2020
click==8.1.7 # via flask, reana-commons
2121
cryptography==43.0.0 # via sqlalchemy-utils
22+
events==0.5 # via opensearch-py
2223
flask==2.2.5 # via reana-workflow-controller (setup.py)
2324
fqdn==1.5.1 # via jsonschema
2425
fs==2.4.16 # via reana-commons
2526
gitdb==4.0.11 # via gitpython
2627
gitpython==3.1.43 # via reana-workflow-controller (setup.py)
2728
google-auth==2.32.0 # via kubernetes
28-
greenlet==3.0.3 # via sqlalchemy
2929
idna==3.7 # via jsonschema, requests
3030
importlib-resources==6.4.0 # via swagger-spec-validator
3131
isoduration==20.11.0 # via jsonschema
@@ -46,18 +46,19 @@ monotonic==1.6 # via bravado
4646
msgpack==1.0.8 # via bravado-core
4747
msgpack-python==0.5.6 # via bravado
4848
oauthlib==3.2.2 # via requests-oauthlib
49+
opensearch-py==2.7.1 # via reana-workflow-controller (setup.py)
4950
packaging==24.1 # via reana-workflow-controller (setup.py)
5051
psycopg2-binary==2.9.9 # via reana-db
5152
pyasn1==0.6.0 # via pyasn1-modules, rsa
5253
pyasn1-modules==0.4.0 # via google-auth
5354
pycparser==2.22 # via cffi
54-
python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes
55+
python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes, opensearch-py
5556
pytz==2024.1 # via bravado-core
5657
pyyaml==6.0.1 # via bravado, bravado-core, kubernetes, reana-commons, swagger-spec-validator
5758
reana-commons[kubernetes]==0.95.0a3 # via reana-db, reana-workflow-controller (setup.py)
5859
reana-db==0.95.0a4 # via reana-workflow-controller (setup.py)
5960
referencing==0.35.1 # via jsonschema, jsonschema-specifications
60-
requests==2.32.3 # via bravado, bravado-core, kubernetes, reana-workflow-controller (setup.py), requests-oauthlib
61+
requests==2.32.3 # via bravado, bravado-core, kubernetes, opensearch-py, reana-workflow-controller (setup.py), requests-oauthlib
6162
requests-oauthlib==2.0.0 # via kubernetes
6263
rfc3339-validator==0.1.4 # via jsonschema
6364
rfc3987==1.3.8 # via jsonschema
@@ -72,7 +73,7 @@ swagger-spec-validator==3.0.4 # via bravado-core
7273
types-python-dateutil==2.9.0.20240316 # via arrow
7374
typing-extensions==4.12.2 # via alembic, bravado, swagger-spec-validator
7475
uri-template==1.3.0 # via jsonschema
75-
urllib3==2.2.2 # via kubernetes, requests
76+
urllib3==2.2.2 # via kubernetes, opensearch-py, requests
7677
uwsgi==2.0.26 # via reana-workflow-controller (setup.py)
7778
uwsgi-tools==1.1.1 # via reana-workflow-controller (setup.py)
7879
uwsgitop==0.12 # via reana-workflow-controller (setup.py)

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
"gitpython>=2.1",
5050
"jsonpickle>=0.9.6",
5151
"marshmallow>2.13.0,<3.0.0", # same upper pin as reana-server
52+
"opensearch-py>=2.7.0,<2.8.0",
5253
"packaging>=18.0",
5354
"reana-commons[kubernetes] @ git+https://github.com/reanahub/[email protected]",
5455
"reana-db>=0.95.0a4,<0.96.0",

0 commit comments

Comments
 (0)