Skip to content

Commit

Permalink
Fixes for jobs table async compatibility (tethysplatform#1091)
Browse files Browse the repository at this point in the history
* Fixes for jobs table async compatibility

* update tests

---------

Co-authored-by: sdc50 <[email protected]>
  • Loading branch information
swainn and sdc50 authored Sep 13, 2024
1 parent ce55da9 commit 0c5ac17
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async def test_show_log_exception(self, mock_tj, mock_log):
)

@mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job")
async def test_get_log_content(self, mock_tj):
async def test_get_log_content_async(self, mock_tj):
mock_tj.return_value = mock.MagicMock(
get_logs=partial(mock_async_func, {"log": "log content"}),
safe_close=mock_async_func,
Expand All @@ -201,7 +201,19 @@ async def test_get_log_content(self, mock_tj):
self.assertEqual(200, result.status_code)

@mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job")
async def test_get_log_content_key2(self, mock_tj):
async def test_get_log_content(self, mock_tj):
mock_tj.return_value = mock.MagicMock(
get_logs=lambda: {"log": "log content"},
)
request = mock.MagicMock(user=mock.MagicMock(is_authenticated=True))

result = await gizmo_jobs_table.get_log_content(
request=request, job_id="1", key1="log"
)
self.assertEqual(200, result.status_code)

@mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job")
async def test_get_log_content_key2_async(self, mock_tj):
log_func = partial(mock_async_func, "log content")
mock_tj.return_value = mock.MagicMock(
get_logs=partial(
Expand All @@ -217,6 +229,25 @@ async def test_get_log_content_key2(self, mock_tj):
)
self.assertEqual(200, result.status_code)

@mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job")
async def test_get_log_content_key2(self, mock_tj):
def log_func():
return "log content"

mock_tj.return_value = mock.MagicMock(
get_logs=partial(
mock_async_func,
{"log": {"sub_log_1": log_func, "sub_log_2": "log content"}},
),
safe_close=mock_async_func,
)
request = mock.MagicMock(user=mock.MagicMock(is_authenticated=True))

result = await gizmo_jobs_table.get_log_content(
request=request, job_id="1", key1="log", key2="sub_log_1"
)
self.assertEqual(200, result.status_code)

@mock.patch("tethys_gizmos.views.gizmos.jobs_table.logger")
@mock.patch("tethys_gizmos.views.gizmos.jobs_table.get_job")
async def test_get_log_content_exception(self, mock_tj, mock_log):
Expand Down
143 changes: 82 additions & 61 deletions tethys_gizmos/views/gizmos/jobs_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,88 @@ def get_job(job_id, user):
return TethysJob.objects.get_subclass(id=job_id, user=user)


async def do_job_action(job, action):
func = getattr(job, action)
if inspect.iscoroutinefunction(func):
ret = await func()
await job.safe_close()
else:
ret = await database_sync_to_async(func)()
return ret


async def _get_log_content(job, key1, key2):
# Get the Job logs.
if inspect.iscoroutinefunction(job.get_logs):
data = await job.get_logs()
needs_safe_close = True
else:
data = await database_sync_to_async(job.get_logs)()
needs_safe_close = False

log_func = data[key1]
if key2 is not None:
log_func = log_func[key2]
if callable(log_func):
if inspect.iscoroutinefunction(log_func):
content = await log_func()
needs_safe_close = True
else:
content = await database_sync_to_async(log_func)()
else:
content = log_func

if needs_safe_close:
await job.safe_close()
return content


@database_sync_to_async
def get_dask_scheduler(scheduler_id):
return DaskScheduler.objects.get(id=scheduler_id)


@database_sync_to_async
def get_condor_job_nodes(job):
dag = {}
nodes = job.condor_object.node_set

for node in nodes:
parents = []
for parent in node.parent_nodes:
parents.append(parent.job.name)

job_name = node.job.name
display_job_name = job_name.replace("_", " ").replace("-", " ").title()
dag[node.job.name] = {
"cluster_id": node.job.cluster_id,
"display": display_job_name,
"status": CondorWorkflow.STATUS_MAP[node.job.status].lower(),
"parents": parents,
}

return dag


@database_sync_to_async
def get_job_statuses(job):
num_statuses = 0
statuses = {"Completed": 0, "Error": 0, "Running": 0, "Aborted": 0}
for key, value in job.statuses.items():
if key in statuses:
num_statuses += value
statuses[key] = float(value) / float(job.num_jobs) * 100.0

return statuses, num_statuses


@async_login_required
async def perform_action(
request, job_id, action, success_message="", error_message=None
):
try:
job = await get_job(job_id, request.user)
result = getattr(job, action)()
if inspect.iscoroutine(result):
await result
await job.safe_close()
await do_job_action(job, action)
success = True
message = success_message
except Exception as e:
Expand All @@ -75,10 +142,7 @@ async def delete(request, job_id):
try:
job = await get_job(job_id, request.user)
job.clean_on_delete = True
result = job.delete()
if inspect.iscoroutine(result):
await result
await job.safe_close()
await do_job_action(job, "delete")
success = True
message = ""
except Exception as e:
Expand All @@ -95,10 +159,7 @@ async def show_log(request, job_id):
try:
job = await get_job(job_id, request.user)
# Get the Job logs.
data = job.get_logs()
if inspect.iscoroutine(data):
data = await data
await job.safe_close()
data = await do_job_action(job, "get_logs")

sub_job_options = [(k, k) for k in data.keys()]
sub_job_select = SelectInput(
Expand Down Expand Up @@ -158,21 +219,7 @@ async def get_log_content(request, job_id, key1, key2=None):
try:
job = await get_job(job_id, request.user)
# Get the Job logs.
data = job.get_logs()
needs_safe_close = False
if inspect.iscoroutine(data):
data = await data
needs_safe_close = True
log_func = data[key1]
if key2 is not None:
log_func = log_func[key2]
content = log_func() if callable(log_func) else log_func
if inspect.iscoroutine(content):
content = await content
needs_safe_close = True

if needs_safe_close:
await job.safe_close()
content = await _get_log_content(job, key1, key2)

return JsonResponse({"success": True, "content": content})
except Exception as e:
Expand All @@ -198,10 +245,7 @@ async def update_row(request, job_id):
data = reconstruct_post_dict(request)
try:
job = await get_job(job_id, request.user)
result = job.update_status()
if inspect.iscoroutine(result):
await result
await job.safe_close()
await do_job_action(job, "update_status")
status = job.cached_status
status_msg = job.status_message
statuses = None
Expand Down Expand Up @@ -230,12 +274,7 @@ async def update_row(request, job_id):
"Aborted": 5,
}
elif isinstance(job, CondorWorkflow):
num_statuses = 0
statuses = {"Completed": 0, "Error": 0, "Running": 0, "Aborted": 0}
for key, value in job.statuses.items():
if key in statuses:
num_statuses += value
statuses[key] = float(value) / float(job.num_jobs) * 100.0
statuses, num_statuses = await get_job_statuses(job)

# Handle case with CondorWorkflows where DAG has started working,
# but jobs have not necessarily started yet.
Expand All @@ -247,7 +286,9 @@ async def update_row(request, job_id):
if status == "Results-Ready":
status = "Running"

row = JobsTable.get_row(job, data["column_fields"], data.get("actions"))
row = JobsTable.get_row(
job, data["column_fields"], data.get("actions"), delay_loading_status=True
)
data.update(
{
"job": job,
Expand Down Expand Up @@ -286,10 +327,7 @@ async def update_workflow_nodes_row(request, job_id):
dag = {}
try:
job = await get_job(job_id, request.user)
result = job.update_status()
if inspect.iscoroutine(result):
await result
await job.safe_close()
await do_job_action(job, "update_status")
status = job.cached_status

# Hard code example for gizmo_showcase
Expand Down Expand Up @@ -336,21 +374,7 @@ async def update_workflow_nodes_row(request, job_id):
}

else:
nodes = job.condor_object.node_set

for node in nodes:
parents = []
for parent in node.parent_nodes:
parents.append(parent.job.name)

job_name = node.job.name
display_job_name = job_name.replace("_", " ").replace("-", " ").title()
dag[node.job.name] = {
"cluster_id": node.job.cluster_id,
"display": display_job_name,
"status": CondorWorkflow.STATUS_MAP[node.job.status].lower(),
"parents": parents,
}
dag = await get_condor_job_nodes(job)

success = True
except Exception as e:
Expand All @@ -372,10 +396,7 @@ async def bokeh_row(request, job_id, type="individual-graph"):
"""
try:
job = await get_job(job_id, request.user)
result = job.update_status()
if inspect.iscoroutine(result):
await result
await job.safe_close()
await do_job_action(job, "update_status")
status = job.cached_status

# Get dashboard link for a given job_id
Expand Down

0 comments on commit 0c5ac17

Please sign in to comment.