Skip to content

Commit

Permalink
Specify distribution strategy per env var
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Aug 9, 2021
1 parent 43eac71 commit 28a74de
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion src/binding/python/openpmd_api/pipe/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .. import openpmd_api_cxx as io
import argparse
import os # os.path.basename
import re
import sys # sys.stderr.write

# MPI is an optional dependency
Expand Down Expand Up @@ -44,6 +45,12 @@ def parse_args(program_name):
as soon as the mpi4py package is found and this tool is called in an MPI
context. In that case, each dataset will be equally sliced along the dimension
with the largest extent.
A chunk distribution strategy may be selected via the environment variable
OPENPMD_CHUNK_DISTRIBUTION. Options include "roundrobin", "binpacking",
"slicedataset" and "hostname_<1>_<2>", where <1> should be replaced with a
strategy to be applied within a compute node and <2> with a secondary strategy
in case the hostname strategy does not distribute all chunks.
Examples:
{0} --infile simData.h5 --outfile simData_%T.bp
Expand Down Expand Up @@ -97,6 +104,40 @@ def run(self):
self.dest.store(index, item)


def distribution_strategy(dataset_extent,
mpi_rank,
mpi_size,
strategy_identifier=None):
if strategy_identifier is None or not strategy_identifier:
if 'OPENPMD_CHUNK_DISTRIBUTION' in os.environ:
strategy_identifier = os.environ[
'OPENPMD_CHUNK_DISTRIBUTION'].lower()
else:
strategy_identifier = 'hostname_binpacking_slicedataset' # default
match = re.search('hostname_(.*)_(.*)', strategy_identifier)
if match is not None:
inside_node = distribution_strategy(dataset_extent,
mpi_rank,
mpi_size,
strategy_identifier=match.group(1))
second_phase = distribution_strategy(
dataset_extent,
mpi_rank,
mpi_size,
strategy_identifier=match.group(2))
return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase)
elif strategy_identifier == 'roundrobin':
return io.RoundRobin()
elif strategy_identifier == 'binpacking':
return io.BinPacking()
elif strategy_identifier == 'slicedataset':
return io.ByCuboidSlice(io.OneDimensionalBlockSlicer(), dataset_extent,
mpi_rank, mpi_size)
else:
raise RuntimeError("Unknown distribution strategy: " +
strategy_identifier)


class pipe:
"""
Represents the configuration of one "pipe" pass.
Expand Down Expand Up @@ -226,7 +267,8 @@ def __copy(self, src, dest, current_path="/data/"):
dest.make_constant(src.get_attribute("value"))
else:
chunk_table = src.available_chunks()
strategy = io.BinPacking()
strategy = distribution_strategy(shape, self.comm.rank,
self.comm.size)
my_chunks = strategy.assign_chunks(chunk_table, self.inranks,
self.outranks)
for chunk in [
Expand Down

0 comments on commit 28a74de

Please sign in to comment.