From 13c3228a0b6ffad3d018cb789ce82e5b0c540ac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelizaveta=20Leme=C5=A1eva?= Date: Wed, 9 Oct 2024 10:35:14 +0200 Subject: [PATCH] feat(opensearch): fetch logs from OpenSearch (#602) --- AUTHORS.md | 1 + docs/conf.py | 9 + reana_workflow_controller/config.py | 30 +++ reana_workflow_controller/opensearch.py | 176 +++++++++++++++ reana_workflow_controller/rest/utils.py | 15 +- .../rest/workflows_status.py | 16 +- requirements.txt | 11 +- setup.py | 1 + tests/test_opensearch.py | 210 ++++++++++++++++++ tests/test_views.py | 163 +++++++++++--- 10 files changed, 593 insertions(+), 39 deletions(-) create mode 100644 reana_workflow_controller/opensearch.py create mode 100644 tests/test_opensearch.py diff --git a/AUTHORS.md b/AUTHORS.md index f3b9bb24..6cd4cac7 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -13,6 +13,7 @@ The list of contributors in alphabetical order: - [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553) - [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X) - [Jan Okraska](https://orcid.org/0000-0002-1416-3244) +- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270) - [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630) - [Lukas Heinrich](https://orcid.org/0000-0002-4048-7584) - [Marco Donadoni](https://orcid.org/0000-0003-2922-5505) diff --git a/docs/conf.py b/docs/conf.py index 7d606c52..7d5b9861 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -33,6 +33,15 @@ # # needs_sphinx = '1.0' +# Define the canonical URL if you are using a custom domain on Read the Docs +html_baseurl = os.environ.get("READTHEDOCS_CANONICAL_URL", "") + +# Tell Jinja2 templates the build is running on Read the Docs +if os.environ.get("READTHEDOCS", "") == "True": + if "html_context" not in globals(): + html_context = {} + html_context["READTHEDOCS"] = True + # Do not warn on external images. suppress_warnings = ["image.nonlocal_uri"] diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 56d5ebdd..d9542bdd 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -154,6 +154,36 @@ def _env_vars_dict_to_k8s_list(env_vars): ) """Common to all workflow engines environment variables for debug mode.""" +REANA_OPENSEARCH_ENABLED = ( + os.getenv("REANA_OPENSEARCH_ENABLED", "false").lower() == "true" +) +"""OpenSearch enabled flag.""" + +REANA_OPENSEARCH_HOST = os.getenv( + "REANA_OPENSEARCH_HOST", "reana-opensearch-master.default.svc.cluster.local" +) +"""OpenSearch host.""" + +REANA_OPENSEARCH_PORT = os.getenv("REANA_OPENSEARCH_PORT", "9200") +"""OpenSearch port.""" + +REANA_OPENSEARCH_URL_PREFIX = os.getenv("REANA_OPENSEARCH_URL_PREFIX", "") +"""OpenSearch URL prefix.""" + +REANA_OPENSEARCH_USER = os.getenv("REANA_OPENSEARCH_USER", "admin") +"""OpenSearch user.""" + +REANA_OPENSEARCH_PASSWORD = os.getenv("REANA_OPENSEARCH_PASSWORD", "admin") +"""OpenSearch password.""" + +REANA_OPENSEARCH_USE_SSL = ( + os.getenv("REANA_OPENSEARCH_USE_SSL", "false").lower() == "true" +) +"""OpenSearch SSL flag.""" + +REANA_OPENSEARCH_CA_CERTS = os.getenv("REANA_OPENSEARCH_CA_CERTS") +"""OpenSearch CA certificates.""" + def _parse_interactive_sessions_environments(env_var): config = {} diff --git a/reana_workflow_controller/opensearch.py b/reana_workflow_controller/opensearch.py new file mode 100644 index 00000000..aea2d6ca --- /dev/null +++ b/reana_workflow_controller/opensearch.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# +# This file is part of REANA. +# Copyright (C) 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""OpenSearch client and log fetcher.""" + +import logging +from opensearchpy import OpenSearch + +from reana_workflow_controller.config import ( + REANA_OPENSEARCH_CA_CERTS, + REANA_OPENSEARCH_HOST, + REANA_OPENSEARCH_PASSWORD, + REANA_OPENSEARCH_PORT, + REANA_OPENSEARCH_URL_PREFIX, + REANA_OPENSEARCH_USE_SSL, + REANA_OPENSEARCH_USER, + REANA_OPENSEARCH_ENABLED, +) + + +def build_opensearch_client( + host: str = REANA_OPENSEARCH_HOST, + port: str = REANA_OPENSEARCH_PORT, + url_prefix: str = REANA_OPENSEARCH_URL_PREFIX, + http_auth: tuple | None = (REANA_OPENSEARCH_USER, REANA_OPENSEARCH_PASSWORD), + use_ssl: bool = REANA_OPENSEARCH_USE_SSL, + ca_certs: str | None = REANA_OPENSEARCH_CA_CERTS, +) -> OpenSearch: + """ + Build an OpenSearch client object. + + :param host: OpenSearch host. + :param port: OpenSearch port. + :param url_prefix: URL prefix. + :param http_auth: HTTP authentication credentials. + :param use_ssl: Use SSL/TLS for connection. + :param ca_certs: Path to CA certificates. + + :return: OpenSearch client object. + """ + opensearch_client = OpenSearch( + hosts=f"{host}:{port}", + http_compress=True, # enables gzip compression for request bodies + http_auth=http_auth, + use_ssl=use_ssl, + ca_certs=ca_certs, + url_prefix=url_prefix, + verify_certs=True, + ) + return opensearch_client + + +class OpenSearchLogFetcher(object): + """Retrieves job and workflow logs from OpenSearch API.""" + + def __init__( + self, + os_client: OpenSearch | None = None, + job_index: str = "fluentbit-job_log", + workflow_index: str = "fluentbit-workflow_log", + max_rows: int = 5000, + log_key: str = "log", + order: str = "asc", + job_log_matcher: str = "kubernetes.labels.job-name.keyword", + workflow_log_matcher: str = "kubernetes.labels.reana-run-batch-workflow-uuid.keyword", + timeout: int = 5, + ) -> None: + """ + Initialize the OpenSearchLogFetcher object. + + :param os_client: OpenSearch client object. + :param job_index: Index name for job logs. + :param workflow_index: Index name for workflow logs. + :param max_rows: Maximum number of rows to fetch. + :param log_key: Key for log message in the response. + :param order: Order of logs (asc/desc). + :param job_log_matcher: Job log matcher. + :param workflow_log_matcher: Workflow log matcher. + :param timeout: Timeout for OpenSearch queries. + + :return: None + """ + if os_client is None: + os_client = build_opensearch_client() + + self.os_client = os_client + self.job_index = job_index + self.workflow_index = workflow_index + self.max_rows = max_rows + self.log_key = log_key + self.order = order + self.job_log_matcher = job_log_matcher + self.workflow_log_matcher = workflow_log_matcher + self.timeout = timeout + + def fetch_logs(self, id: str, index: str, match: str) -> str | None: + """ + Fetch logs of a specific job or workflow. + + :param id: Job or workflow ID. + :param index: Index name for logs. + :param match: Matcher for logs. + + :return: Job or workflow logs. + """ + query = { + "query": {"match": {match: id}}, + "sort": [{"@timestamp": {"order": self.order}}], + } + + try: + response = self.os_client.search( + index=index, body=query, size=self.max_rows, timeout=self.timeout + ) + except Exception as e: + logging.error("Failed to fetch logs for {0}: {1}".format(id, e)) + return None + + return self._concat_rows(response["hits"]["hits"]) + + def fetch_job_logs(self, backend_job_id: str) -> str: + """ + Fetch logs of a specific job. + + :param backend_job_id: Job ID. + + :return: Job logs. + """ + return self.fetch_logs( + backend_job_id, + self.job_index, + self.job_log_matcher, + ) + + def fetch_workflow_logs(self, workflow_id: str) -> str | None: + """ + Fetch logs of a specific workflow. + + :param workflow_id: Workflow ID. + + :return: Workflow logs. + """ + return self.fetch_logs( + workflow_id, + self.workflow_index, + self.workflow_log_matcher, + ) + + def _concat_rows(self, rows: list) -> str | None: + """ + Concatenate log messages from rows. + + :param rows: List of rows. + + :return: Concatenated log messages. + """ + logs = "" + + for hit in rows: + logs += hit["_source"][self.log_key] + "\n" + + return logs + + +def build_opensearch_log_fetcher() -> OpenSearchLogFetcher | None: + """ + Build OpenSearchLogFetcher object. + + :return: OpenSearchLogFetcher object. + """ + return OpenSearchLogFetcher() if REANA_OPENSEARCH_ENABLED else None diff --git a/reana_workflow_controller/rest/utils.py b/reana_workflow_controller/rest/utils.py index 4f754e6d..9a7a7990 100644 --- a/reana_workflow_controller/rest/utils.py +++ b/reana_workflow_controller/rest/utils.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2020, 2021, 2022, 2023 CERN. +# Copyright (C) 2020, 2021, 2022, 2023, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -166,6 +166,8 @@ def is_uuid_v4(uuid_or_name: str) -> bool: def build_workflow_logs(workflow, steps=None, paginate=None): """Return the logs for all jobs of a workflow.""" + from reana_workflow_controller.opensearch import build_opensearch_log_fetcher + query = Session.query(Job).filter_by(workflow_uuid=workflow.id_) if steps: query = query.filter(Job.job_name.in_(steps)) @@ -179,6 +181,15 @@ def build_workflow_logs(workflow, steps=None, paginate=None): finished_at = ( job.finished_at.strftime(WORKFLOW_TIME_FORMAT) if job.finished_at else None ) + + open_search_log_fetcher = build_opensearch_log_fetcher() + + logs = ( + open_search_log_fetcher.fetch_job_logs(job.backend_job_id) + if open_search_log_fetcher + else None + ) + item = { "workflow_uuid": str(job.workflow_uuid) or "", "job_name": job.job_name or "", @@ -187,7 +198,7 @@ def build_workflow_logs(workflow, steps=None, paginate=None): "docker_img": job.docker_img or "", "cmd": job.prettified_cmd or "", "status": job.status.name or "", - "logs": job.logs or "", + "logs": logs or job.logs or "", "started_at": started_at, "finished_at": finished_at, } diff --git a/reana_workflow_controller/rest/workflows_status.py b/reana_workflow_controller/rest/workflows_status.py index 1361dbb1..19652bb5 100644 --- a/reana_workflow_controller/rest/workflows_status.py +++ b/reana_workflow_controller/rest/workflows_status.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2020, 2021, 2022 CERN. +# Copyright (C) 2020, 2021, 2022, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -150,8 +150,20 @@ def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa "engine_specific": None, } else: + from reana_workflow_controller.opensearch import ( + build_opensearch_log_fetcher, + ) + + open_search_log_fetcher = build_opensearch_log_fetcher() + + logs = ( + open_search_log_fetcher.fetch_workflow_logs(workflow.id_) + if open_search_log_fetcher + else None + ) + workflow_logs = { - "workflow_logs": workflow.logs, + "workflow_logs": logs or workflow.logs, "job_logs": build_workflow_logs(workflow, paginate=paginate), "engine_specific": workflow.engine_specific, } diff --git a/requirements.txt b/requirements.txt index 5ec6cec7..84350cef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,19 +13,19 @@ bracex==2.4 # via wcmatch bravado==10.3.2 # via reana-commons bravado-core==6.1.0 # via bravado, reana-commons cachetools==5.4.0 # via google-auth -certifi==2024.7.4 # via kubernetes, requests +certifi==2024.7.4 # via kubernetes, opensearch-py, requests cffi==1.16.0 # via cryptography charset-normalizer==3.3.2 # via requests checksumdir==1.1.9 # via reana-commons click==8.1.7 # via flask, reana-commons cryptography==43.0.0 # via sqlalchemy-utils +events==0.5 # via opensearch-py flask==2.2.5 # via reana-workflow-controller (setup.py) fqdn==1.5.1 # via jsonschema fs==2.4.16 # via reana-commons gitdb==4.0.11 # via gitpython gitpython==3.1.43 # via reana-workflow-controller (setup.py) google-auth==2.32.0 # via kubernetes -greenlet==3.0.3 # via sqlalchemy idna==3.7 # via jsonschema, requests importlib-resources==6.4.0 # via swagger-spec-validator isoduration==20.11.0 # via jsonschema @@ -46,18 +46,19 @@ monotonic==1.6 # via bravado msgpack==1.0.8 # via bravado-core msgpack-python==0.5.6 # via bravado oauthlib==3.2.2 # via requests-oauthlib +opensearch-py==2.7.1 # via reana-workflow-controller (setup.py) packaging==24.1 # via reana-workflow-controller (setup.py) psycopg2-binary==2.9.9 # via reana-db pyasn1==0.6.0 # via pyasn1-modules, rsa pyasn1-modules==0.4.0 # via google-auth pycparser==2.22 # via cffi -python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes +python-dateutil==2.9.0.post0 # via arrow, bravado, bravado-core, kubernetes, opensearch-py pytz==2024.1 # via bravado-core pyyaml==6.0.1 # via bravado, bravado-core, kubernetes, reana-commons, swagger-spec-validator reana-commons[kubernetes]==0.95.0a3 # via reana-db, reana-workflow-controller (setup.py) reana-db==0.95.0a4 # via reana-workflow-controller (setup.py) referencing==0.35.1 # via jsonschema, jsonschema-specifications -requests==2.32.3 # via bravado, bravado-core, kubernetes, reana-workflow-controller (setup.py), requests-oauthlib +requests==2.32.3 # via bravado, bravado-core, kubernetes, opensearch-py, reana-workflow-controller (setup.py), requests-oauthlib requests-oauthlib==2.0.0 # via kubernetes rfc3339-validator==0.1.4 # via jsonschema rfc3987==1.3.8 # via jsonschema @@ -72,7 +73,7 @@ swagger-spec-validator==3.0.4 # via bravado-core types-python-dateutil==2.9.0.20240316 # via arrow typing-extensions==4.12.2 # via alembic, bravado, swagger-spec-validator uri-template==1.3.0 # via jsonschema -urllib3==2.2.2 # via kubernetes, requests +urllib3==2.2.2 # via kubernetes, opensearch-py, requests uwsgi==2.0.26 # via reana-workflow-controller (setup.py) uwsgi-tools==1.1.1 # via reana-workflow-controller (setup.py) uwsgitop==0.12 # via reana-workflow-controller (setup.py) diff --git a/setup.py b/setup.py index cf4a2b8c..44f22ee8 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ "gitpython>=2.1", "jsonpickle>=0.9.6", "marshmallow>2.13.0,<3.0.0", # same upper pin as reana-server + "opensearch-py>=2.7.0,<2.8.0", "packaging>=18.0", "reana-commons[kubernetes] @ git+https://github.com/reanahub/reana-commons.git@0.95.0a4", "reana-db>=0.95.0a4,<0.96.0", diff --git a/tests/test_opensearch.py b/tests/test_opensearch.py new file mode 100644 index 00000000..86859f5d --- /dev/null +++ b/tests/test_opensearch.py @@ -0,0 +1,210 @@ +# -*- coding: utf-8 -*- +# +# This file is part of REANA. +# Copyright (C) 2024 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""REANA-Workflow-Controller OpenSearchLogFetcher tests.""" + +import pytest +from opensearchpy import OpenSearch +from mock import patch + + +def test_fetch_workflow_logs(): + """Test OpenSearchLogFetcher.fetch_workflow_logs.""" + from reana_workflow_controller.opensearch import OpenSearchLogFetcher + + with patch.object( + OpenSearchLogFetcher, "fetch_logs", return_value="some log" + ) as mock_search: + os_fetcher = OpenSearchLogFetcher() + assert os_fetcher.fetch_workflow_logs("wf_id") == "some log" + + mock_search.assert_called_once_with( + "wf_id", + "fluentbit-workflow_log", + "kubernetes.labels.reana-run-batch-workflow-uuid.keyword", + ) + + +def test_fetch_job_logs(): + """Test OpenSearchLogFetcher.fetch_job_logs.""" + from reana_workflow_controller.opensearch import OpenSearchLogFetcher + + with patch.object( + OpenSearchLogFetcher, "fetch_logs", return_value="some log" + ) as mock_search: + os_fetcher = OpenSearchLogFetcher() + assert os_fetcher.fetch_job_logs("job_id") == "some log" + + mock_search.assert_called_once_with( + "job_id", "fluentbit-job_log", "kubernetes.labels.job-name.keyword" + ) + + +@pytest.mark.parametrize( + "opensearch_response,expected_logs", + [ + ( + { + "took": 3, + "timed_out": False, + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": {"value": 0, "relation": "eq"}, + "max_score": None, + "hits": [], + }, + }, + "", + ), + ( + { + "took": 6, + "timed_out": False, + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": {"value": 2, "relation": "eq"}, + "max_score": None, + "hits": [ + { + "_index": "fluentbit-job_log", + "_id": "_kTKspEBC9PZpoJqzxwj", + "_score": None, + "_source": { + "@timestamp": "2024-09-02T12:52:00.984Z", + "time": "2024-09-02T12:52:00.984167462Z", + "stream": "stderr", + "_p": "F", + "log": "Executing step 0/1", + }, + "sort": [1725281520984], + }, + { + "_index": "fluentbit-job_log", + "_id": "xETJspEBC9PZpoJqKRtQ", + "_score": None, + "_source": { + "@timestamp": "2024-09-02T12:50:12.705Z", + "time": "2024-09-02T12:50:12.705755718Z", + "stream": "stderr", + "_p": "F", + "log": "Result: 1.3425464", + }, + "sort": [1725281412705], + }, + ], + }, + }, + """Executing step 0/1 +Result: 1.3425464 +""", + ), + ( + { + "took": 6, + "timed_out": False, + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": {"value": 2, "relation": "eq"}, + "max_score": None, + "hits": [ + { + "_index": "fluentbit-workflow_log", + "_id": "_kTKspEBC9PZpoJqzxwj", + "_score": None, + "_source": { + "@timestamp": "2024-09-02T12:52:00.984Z", + "time": "2024-09-02T12:52:00.984167462Z", + "stream": "stderr", + "_p": "F", + "log": "2024-09-02 12:52:00,983 | root | MainThread | INFO | Workflow 567bedbc-31d1-4449-8fc6-48af67e04e68 finished.", + }, + "sort": [1725281520984], + }, + { + "_index": "fluentbit-workflow_log", + "_id": "xETJspEBC9PZpoJqKRtQ", + "_score": None, + "_source": { + "@timestamp": "2024-09-02T12:50:12.705Z", + "time": "2024-09-02T12:50:12.705755718Z", + "stream": "stderr", + "_p": "F", + "log": "2024-09-02 12:50:12,705 | root | MainThread | INFO | Publishing step:0.", + }, + "sort": [1725281412705], + }, + ], + }, + }, + """2024-09-02 12:52:00,983 | root | MainThread | INFO | Workflow 567bedbc-31d1-4449-8fc6-48af67e04e68 finished. +2024-09-02 12:50:12,705 | root | MainThread | INFO | Publishing step:0. +""", + ), + ], +) +def test_fetch_logs(opensearch_response, expected_logs): + """Test OpenSearchLogFetcher.fetch_logs.""" + from reana_workflow_controller.opensearch import OpenSearchLogFetcher + + with patch.object( + OpenSearch, "search", return_value=opensearch_response + ) as mock_search: + opensearch_client = OpenSearch() + os_fetcher = OpenSearchLogFetcher(opensearch_client) + logs = os_fetcher.fetch_logs( + "job_id", "fluentbit-job_log", "kubernetes.labels.job-name.keyword" + ) + assert logs == expected_logs + + query = { + "query": {"match": {"kubernetes.labels.job-name.keyword": "job_id"}}, + "sort": [{"@timestamp": {"order": "asc"}}], + } + + mock_search.assert_called_once_with( + index="fluentbit-job_log", body=query, size=5000, timeout=5 + ) + + +def test_fetch_logs_error(): + """Test OpenSearchLogFetcher.fetch_logs with error.""" + from reana_workflow_controller.opensearch import OpenSearchLogFetcher + + with patch.object( + OpenSearch, "search", side_effect=Exception("error") + ) as mock_search: + opensearch_client = OpenSearch() + os_fetcher = OpenSearchLogFetcher(opensearch_client) + logs = os_fetcher.fetch_logs( + "job_id", "fluentbit-job_log", "kubernetes.labels.job-name.keyword" + ) + assert logs is None + + query = { + "query": {"match": {"kubernetes.labels.job-name.keyword": "job_id"}}, + "sort": [{"@timestamp": {"order": "asc"}}], + } + + mock_search.assert_called_once_with( + index="fluentbit-job_log", body=query, size=5000, timeout=5 + ) + + +def test_include_opensearch_disabled(): + """Test OpenSearchLogFetcher inclusion when OpenSearch is disabled (default).""" + from reana_workflow_controller.opensearch import build_opensearch_log_fetcher + + assert build_opensearch_log_fetcher() is None + + +def test_include_opensearch_enabled(): + """Test OpenSearchLogFetcher inclusion when OpenSearch is enabled.""" + with patch("reana_workflow_controller.opensearch.REANA_OPENSEARCH_ENABLED", True): + from reana_workflow_controller.opensearch import build_opensearch_log_fetcher + + assert build_opensearch_log_fetcher() is not None diff --git a/tests/test_views.py b/tests/test_views.py index 06553c69..59e29a51 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1146,39 +1146,142 @@ def test_delete_file(app, user0, sample_serial_workflow_in_db): assert not os.path.exists(abs_path_to_file) +@pytest.mark.parametrize( + "opensearch_return_value", + [ + ("test logs\ntest logs\n"), + (""), + (None), + ], +) def test_get_created_workflow_logs( - app, user0, cwl_workflow_with_name, tmp_shared_volume_path + opensearch_return_value, + app, + user0, + cwl_workflow_with_name, + tmp_shared_volume_path, + session, ): """Test get workflow logs.""" - with app.test_client() as client: - # create workflow - res = client.post( - url_for("workflows.create_workflow"), - query_string={ - "user": user0.id_, - "workspace_root_path": tmp_shared_volume_path, - }, - content_type="application/json", - data=json.dumps(cwl_workflow_with_name), - ) - response_data = json.loads(res.get_data(as_text=True)) - workflow_uuid = response_data.get("workflow_id") - workflow_name = response_data.get("workflow_name") - res = client.get( - url_for("statuses.get_workflow_logs", workflow_id_or_name=workflow_uuid), - query_string={"user": user0.id_}, - content_type="application/json", - data=json.dumps(None), - ) - assert res.status_code == 200 - response_data = json.loads(res.get_data(as_text=True)) - expected_data = { - "workflow_id": workflow_uuid, - "workflow_name": workflow_name, - "user": str(user0.id_), - "logs": '{"workflow_logs": "", "job_logs": {},' ' "engine_specific": null}', - } - assert response_data == expected_data + from reana_workflow_controller.opensearch import OpenSearchLogFetcher + + with mock.patch.object( + OpenSearchLogFetcher, "fetch_logs", return_value=opensearch_return_value + ) as mock_method, mock.patch( + "reana_workflow_controller.opensearch.REANA_OPENSEARCH_ENABLED", True + ): + with app.test_client() as client: + # create workflow + res = client.post( + url_for("workflows.create_workflow"), + query_string={ + "user": user0.id_, + "workspace_root_path": tmp_shared_volume_path, + }, + content_type="application/json", + data=json.dumps(cwl_workflow_with_name), + ) + response_data = json.loads(res.get_data(as_text=True)) + workflow_uuid = response_data.get("workflow_id") + workflow_name = response_data.get("workflow_name") + + # create a job for the workflow + workflow_job = Job( + id_=uuid.UUID("9a22c3a4-6d72-4812-93e7-7e0efdeb985d"), + workflow_uuid=workflow_uuid, + ) + workflow_job.status = "running" + workflow_job.logs = "test job logs" + session.add(workflow_job) + session.commit() + + res = client.get( + url_for( + "statuses.get_workflow_logs", workflow_id_or_name=workflow_uuid + ), + query_string={"user": user0.id_}, + content_type="application/json", + data=json.dumps(None), + ) + assert res.status_code == 200 + response_data = json.loads(res.get_data(as_text=True)) + expected_data = { + "workflow_id": workflow_uuid, + "workflow_name": workflow_name, + "user": str(user0.id_), + "logs": json.dumps( + { + "workflow_logs": ( + opensearch_return_value if opensearch_return_value else "" + ), + "job_logs": { + str(workflow_job.id_): { + "workflow_uuid": str(workflow_job.workflow_uuid), + "job_name": "", + "compute_backend": "", + "backend_job_id": "", + "docker_img": "", + "cmd": "", + "status": workflow_job.status.name, + "logs": ( + opensearch_return_value + if opensearch_return_value + else workflow_job.logs + ), + "started_at": None, + "finished_at": None, + } + }, + "engine_specific": None, + } + ), + } + assert response_data == expected_data + mock_method.call_count == 2 + + +def test_get_created_workflow_opensearch_disabled( + app, user0, cwl_workflow_with_name, tmp_shared_volume_path +): + """Test get workflow logs when Opensearch is disabled (default).""" + from reana_workflow_controller.opensearch import OpenSearchLogFetcher + + with mock.patch.object( + OpenSearchLogFetcher, "fetch_logs", return_value=None + ) as mock_method: + with app.test_client() as client: + # create workflow + res = client.post( + url_for("workflows.create_workflow"), + query_string={ + "user": user0.id_, + "workspace_root_path": tmp_shared_volume_path, + }, + content_type="application/json", + data=json.dumps(cwl_workflow_with_name), + ) + response_data = json.loads(res.get_data(as_text=True)) + workflow_uuid = response_data.get("workflow_id") + workflow_name = response_data.get("workflow_name") + res = client.get( + url_for( + "statuses.get_workflow_logs", workflow_id_or_name=workflow_uuid + ), + query_string={"user": user0.id_}, + content_type="application/json", + data=json.dumps(None), + ) + assert res.status_code == 200 + response_data = json.loads(res.get_data(as_text=True)) + expected_data = { + "workflow_id": workflow_uuid, + "workflow_name": workflow_name, + "user": str(user0.id_), + "logs": '{"workflow_logs": "", "job_logs": {},' + ' "engine_specific": null}', + } + assert response_data == expected_data + mock_method.assert_not_called() def test_get_unknown_workflow_logs(