Skip to content

Commit

Permalink
trsh - trsh_job_queue
Browse files Browse the repository at this point in the history
A new function that will attempt to select another queue, if provided by the user, with a higher walltime for the job that failed due to walltime exceeding
If not provided, it will attempt to query the server (PBS) to see if there are any available queues
also checks if the user has provided queue names to exclude

tests:
created multiple tests to make sure it queries servers and settings correctly
  • Loading branch information
calvinp0 committed Dec 14, 2023
1 parent 1a606ec commit 6ec3439
Show file tree
Hide file tree
Showing 2 changed files with 490 additions and 184 deletions.
151 changes: 151 additions & 0 deletions arc/job/trsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
is_same_pivot,
is_same_sequence_sublist,
is_str_float,
convert_to_hours
)
from arc.exceptions import InputError, SpeciesError, TrshError
from arc.imports import settings
Expand Down Expand Up @@ -1114,6 +1115,156 @@ def trsh_conformer_isomorphism(software: str,
break
return level_of_theory

def trsh_job_queue(server: str,
job_name: str,
max_time: int = 24,
attempted_queues: list = None,
) -> Tuple[dict, bool]:
""" A function to troubleshoot job queue issues. This function will attempt to determine if the user has provided a queue that provides more time than the walltime failed queue.
If not, it will attempt to determine if there are any other queues available on the server that provide more time than the walltime failed queue.
Args:
server (str): Name of the server
job_name (str): Name of the job
max_time (int, optional): The max time that the current queue that the job failed on provied. Defaults to 24, measured in hours.
attempted_queues (list, optional): Any queues that have already been attempted to run the job on. Defaults to None.
Returns:
Tuple[dict, bool]: A dictionary of the available queues and a boolean indicating if the function was successful.
"""

server_queues = servers[server].get('queue', list())
cluster_soft = servers[server].get('cluster_soft','Undefined')
exclude_queues = servers[server].get('exclude_queues', list())

# Check if there are any available queues in server_queues that hasn't been tried yet
if len(server_queues) > 1:
# Make sure that the queue is not already in the attempted_queues list
server_queues = [
queue for queue in server_queues
if (attempted_queues is None or queue not in attempted_queues)
and convert_to_hours(server_queues[queue]) >= max_time
]
if len(server_queues) == 0:
logger.error(f' Could not troubleshoot {job_name} on {server} as all available queues have been tried. Will attempt to query the server for additional queues.')
else:
return server_queues, True
# If the server is PBS, query which queues are available
if cluster_soft.lower() == 'pbs':
# First determine which group the current user belongs to
cmd = 'groups'
output = execute_command(cmd, shell=True)
if 'Error' in output:
logger.error(f'Could not troubleshoot {job_name} on {server} as the groups command failed.')
return None, False
else:
# (['users zeus-users vkm-users gaussian grinberg-dana_prj docker'], [])
user_groups = output[0][0].split()
if len(user_groups) == 0:
logger.error(f'Could not troubleshoot {job_name} on {server} as the groups command did not return any groups.')
return None, False
# check if the term '-users' is in the group name, if so, remove it
elif any('-users' in group for group in user_groups):
user_groups = [group.replace('-users', '') for group in user_groups]
# Now query the queues
cmd = 'qstat -q'
output_queues = execute_command(cmd, shell=True)
if 'Error' in output_queues:
logger.error(f'Could not troubleshoot {job_name} on {server} as the qstat command failed.')
return None, False
else:
# Need to parse output
# Example:
# Queue Memory CPU Time Walltime Node Run Que Lm State
# ---------------- ------ -------- -------- ---- ----- ----- ---- -----
# workq -- -- -- -- 0 0 -- D S
# maytal_q -- -- -- -- 7 0 -- E R
# vkm_all_q -- -- -- -- 0 0 -- D R
# zeus_temp -- -- 24:00:00 -- 0 0 -- D S
# dagan_q -- -- -- -- 0 0 -- E R
# frankel_q -- -- -- -- 0 0 -- E R
# vkm_gm_q -- -- -- -- 0 0 -- E R
# zeus_all_scalar -- -- 24:00:00 -- 0 0 -- D S
# yuval_q -- -- -- -- 0 0 -- E R
# dan_q -- -- -- -- 7 0 -- E R
# zeus_all_q -- -- 24:00:00 -- 4 0 -- E R
# zeus_long_q -- -- 168:00:0 -- 0 0 -- E R
# zeus_short_q -- -- 03:00:00 -- 0 0 -- E R
# gpu_v100_q -- -- 480:00:0 -- 0 0 -- E R
# brandon_q -- -- -- -- 0 0 -- E R
# karp_q -- -- -- -- 0 0 -- E R
# mafat_gm_q -- -- -- -- 0 0 -- E R
# training_q -- -- 24:00:00 -- 0 0 -- E R
# mafat4_q -- -- -- -- 0 0 -- D R
# zeus_new_q -- -- 72:00:00 -- 0 0 -- E R
# zeus_combined_q -- -- 24:00:00 -- 17 0 -- E R
# zeus_comb_short -- -- 03:00:00 -- 0 0 -- E R
# mafat_new_q -- -- -- -- 46 0 -- E R
# dagan_comb_q -- -- -- -- 0 0 -- E R
# dan_comb_q -- -- -- -- 0 0 -- E R
# karp_comb_q -- -- -- -- 0 0 -- D R
# brandon_comb_q -- -- -- -- 0 0 -- E R
# train_gpu_q -- -- 24:00:00 -- 0 0 -- E R
# ----- -----
# 81 0
# 1. Get the queue names in the first column, and check if the state is 'E' (enabled) or 'D' (disabled) - Select only enabled queues
# 2. Once we have a list of queues, we need to make sure we can submit to them. We can do this by check qstat -Qf <queue_name> and see output of acl_groups
# 3. We also need to get the wall time for each queue
queues = {}
for line in output_queues[0]:
if line.strip() and not line.startswith("Queue") and not line.startswith('----') and not line.startswith('server'):
parts = line.split()
if len(parts) >= 9 and parts[8] == 'E':
queue_name = parts[0]
acl_groups = None
cmd = f'qstat -Qf {queue_name}'
output_specific_queue = execute_command(cmd, shell=True)
# Parse Example:
#
# Queue: mafat_new_q
# queue_type = Execution
# total_jobs = 44
# state_count = Transit:0 Queued:0 Held:0 Waiting:0 Running:44 Exiting:0 Begu
# n:0
# resources_default.walltime = 3600:00:00
# acl_group_enable = True
# acl_groups = grinberg-dana_prj,halupovich_prj,yuvallevy_prj
# default_chunk.qlist = mafat_new_q
# resources_assigned.mem = 376625977kb
# resources_assigned.mpiprocs = 132
# resources_assigned.ncpus = 3254
# resources_assigned.nodect = 47
# max_run_res.ncpus = [o:PBS_ALL=3584]
# enabled = True
# started = True
for line_queue in output_specific_queue[0]:
# Check walltime
if line_queue.strip().startswith('resources_default.walltime'):
walltime = line_queue.split('=')[1].strip()
queues[queue_name] = walltime
if line_queue.strip().startswith('acl_groups'):
acl_groups = True
groups = line_queue.split('=')[1].strip().split(',')
if any(group in user_groups for group in groups):
break # User is in one of the acl_groups, keep the queue
else:
queues.pop(queue_name, None) # User is not in acl_groups, remove the queue
break
if not any(group in queue_name for group in user_groups) and acl_groups is None:
queues.pop(queue_name, None) # Queue name does not contain any of the user's groups, remove the queue
# Check if any of the found queues are part of the excluded queues list
if exclude_queues:
for queue in exclude_queues:
if queue in queues:
queues.pop(queue, None)
if len(queues) == 0:
logger.error(f'Could not troubleshoot {job_name} on {server} as no queues were found.')
return None, False
else:
return queues, True
else:
logger.error(f'Could not troubleshoot {job_name} on {server} as server is not PBS.')
return None, False

def trsh_job_on_server(server: str,
job_name: str,
Expand Down
Loading

0 comments on commit 6ec3439

Please sign in to comment.