Skip to content

Commit

Permalink
try to fix issues with subprocess monitoring job
Browse files Browse the repository at this point in the history
  • Loading branch information
Acribbs committed Jan 17, 2025
1 parent 13dd954 commit 0ba0662
Showing 1 changed file with 53 additions and 6 deletions.
59 changes: 53 additions & 6 deletions cgatcore/pipeline/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,46 @@ def _monitor_job(self, job_id, job_script):
FAILED_STATES = ["FAILED", "TIMEOUT", "CANCELLED", "NODE_FAIL", "OUT_OF_MEMORY",
"BOOT_FAIL", "DEADLINE", "PREEMPTED", "REVOKED", "SPECIAL_EXIT"]

def get_job_error():
"""Get detailed error information from SLURM."""
error_info = []

# Try sacct first for detailed state
cmd = f"sacct -j {job_id} --format=JobID,State,ExitCode,DerivedExitCode,Comment,Reason --noheader --parsable2"
process = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if process.returncode == 0 and process.stdout.strip():
error_info.append(f"SACCT details: {process.stdout.strip()}")

# Try scontrol for more details
cmd = f"scontrol show job {job_id}"
process = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if process.returncode == 0:
for line in process.stdout.split('\n'):
if any(x in line for x in ['Reason=', 'ExitCode=', 'Comment=']):
error_info.append(line.strip())

# Check job script output/error files
script_dir = os.path.dirname(job_script)
for ext in ['.out', '.err']:
output_file = os.path.join(script_dir, f"slurm-{job_id}{ext}")
if os.path.exists(output_file):
try:
with open(output_file, 'r') as f:
content = f.read().strip()
if content:
error_info.append(f"Content of {output_file}:")
error_info.append(content)
except Exception as e:
error_info.append(f"Could not read {output_file}: {str(e)}")

return '\n'.join(error_info)

while True:
if time.time() - start_time > max_wait_time:
error_details = get_job_error()
self.logger.error(f"Job {job_id} exceeded maximum wait time of {max_wait_time} seconds")
raise RuntimeError(f"Job {job_id} exceeded maximum wait time")
self.logger.error(f"Error details:\n{error_details}")
raise RuntimeError(f"Job {job_id} exceeded maximum wait time. Details:\n{error_details}")

# First try squeue to check if job is still running
cmd = f"squeue -j {job_id} -h -o %T"
Expand All @@ -410,8 +446,10 @@ def _monitor_job(self, job_id, job_script):
process = subprocess.run(cmd, shell=True, capture_output=True, text=True)

if process.returncode != 0:
error_details = get_job_error()
self.logger.error(f"Failed to get job status: {process.stderr}")
raise RuntimeError(f"Failed to get job status: {process.stderr}")
self.logger.error(f"Error details:\n{error_details}")
raise RuntimeError(f"Failed to get job status: {process.stderr}\nDetails:\n{error_details}")

# Parse scontrol output
for line in process.stdout.split('\n'):
Expand All @@ -428,11 +466,15 @@ def _monitor_job(self, job_id, job_script):
self.logger.info(f"Job {job_id} completed successfully")
break
elif any(s in status for s in FAILED_STATES):
error_details = get_job_error()
self.logger.error(f"Job {job_id} failed with status: {status}")
raise RuntimeError(f"Job {job_id} failed with status: {status}")
self.logger.error(f"Error details:\n{error_details}")
raise RuntimeError(f"Job {job_id} failed with status: {status}\nDetails:\n{error_details}")
elif status == "UNKNOWN":
# If we can't determine the status, assume it's still running
self.logger.debug(f"Could not determine status for job {job_id}, assuming still running")
self.logger.debug(f"Could not determine status for job {job_id}, checking error details")
error_details = get_job_error()
if error_details:
self.logger.error(f"Found error details for unknown status:\n{error_details}")
else:
# Job still in queue or running
status = process.stdout.strip().upper()
Expand All @@ -449,15 +491,20 @@ def _monitor_job(self, job_id, job_script):
self.logger.debug(f"Job {job_id} status: {status}")

if status in FAILED_STATES:
error_details = get_job_error()
self.logger.error(f"Job {job_id} failed with status: {status}")
raise RuntimeError(f"Job {job_id} failed with status: {status}")
self.logger.error(f"Error details:\n{error_details}")
raise RuntimeError(f"Job {job_id} failed with status: {status}\nDetails:\n{error_details}")
elif status in COMPLETED_STATES:
self.logger.info(f"Job {job_id} completed successfully")
break
elif status in RUNNING_STATES or not status: # Include empty status here
self.logger.debug(f"Job {job_id} is {status or 'in unknown state'}")
else:
self.logger.debug(f"Job {job_id} has unknown status: {status}")
error_details = get_job_error()
if error_details:
self.logger.debug(f"Additional status details:\n{error_details}")

time.sleep(check_interval)

Expand Down

0 comments on commit 0ba0662

Please sign in to comment.