Skip to content

Commit f5514de

Browse files
committed
feat(opensearch): fetch logs from opensearch (reanahub#602)
1 parent e36c6a2 commit f5514de

File tree

9 files changed

+547
-39
lines changed

9 files changed

+547
-39
lines changed

AUTHORS.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The list of contributors in alphabetical order:
1313
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
1414
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
1515
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
16+
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
1617
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
1718
- [Lukas Heinrich](https://orcid.org/0000-0002-4048-7584)
1819
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)

reana_workflow_controller/config.py

+30
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,36 @@ def _env_vars_dict_to_k8s_list(env_vars):
154154
)
155155
"""Common to all workflow engines environment variables for debug mode."""
156156

157+
REANA_OPENSEARCH_ENABLED = (
158+
os.getenv("REANA_OPENSEARCH_ENABLED", "false").lower() == "true"
159+
)
160+
"""OpenSearch enabled flag."""
161+
162+
REANA_OPENSEARCH_HOST = os.getenv(
163+
"REANA_OPENSEARCH_HOST", "opensearch-cluster-master.default.svc.cluster.local"
164+
)
165+
"""OpenSearch host."""
166+
167+
REANA_OPENSEARCH_PORT = os.getenv("REANA_OPENSEARCH_PORT", "9200")
168+
"""OpenSearch port."""
169+
170+
REANA_OPENSEARCH_URL_PREFIX = os.getenv("REANA_OPENSEARCH_URL_PREFIX", "")
171+
"""OpenSearch URL prefix."""
172+
173+
REANA_OPENSEARCH_USER = os.getenv("REANA_OPENSEARCH_USER", "admin")
174+
"""OpenSearch user."""
175+
176+
REANA_OPENSEARCH_PASSWORD = os.getenv("REANA_OPENSEARCH_PASSWORD", "admin")
177+
"""OpenSearch password."""
178+
179+
REANA_OPENSEARCH_USE_SSL = (
180+
os.getenv("REANA_OPENSEARCH_USE_SSL", "false").lower() == "true"
181+
)
182+
"""OpenSearch SSL flag."""
183+
184+
REANA_OPENSEARCH_CA_CERTS = os.getenv("REANA_OPENSEARCH_CA_CERTS")
185+
"""OpenSearch CA certificates."""
186+
157187

