Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Chunk distribution strategies #824

Open
wants to merge 11 commits into
base: dev
Choose a base branch
from
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ if(openPMD_BUILD_TESTING)
)
add_test(NAME CLI.pipe.py
COMMAND sh -c
"${MPI_TEST_EXE} ${Python_EXECUTABLE} \
"${MPI_TEST_EXE} -n 2 ${Python_EXECUTABLE} \
${openPMD_RUNTIME_OUTPUT_DIRECTORY}/openpmd-pipe \
--infile ../samples/git-sample/data%T.h5 \
--outfile ../samples/git-sample/data%T.bp && \
Expand All @@ -1373,7 +1373,7 @@ if(openPMD_BUILD_TESTING)
--outfile \
../samples/git-sample/single_iteration_%T.bp && \
\
${MPI_TEST_EXE} ${Python_EXECUTABLE} \
${MPI_TEST_EXE} -n 2 ${Python_EXECUTABLE} \
${openPMD_RUNTIME_OUTPUT_DIRECTORY}/openpmd-pipe \
--infile ../samples/git-sample/thetaMode/data%T.h5 \
--outfile \
Expand Down
307 changes: 307 additions & 0 deletions include/openPMD/ChunkInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "openPMD/config.hpp"

#include "openPMD/Dataset.hpp" // Offset, Extent
#include "openPMD/benchmark/mpi/BlockSlicer.hpp"
#include <memory>

#if openPMD_HAVE_MPI
#include <mpi.h>
Expand Down Expand Up @@ -84,7 +86,312 @@ using ChunkTable = std::vector<WrittenChunkInfo>;

namespace chunk_assignment
{
constexpr char const *HOSTFILE_VARNAME = "MPI_WRITTEN_HOSTFILE";

using RankMeta = std::map<unsigned int, std::string>;

using Assignment = std::map<unsigned int, std::vector<WrittenChunkInfo>>;

template <typename Chunk_t>
void mergeChunks(std::vector<Chunk_t> &);

auto mergeChunksFromSameSourceID(std::vector<WrittenChunkInfo> const &)
-> std::map<unsigned int, std::vector<ChunkInfo>>;

struct PartialAssignment
{
ChunkTable notAssigned;
Assignment assigned;

explicit PartialAssignment() = default;
PartialAssignment(ChunkTable notAssigned);
PartialAssignment(ChunkTable notAssigned, Assignment assigned);
};

/**
* @brief Interface for a chunk distribution strategy.
*
* Used for implementing algorithms that read a ChunkTable as produced
* by BaseRecordComponent::availableChunks() and produce as result a
* ChunkTable that guides data sinks on how to load data into reading
* processes.
*/
struct Strategy
{
Assignment assign(
ChunkTable,
RankMeta const &rankMetaIn,
RankMeta const &rankMetaOut);
/**
* @brief Assign chunks to be loaded to reading processes.
*
* @param partialAssignment Two chunktables, one of unassigned chunks
* and one of chunks that might have already been assigned
* previously.
* Merge the unassigned chunks into the partially assigned table.
* @param in Meta information on writing processes, e.g. hostnames.
* @param out Meta information on reading processes, e.g. hostnames.
* @return ChunkTable A table that assigns chunks to reading processes.
*/
virtual Assignment assign(
PartialAssignment partialAssignment,
RankMeta const &in,
RankMeta const &out) = 0;

virtual std::unique_ptr<Strategy> clone() const = 0;

virtual ~Strategy() = default;
};

/**
* @brief A chunk distribution strategy that guarantees no complete
* distribution.
*
* Combine with a full Strategy using the FromPartialStrategy struct to
* obtain a Strategy that works in two phases:
* 1. Apply the partial strategy.
* 2. Apply the full strategy to assign unassigned leftovers.
*
*/
struct PartialStrategy
{
PartialAssignment
assign(ChunkTable table, RankMeta const &in, RankMeta const &out);
/**
* @brief Assign chunks to be loaded to reading processes.
*
* @param partialAssignment Two chunktables, one of unassigned chunks
* and one of chunks that might have already been assigned
* previously.
* Merge the unassigned chunks into the partially assigned table.
* @param in Meta information on writing processes, e.g. hostnames.
* @param out Meta information on reading processes, e.g. hostnames.
* @return PartialAssignment Two chunktables, one of leftover chunks
* that were not assigned and one that assigns chunks to
* reading processes.
*/
virtual PartialAssignment assign(
PartialAssignment partialAssignment,
RankMeta const &in,
RankMeta const &out) = 0;

virtual std::unique_ptr<PartialStrategy> clone() const = 0;

virtual ~PartialStrategy() = default;
};

/**
* @brief Combine a PartialStrategy and a Strategy to obtain a Strategy
* working in two phases.
*
* 1. Apply the PartialStrategy to obtain a PartialAssignment.
* This may be a heuristic that will not work under all circumstances,
* e.g. trying to distribute chunks within the same compute node.
* 2. Apply the Strategy to assign leftovers.
* This guarantees correctness in case the heuristics in the first phase
* were not applicable e.g. due to a suboptimal setup.
*
*/
struct FromPartialStrategy : Strategy
{
FromPartialStrategy(
std::unique_ptr<PartialStrategy> firstPass,
std::unique_ptr<Strategy> secondPass);

virtual Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;

private:
std::unique_ptr<PartialStrategy> m_firstPass;
std::unique_ptr<Strategy> m_secondPass;
};

/**
* @brief Simple strategy that assigns produced chunks to reading processes
* in a round-Robin manner.
*
*/
struct RoundRobin : Strategy
{
Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;
};

struct RoundRobinOfSourceRanks : Strategy
{
Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;
};

struct Blocks : Strategy
{
private:
unsigned int mpi_size, mpi_rank;

public:
Blocks(unsigned int mpi_rank, unsigned int mpi_size);

Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

[[nodiscard]] std::unique_ptr<Strategy> clone() const override;
};

struct BlocksOfSourceRanks : Strategy
{
private:
unsigned int mpi_size, mpi_rank;

public:
BlocksOfSourceRanks(unsigned int mpi_rank, unsigned int mpi_size);

Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

[[nodiscard]] std::unique_ptr<Strategy> clone() const override;
};

/**
* @brief Strategy that assigns chunks to be read by processes within
* the same host that produced the chunk.
*
* The distribution strategy within one such chunk can be flexibly
* chosen.
*
*/
struct ByHostname : PartialStrategy
{
ByHostname(std::unique_ptr<Strategy> withinNode);

PartialAssignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<PartialStrategy> clone() const override;

private:
std::unique_ptr<Strategy> m_withinNode;
};

/**
* @brief Slice the n-dimensional dataset into hyperslabs and distribute
* chunks according to them.
*
* This strategy only produces chunks in the returned ChunkTable for the
* calling parallel process.
* Incoming chunks are intersected with the hyperslab and assigned to the
* current parallel process in case this intersection is non-empty.
*
*/
struct ByCuboidSlice : Strategy
{
ByCuboidSlice(
std::unique_ptr<BlockSlicer> blockSlicer,
Extent totalExtent,
unsigned int mpi_rank,
unsigned int mpi_size);

Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;

private:
std::unique_ptr<BlockSlicer> blockSlicer;
Extent totalExtent;
unsigned int mpi_rank, mpi_size;
};

/**
* @brief Strategy that tries to assign chunks in a balanced manner without
* arbitrarily cutting chunks.
*
* Idea:
* Calculate the ideal amount of data to be loaded per parallel process
* and cut chunks s.t. no chunk is larger than that ideal size.
* The resulting problem is an instance of the Bin-Packing problem which
* can be solved by a factor-2 approximation, meaning that a reading process
* will be assigned at worst twice the ideal amount of data.
*
*/
struct BinPacking : Strategy
{
size_t splitAlongDimension = 0;

/**
* @param splitAlongDimension If a chunk needs to be split, split it
* along this dimension.
*/
BinPacking(size_t splitAlongDimension = 0);

Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;
};

/**
* @brief Strategy that purposefully fails when the PartialAssignment has
* leftover chunks.
*
* Useful as second phase in FromPartialStrategy to assert that the first
* pass of the strategy catches all blocks, e.g. to assert that all chunks
* can be assigned within the same compute node.
*
*/
struct FailingStrategy : Strategy
{
explicit FailingStrategy();

Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;
};

/**
* @brief Strategy that purposefully discards leftover chunk from
* the PartialAssignment.
*
* Useful as second phase in FromPartialStrategy when knowing that some
* chunks will go unassigned, but still wanting to communicate only within
* the same node.
*
*/
struct DiscardingStrategy : Strategy
{
explicit DiscardingStrategy();

Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;
};
} // namespace chunk_assignment

namespace host_info
Expand Down
1 change: 1 addition & 0 deletions include/openPMD/backend/BaseRecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#pragma once

#include "openPMD/ChunkInfo.hpp"
#include "openPMD/Dataset.hpp"
#include "openPMD/Error.hpp"
#include "openPMD/backend/Attributable.hpp"
Expand Down
4 changes: 4 additions & 0 deletions include/openPMD/benchmark/mpi/BlockSlicer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "openPMD/Dataset.hpp"

#include <memory>

namespace openPMD
{
/**
Expand All @@ -42,6 +44,8 @@ class BlockSlicer
virtual std::pair<Offset, Extent>
sliceBlock(Extent &totalExtent, int size, int rank) = 0;

virtual std::unique_ptr<BlockSlicer> clone() const = 0;

/** This class will be derived from
*/
virtual ~BlockSlicer() = default;
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ class OneDimensionalBlockSlicer : public BlockSlicer

explicit OneDimensionalBlockSlicer(Extent::value_type dim = 0);

static std::pair<size_t, size_t>
n_th_block_inside(size_t length, size_t rank, size_t size);

std::pair<Offset, Extent>
sliceBlock(Extent &totalExtent, int size, int rank) override;

virtual std::unique_ptr<BlockSlicer> clone() const override;
};
} // namespace openPMD
Loading
Loading