Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 13 additions & 30 deletions workflow/calculation/hf_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def args_parser(cmd=None):
except SystemExit as e:
print(e, flush=True)
# invalid arguments or -h
comm.Abort()
comm.Abort(1)

if args.sim_bin is None:
args.sim_bin = binary_version.get_hf_binmod(args.version)
Expand Down Expand Up @@ -381,7 +381,7 @@ def unfinished(out_file):
logger.debug("Checkpoints found.")
initialise(check_only=True)
logger.error("HF Simulation already completed.")
comm.Abort()
comm.Abort(1)
except AssertionError:
return
# seems ok to continue simulation
Expand Down Expand Up @@ -504,7 +504,7 @@ def run_hf(

with open(f"hf_err_{idx_0}", "w") as e:
e.write(stderr)
comm.Abort()
comm.Abort(1)

# write e_dist and vs to file
with open(args.out_file, "r+b") as out:
Expand All @@ -514,20 +514,7 @@ def run_hf(
e_dist[i].tofile(out)
vs.tofile(out)

def validate_end(idx_n):
"""
Verify filesize has been extended by the correct amount.
idx_n: position (starting at 1) of last station to be completed
"""
try:
assert os.stat(args.out_file).st_size == head_total + idx_n * block_size
except AssertionError:
msg = f"Expected size: {head_total + idx_n * block_size} bytes (last stat idx: {idx_n}), actual {os.stat(args.out_file).st_size} bytes."
logger.error("Validation failed: {}".format(msg))
comm.Abort()

# distribute work, must be sequential for optimisation,
# and for validation function above to be thread safe
# distribute work in a round-robin fashion across ranks for optimisation
# if size=4, rank 0 takes [0,4,8...], rank 1 takes [1,5,9...], rank 2 takes [2,6,10...],
# rank 3 takes [3,7,11...]
work = stations_todo[rank::size]
Expand All @@ -551,24 +538,20 @@ def validate_end(idx_n):
in_stats, 1, work_idx[s], v1d_path=v1d_path
) # passing in_stat with the seed adjustment work_idx[s]

if (
len(work_idx) > 0
and len(stations_todo_idx) > 0
and work_idx[-1] == stations_todo_idx[-1]
): # if this rank did the last station in the full list
validate_end(work_idx[-1] + 1)

os.remove(in_stats)
print(
"Process {} of {} completed {} stations ({:.2f}).".format(
rank, size, work.size, MPI.Wtime() - t0
)
)
logger.debug(
"Process {} of {} completed {} stations ({:.2f}).".format(
rank, size, work.size, MPI.Wtime() - t0
)
)
comm.Barrier() # all ranks wait here until rank 0 arrives to announce all completed
if is_master:
logger.debug("Simulation completed.")
actual_size = os.stat(args.out_file).st_size
if actual_size != file_size:
msg = f"CRITICAL: Final file size mismatch! Expected {file_size}, got {actual_size}"
print(msg, flush=True)
logger.error(msg)
comm.Abort(1)
else:
logger.debug("Simulation completed and size verified.")
print("✅ HF completed successfully", flush=True)
Comment on lines 548 to +557

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For increased robustness, it's good practice to wrap the file size check in a try...except OSError block. This will gracefully handle cases where os.stat(args.out_file) might fail (e.g., if the file is unexpectedly deleted or permissions change during execution). This prevents the master rank from crashing with an unhandled exception and ensures a clean shutdown of all MPI processes via comm.Abort().

Suggested change
if is_master:
logger.debug("Simulation completed.")
actual_size = os.stat(args.out_file).st_size
if actual_size != file_size:
msg = f"CRITICAL: Final file size mismatch! Expected {file_size}, got {actual_size}"
print(msg, flush=True)
logger.error(msg)
comm.Abort(1)
else:
logger.debug("Simulation completed and size verified.")
print("✅ HF completed successfully", flush=True)
if is_master:
try:
actual_size = os.stat(args.out_file).st_size
if actual_size != file_size:
msg = f"CRITICAL: Final file size mismatch! Expected {file_size}, got {actual_size}"
print(msg, flush=True)
logger.error(msg)
comm.Abort(1)
else:
logger.debug("Simulation completed and size verified.")
print("✅ HF completed successfully", flush=True)
except OSError as e:
msg = f"CRITICAL: Could not stat output file {args.out_file}: {e}"
print(msg, flush=True)
logger.error(msg)
comm.Abort(1)