158188
def _parse_interactive_sessions_environments(env_var):
159189
config = {}
+176
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# This file is part of REANA.
4+
# Copyright (C) 2024 CERN.
5+
#
6+
# REANA is free software; you can redistribute it and/or modify it
7+
# under the terms of the MIT License; see LICENSE file for more details.
8+
9+
"""OpenSearch client and log fetcher."""
10+
11+
import logging
12+
from opensearchpy import OpenSearch
13+
14+
from reana_workflow_controller.config import (
15+
REANA_OPENSEARCH_CA_CERTS,
16+
REANA_OPENSEARCH_HOST,
17+
REANA_OPENSEARCH_PASSWORD,
18+
REANA_OPENSEARCH_PORT,
19+
REANA_OPENSEARCH_URL_PREFIX,
20+
REANA_OPENSEARCH_USE_SSL,
21+
REANA_OPENSEARCH_USER,
22+
REANA_OPENSEARCH_ENABLED,
23+
)
24+
25+
26+
def build_opensearch_client(
27+
host: str = REANA_OPENSEARCH_HOST,
28+
port: str = REANA_OPENSEARCH_PORT,
29+
url_prefix: str = REANA_OPENSEARCH_URL_PREFIX,
30+
http_auth: tuple | None = (REANA_OPENSEARCH_USER, REANA_OPENSEARCH_PASSWORD),
31+
use_ssl: bool = REANA_OPENSEARCH_USE_SSL,
32+
ca_certs: str | None = REANA_OPENSEARCH_CA_CERTS,
33+
) -> OpenSearch:
34+
"""
35+
Build an OpenSearch client object.
36+
37+
:param host: OpenSearch host.
38+
:param port: OpenSearch port.
39+
:param url_prefix: URL prefix.
40+
:param http_auth: HTTP authentication credentials.
41+
:param use_ssl: Use SSL/TLS for connection.
42+
:param ca_certs: Path to CA certificates.
43+
44+
:return: OpenSearch client object.
45+
"""
46+
opensearch_client = OpenSearch(
47+
hosts=f"{host}:{port}",
48+
http_compress=True, # enables gzip compression for request bodies
49+
http_auth=http_auth,
50+
use_ssl=use_ssl,
51+
ca_certs=ca_certs,
52+
url_prefix=url_prefix,
53+
verify_certs=True,
54+
)
55+
return opensearch_client
56+
57+
58+
class OpenSearchLogFetcher(object):
59+
"""Retrieves job and workflow logs from OpenSearch API."""
60+
61+
def __init__(
62+
self,
63+
os_client: OpenSearch | None = None,
64+
job_index: str = "fluentbit-job_log",
65+
workflow_index: str = "fluentbit-workflow_log",
66+
max_rows: int = 5000,
67+
log_key: str = "log",
68+
order: str = "asc",
69+
job_log_matcher: str = "kubernetes.labels.job-name.keyword",
70+
workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword",
71+
timeout: int = 5,
72+
) -> None:
73+
"""
74+
Initialize the OpenSearchLogFetcher object.
75+
76+
:param os_client: OpenSearch client object.
77+
:param job_index: Index name for job logs.
78+
:param workflow_index: Index name for workflow logs.
79+
:param max_rows: Maximum number of rows to fetch.
80+
:param log_key: Key for log message in the response.
81+
:param order: Order of logs (asc/desc).
82+
:param job_log_matcher: Job log matcher.
83+
:param workflow_log_matcher: Workflow log matcher.
84+
:param timeout: Timeout for OpenSearch queries.
85+
86+
:return: None
87+
"""
88+
if os_client is None:
89+
os_client = build_opensearch_client()
90+
91+
self.os_client = os_client
92+
self.job_index = job_index
93+
self.workflow_index = workflow_index
94+
self.max_rows = max_rows
95+
self.log_key = log_key
96+
self.order = order
97+
self.job_log_matcher = job_log_matcher
98+
self.workflow_log_matcher = workflow_log_matcher
99+
self.timeout = timeout
100+
101+
def fetch_logs(self, id: str, index: str, match: str) -> str | None:
102+
"""
103+
Fetch logs of a specific job or workflow.
104+
105+
:param id: Job or workflow ID.
106+
:param index: Index name for logs.
107+
:param match: Matcher for logs.
108+
109+
:return: Job or workflow logs.
110+
"""
111+
query = {
112+
"query": {"match": {match: id}},
113+
"sort": [{"@timestamp": {"order": self.order}}],
114+
}
115+
116+
try:
117+
response = self.os_client.search(
118+
index=index, body=query, size=self.max_rows, timeout=self.timeout
119+
)
120+
except Exception as e:
121+
logging.error("Failed to fetch logs for {0}: {1}".format(id, e))
122+
return None
123+
124+
return self._concat_rows(response["hits"]["hits"])
125+
126+
def fetch_job_logs(self, backend_job_id: str) -> str:
127+
"""
128+
Fetch logs of a specific job.
129+
130+
:param backend_job_id: Job ID.
131+
132+
:return: Job logs.
133+
"""
134+
return self.fetch_logs(
135+
backend_job_id,
136+
self.job_index,
137+
self.job_log_matcher,
138+
)
139+
140+
def fetch_workflow_logs(self, workflow_id: str) -> str | None:
141+
"""
142+
Fetch logs of a specific workflow.
143+
144+
:param workflow_id: Workflow ID.
145+
146+
:return: Workflow logs.
147+
"""
148+
return self.fetch_logs(
149+
workflow_id,
150+
self.workflow_index,
151+
self.workflow_log_matcher,
152+
)
153+
154+
def _concat_rows(self, rows: list) -> str | None:
155+
"""
156+
Concatenate log messages from rows.
157+
158+
:param rows: List of rows.
159+
160+
:return: Concatenated log messages.
161+
"""
162+
logs = ""
163+
164+
for hit in rows:
165+
logs += hit["_source"][self.log_key] + "\n"
166+
167+
return logs
168+
169+
170+
def build_opensearch_log_fetcher() -> OpenSearchLogFetcher | None:
171+
"""
172+
Build OpenSearchLogFetcher object.
173+
174+
:return: OpenSearchLogFetcher object.
175+
"""
176+
return OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None

reana_workflow_controller/rest/utils.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022, 2023 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2023, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -166,6 +166,8 @@ def is_uuid_v4(uuid_or_name: str) -> bool:
166166

167167
def build_workflow_logs(workflow, steps=None, paginate=None):
168168
"""Return the logs for all jobs of a workflow."""
169+
from reana_workflow_controller.opensearch import build_opensearch_log_fetcher
170+
169171
query = Session.query(Job).filter_by(workflow_uuid=workflow.id_)
170172
if steps:
171173
query = query.filter(Job.job_name.in_(steps))
@@ -179,6 +181,15 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
179181
finished_at = (
180182
job.finished_at.strftime(WORKFLOW_TIME_FORMAT) if job.finished_at else None
181183
)
184+
185+
open_search_log_fetcher = build_opensearch_log_fetcher()
186+
187+
logs = (
188+
open_search_log_fetcher.fetch_job_logs(job.backend_job_id)
189+
if open_search_log_fetcher
190+
else None
191+
)
192+
182193
item = {
183194
"workflow_uuid": str(job.workflow_uuid) or "",
184195
"job_name": job.job_name or "",
@@ -187,7 +198,7 @@ def build_workflow_logs(workflow, steps=None, paginate=None):
187198
"docker_img": job.docker_img or "",
188199
"cmd": job.prettified_cmd or "",
189200
"status": job.status.name or "",
190-
"logs": job.logs or "",
201+
"logs": logs or job.logs or "",
191202
"started_at": started_at,
192203
"finished_at": finished_at,
193204
}

