diff --git a/modifiers/allocation/modifier.py b/modifiers/allocation/modifier.py index 4fb42fabf..9e0ef237a 100644 --- a/modifiers/allocation/modifier.py +++ b/modifiers/allocation/modifier.py @@ -287,11 +287,17 @@ def determine_allocation(self, v): if v.n_gpus: if v.sys_gpus_per_node: gpus_node_request = math.ceil(v.n_gpus / float(v.sys_gpus_per_node)) + if gpus_node_request > v.sys_gpus_per_node: + raise ValueError( + f"Requested GPUs ({v.n_gpus}) exceeds available GPUs " + f"({v.n_nodes * v.sys_gpus_per_node}) on {v.n_nodes} nodes" + ) else: raise ValueError( "Experiment requests GPUs, but sys_gpus_per_node " "is not specified for the system" ) + v.n_nodes = max(cores_node_request or 0, gpus_node_request or 0) if not v.n_threads_per_proc: @@ -466,6 +472,48 @@ def pjm_instructions(self, v): v.batch_submit = "pjsub {execute_experiment}" v.allocation_directives = "\n".join(batch_directives) + def pbs_instructions(self, v): + batch_opts, cmd_opts = Allocation._init_batch_and_cmd_opts(v) + + if not v.n_ranks_per_node: + v.n_ranks_per_node = math.ceil(v.n_ranks / v.n_nodes) + + node_spec = [f"select={v.n_nodes}"] + + if v.n_ranks: + cmd_opts.append(f"-np {v.n_ranks}") + node_spec.append(f"mpiprocs={v.n_ranks_per_node}") + + if v.n_threads_per_proc and v.n_threads_per_proc != 1: + node_spec.append(f"ompthreads={v.n_threads_per_proc}") + + n_cpus_per_node = v.n_ranks_per_node * v.n_threads_per_proc + node_spec.append(f"ncpus={n_cpus_per_node}") + + if v.n_gpus: + gpus_per_rank = self.gpus_as_gpus_per_rank(v.n_gpus) + node_spec.append(f"gpus={gpus_per_rank}") + + if node_spec: + batch_opts.append(f"-l {':'.join(node_spec)}") + else: + raise ValueError("Not enough information to select resources") + + if v.queue: + batch_opts.append(f"-q {v.queue}") + + if v.timeout: + batch_opts.append(f"-l walltime={TimeFormat.as_hhmmss(v.timeout)}") + + if v.bank: + batch_opts.append(f"-A {v.bank}") + + batch_directives = list(f"#PBS {x}" for x in batch_opts) + + v.mpi_command = f"mpiexec {' '.join(cmd_opts)}" + v.batch_submit = "qsub {execute_experiment}" + v.allocation_directives = "\n".join(batch_directives) + def determine_scheduler_instructions(self, v): handler = { "slurm": self.slurm_instructions, @@ -473,6 +521,7 @@ def determine_scheduler_instructions(self, v): "mpi": self.mpi_instructions, "lsf": self.lsf_instructions, "pjm": self.pjm_instructions, + "pbs": self.pbs_instructions, } if v.scheduler not in handler: raise ValueError(