From 0c5ac179f9cb345f8a1e2aa6362ffdac987b2bbe Mon Sep 17 00:00:00 2001 From: Nathan Swain Date: Fri, 13 Sep 2024 16:25:50 -0600 Subject: [PATCH] Fixes for jobs table async compatibility (#1091) * Fixes for jobs table async compatibility * update tests --------- Co-authored-by: sdc50 --- .../test_views/test_gizmos/test_jobs_table.py | 35 ++++- tethys_gizmos/views/gizmos/jobs_table.py | 143 ++++++++++-------- 2 files changed, 115 insertions(+), 63 deletions(-) diff --git a/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py b/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py index a84d25c02..a80382e30 100644 --- a/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py +++ b/tests/unit_tests/test_tethys_gizmos/test_views/test_gizmos/test_jobs_table.py @@ -188,7 +188,7 @@ async def test_show_log_exception(self, mock_tj, mock_log): ) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") - async def test_get_log_content(self, mock_tj): + async def test_get_log_content_async(self, mock_tj): mock_tj.return_value = mock.MagicMock( get_logs=partial(mock_async_func, {"log": "log content"}), safe_close=mock_async_func, @@ -201,7 +201,19 @@ async def test_get_log_content(self, mock_tj): self.assertEqual(200, result.status_code) @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") - async def test_get_log_content_key2(self, mock_tj): + async def test_get_log_content(self, mock_tj): + mock_tj.return_value = mock.MagicMock( + get_logs=lambda: {"log": "log content"}, + ) + request = mock.MagicMock(user=mock.MagicMock(is_authenticated=True)) + + result = await gizmo_jobs_table.get_log_content( + request=request, job_id="1", key1="log" + ) + self.assertEqual(200, result.status_code) + + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") + async def test_get_log_content_key2_async(self, mock_tj): log_func = partial(mock_async_func, "log content") mock_tj.return_value = mock.MagicMock( get_logs=partial( @@ -217,6 +229,25 @@ async def test_get_log_content_key2(self, mock_tj): ) self.assertEqual(200, result.status_code) + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") + async def test_get_log_content_key2(self, mock_tj): + def log_func(): + return "log content" + + mock_tj.return_value = mock.MagicMock( + get_logs=partial( + mock_async_func, + {"log": {"sub_log_1": log_func, "sub_log_2": "log content"}}, + ), + safe_close=mock_async_func, + ) + request = mock.MagicMock(user=mock.MagicMock(is_authenticated=True)) + + result = await gizmo_jobs_table.get_log_content( + request=request, job_id="1", key1="log", key2="sub_log_1" + ) + self.assertEqual(200, result.status_code) + @mock.patch("tethys_gizmos.views.gizmos.jobs_table.logger") @mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job") async def test_get_log_content_exception(self, mock_tj, mock_log): diff --git a/tethys_gizmos/views/gizmos/jobs_table.py b/tethys_gizmos/views/gizmos/jobs_table.py index 9229e0c35..5785fe7d8 100644 --- a/tethys_gizmos/views/gizmos/jobs_table.py +++ b/tethys_gizmos/views/gizmos/jobs_table.py @@ -38,21 +38,88 @@ def get_job(job_id, user): return TethysJob.objects.get_subclass(id=job_id, user=user) +async def do_job_action(job, action): + func = getattr(job, action) + if inspect.iscoroutinefunction(func): + ret = await func() + await job.safe_close() + else: + ret = await database_sync_to_async(func)() + return ret + + +async def _get_log_content(job, key1, key2): + # Get the Job logs. + if inspect.iscoroutinefunction(job.get_logs): + data = await job.get_logs() + needs_safe_close = True + else: + data = await database_sync_to_async(job.get_logs)() + needs_safe_close = False + + log_func = data[key1] + if key2 is not None: + log_func = log_func[key2] + if callable(log_func): + if inspect.iscoroutinefunction(log_func): + content = await log_func() + needs_safe_close = True + else: + content = await database_sync_to_async(log_func)() + else: + content = log_func + + if needs_safe_close: + await job.safe_close() + return content + + @database_sync_to_async def get_dask_scheduler(scheduler_id): return DaskScheduler.objects.get(id=scheduler_id) +@database_sync_to_async +def get_condor_job_nodes(job): + dag = {} + nodes = job.condor_object.node_set + + for node in nodes: + parents = [] + for parent in node.parent_nodes: + parents.append(parent.job.name) + + job_name = node.job.name + display_job_name = job_name.replace("_", " ").replace("-", " ").title() + dag[node.job.name] = { + "cluster_id": node.job.cluster_id, + "display": display_job_name, + "status": CondorWorkflow.STATUS_MAP[node.job.status].lower(), + "parents": parents, + } + + return dag + + +@database_sync_to_async +def get_job_statuses(job): + num_statuses = 0 + statuses = {"Completed": 0, "Error": 0, "Running": 0, "Aborted": 0} + for key, value in job.statuses.items(): + if key in statuses: + num_statuses += value + statuses[key] = float(value) / float(job.num_jobs) * 100.0 + + return statuses, num_statuses + + @async_login_required async def perform_action( request, job_id, action, success_message="", error_message=None ): try: job = await get_job(job_id, request.user) - result = getattr(job, action)() - if inspect.iscoroutine(result): - await result - await job.safe_close() + await do_job_action(job, action) success = True message = success_message except Exception as e: @@ -75,10 +142,7 @@ async def delete(request, job_id): try: job = await get_job(job_id, request.user) job.clean_on_delete = True - result = job.delete() - if inspect.iscoroutine(result): - await result - await job.safe_close() + await do_job_action(job, "delete") success = True message = "" except Exception as e: @@ -95,10 +159,7 @@ async def show_log(request, job_id): try: job = await get_job(job_id, request.user) # Get the Job logs. - data = job.get_logs() - if inspect.iscoroutine(data): - data = await data - await job.safe_close() + data = await do_job_action(job, "get_logs") sub_job_options = [(k, k) for k in data.keys()] sub_job_select = SelectInput( @@ -158,21 +219,7 @@ async def get_log_content(request, job_id, key1, key2=None): try: job = await get_job(job_id, request.user) # Get the Job logs. - data = job.get_logs() - needs_safe_close = False - if inspect.iscoroutine(data): - data = await data - needs_safe_close = True - log_func = data[key1] - if key2 is not None: - log_func = log_func[key2] - content = log_func() if callable(log_func) else log_func - if inspect.iscoroutine(content): - content = await content - needs_safe_close = True - - if needs_safe_close: - await job.safe_close() + content = await _get_log_content(job, key1, key2) return JsonResponse({"success": True, "content": content}) except Exception as e: @@ -198,10 +245,7 @@ async def update_row(request, job_id): data = reconstruct_post_dict(request) try: job = await get_job(job_id, request.user) - result = job.update_status() - if inspect.iscoroutine(result): - await result - await job.safe_close() + await do_job_action(job, "update_status") status = job.cached_status status_msg = job.status_message statuses = None @@ -230,12 +274,7 @@ async def update_row(request, job_id): "Aborted": 5, } elif isinstance(job, CondorWorkflow): - num_statuses = 0 - statuses = {"Completed": 0, "Error": 0, "Running": 0, "Aborted": 0} - for key, value in job.statuses.items(): - if key in statuses: - num_statuses += value - statuses[key] = float(value) / float(job.num_jobs) * 100.0 + statuses, num_statuses = await get_job_statuses(job) # Handle case with CondorWorkflows where DAG has started working, # but jobs have not necessarily started yet. @@ -247,7 +286,9 @@ async def update_row(request, job_id): if status == "Results-Ready": status = "Running" - row = JobsTable.get_row(job, data["column_fields"], data.get("actions")) + row = JobsTable.get_row( + job, data["column_fields"], data.get("actions"), delay_loading_status=True + ) data.update( { "job": job, @@ -286,10 +327,7 @@ async def update_workflow_nodes_row(request, job_id): dag = {} try: job = await get_job(job_id, request.user) - result = job.update_status() - if inspect.iscoroutine(result): - await result - await job.safe_close() + await do_job_action(job, "update_status") status = job.cached_status # Hard code example for gizmo_showcase @@ -336,21 +374,7 @@ async def update_workflow_nodes_row(request, job_id): } else: - nodes = job.condor_object.node_set - - for node in nodes: - parents = [] - for parent in node.parent_nodes: - parents.append(parent.job.name) - - job_name = node.job.name - display_job_name = job_name.replace("_", " ").replace("-", " ").title() - dag[node.job.name] = { - "cluster_id": node.job.cluster_id, - "display": display_job_name, - "status": CondorWorkflow.STATUS_MAP[node.job.status].lower(), - "parents": parents, - } + dag = await get_condor_job_nodes(job) success = True except Exception as e: @@ -372,10 +396,7 @@ async def bokeh_row(request, job_id, type="individual-graph"): """ try: job = await get_job(job_id, request.user) - result = job.update_status() - if inspect.iscoroutine(result): - await result - await job.safe_close() + await do_job_action(job, "update_status") status = job.cached_status # Get dashboard link for a given job_id