reana_workflow_controller/rest/workflows_status.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# -*- coding: utf-8 -*-
22
#
33
# This file is part of REANA.
4-
# Copyright (C) 2020, 2021, 2022 CERN.
4+
# Copyright (C) 2020, 2021, 2022, 2024 CERN.
55
#
66
# REANA is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
@@ -150,8 +150,18 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
150150
"engine_specific": None,
151151
}
152152
else:
153+
from reana_workflow_controller.opensearch import build_opensearch_log_fetcher
154+
155+
open_search_log_fetcher = build_opensearch_log_fetcher()
156+
157+
logs = (
158+
open_search_log_fetcher.fetch_workflow_logs(workflow.id_)
159+
if open_search_log_fetcher
160+
else None
161+
)
162+
153163
workflow_logs = {
154-
"workflow_logs": workflow.logs,
164+
"workflow_logs": logs or workflow.logs,
155165
"job_logs": build_workflow_logs(workflow, paginate=paginate),
156166
"engine_specific": workflow.engine_specific,
157167
}

requirements.txt

+6-5
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ bracex==2.4 # via wcmatch
1313
bravado==10.3.2 # via reana-commons
1414
bravado-core==6.1.0 # via bravado, reana-commons
1515
cachetools==5.4.0 # via google-auth
16-
certifi==2024.7.4 # via kubernetes, requests
16+
certifi==2024.7.4 # via kubernetes, opensearch-py, requests
1717
cffi==1.16.0 # via cryptography
1818
charset-normalizer==3.3.2 # via requests
1919
checksumdir==1.1.9 # via reana-commons
2020
click==8.1.7 # via flask, reana-commons
2121
cryptography==43.0.0 # via sqlalchemy-utils
22+
events==0.5 # via opensearch-py
2223
flask==2.2.5 # via reana-workflow-controller (setup.py)
2324
fqdn==1.5.1 # via jsonschema
2425
fs==2.4.16 # via reana-commons
2526
gitdb==4.0.11 # via gitpython
2627
gitpython==3.1.43 # via reana-workflow-controller (setup.py)
2728
google-auth==2.32.0 # via kubernetes
28-
greenlet==3.0.3 # via sqlalchemy
2929
idna==3.7 # via jsonschema, requests
3030
importlib-resources==6.4.0 # via swagger-spec-validator
3131
isoduration==20.11.0 # via jsonschema
@@ -46,18 +46,19 @@ monotonic==1.6 # via bravado
4646
msgpack==1.0.8 # via bravado-core
4747
msgpack-python==0.5.6 # via bravado
4848
oauthlib==3.2.2 # via requests-oauthlib
49+
opensearch-py==2.7.1 # via reana-workflow-controller (setup.py)
4950
packaging==24.1 # via reana-workflow-controller (setup.py)
5051
psycopg2-binary==2.9.9 # via reana-db
5152
pyasn1==0.6.0 # via pyasn1-modules, rsa
5253
pyasn1-modules==0.4.0 # via google-auth
5354
pycparser==2.22 # via cffi
54-
python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes
55+
python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes, opensearch-py
5556
pytz==2024.1 # via bravado-core
5657
pyyaml==6.0.1 # via bravado, bravado-core, kubernetes, reana-commons, swagger-spec-validator
5758
reana-commons[kubernetes]==0.95.0a3 # via reana-db, reana-workflow-controller (setup.py)
5859
reana-db==0.95.0a4 # via reana-workflow-controller (setup.py)
5960
referencing==0.35.1 # via jsonschema, jsonschema-specifications
60-
requests==2.32.3 # via bravado, bravado-core, kubernetes, reana-workflow-controller (setup.py), requests-oauthlib
61+
requests==2.32.3 # via bravado, bravado-core, kubernetes, opensearch-py, reana-workflow-controller (setup.py), requests-oauthlib
6162
requests-oauthlib==2.0.0 # via kubernetes
6263
rfc3339-validator==0.1.4 # via jsonschema
6364
rfc3987==1.3.8 # via jsonschema
@@ -72,7 +73,7 @@ swagger-spec-validator==3.0.4 # via bravado-core
7273
types-python-dateutil==2.9.0.20240316 # via arrow
7374
typing-extensions==4.12.2 # via alembic, bravado, swagger-spec-validator
7475
uri-template==1.3.0 # via jsonschema
75-
urllib3==2.2.2 # via kubernetes, requests
76+
urllib3==2.2.2 # via kubernetes, opensearch-py, requests
7677
uwsgi==2.0.26 # via reana-workflow-controller (setup.py)
7778
uwsgi-tools==1.1.1 # via reana-workflow-controller (setup.py)
7879
uwsgitop==0.12 # via reana-workflow-controller (setup.py)

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
"gitpython>=2.1",
5050
"jsonpickle>=0.9.6",
5151
"marshmallow>2.13.0,<3.0.0", # same upper pin as reana-server
52+
"opensearch-py>=2.7.0,<2.8.0",
5253
"packaging>=18.0",
5354
"reana-commons[kubernetes] @ git+https://github.com/reanahub/[email protected]",
5455
"reana-db>=0.95.0a4,<0.96.0",

0 commit comments

Comments
 (0)