Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinp0 committed Nov 30, 2023
1 parent d3f174f commit 1eb2662
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 2 deletions.
9 changes: 9 additions & 0 deletions arc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1721,3 +1721,12 @@ def is_xyz_mol_match(mol: 'Molecule',
if element not in element_dict_xyz or element_dict_xyz[element] != count:
return False
return True

def convert_to_hours(time_str:str):
"""Convert walltime string in format HH:MM:SS to hours.
Args:
time_str (str): A time string in format HH:MM:SS
"""
h, m, s = map(int, time_str.split(':'))
return h + m / 60 + s / 3600
38 changes: 37 additions & 1 deletion arc/job/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
rename_output,
submit_job,
)
from arc.job.trsh import trsh_job_on_server
from arc.job.trsh import trsh_job_on_server, trsh_job_queue
from arc.job.ssh import SSHClient
from arc.job.trsh import determine_ess_status
from arc.species.converter import xyz_to_str
Expand Down Expand Up @@ -499,6 +499,7 @@ def write_submit_script(self) -> None:
submit_script = submit_script.format(
name=self.job_server_name,
un=servers[self.server]['un'],
queue= servers[self.server].get('queue'),
t_max=self.format_max_job_time(time_format=t_max_format[servers[self.server]['cluster_soft']]),
memory=int(self.submit_script_memory) if (isinstance(self.submit_script_memory, int) or isinstance(self.submit_script_memory, float)) else self.submit_script_memory,
cpus=self.cpu_cores,
Expand Down Expand Up @@ -894,6 +895,20 @@ def determine_job_status(self):
'time limit.'
self.job_status[1]['line'] = line
break
# =>> PBS: job killed: walltime 10837 exceeded limit 10800
elif 'job killed' in line and 'exceeded limit' in line:
logger.warning(f'Looks like the job was killed on {self.server} due to time limit. '
f'Got: {line}')
time_limit = int(line.split('limit')[1].split()[0])
new_max_job_time = self.max_job_time - 24 if self.max_job_time > 25 else 1
logger.warning(f'Setting max job time to {new_max_job_time} (was {self.max_job_time})')
self.max_job_time = new_max_job_time
self.job_status[1]['status'] = 'errored'
self.job_status[1]['keywords'] = ['ServerTimeLimit']
self.job_status[1]['error'] = 'Job killed by the server since it reached the maximal ' \
'time limit.'
self.job_status[1]['line'] = line
break
elif self.job_status[0] == 'running':
self.job_status[1]['status'] = 'running'

Expand Down Expand Up @@ -1313,6 +1328,27 @@ def troubleshoot_server(self):
# resubmit job
self.execute()

def troubleshoot_queue(self):
"""
Troubleshoot queue errors.
"""
queues, run_job = trsh_job_queue(job_name=self.job_name,
job_id=self.job_id,
job_status=self.job_status[1],
server=self.server,
server_nodes=self.server_nodes,
)

if queues is not None:
# We use self.max_job_time to determine which queues to troubleshoot.
filtered_queues = {queues: walltime for queue, walltime, in queues.items() if convert_to_hours(walltime) >= self.max_job_time}
# Now we sort the queues by walltime, and choose the one with the longest walltime.
sorted_queues = sorted(filtered_queues.items(), key=lambda x: convert_to_hours(x[1]), reverse=True)
queue = sorted_queues[0][0]
if run_job:
self.queue = queue
self.execute()

def save_output_file(self,
key: Optional[str] = None,
val: Optional[Union[float, dict, np.ndarray]] = None,
Expand Down
130 changes: 130 additions & 0 deletions arc/job/trsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,136 @@ def trsh_conformer_isomorphism(software: str,
break
return level_of_theory

def trsh_job_queue(server: str,
job_name: str,
job_id: Union[int, str],
server_nodes: list = None,
) -> Tuple[dict, bool]:
"""_summary_
Args:
server (str): _description_
job_name (str): _description_
job_id (Union[int, str]): _description_
job_server_status (str): _description_
remote_path (str): _description_
server_nodes (list, optional): _description_. Defaults to None.
Returns:
Tuple[dict, bool]: _description_
"""

server_nodes = server_nodes if server_nodes is not None else list()
cluster_soft = servers[server]['cluster_soft']

# If the server is PBS, query which queues are available
if cluster_soft.lower() == 'pbs':
# First determine which group the current user belongs to
cmd = 'groups'
output = execute_command(cmd, shell=True)
if 'Error' in output:
logger.error(f'Could not troubleshoot {job_name} on {server} as the groups command failed.')
return None, False
else:
# (['users zeus-users vkm-users gaussian grinberg-dana_prj docker'], [])
user_groups = output[0][0].split()
if len(user_groups) == 0:
logger.error(f'Could not troubleshoot {job_name} on {server} as the groups command did not return any groups.')
return None, False
# check if the term '-users' is in the group name, if so, remove it
elif any('-users' in group for group in user_groups):
user_groups = [group.replace('-users', '') for group in user_groups]
# Now query the queues
cmd = 'qstat -q'
output_queues = execute_command(cmd, shell=True)
if 'Error' in output_queues:
logger.error(f'Could not troubleshoot {job_name} on {server} as the qstat command failed.')
return None, False
else:
# Need to parse output
# Example:
# Queue Memory CPU Time Walltime Node Run Que Lm State
# ---------------- ------ -------- -------- ---- ----- ----- ---- -----
# workq -- -- -- -- 0 0 -- D S
# maytal_q -- -- -- -- 7 0 -- E R
# vkm_all_q -- -- -- -- 0 0 -- D R
# zeus_temp -- -- 24:00:00 -- 0 0 -- D S
# dagan_q -- -- -- -- 0 0 -- E R
# frankel_q -- -- -- -- 0 0 -- E R
# vkm_gm_q -- -- -- -- 0 0 -- E R
# zeus_all_scalar -- -- 24:00:00 -- 0 0 -- D S
# yuval_q -- -- -- -- 0 0 -- E R
# dan_q -- -- -- -- 7 0 -- E R
# zeus_all_q -- -- 24:00:00 -- 4 0 -- E R
# zeus_long_q -- -- 168:00:0 -- 0 0 -- E R
# zeus_short_q -- -- 03:00:00 -- 0 0 -- E R
# gpu_v100_q -- -- 480:00:0 -- 0 0 -- E R
# brandon_q -- -- -- -- 0 0 -- E R
# karp_q -- -- -- -- 0 0 -- E R
# mafat_gm_q -- -- -- -- 0 0 -- E R
# training_q -- -- 24:00:00 -- 0 0 -- E R
# mafat4_q -- -- -- -- 0 0 -- D R
# zeus_new_q -- -- 72:00:00 -- 0 0 -- E R
# zeus_combined_q -- -- 24:00:00 -- 17 0 -- E R
# zeus_comb_short -- -- 03:00:00 -- 0 0 -- E R
# mafat_new_q -- -- -- -- 46 0 -- E R
# dagan_comb_q -- -- -- -- 0 0 -- E R
# dan_comb_q -- -- -- -- 0 0 -- E R
# karp_comb_q -- -- -- -- 0 0 -- D R
# brandon_comb_q -- -- -- -- 0 0 -- E R
# train_gpu_q -- -- 24:00:00 -- 0 0 -- E R
# ----- -----
# 81 0
# 1. Get the queue names in the first column, and check if the state is 'E' (enabled) or 'D' (disabled) - Select only enabled queues
# 2. Once we have a list of queues, we need to make sure we can submit to them. We can do this by check qstat -Qf <queue_name> and see output of acl_groups
# 3. We also need to get the wall time for each queue
queues = {}
for line in output_queues[0]:
if line.strip() and not line.startswith("Queue") and not line.startswith('----') and not line.startswith('server'):
parts = line.split()
if len(parts) >= 9 and parts[8] == 'E':
queue_name = parts[0]
acl_groups = None
cmd = f'qstat -Qf {queue_name}'
output_specific_queue = execute_command(cmd, shell=True)
# Parse Example:
#
# Queue: mafat_new_q
# queue_type = Execution
# total_jobs = 44
# state_count = Transit:0 Queued:0 Held:0 Waiting:0 Running:44 Exiting:0 Begu
# n:0
# resources_default.walltime = 3600:00:00
# acl_group_enable = True
# acl_groups = grinberg-dana_prj,halupovich_prj,yuvallevy_prj
# default_chunk.qlist = mafat_new_q
# resources_assigned.mem = 376625977kb
# resources_assigned.mpiprocs = 132
# resources_assigned.ncpus = 3254
# resources_assigned.nodect = 47
# max_run_res.ncpus = [o:PBS_ALL=3584]
# enabled = True
# started = True
for line_queue in output_specific_queue[0]:
# Check walltime
if line_queue.strip().startswith('resources_default.walltime'):
walltime = line_queue.split('=')[1].strip()
queues[queue_name] = walltime
if line_queue.strip().startswith('acl_groups'):
acl_groups = True
groups = line_queue.split('=')[1].strip().split(',')
if any(group in user_groups for group in groups):
break # User is in one of the acl_groups, keep the queue
else:
queues.pop(queue_name, None) # User is not in acl_groups, remove the queue
break
if not any(group in queue_name for group in user_groups) and acl_groups is None:
queues.pop(queue_name, None) # Queue name does not contain any of the user's groups, remove the queue
if len(queues) == 0:
logger.error(f'Could not troubleshoot {job_name} on {server} as no queues were found.')
return None, False
else:
return queues, True

def trsh_job_on_server(server: str,
job_name: str,
Expand Down
6 changes: 6 additions & 0 deletions arc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,12 @@ def end_job(self, job: 'JobAdapter',
f'Was {original_mem} GB, rerunning job with {job.job_memory_gb} GB.')
job.job_memory_gb = used_mem * 4.5 if used_mem is not None else job.job_memory_gb * 0.5
self._run_a_job(job=job, label=label)
if job.job_status[1]['status'] == 'errored' and job.job_status[1]['keywords'] == ['ServerTimeLimit']:
logger.warning(f'Job {job.job_name} errored because of a server time limit. '
f'Rerunning job with {job.max_job_time * 2} hours.')
job.max_job_time *= 2

self._run_a_job(job=job, label=label)

if not os.path.isfile(job.local_path_to_output_file) and not job.execution_type == 'incore':
job.rename_output_file()
Expand Down
2 changes: 1 addition & 1 deletion arc/settings/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@
},
'pbs_sample': {
'gaussian': """#!/bin/bash -l
#PBS -q batch
#PBS -q {queue}
#PBS -l nodes=1:ppn={cpus}
#PBS -l mem={memory}mb
#PBS -l walltime=48:00:00
Expand Down

0 comments on commit 1eb2662

Please sign in to comment.