From d66a06b5a4435cc3212896dd9ddbb13b39a62f35 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 29 Sep 2022 16:09:55 -0400 Subject: [PATCH 01/11] Add task_gpp to launch_task --- ipsframework/services.py | 5 ++++- ipsframework/taskManager.py | 27 ++++++++++++++++++--------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 5e9e2bd1..6eca0382 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -195,6 +195,7 @@ def __init__(self, fwk, fwk_in_q, svc_response_q, sim_conf, log_pipe_name): self.binary_fullpath_cache = {} self.ppn = 0 self.cpp = 0 + self.gpp = 0 self.shared_nodes = False def __initialize__(self, component_ref): @@ -613,6 +614,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): * *task_ppn* : the processes per node value for this task * *task_cpp* : the cores per process, only used when ``MPIRUN=srun`` commands + * *task_gpp* : the gpus per process, only used when ``MPIRUN=srun`` commands * *omp* : If ``True`` the task will be launch with the correct OpenMP environment variables set, only used when ``MPIRUN=srun`` * *block* : specifies that this task will block (or raise an @@ -679,6 +681,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): task_ppn = keywords.get('task_ppn', self.ppn) task_cpp = keywords.get('task_cpp', self.cpp) + task_gpp = keywords.get('task_gpp', self.gpp) omp = keywords.get('omp', False) block = keywords.get('block', True) tag = keywords.get('tag', 'None') @@ -691,7 +694,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): msg_id = self._invoke_service(self.fwk.component_id, 'init_task', TaskInit(int(nproc), binary_fullpath, - working_dir, int(task_ppn), task_cpp, block, + working_dir, int(task_ppn), task_cpp, task_gpp, block, omp, whole_nodes, whole_socks, args)) (task_id, command, env_update, cores_allocated) = self._get_service_response(msg_id, block=True) except Exception: diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index c6e2a5fa..ce11953f 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -13,7 +13,7 @@ from .ipsutil import which TaskInit = namedtuple("TaskInit", - ["nproc", "binary", "working_dir", "tppn", "tcpp", "block", "omp", "wnodes", "wsocks", "cmd_args"]) + ["nproc", "binary", "working_dir", "tppn", "tcpp", "tgpp", "block", "omp", "wnodes", "wsocks", "cmd_args"]) class TaskManager: @@ -222,7 +222,7 @@ def init_task(self, init_task_msg): try: return self._init_task(caller_id, int(taskInit.nproc), taskInit.binary, taskInit.working_dir, - int(taskInit.tppn), taskInit.tcpp, taskInit.omp, taskInit.wnodes, taskInit.wsocks, taskInit.cmd_args) + int(taskInit.tppn), taskInit.tcpp, taskInit.omp, taskInit.tgpp, taskInit.wnodes, taskInit.wsocks, taskInit.cmd_args) except InsufficientResourcesException: if taskInit.block: raise BlockedMessageException(init_task_msg, '***%s waiting for %d resources' % @@ -240,7 +240,7 @@ def init_task(self, init_task_msg): except Exception: raise - def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wnodes, wsocks, cmd_args): + def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, omp, tgpp, wnodes, wsocks, cmd_args): # handle for task related things task_id = self.get_task_id() @@ -268,6 +268,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wno task_id, allocation.cpp, omp, + tgpp, allocation.corelist) self.curr_task_table[task_id] = {'component': caller_id, @@ -282,7 +283,7 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, omp, wno def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, - task_id, cpp=0, omp=False, core_list=''): + task_id, cpp=0, omp=False, gpp=0, core_list=''): """ Construct task launch command to be executed by the component. @@ -500,11 +501,19 @@ def build_launch_cmd(self, nproc, binary, cmd_args, working_dir, ppn, else: cpuptask_flag = '-c' cpubind_flag = '--threads-per-core=1 --cpu-bind=cores' - cmd = ' '.join([self.task_launch_cmd, - nnodes_flag, str(num_nodes), - nproc_flag, str(nproc), - cpuptask_flag, str(cpp), - cpubind_flag]) + if gpp: + gpuflags = f"--gpus-per-task={gpp}" + cmd = ' '.join([self.task_launch_cmd, + nnodes_flag, str(num_nodes), + nproc_flag, str(nproc), + cpuptask_flag, str(cpp), + cpubind_flag, gpuflags]) + else: + cmd = ' '.join([self.task_launch_cmd, + nnodes_flag, str(num_nodes), + nproc_flag, str(nproc), + cpuptask_flag, str(cpp), + cpubind_flag]) if omp: env_update = {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', From d30f48d10d4bf0695ebcd8fb3daa1e7f47216478 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 30 Sep 2022 12:43:02 -0400 Subject: [PATCH 02/11] Add GPUS_PER_NODE to platform config and validate GPU resources --- ipsframework/configurationManager.py | 1 + ipsframework/ipsExceptions.py | 20 ++++++++++++++++++++ ipsframework/resourceManager.py | 17 ++++++++++++++++- ipsframework/taskManager.py | 10 ++++++++-- 4 files changed, 45 insertions(+), 3 deletions(-) diff --git a/ipsframework/configurationManager.py b/ipsframework/configurationManager.py index ff2e592d..ef13e9e6 100644 --- a/ipsframework/configurationManager.py +++ b/ipsframework/configurationManager.py @@ -225,6 +225,7 @@ def initialize(self, data_mgr, resource_mgr, task_mgr): self.platform_conf['PROCS_PER_NODE'] = int(self.platform_conf.get('PROCS_PER_NODE', 0)) self.platform_conf['CORES_PER_NODE'] = int(self.platform_conf.get('CORES_PER_NODE', 0)) self.platform_conf['SOCKETS_PER_NODE'] = int(self.platform_conf.get('SOCKETS_PER_NODE', 0)) + self.platform_conf['GPUS_PER_NODE'] = int(self.platform_conf.get('GPUS_PER_NODE', 0)) self.platform_conf['USE_ACCURATE_NODES'] = use_accurate_nodes self.platform_conf['MPIRUN_VERSION'] = mpirun_version diff --git a/ipsframework/ipsExceptions.py b/ipsframework/ipsExceptions.py index e0c60d09..ba635ec5 100644 --- a/ipsframework/ipsExceptions.py +++ b/ipsframework/ipsExceptions.py @@ -74,6 +74,26 @@ def __str__(self): return s +class GPUResourceRequestMismatchException(Exception): + """ Exception raised by the resource manager when it is possible to launch + the requested number of GPUs per task + """ + + def __init__(self, caller_id, tid, ppn, gpp, max_gpp): + super().__init__() + self.caller_id = caller_id + self.task_id = tid + self.ppn = ppn + self.gpp = gpp + self.max_gpp = max_gpp + self.args = (caller_id, tid, ppn, gpp, max_gpp) + + def __str__(self): + s = "component %s requested %d processes per node with %d GPUs per process, which is greater than the available %d GPUS_PER_NODE" % ( + self.caller_id, self.ppn, self.gpp, self.max_gpp) + return s + + class ResourceRequestUnequalPartitioningException(Exception): """Exception raised by the resource manager when it is possible to launch the requested number of processes, but the requested number diff --git a/ipsframework/resourceManager.py b/ipsframework/resourceManager.py index 37485c3a..f41c4eab 100644 --- a/ipsframework/resourceManager.py +++ b/ipsframework/resourceManager.py @@ -9,6 +9,7 @@ from .ipsExceptions import (InsufficientResourcesException, BadResourceRequestException, ResourceRequestMismatchException, + GPUResourceRequestMismatchException, ResourceRequestUnequalPartitioningException) from .ips_es_spec import eventManager from .resourceHelper import getResourceList @@ -64,6 +65,7 @@ def __init__(self, fwk): # other stuff self.max_ppn = 1 # the ppn for the whole submission (max ppn allowed by *software*) self.ppn = 1 # platform config ppn for the whole IPS + self.gpn = 0 self.myTopic = None self.service_methods = ['get_allocation', 'release_allocation'] @@ -176,6 +178,11 @@ def initialize(self, dataMngr, taskMngr, configMngr, self.sockets_per_node = 1 self.cores_per_socket = self.cores_per_node + # ------------------------------- + # set gpp + # ------------------------------- + self.gpn = int(self.CM.get_platform_parameter('GPUS_PER_NODE')) + # ------------------------------- # populate nodes # ------------------------------- @@ -260,7 +267,7 @@ def add_nodes(self, listOfNodes): # RM getAllocation # pylint: disable=inconsistent-return-statements def get_allocation(self, comp_id, nproc, task_id, - whole_nodes, whole_socks, task_ppn=0, task_cpp=0): + whole_nodes, whole_socks, task_ppn=0, task_cpp=0, task_gpp=0): """ Traverse available nodes to return: @@ -358,6 +365,11 @@ def get_allocation(self, comp_id, nproc, task_id, self.total_cores, self.max_ppn) else: + if not self.check_gpus(ppn, task_gpp): + raise GPUResourceRequestMismatchException(comp_id, task_id, + ppn, task_gpp, + self.gpn) + try: self.processes += nproc cores_allocated = 0 @@ -594,6 +606,9 @@ def check_core_cap(self, nproc, ppn): else: return False, "mismatch" + def check_gpus(self, ppn, task_gpp): + return ppn * task_gpp <= self.gpn + # RM releaseAllocation def release_allocation(self, task_id, status): """ diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index ce11953f..3dff1040 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -9,7 +9,8 @@ IncompleteCallException, \ InsufficientResourcesException, \ BadResourceRequestException, \ - ResourceRequestMismatchException + ResourceRequestMismatchException, \ + GPUResourceRequestMismatchException from .ipsutil import which TaskInit = namedtuple("TaskInit", @@ -237,6 +238,10 @@ def init_task(self, init_task_msg): self.fwk.error("There has been a fatal error, %s requested too few processors per node to launch task %d (requested: procs = %d, ppn = %d)", caller_id, e.task_id, e.nproc, e.ppn) raise + except GPUResourceRequestMismatchException as e: + self.fwk.error("There has been a fatal error, %s requested too many GPUs per node to launch task %d (requested: ppn = %d, gpp = %d)", + caller_id, e.task_id, e.ppn, e.gpp) + raise except Exception: raise @@ -250,7 +255,8 @@ def _init_task(self, caller_id, nproc, binary, working_dir, tppn, tcpp, omp, tgp wnodes, wsocks, task_ppn=tppn, - task_cpp=tcpp) + task_cpp=tcpp, + task_gpp=tgpp) self.fwk.debug('RM: get_allocation() returned %s', str(allocation)) if allocation.partial_node or allocation.accurateNodes: From cf0a95b879ee89b1bcce64ec9467b3873583e84a Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 30 Sep 2022 13:53:33 -0400 Subject: [PATCH 03/11] Fix tests --- ipsframework/services.py | 3 +-- ipsframework/taskManager.py | 2 +- tests/new/test_taskManager.py | 8 ++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 6eca0382..9d40844d 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -195,7 +195,6 @@ def __init__(self, fwk, fwk_in_q, svc_response_q, sim_conf, log_pipe_name): self.binary_fullpath_cache = {} self.ppn = 0 self.cpp = 0 - self.gpp = 0 self.shared_nodes = False def __initialize__(self, component_ref): @@ -681,7 +680,7 @@ def launch_task(self, nproc, working_dir, binary, *args, **keywords): task_ppn = keywords.get('task_ppn', self.ppn) task_cpp = keywords.get('task_cpp', self.cpp) - task_gpp = keywords.get('task_gpp', self.gpp) + task_gpp = keywords.get('task_gpp', 0) omp = keywords.get('omp', False) block = keywords.get('block', True) tag = keywords.get('tag', 'None') diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 3dff1040..8be39777 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -554,7 +554,7 @@ def init_task_pool(self, init_task_msg): try: ret_dict[task_name] = self._init_task(caller_id, taskInit.nproc, taskInit.binary, taskInit.working_dir, - taskInit.tppn, taskInit.tcpp, taskInit.omp, taskInit.wnodes, taskInit.wsocks, taskInit.cmd_args) + taskInit.tppn, taskInit.tcpp, taskInit.omp, taskInit.tgpp, taskInit.wnodes, taskInit.wsocks, taskInit.cmd_args) except InsufficientResourcesException: continue except BadResourceRequestException as e: diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index bbfd536b..edcf47b6 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -438,7 +438,7 @@ def test_init_task_srun(tmpdir): def init_final_task(nproc, tppn, tcpt=0): task_id, cmd, _, cores_allocated = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(nproc, 'exe', '/dir', tppn, tcpt, - True, True, True, False, []))) + True, 0, True, True, False, []))) tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) return task_id, cmd, cores_allocated @@ -516,17 +516,17 @@ def init_final_task(nproc, tppn, tcpt=0): # start two task, second should fail with Insufficient Resources depending on block task_id, cmd, _, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(4, 'exe', '/dir', 0, 0, - True, True, True, False, []))) + True, 0, True, True, False, []))) with pytest.raises(BlockedMessageException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(1, 'exe', '/dir', 0, 0, - True, True, True, False, []))) + True, 0, True, True, False, []))) with pytest.raises(InsufficientResourcesException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(1, 'exe', '/dir', 0, 0, - False, True, True, False, []))) + False, 0, True, True, False, []))) def test_init_task_pool_srun(tmpdir): From 2429d76eb1e57a5330ba341abbce730f163374ae Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 30 Sep 2022 14:43:53 -0400 Subject: [PATCH 04/11] Add task_gpp to task pool --- ipsframework/services.py | 5 +++-- ipsframework/taskManager.py | 7 +++++++ tests/new/test_taskManager.py | 23 +++++++++++------------ 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 9d40844d..49aaabf2 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -786,10 +786,11 @@ def launch_task_pool(self, task_pool_name, launch_interval=0.0): wnodes = task.keywords.get('whole_nodes', not self.shared_nodes) wsocks = task.keywords.get('whole_sockets', not self.shared_nodes) task_cpp = task.keywords.get('task_cpp', self.cpp) + task_gpp = task.keywords.get('task_gpp', 0) omp = task.keywords.get('omp', False) submit_dict[task_name] = TaskInit(task.nproc, task.binary, - task.working_dir, task_ppn, task_cpp, False, - omp, wnodes, wsocks, task.args) + task.working_dir, task_ppn, task_cpp, task_gpp, + False, omp, wnodes, wsocks, task.args) try: msg_id = self._invoke_service(self.fwk.component_id, diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 8be39777..37d54344 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -571,6 +571,13 @@ def init_task_pool(self, init_task_msg): self.resource_mgr.release_allocation(task_id, -1) del self.curr_task_table[task_id] raise + except GPUResourceRequestMismatchException as e: + self.fwk.error("There has been a fatal error, %s requested too many GPUs per node to launch task %d (requested: ppn = %d, gpp = %d)", + caller_id, e.task_id, e.ppn, e.gpp) + for task_id, _, _, _ in ret_dict.values(): + self.resource_mgr.release_allocation(task_id, -1) + del self.curr_task_table[task_id] + raise except Exception: self.fwk.exception('TM:init_task_pool(): Allocation exception') raise diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index edcf47b6..e2a2d6be 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -438,7 +438,7 @@ def test_init_task_srun(tmpdir): def init_final_task(nproc, tppn, tcpt=0): task_id, cmd, _, cores_allocated = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(nproc, 'exe', '/dir', tppn, tcpt, - True, 0, True, True, False, []))) + 0, True, True, True, False, []))) tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', task_id, None)) return task_id, cmd, cores_allocated @@ -516,17 +516,17 @@ def init_final_task(nproc, tppn, tcpt=0): # start two task, second should fail with Insufficient Resources depending on block task_id, cmd, _, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(4, 'exe', '/dir', 0, 0, - True, 0, True, True, False, []))) + 0, True, True, True, False, []))) with pytest.raises(BlockedMessageException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(1, 'exe', '/dir', 0, 0, - True, 0, True, True, False, []))) + 0, True, True, True, False, []))) with pytest.raises(InsufficientResourcesException): tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', TaskInit(1, 'exe', '/dir', 0, 0, - False, 0, True, True, False, []))) + 0, False, True, True, False, []))) def test_init_task_pool_srun(tmpdir): @@ -554,7 +554,7 @@ def test_init_task_pool_srun(tmpdir): def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): if msg is None: - msg = {f'task{n}': TaskInit(nproc, f'exe{n}', '/dir', tppn, tcpp, False, False, True, False, (f'arg{n}',)) for n in range(number_of_tasks)} + msg = {f'task{n}': TaskInit(nproc, f'exe{n}', '/dir', tppn, tcpp, 0, False, False, True, False, (f'arg{n}',)) for n in range(number_of_tasks)} retval = tm.init_task_pool(ServiceRequestMessage('id', 'id', 'c', 'init_task_pool', msg)) for task_id, _, _, _ in retval.values(): tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', @@ -683,9 +683,8 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): fwk.warning.assert_called_once_with('task cpp (4) exceeds maximum possible for 1 procs per node with 2 cores per node, using 2 cpus per proc instead') # different size tasks - msg = {'task0': TaskInit(1, 'exe0', '/dir', 0, 0, False, False, True, False, ('arg0',)), - 'task1': TaskInit(2, 'exe1', '/dir', 0, 0, False, False, True, False, ('arg1',))} - + msg = {'task0': TaskInit(1, 'exe0', '/dir', 0, 0, 0, False, False, True, False, ('arg0',)), + 'task1': TaskInit(2, 'exe1', '/dir', 0, 0, 0, False, False, True, False, ('arg1',))} retval = init_final_task_pool(msg=msg) assert len(retval) == 2 task_id, cmd, _, cores = retval['task0'] @@ -698,13 +697,13 @@ def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, tcpp=0, msg=None): assert cores == 2 # one good task, one bad task - msg = {'task0': TaskInit(1, 'exe0', '/dir', 0, 0, False, False, True, False, ('arg0',)), - 'task1': TaskInit(5, 'exe1', '/dir', 0, 0, False, False, True, False, ('arg1',))} + msg = {'task0': TaskInit(1, 'exe0', '/dir', 0, 0, 0, False, False, True, False, ('arg0',)), + 'task1': TaskInit(5, 'exe1', '/dir', 0, 0, 0, False, False, True, False, ('arg1',))} with pytest.raises(BadResourceRequestException): init_final_task_pool(msg=msg) # one good task, one bad task - msg = {'task0': TaskInit(1, 'exe0', '/dir', 0, 0, False, False, True, False, ('arg0',)), - 'task1': TaskInit(3, 'exe1', '/dir', 1, 0, False, False, True, False, ('arg1',))} + msg = {'task0': TaskInit(1, 'exe0', '/dir', 0, 0, 0, False, False, True, False, ('arg0',)), + 'task1': TaskInit(3, 'exe1', '/dir', 1, 0, 0, False, False, True, False, ('arg1',))} with pytest.raises(ResourceRequestMismatchException): init_final_task_pool(msg=msg) From f95e46f15da829527f4e64f486e410d69632604b Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 6 Oct 2022 10:16:06 -0400 Subject: [PATCH 05/11] Add tests for GPUs --- ipsframework/taskManager.py | 3 +- tests/new/test_resourceManager.py | 73 ++++++++++++++++++++++++++ tests/new/test_taskManager.py | 87 +++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 1 deletion(-) diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 37d54344..bfa4b86f 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -554,7 +554,8 @@ def init_task_pool(self, init_task_msg): try: ret_dict[task_name] = self._init_task(caller_id, taskInit.nproc, taskInit.binary, taskInit.working_dir, - taskInit.tppn, taskInit.tcpp, taskInit.omp, taskInit.tgpp, taskInit.wnodes, taskInit.wsocks, taskInit.cmd_args) + taskInit.tppn, taskInit.tcpp, taskInit.omp, taskInit.tgpp, taskInit.wnodes, + taskInit.wsocks, taskInit.cmd_args) except InsufficientResourcesException: continue except BadResourceRequestException as e: diff --git a/tests/new/test_resourceManager.py b/tests/new/test_resourceManager.py index 8f11d400..41c70962 100644 --- a/tests/new/test_resourceManager.py +++ b/tests/new/test_resourceManager.py @@ -5,6 +5,7 @@ from ipsframework.ipsExceptions import (InsufficientResourcesException, BadResourceRequestException, ResourceRequestMismatchException, + GPUResourceRequestMismatchException, ResourceRequestUnequalPartitioningException) @@ -322,3 +323,75 @@ def test_allocations(tmpdir): assert lines[6] == "core: 1 - available" assert lines[7] == "core: 2 - available" assert lines[8] == "core: 3 - available" + + # test GPUs + with pytest.raises(GPUResourceRequestMismatchException) as excinfo: + rm.get_allocation(comp_id='comp0', + nproc=1, + task_gpp=1, + task_id=0, + whole_nodes=True, + whole_socks=False) + + assert str(excinfo.value) == "component comp0 requested 1 processes per node with 1 GPUs per process, which is greater than the available 0 GPUS_PER_NODE" + + # set GPUS_PER_NODE to 2 + rm = ResourceManager(fwk) + rm.initialize(dm, tm, cm, + cmd_nodes=2, + cmd_ppn=4) + rm.gpn = 2 + + with pytest.raises(GPUResourceRequestMismatchException) as excinfo: + rm.get_allocation(comp_id='comp0', + nproc=1, + task_gpp=4, + task_id=0, + whole_nodes=True, + whole_socks=False) + + assert str(excinfo.value) == "component comp0 requested 1 processes per node with 4 GPUs per process, which is greater than the available 2 GPUS_PER_NODE" + + with pytest.raises(GPUResourceRequestMismatchException) as excinfo: + rm.get_allocation(comp_id='comp0', + nproc=2, + task_gpp=2, + task_id=0, + whole_nodes=True, + whole_socks=False) + + assert str(excinfo.value) == "component comp0 requested 2 processes per node with 2 GPUs per process, which is greater than the available 2 GPUS_PER_NODE" + + rm.get_allocation(comp_id='comp0', + nproc=2, + task_ppn=1, + task_gpp=2, + task_id=0, + whole_nodes=True, + whole_socks=False) + + with io.StringIO() as output: + rm.nodes['dummy_node0'].print_sockets(output) + lines = [s.strip() for s in output.getvalue().split('\n')] + assert lines[0] == "socket: 0" + assert lines[1] == "availablilty: 0" + assert lines[2] == "task ids: [0]" + assert lines[3] == "owners: ['comp0']" + assert lines[4] == "cores: 4" + assert lines[5] == "core: 0 - task_id: 0 - owner: comp0" + assert lines[6] == "core: 1 - task_id: 0 - owner: comp0" + assert lines[7] == "core: 2 - task_id: 0 - owner: comp0" + assert lines[8] == "core: 3 - task_id: 0 - owner: comp0" + + with io.StringIO() as output: + rm.nodes['dummy_node1'].print_sockets(output) + lines = [s.strip() for s in output.getvalue().split('\n')] + assert lines[0] == "socket: 0" + assert lines[1] == "availablilty: 0" + assert lines[2] == "task ids: [0]" + assert lines[3] == "owners: ['comp0']" + assert lines[4] == "cores: 4" + assert lines[5] == "core: 0 - task_id: 0 - owner: comp0" + assert lines[6] == "core: 1 - task_id: 0 - owner: comp0" + assert lines[7] == "core: 2 - task_id: 0 - owner: comp0" + assert lines[8] == "core: 3 - task_id: 0 - owner: comp0" diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index e2a2d6be..b47b1a6b 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -5,6 +5,7 @@ from ipsframework.messages import ServiceRequestMessage from ipsframework.ipsExceptions import (BadResourceRequestException, ResourceRequestMismatchException, + GPUResourceRequestMismatchException, BlockedMessageException, InsufficientResourcesException, ResourceRequestUnequalPartitioningException) @@ -411,6 +412,40 @@ def test_build_launch_cmd_srun(): assert cmd == ('srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores executable 13 42', {'OMP_PLACES': 'threads', 'OMP_PROC_BIND': 'spread', 'OMP_NUM_THREADS': '2'}) + # now with GPUs + + cmd = tm.build_launch_cmd(nproc=1, + binary='executable', + cmd_args=('13', '42'), + working_dir=None, + ppn=None, + max_ppn=None, + nodes='n1', + accurateNodes=None, + partial_nodes=False, + task_id=None, + cpp=1, + gpp=1, + omp=False) + + assert cmd == ('srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1 executable 13 42', None) + + cmd = tm.build_launch_cmd(nproc=1, + binary='executable', + cmd_args=('13', '42'), + working_dir=None, + ppn=None, + max_ppn=None, + nodes='n1', + accurateNodes=None, + partial_nodes=False, + task_id=None, + cpp=1, + gpp=4, + omp=False) + + assert cmd == ('srun -N 1 -n 1 -c 1 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=4 executable 13 42', None) + def test_init_task_srun(tmpdir): # this will combine calls to ResourceManager.get_allocation and @@ -528,6 +563,58 @@ def init_final_task(nproc, tppn, tcpt=0): TaskInit(1, 'exe', '/dir', 0, 0, 0, False, True, True, False, []))) + tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', + task_id, None)) + + # request GPUs when there are none + with pytest.raises(GPUResourceRequestMismatchException) as e: + tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + TaskInit(1, 'exe', '/dir', 0, 0, + 1, False, True, True, False, []))) + + assert str(e.value) == "component id requested 1 processes per node with 1 GPUs per process, which is greater than the available 0 GPUS_PER_NODE" + + # set GPUS_PER_NODE=2 + rm.gpn = 2 + task_id, cmd, _, _, = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + TaskInit(1, 'exe', '/dir', 0, 0, + 1, False, True, True, False, []))) + tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', + task_id, None)) + assert task_id == 19 + assert cmd == "srun -N 1 -n 1 -c 2 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1 exe " + + task_id, cmd, _, _, = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + TaskInit(2, 'exe', '/dir', 1, 0, + 1, False, True, True, False, []))) + tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', + task_id, None)) + assert task_id == 20 + assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1 exe " + + task_id, cmd, _, _, = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + TaskInit(2, 'exe', '/dir', 1, 0, + 2, False, True, True, False, []))) + tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', + task_id, None)) + assert task_id == 21 + assert cmd == "srun -N 2 -n 2 -c 2 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=2 exe " + + task_id, cmd, _, _, = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + TaskInit(2, 'exe', '/dir', 2, 0, + 1, False, True, True, False, []))) + tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', + task_id, None)) + assert task_id == 22 + assert cmd == "srun -N 1 -n 2 -c 1 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1 exe " + + with pytest.raises(GPUResourceRequestMismatchException) as e: + tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + TaskInit(2, 'exe', '/dir', 2, 0, + 2, False, True, True, False, []))) + + assert str(e.value) == "component id requested 2 processes per node with 2 GPUs per process, which is greater than the available 2 GPUS_PER_NODE" + def test_init_task_pool_srun(tmpdir): # this will combine calls to ResourceManager.get_allocation and From e6c68b92291c6350a2eabf2d73879cc5e7080f8d Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 6 Oct 2022 14:04:11 -0400 Subject: [PATCH 06/11] GPU docs --- doc/user_guides/advanced_guide.rst | 100 +++++++++++++++++++++++++++++ doc/user_guides/platform.rst | 3 + 2 files changed, 103 insertions(+) diff --git a/doc/user_guides/advanced_guide.rst b/doc/user_guides/advanced_guide.rst index 7316bb9a..c1ece0a3 100644 --- a/doc/user_guides/advanced_guide.rst +++ b/doc/user_guides/advanced_guide.rst @@ -426,6 +426,106 @@ the resulting core affinity of the OpenMP threads are: Hello from rank 7, thread 0, on nid00026. (core affinity = 18) Hello from rank 7, thread 1, on nid00026. (core affinity = 19) + +Slurm with GPUs examples +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. note:: + + New in 0.8.0 + +The :py:meth:`~ipsframework.services.ServicesProxy.launch_task` method +has an option ``task_gpp`` which allows you to set the number of GPUs +per process, used as the ``--gpus-per-task`` in the ``srun`` +command. + +IPS will validate the number of GPUs per node requested does not +exceed the number specified by the ``GPUS_PER_NODE`` parameter in the +:ref:`plat-conf-sec`. You need to make sure that the number of GPUs +per process times the number of processes per node does not exceed the +``GPUS_PER_NODE`` set. + +Using the `gpus_for_tasks +`_ program provided for +Perlmutter (which has 4 GPUs per node) to test the behavior, you will +see the following: + + +To launch a task with 1 process and 1 GPU per process (``task_gpp``) run: + +.. code-block:: python + + self.services.launch_task(1, cwd, "gpu-per-task", task_gpp=1) + +will create the command ``srun -N 1 -n 1 -c +64 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1 +gpus_for_tasks`` and the output of will be: + +.. code-block:: text + + Rank 0 out of 1 processes: I see 1 GPU(s). + 0 for rank 0: 0000:03:00.0 + +To launch 8 processes on 2 nodes (so 4 processes per node) with 1 gpu per process run: + +.. code-block:: python + + self.services.launch_task(8, cwd, "gpu-per-task", task_ppn=4, task_gpp=1) + +will create the command ``srun -N 2 -n 8 -c +16 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1 +gpus_for_task`` and the output of will be: + +.. code-block:: text + + Rank 0 out of 8 processes: I see 1 GPU(s). + 0 for rank 0: 0000:03:00.0 + Rank 1 out of 8 processes: I see 1 GPU(s). + 0 for rank 1: 0000:41:00.0 + Rank 2 out of 8 processes: I see 1 GPU(s). + 0 for rank 2: 0000:82:00.0 + Rank 3 out of 8 processes: I see 1 GPU(s). + 0 for rank 3: 0000:C1:00.0 + Rank 4 out of 8 processes: I see 1 GPU(s). + 0 for rank 4: 0000:03:00.0 + Rank 5 out of 8 processes: I see 1 GPU(s). + 0 for rank 5: 0000:41:00.0 + Rank 6 out of 8 processes: I see 1 GPU(s). + 0 for rank 6: 0000:82:00.0 + Rank 7 out of 8 processes: I see 1 GPU(s). + 0 for rank 7: 0000:C1:00.0 + +To launch 2 processes on 2 nodes (so 1 processes per node) with 4 gpu per process run: + +.. code-block:: python + + self.services.launch_task(2, cwd, "gpu-per-task", task_ppn=1, task_gpp=4) + +will create the command ``srun -N 2 -n 2 -c +64 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=4 +gpus_per_tasks`` and the output of will be: + +.. code-block:: text + + Rank 0 out of 2 processes: I see 4 GPU(s). + 0 for rank 0: 0000:03:00.0 + 1 for rank 0: 0000:41:00.0 + 2 for rank 0: 0000:82:00.0 + 3 for rank 0: 0000:C1:00.0 + Rank 1 out of 2 processes: I see 4 GPU(s). + 0 for rank 1: 0000:03:00.0 + 1 for rank 1: 0000:41:00.0 + 2 for rank 1: 0000:82:00.0 + 3 for rank 1: 0000:C1:00.0 + +If you try to launch a task with too many GPUs per node, *e.g.*: + +.. code-block:: python + + self.services.launch_task(8, cwd, "gpu-per-task", task_gpp=1) + +then it will raise an :class:`~ipsframework.ipsExceptions.GPUResourceRequestMismatchException`. + .. automethod:: ipsframework.services.ServicesProxy.launch_task :noindex: diff --git a/doc/user_guides/platform.rst b/doc/user_guides/platform.rst index 58ce6159..d6280281 100644 --- a/doc/user_guides/platform.rst +++ b/doc/user_guides/platform.rst @@ -423,6 +423,9 @@ The platform configuration file contains platform specific information that the one task can share a node [#nochange]_. Simulations, components and tasks can set their node usage allocation policies in the configuration file and on task launch. +**GPUS_PER_NODE** + number of GPUs per node, used when validating the launch task + commands with ``task_gpp`` set, see :meth:`~ipsframework.services.ServicesProxy.launch_task`. .. [#nochange] This value should not change unless the machine is From 96785c853b8dbde5988c3b0813f745eec6874075 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 20 Oct 2022 16:24:16 -0400 Subject: [PATCH 07/11] Add option for dask-worker per gpu --- ipsframework/services.py | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/ipsframework/services.py b/ipsframework/services.py index 49aaabf2..5dd794d2 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -1880,7 +1880,7 @@ def add_task(self, task_pool_name, task_name, nproc, working_dir, *args, keywords=keywords) def submit_tasks(self, task_pool_name, block=True, use_dask=False, dask_nodes=1, - dask_ppn=None, launch_interval=0.0, use_shifter=False, dask_worker_plugin=None): + dask_ppn=None, launch_interval=0.0, use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): """ Launch all unfinished tasks in task pool *task_pool_name*. If *block* is ``True``, return when all tasks have been launched. If *block* is ``False``, return when all @@ -1892,7 +1892,7 @@ def submit_tasks(self, task_pool_name, block=True, use_dask=False, dask_nodes=1, start_time = time.time() self._send_monitor_event('IPS_TASK_POOL_BEGIN', 'task_pool = %s ' % task_pool_name) task_pool: TaskPool = self.task_pools[task_pool_name] - retval = task_pool.submit_tasks(block, use_dask, dask_nodes, dask_ppn, launch_interval, use_shifter, dask_worker_plugin) + retval = task_pool.submit_tasks(block, use_dask, dask_nodes, dask_ppn, launch_interval, use_shifter, dask_worker_plugin, dask_worker_per_gpu) elapsed_time = time.time() - start_time self._send_monitor_event('IPS_TASK_POOL_END', 'task_pool = %s elapsed time = %.2f S' % (task_pool_name, elapsed_time), @@ -2098,7 +2098,7 @@ def add_task(self, task_name, nproc, working_dir, binary, *args, **keywords): self.queued_tasks[task_name] = Task(task_name, nproc, working_dir, binary_fullpath, *args, **keywords["keywords"]) - def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter=False, dask_worker_plugin=None): + def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): """Launch tasks in *queued_tasks* using dask. :param block: Unused, this will always return after tasks are submitted @@ -2111,6 +2111,8 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter :type use_shifter: bool :param dask_worker_plugin: If provided this will be registered as a worker plugin with the dask client :type dask_worker_plugin: distributed.diagnostics.plugin.WorkerPlugin + :param dask_worker_per_gpu: If true then a separate worker will be started for each GPU and binded to that GPU + :type dask_worker_per_gpu: bool """ services: ServicesProxy = self.services self.dask_file_name = os.path.join(os.getcwd(), @@ -2127,7 +2129,18 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter dask_nodes = 1 nthreads = dask_ppn if dask_ppn else services.get_config_param("PROCS_PER_NODE") + + task_ppn = 1 + task_gpp = 0 + if dask_worker_per_gpu: + gpn = services.get_config_param("GPUS_PER_NODE") + dask_nodes *= gpn + nthreads = max(1, nthreads // gpn) + task_ppn = gpn + task_gpp = 1 + nworkers = "--nworkers" if self.distributed.__version__ >= "2022.10" else "--nprocs" + if use_shifter: self.dask_workers_tid = services.launch_task(dask_nodes, os.getcwd(), self.shifter, @@ -2137,7 +2150,8 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter nworkers, 1, "--nthreads", nthreads, "--no-dashboard", - task_ppn=1) + task_ppn=task_ppn, + task_gpp=task_gpp) else: self.dask_workers_tid = services.launch_task(dask_nodes, os.getcwd(), self.dask_worker, @@ -2146,7 +2160,8 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter nworkers, 1, "--nthreads", nthreads, "--no-dashboard", - task_ppn=1) + task_ppn=task_ppn, + task_gpp=task_gpp) self.dask_client = self.dask.distributed.Client(scheduler_file=self.dask_file_name) @@ -2173,7 +2188,8 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter self.queued_tasks = {} return len(self.futures) - def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, launch_interval=0.0, use_shifter=False, dask_worker_plugin=None): + def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, launch_interval=0.0, + use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): """Launch tasks in *queued_tasks*. Finished tasks are handled before launching new ones. If *block* is ``True``, the number of tasks submitted is returned after all tasks have been launched @@ -2197,6 +2213,8 @@ def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, :type use_shifter: bool :param dask_worker_plugin: If provided this will be registered as a worker plugin with the dask client :type dask_worker_plugin: distributed.diagnostics.plugin.WorkerPlugin + :param dask_worker_per_gpu: If true then a separate worker will be started for each GPU and binded to that GPU + :type dask_worker_per_gpu: bool """ if use_dask: @@ -2206,7 +2224,7 @@ def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, self.services.error("Requested to run dask within shifter but shifter not available") raise Exception("shifter not found") else: - return self.submit_dask_tasks(block, dask_nodes, dask_ppn, use_shifter, dask_worker_plugin) + return self.submit_dask_tasks(block, dask_nodes, dask_ppn, use_shifter, dask_worker_plugin, dask_worker_per_gpu) elif not TaskPool.dask: self.services.warning("Requested use_dask but cannot because import dask failed") elif not self.serial_pool: From 50534bc57c3cd4208f8c7a8ae8c32710f0edca45 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 27 Oct 2022 11:32:21 -0400 Subject: [PATCH 08/11] Add test for GPU dask workers --- .../workflows/conda_env/environment_macos.yml | 2 +- ipsframework/services.py | 3 +- tests/components/workers/dask_worker.py | 3 +- tests/new/test_dask.py | 91 ++++++++++++++++++- 4 files changed, 93 insertions(+), 6 deletions(-) diff --git a/.github/workflows/conda_env/environment_macos.yml b/.github/workflows/conda_env/environment_macos.yml index 01661932..1895b5c3 100644 --- a/.github/workflows/conda_env/environment_macos.yml +++ b/.github/workflows/conda_env/environment_macos.yml @@ -5,6 +5,6 @@ dependencies: - pytest-cov<4 - pytest-timeout - psutil -- dask=2022.08.1 +- dask=2022.10.0 - coverage!=6.3 - flask=2.2.2 diff --git a/ipsframework/services.py b/ipsframework/services.py index 5dd794d2..147801d0 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -2139,7 +2139,8 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter task_ppn = gpn task_gpp = 1 - nworkers = "--nworkers" if self.distributed.__version__ >= "2022.10" else "--nprocs" + # --nprocs was removed in version 2022.10.0 and replaced with --nworkers + nworkers = "--nworkers" if tuple(map(int, self.distributed.__version__.split('.'))) >= (2022, 10, 0) else "--nprocs" if use_shifter: self.dask_workers_tid = services.launch_task(dask_nodes, os.getcwd(), diff --git a/tests/components/workers/dask_worker.py b/tests/components/workers/dask_worker.py index d1ddc544..edb50b7c 100644 --- a/tests/components/workers/dask_worker.py +++ b/tests/components/workers/dask_worker.py @@ -31,7 +31,8 @@ def step(self, timestamp=0.0, **keywords): ret_val = self.services.submit_tasks('pool', use_dask=True, use_shifter=self.SHIFTER == 'True', - dask_nodes=nodes) + dask_nodes=nodes, + dask_worker_per_gpu=self.GPU == 'True') self.services.info('ret_val = %d', ret_val) exit_status = self.services.get_finished_tasks('pool') for i in range(total_tasks): diff --git a/tests/new/test_dask.py b/tests/new/test_dask.py index 64107639..3d96ee38 100644 --- a/tests/new/test_dask.py +++ b/tests/new/test_dask.py @@ -8,16 +8,18 @@ from ipsframework import Framework -def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfile='', nproc=1, exe='/bin/sleep', value='', shifter=False): +def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfile='', nproc=1, exe='/bin/sleep', value='', shifter=False, gpus=0): platform_file = tmpdir.join('platform.conf') - platform = """MPIRUN = eval + platform = f"""MPIRUN = eval NODE_DETECTION = manual CORES_PER_NODE = 2 +PROCS_PER_NODE = 2 SOCKETS_PER_NODE = 1 NODE_ALLOCATION_MODE = shared HOST = SCRATCH = +GPUS_PER_NODE = {gpus} """ with open(platform_file, 'w') as f: @@ -55,6 +57,7 @@ def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfil EXECUTABLE = {exe} VALUE = {value} NPROC = {nproc} + GPU = {bool(gpus)} INPUT_FILES = OUTPUT_FILES = SCRIPT = @@ -231,7 +234,7 @@ def test_dask_fake_shifter(tmpdir, monkeypatch): assert lines[0].startswith('Running dask-scheduler --no-dashboard --scheduler-file') assert lines[0].endswith('--port 0 in shifter\n') assert lines[1].startswith('Running dask-worker --scheduler-file') - assert lines[1].endswith('1 --nthreads 0 --no-dashboard in shifter\n') + assert lines[1].endswith('1 --nthreads 2 --no-dashboard in shifter\n') def test_dask_timeout(tmpdir): @@ -467,3 +470,85 @@ def test_dask_shifter_on_cori(tmpdir): assert lines[0] == f'Running {i}\n' assert lines[1] == 'SHIFTER_RUNTIME=1\n' assert lines[2].startswith("SHIFTER_IMAGEREQUEST") + + +def test_dask_with_1_gpu(tmpdir): + pytest.importorskip("dask") + pytest.importorskip("distributed") + platform_file, config_file = write_basic_config_and_platform_files(tmpdir, gpus=1) + + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=None, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + # check output log file + with open(str(tmpdir.join('sim.log')), 'r') as f: + lines = f.readlines() + + # remove timestamp + lines = [line[24:] for line in lines] + + log = "DASK__dask_worker_2 INFO {}\n" + assert log.format("ret_val = 4") in lines + + # task successful and return 0 + for i in range(4): + assert log.format(f"task_{i} 0") in lines + + # check simulation_log + json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + with open(json_files[0], 'r') as json_file: + comments = [json.loads(line)['comment'].split(', ', maxsplit=5)[2:] for line in json_file.readlines()] + + assert comments[10][0] == "nproc = 1 " + assert comments[10][1].startswith("Target = ") + assert "dask-worker --scheduler-file" in comments[10][1] + assert comments[10][1].endswith("s 1 --nthreads 2 --no-dashboard") + + +def test_dask_with_2_gpus(tmpdir): + pytest.importorskip("dask") + pytest.importorskip("distributed") + platform_file, config_file = write_basic_config_and_platform_files(tmpdir, gpus=2) + + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=None, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + # check output log file + with open(str(tmpdir.join('sim.log')), 'r') as f: + lines = f.readlines() + + # remove timestamp + lines = [line[24:] for line in lines] + + log = "DASK__dask_worker_2 INFO {}\n" + assert log.format("ret_val = 4") in lines + + # task successful and return 0 + for i in range(4): + assert log.format(f"task_{i} 0") in lines + + # check simulation_log + json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + with open(json_files[0], 'r') as json_file: + comments = [json.loads(line)['comment'].split(', ', maxsplit=5)[2:] for line in json_file.readlines()] + + assert comments[10][0] == "nproc = 2 " + assert comments[10][1].startswith("Target = ") + assert "dask-worker --scheduler-file" in comments[10][1] + assert comments[10][1].endswith("s 1 --nthreads 1 --no-dashboard") From 0f94024121b877382bdf22cfef6df44c348b4cff Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Thu, 27 Oct 2022 16:58:29 -0400 Subject: [PATCH 09/11] Rename dask_ppn to dask_ppw procs-per-node to procs-per-worker --- .../drivers/hello/hello_worker_task_pool.py | 2 +- ipsframework/services.py | 40 ++++++++++++------- .../helloworld/hello_worker_task_pool_dask.py | 2 +- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/components/drivers/hello/hello_worker_task_pool.py b/components/drivers/hello/hello_worker_task_pool.py index 91b68470..f7ba13b0 100755 --- a/components/drivers/hello/hello_worker_task_pool.py +++ b/components/drivers/hello/hello_worker_task_pool.py @@ -52,7 +52,7 @@ def step(self, timeStamp=0.0): cwd, myFun, str(duration[i]), task_env=task_env) - ret_val = self.services.submit_tasks('pool', use_dask=True, dask_nodes=1, dask_ppn=10) + ret_val = self.services.submit_tasks('pool', use_dask=True, dask_nodes=1, dask_ppw=10) print('ret_val = ', ret_val) exit_status = self.services.get_finished_tasks('pool') print(exit_status) diff --git a/ipsframework/services.py b/ipsframework/services.py index 147801d0..f1b4609a 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -1880,7 +1880,7 @@ def add_task(self, task_pool_name, task_name, nproc, working_dir, *args, keywords=keywords) def submit_tasks(self, task_pool_name, block=True, use_dask=False, dask_nodes=1, - dask_ppn=None, launch_interval=0.0, use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): + dask_ppw=None, launch_interval=0.0, use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): """ Launch all unfinished tasks in task pool *task_pool_name*. If *block* is ``True``, return when all tasks have been launched. If *block* is ``False``, return when all @@ -1892,7 +1892,7 @@ def submit_tasks(self, task_pool_name, block=True, use_dask=False, dask_nodes=1, start_time = time.time() self._send_monitor_event('IPS_TASK_POOL_BEGIN', 'task_pool = %s ' % task_pool_name) task_pool: TaskPool = self.task_pools[task_pool_name] - retval = task_pool.submit_tasks(block, use_dask, dask_nodes, dask_ppn, launch_interval, use_shifter, dask_worker_plugin, dask_worker_per_gpu) + retval = task_pool.submit_tasks(block, use_dask, dask_nodes, dask_ppw, launch_interval, use_shifter, dask_worker_plugin, dask_worker_per_gpu) elapsed_time = time.time() - start_time self._send_monitor_event('IPS_TASK_POOL_END', 'task_pool = %s elapsed time = %.2f S' % (task_pool_name, elapsed_time), @@ -2098,21 +2098,27 @@ def add_task(self, task_name, nproc, working_dir, binary, *args, **keywords): self.queued_tasks[task_name] = Task(task_name, nproc, working_dir, binary_fullpath, *args, **keywords["keywords"]) - def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): + def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppw=None, use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): """Launch tasks in *queued_tasks* using dask. + One dask worker will be started for each node unless + dask_worker_per_gpu is True where one dask worker will be + started for every GPU. So dask_node times GPUS_PER_NODE + workers will be started. + :param block: Unused, this will always return after tasks are submitted :type block: bool :param dask_nodes: Number of task nodes, default 1 :type dask_nodes: int - :param dask_ppn: Number of processes per node, default None - :type dask_ppn: int + :param dask_ppw: Number of processes per dask worker, default is PROCS_PER_NODE + :type dask_ppw: int :param use_shifter: Option to launch dask scheduler and workers in shifter container :type use_shifter: bool :param dask_worker_plugin: If provided this will be registered as a worker plugin with the dask client :type dask_worker_plugin: distributed.diagnostics.plugin.WorkerPlugin :param dask_worker_per_gpu: If true then a separate worker will be started for each GPU and binded to that GPU :type dask_worker_per_gpu: bool + """ services: ServicesProxy = self.services self.dask_file_name = os.path.join(os.getcwd(), @@ -2128,16 +2134,16 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter if services.get_config_param("MPIRUN") == "eval": dask_nodes = 1 - nthreads = dask_ppn if dask_ppn else services.get_config_param("PROCS_PER_NODE") - - task_ppn = 1 - task_gpp = 0 if dask_worker_per_gpu: gpn = services.get_config_param("GPUS_PER_NODE") dask_nodes *= gpn - nthreads = max(1, nthreads // gpn) + nthreads = dask_ppw if dask_ppw else services.get_config_param("PROCS_PER_NODE") // gpn task_ppn = gpn task_gpp = 1 + else: + nthreads = dask_ppw if dask_ppw else services.get_config_param("PROCS_PER_NODE") + task_ppn = 1 + task_gpp = 0 # --nprocs was removed in version 2022.10.0 and replaced with --nworkers nworkers = "--nworkers" if tuple(map(int, self.distributed.__version__.split('.'))) >= (2022, 10, 0) else "--nprocs" @@ -2189,7 +2195,7 @@ def submit_dask_tasks(self, block=True, dask_nodes=1, dask_ppn=None, use_shifter self.queued_tasks = {} return len(self.futures) - def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, launch_interval=0.0, + def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppw=None, launch_interval=0.0, use_shifter=False, dask_worker_plugin=None, dask_worker_per_gpu=False): """Launch tasks in *queued_tasks*. Finished tasks are handled before launching new ones. If *block* is ``True``, the number of @@ -2198,7 +2204,10 @@ def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, that can immediately be launched is returned. If ``use_dask==True`` then the tasks are launched with - :meth:`submit_dask_tasks`. + :meth:`submit_dask_tasks`. One dask worker will be started for + each node unless dask_worker_per_gpu is True where one dask + worker will be started for every GPU. So dask_node times + GPUS_PER_NODE workers will be started. :param block: If True then wait for task to complete, default True :type block: bool @@ -2206,8 +2215,8 @@ def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, :type use_dask: bool :param dask_nodes: Number of task nodes, only used it ``use_dask==True`` :type dask_nodes: int - :param dask_ppn: Number of processes per node, only used it ``use_dask==True`` - :type dask_ppn: int + :param dask_ppw: Number of processes per dask worker, default is PROCS_PER_NODE, only used it ``use_dask==True`` + :type dask_ppw: int :param launch_internal: time to wait between launching tasks, default 0.0 :type launch_internal: float :param use_shifter: Option to launch dask scheduler and workers in shifter container @@ -2216,6 +2225,7 @@ def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, :type dask_worker_plugin: distributed.diagnostics.plugin.WorkerPlugin :param dask_worker_per_gpu: If true then a separate worker will be started for each GPU and binded to that GPU :type dask_worker_per_gpu: bool + """ if use_dask: @@ -2225,7 +2235,7 @@ def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, self.services.error("Requested to run dask within shifter but shifter not available") raise Exception("shifter not found") else: - return self.submit_dask_tasks(block, dask_nodes, dask_ppn, use_shifter, dask_worker_plugin, dask_worker_per_gpu) + return self.submit_dask_tasks(block, dask_nodes, dask_ppw, use_shifter, dask_worker_plugin, dask_worker_per_gpu) elif not TaskPool.dask: self.services.warning("Requested use_dask but cannot because import dask failed") elif not self.serial_pool: diff --git a/tests/helloworld/hello_worker_task_pool_dask.py b/tests/helloworld/hello_worker_task_pool_dask.py index 2412db7e..bc6a7ccc 100644 --- a/tests/helloworld/hello_worker_task_pool_dask.py +++ b/tests/helloworld/hello_worker_task_pool_dask.py @@ -43,7 +43,7 @@ def step(self, timestamp=0.0, **keywords): worker_plugin = DaskWorkerPlugin() - ret_val = self.services.submit_tasks('pool', use_dask=True, dask_nodes=1, dask_ppn=10, + ret_val = self.services.submit_tasks('pool', use_dask=True, dask_nodes=1, dask_ppw=10, dask_worker_plugin=worker_plugin) print('ret_val = ', ret_val) exit_status = self.services.get_finished_tasks('pool') From f86492638b7fa08b0cb93ebb0e3551e7d8f8b88b Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 28 Oct 2022 10:56:12 -0400 Subject: [PATCH 10/11] Make dask a required dependency --- .../conda_env/environment_minimal.yml | 1 + ipsframework/services.py | 8 ++++---- setup.py | 4 +++- tests/helloworld/test_helloworld.py | 3 --- tests/new/test_dask.py | 18 ------------------ 5 files changed, 8 insertions(+), 26 deletions(-) diff --git a/.github/workflows/conda_env/environment_minimal.yml b/.github/workflows/conda_env/environment_minimal.yml index b53a92c1..298eb53f 100644 --- a/.github/workflows/conda_env/environment_minimal.yml +++ b/.github/workflows/conda_env/environment_minimal.yml @@ -6,3 +6,4 @@ dependencies: - pytest-timeout - psutil - coverage!=6.3 +- dask<=2022.10.0 diff --git a/ipsframework/services.py b/ipsframework/services.py index f1b4609a..1f990751 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -2229,15 +2229,15 @@ def submit_tasks(self, block=True, use_dask=False, dask_nodes=1, dask_ppw=None, """ if use_dask: - if TaskPool.dask and self.serial_pool: + if TaskPool.dask and TaskPool.distributed and self.serial_pool: self.dask_pool = True if use_shifter and not self.shifter: self.services.error("Requested to run dask within shifter but shifter not available") - raise Exception("shifter not found") + raise RuntimeError("shifter not found") else: return self.submit_dask_tasks(block, dask_nodes, dask_ppw, use_shifter, dask_worker_plugin, dask_worker_per_gpu) - elif not TaskPool.dask: - self.services.warning("Requested use_dask but cannot because import dask failed") + elif not TaskPool.dask or not TaskPool.distributed: + raise RuntimeError("Requested use_dask but cannot because import dask or distributed failed") elif not self.serial_pool: self.services.warning("Requested use_dask but cannot because multiple processors requested") diff --git a/setup.py b/setup.py index e53377d1..d57eaa69 100644 --- a/setup.py +++ b/setup.py @@ -64,6 +64,8 @@ zip_safe=True, install_requires=[ 'urllib3', - 'configobj' + 'configobj', + 'dask', + 'distributed' ] ) diff --git a/tests/helloworld/test_helloworld.py b/tests/helloworld/test_helloworld.py index 058c9d7f..b1ddab43 100644 --- a/tests/helloworld/test_helloworld.py +++ b/tests/helloworld/test_helloworld.py @@ -1,7 +1,6 @@ import os import shutil import json -import pytest from ipsframework import Framework, TaskPool @@ -211,8 +210,6 @@ def test_helloworld_task_pool(tmpdir, capfd): def test_helloworld_task_pool_dask(tmpdir, capfd): - pytest.importorskip("dask") - pytest.importorskip("distributed") assert TaskPool.dask is not None data_dir = os.path.dirname(__file__) diff --git a/tests/new/test_dask.py b/tests/new/test_dask.py index 3d96ee38..264ca6c9 100644 --- a/tests/new/test_dask.py +++ b/tests/new/test_dask.py @@ -75,8 +75,6 @@ def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfil def test_dask(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") platform_file, config_file = write_basic_config_and_platform_files(tmpdir, value=1) framework = Framework(config_file_list=[str(config_file)], @@ -128,8 +126,6 @@ def test_dask(tmpdir): @pytest.mark.skipif(shutil.which('shifter') is not None, reason="This tests only works if shifter doesn't exist") def test_dask_shifter_fail(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") platform_file, config_file = write_basic_config_and_platform_files(tmpdir, value=1, shifter=True) framework = Framework(config_file_list=[str(config_file)], @@ -164,8 +160,6 @@ def test_dask_shifter_fail(tmpdir): def test_dask_fake_shifter(tmpdir, monkeypatch): - pytest.importorskip("dask") - pytest.importorskip("distributed") shifter = tmpdir.join("shifter") shifter.write("#!/bin/bash\necho Running $@ in shifter >> shifter.log\n$@\n") @@ -238,8 +232,6 @@ def test_dask_fake_shifter(tmpdir, monkeypatch): def test_dask_timeout(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") platform_file, config_file = write_basic_config_and_platform_files(tmpdir, timeout=1, value=100) framework = Framework(config_file_list=[str(config_file)], @@ -289,8 +281,6 @@ def test_dask_timeout(tmpdir): def test_dask_nproc(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") platform_file, config_file = write_basic_config_and_platform_files(tmpdir, nproc=2, value=1) # Running with NPROC=2 should prevent dask from running and revert to normal task pool @@ -325,8 +315,6 @@ def test_dask_nproc(tmpdir): def test_dask_logfile(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") exe = tmpdir.join("stdouterr_write.sh") exe.write("#!/bin/bash\necho Running $1\n>&2 echo ERROR $1\n") @@ -371,8 +359,6 @@ def test_dask_logfile(tmpdir): def test_dask_logfile_errfile(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") exe = tmpdir.join("stdouterr_write.sh") exe.write("#!/bin/bash\necho Running $1\n>&2 echo ERROR $1\n") @@ -473,8 +459,6 @@ def test_dask_shifter_on_cori(tmpdir): def test_dask_with_1_gpu(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") platform_file, config_file = write_basic_config_and_platform_files(tmpdir, gpus=1) framework = Framework(config_file_list=[str(config_file)], @@ -514,8 +498,6 @@ def test_dask_with_1_gpu(tmpdir): def test_dask_with_2_gpus(tmpdir): - pytest.importorskip("dask") - pytest.importorskip("distributed") platform_file, config_file = write_basic_config_and_platform_files(tmpdir, gpus=2) framework = Framework(config_file_list=[str(config_file)], From 421f9f8a9c3b77f95a54a4df85b44d6e35d78da9 Mon Sep 17 00:00:00 2001 From: Ross Whitfield Date: Fri, 28 Oct 2022 14:13:56 -0700 Subject: [PATCH 11/11] Add perlmutter test --- doc/development.rst | 34 +++++- .../components/workers/perlmutter_srun_gpu.py | 13 +++ tests/conftest.py | 21 ++-- tests/new/test_perlmutter_srun.py | 103 ++++++++++++++++++ 4 files changed, 163 insertions(+), 8 deletions(-) create mode 100644 tests/components/workers/perlmutter_srun_gpu.py create mode 100644 tests/new/test_perlmutter_srun.py diff --git a/doc/development.rst b/doc/development.rst index 7b02658f..9f528d56 100644 --- a/doc/development.rst +++ b/doc/development.rst @@ -169,6 +169,38 @@ An example batch script for running the unit tests is: Then check the output in ``pytest.out`` to see that all the tests passed. +.. _perlmutter-tests : + +Perlmutter only tests +~~~~~~~~~~~~~~~~~~~~~ + +The are some tests that only run on Perlmutter at NERSC and these are +not run as part of the :ref:`CI ` and must be +run manually. To run those test you need to add the option +``--runperlmutter`` to the ``pytest``.There are also tests for the +srun commands built with different ``task_ppn``, ``task_cpp`` and +``task_gpp`` options in +:meth:`~ipsframework.services.ServicesProxy.launch_task`. + + +An example batch script for running the unit tests is: + +.. code-block:: bash + + #!/bin/bash + #SBATCH -p debug + #SBATCH --nodes=1 + #SBATCH -t 00:20:00 + #SBATCH -C gpu + #SBATCH -J pytest + #SBATCH -e pytest.err + #SBATCH -o pytest.out + module load python + python -m pytest --runperlmutter + +Then check the output in ``pytest.out`` to see that all the tests +passed. + Writing Tests ~~~~~~~~~~~~~ @@ -258,4 +290,4 @@ release before the full release to allow feedback from users. Patch versions will not normally have an release candidate. Before a release is finalized the :ref:`Cori only tests` -should be run. +and :ref:`Perlmutter only tests` should be run. diff --git a/tests/components/workers/perlmutter_srun_gpu.py b/tests/components/workers/perlmutter_srun_gpu.py new file mode 100644 index 00000000..48e6c54b --- /dev/null +++ b/tests/components/workers/perlmutter_srun_gpu.py @@ -0,0 +1,13 @@ +from ipsframework import Component + + +class gpu_task(Component): + # pylint: disable=no-member + def step(self, timestamp=0.0, **keywords): + cwd = self.services.get_working_dir() + + self.services.wait_task(self.services.launch_task(1, cwd, self.EXE, "1_1", task_gpp=1)) + self.services.wait_task(self.services.launch_task(1, cwd, self.EXE, "1_2", task_gpp=2)) + self.services.wait_task(self.services.launch_task(1, cwd, self.EXE, "1_4", task_gpp=4)) + self.services.wait_task(self.services.launch_task(2, cwd, self.EXE, "2_2", task_gpp=2)) + self.services.wait_task(self.services.launch_task(4, cwd, self.EXE, "4_1", task_gpp=1)) diff --git a/tests/conftest.py b/tests/conftest.py index 52367f6e..39d7e44c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -44,17 +44,24 @@ def run_around_tests(): def pytest_addoption(parser): parser.addoption("--runcori", action="store_true", default=False, help="run Cori tests") + parser.addoption("--runperlmutter", action="store_true", default=False, help="run Perlmutter tests") def pytest_configure(config): config.addinivalue_line("markers", "cori: mark test to only work on Cori") + config.addinivalue_line("markers", "perlmutter: mark test to only work on Perlmutter") def pytest_collection_modifyitems(config, items): - if config.getoption("--runcori"): - # --runslow given in cli: do not skip slow tests - return - skip_cori = pytest.mark.skip(reason="need --runcori option to run") - for item in items: - if "cori" in item.keywords: - item.add_marker(skip_cori) + if not config.getoption("--runcori"): + # --runcori given in cli: do not skip slow tests + skip_cori = pytest.mark.skip(reason="need --runcori option to run") + for item in items: + if "cori" in item.keywords: + item.add_marker(skip_cori) + if not config.getoption("--runperlmutter"): + # --runperlmutter given in cli: do not skip slow tests + skip_cori = pytest.mark.skip(reason="need --runperlmutter option to run") + for item in items: + if "perlmutter" in item.keywords: + item.add_marker(skip_cori) diff --git a/tests/new/test_perlmutter_srun.py b/tests/new/test_perlmutter_srun.py new file mode 100644 index 00000000..b9122db5 --- /dev/null +++ b/tests/new/test_perlmutter_srun.py @@ -0,0 +1,103 @@ +import glob +import json +import pytest +from ipsframework import Framework + + +def write_basic_config_and_platform_files(tmpdir): + platform_file = tmpdir.join('perlmutter.platform.conf') + + platform = """MPIRUN = srun +HOST = perlmutter +NODE_DETECTION = slurm_env +CORES_PER_NODE = 64 +PROCS_PER_NODE = 64 +GPUS_PER_NODE = 4 +SOCKETS_PER_NODE = 1 +NODE_ALLOCATION_MODE = EXCLUSIVE +USE_ACCURATE_NODES = ON +""" + + with open(platform_file, 'w') as f: + f.write(platform) + + config_file = tmpdir.join('ips.config') + + config = f"""RUN_COMMENT = testing +SIM_NAME = test +LOG_FILE = {str(tmpdir)}/sim.log +LOG_LEVEL = INFO +SIM_ROOT = {str(tmpdir)} +SIMULATION_MODE = NORMAL +[PORTS] + NAMES = DRIVER + [[DRIVER]] + IMPLEMENTATION = DRIVER +[DRIVER] + CLASS = OPENMP + SUB_CLASS = + NAME = gpu_task + BIN_PATH = + EXE = {str(tmpdir)}/gpu_test.sh + NPROC = 1 + INPUT_FILES = + OUTPUT_FILES = + SCRIPT = + MODULE = components.workers.perlmutter_srun_gpu +""" + + with open(config_file, 'w') as f: + f.write(config) + + return platform_file, config_file + + +@pytest.mark.perlmutter +def test_srun_gpu_on_perlmutter(tmpdir): + + platform_file, config_file = write_basic_config_and_platform_files(tmpdir) + + exe = tmpdir.join("gpu_test.sh") + exe.write("#!/bin/bash\nmkdir -p $1\nnvidia-smi -L > $1/proc_${SLURM_PROCID}_GPUS.log\n") + exe.chmod(448) # 700 + + framework = Framework(config_file_list=[str(config_file)], + log_file_name=str(tmpdir.join('ips.log')), + platform_file_name=str(platform_file), + debug=None, + verbose_debug=None, + cmd_nodes=0, + cmd_ppn=0) + + framework.run() + + # check simulation_log + json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + with open(json_files[0], 'r') as json_file: + comments = [json.loads(line)['comment'].split(', ', maxsplit=4)[3:] for line in json_file.readlines()] + + assert comments[5][0].startswith("Target = srun -N 1 -n 1 -c 64 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1") + assert comments[5][0].endswith("gpu_test.sh 1_1") + + assert comments[7][0].startswith("Target = srun -N 1 -n 1 -c 64 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=2") + assert comments[7][0].endswith("gpu_test.sh 1_2") + + assert comments[9][0].startswith("Target = srun -N 1 -n 1 -c 64 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=4") + assert comments[9][0].endswith("gpu_test.sh 1_4") + + assert comments[11][0].startswith("Target = srun -N 1 -n 2 -c 32 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=2") + assert comments[11][0].endswith("gpu_test.sh 2_2") + + assert comments[13][0].startswith("Target = srun -N 1 -n 4 -c 16 --threads-per-core=1 --cpu-bind=cores --gpus-per-task=1") + assert comments[13][0].endswith("gpu_test.sh 4_1") + + # check that the process output log files are created + work_dir = tmpdir.join("work").join("OPENMP__gpu_task_1") + + for nprocs, ngpus in ((1, 1), (1, 2), (1, 4), (2, 2), (4, 1)): + output_files = glob.glob(str(work_dir.join(f'{nprocs}_{ngpus}').join("*.log"))) + assert len(output_files) == nprocs + for n in range(nprocs): + lines = open(output_files[n], 'r').readlines() + assert len(lines) == ngpus