From d6209b83e8bee69293cfb09f8b364c4363ad91fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 2 Mar 2023 13:34:07 +0100 Subject: [PATCH 01/11] Add chunk distribution algorithms --- include/openPMD/ChunkInfo.hpp | 237 +++++++++ include/openPMD/benchmark/mpi/BlockSlicer.hpp | 4 + .../mpi/OneDimensionalBlockSlicer.hpp | 2 + src/ChunkInfo.cpp | 492 ++++++++++++++++++ .../mpi/OneDimensionalBlockSlicer.cpp | 5 + 5 files changed, 740 insertions(+) diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 9bc6e94972..b44379b2aa 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -23,6 +23,7 @@ #include "openPMD/config.hpp" #include "openPMD/Dataset.hpp" // Offset, Extent +#include "openPMD/benchmark/mpi/BlockSlicer.hpp" #if openPMD_HAVE_MPI #include @@ -84,7 +85,243 @@ using ChunkTable = std::vector; namespace chunk_assignment { + constexpr char const *HOSTFILE_VARNAME = "MPI_WRITTEN_HOSTFILE"; + using RankMeta = std::map; + + using Assignment = std::map>; + + struct PartialAssignment + { + ChunkTable notAssigned; + Assignment assigned; + + explicit PartialAssignment() = default; + PartialAssignment(ChunkTable notAssigned); + PartialAssignment(ChunkTable notAssigned, Assignment assigned); + }; + + /** + * @brief Interface for a chunk distribution strategy. + * + * Used for implementing algorithms that read a ChunkTable as produced + * by BaseRecordComponent::availableChunks() and produce as result a + * ChunkTable that guides data sinks on how to load data into reading + * processes. + */ + struct Strategy + { + Assignment assign( + ChunkTable, + RankMeta const &rankMetaIn, + RankMeta const &rankMetaOut); + /** + * @brief Assign chunks to be loaded to reading processes. + * + * @param partialAssignment Two chunktables, one of unassigned chunks + * and one of chunks that might have already been assigned + * previously. + * Merge the unassigned chunks into the partially assigned table. + * @param in Meta information on writing processes, e.g. hostnames. + * @param out Meta information on reading processes, e.g. hostnames. + * @return ChunkTable A table that assigns chunks to reading processes. + */ + virtual Assignment assign( + PartialAssignment partialAssignment, + RankMeta const &in, + RankMeta const &out) = 0; + + virtual std::unique_ptr clone() const = 0; + + virtual ~Strategy() = default; + }; + + /** + * @brief A chunk distribution strategy that guarantees no complete + * distribution. + * + * Combine with a full Strategy using the FromPartialStrategy struct to + * obtain a Strategy that works in two phases: + * 1. Apply the partial strategy. + * 2. Apply the full strategy to assign unassigned leftovers. + * + */ + struct PartialStrategy + { + PartialAssignment + assign(ChunkTable table, RankMeta const &in, RankMeta const &out); + /** + * @brief Assign chunks to be loaded to reading processes. + * + * @param partialAssignment Two chunktables, one of unassigned chunks + * and one of chunks that might have already been assigned + * previously. + * Merge the unassigned chunks into the partially assigned table. + * @param in Meta information on writing processes, e.g. hostnames. + * @param out Meta information on reading processes, e.g. hostnames. + * @return PartialAssignment Two chunktables, one of leftover chunks + * that were not assigned and one that assigns chunks to + * reading processes. + */ + virtual PartialAssignment assign( + PartialAssignment partialAssignment, + RankMeta const &in, + RankMeta const &out) = 0; + + virtual std::unique_ptr clone() const = 0; + + virtual ~PartialStrategy() = default; + }; + + /** + * @brief Combine a PartialStrategy and a Strategy to obtain a Strategy + * working in two phases. + * + * 1. Apply the PartialStrategy to obtain a PartialAssignment. + * This may be a heuristic that will not work under all circumstances, + * e.g. trying to distribute chunks within the same compute node. + * 2. Apply the Strategy to assign leftovers. + * This guarantees correctness in case the heuristics in the first phase + * were not applicable e.g. due to a suboptimal setup. + * + */ + struct FromPartialStrategy : Strategy + { + FromPartialStrategy( + std::unique_ptr firstPass, + std::unique_ptr secondPass); + + virtual Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + + private: + std::unique_ptr m_firstPass; + std::unique_ptr m_secondPass; + }; + + /** + * @brief Simple strategy that assigns produced chunks to reading processes + * in a round-Robin manner. + * + */ + struct RoundRobin : Strategy + { + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + }; + + /** + * @brief Strategy that assigns chunks to be read by processes within + * the same host that produced the chunk. + * + * The distribution strategy within one such chunk can be flexibly + * chosen. + * + */ + struct ByHostname : PartialStrategy + { + ByHostname(std::unique_ptr withinNode); + + PartialAssignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + + private: + std::unique_ptr m_withinNode; + }; + + /** + * @brief Slice the n-dimensional dataset into hyperslabs and distribute + * chunks according to them. + * + * This strategy only produces chunks in the returned ChunkTable for the + * calling parallel process. + * Incoming chunks are intersected with the hyperslab and assigned to the + * current parallel process in case this intersection is non-empty. + * + */ + struct ByCuboidSlice : Strategy + { + ByCuboidSlice( + std::unique_ptr blockSlicer, + Extent totalExtent, + unsigned int mpi_rank, + unsigned int mpi_size); + + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + + private: + std::unique_ptr blockSlicer; + Extent totalExtent; + unsigned int mpi_rank, mpi_size; + }; + + /** + * @brief Strategy that tries to assign chunks in a balanced manner without + * arbitrarily cutting chunks. + * + * Idea: + * Calculate the ideal amount of data to be loaded per parallel process + * and cut chunks s.t. no chunk is larger than that ideal size. + * The resulting problem is an instance of the Bin-Packing problem which + * can be solved by a factor-2 approximation, meaning that a reading process + * will be assigned at worst twice the ideal amount of data. + * + */ + struct BinPacking : Strategy + { + size_t splitAlongDimension = 0; + + /** + * @param splitAlongDimension If a chunk needs to be split, split it + * along this dimension. + */ + BinPacking(size_t splitAlongDimension = 0); + + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + }; + + /** + * @brief Strategy that purposefully fails when the PartialAssignment has + * leftover chunks. + * + * Useful as second phase in FromPartialStrategy to assert that the first + * pass of the strategy catches all blocks, e.g. to assert that all chunks + * can be assigned within the same compute node. + * + */ + struct FailingStrategy : Strategy + { + explicit FailingStrategy(); + + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + }; } // namespace chunk_assignment namespace host_info diff --git a/include/openPMD/benchmark/mpi/BlockSlicer.hpp b/include/openPMD/benchmark/mpi/BlockSlicer.hpp index c66716217a..a720793b41 100644 --- a/include/openPMD/benchmark/mpi/BlockSlicer.hpp +++ b/include/openPMD/benchmark/mpi/BlockSlicer.hpp @@ -23,6 +23,8 @@ #include "openPMD/Dataset.hpp" +#include + namespace openPMD { /** @@ -42,6 +44,8 @@ class BlockSlicer virtual std::pair sliceBlock(Extent &totalExtent, int size, int rank) = 0; + virtual std::unique_ptr clone() const = 0; + /** This class will be derived from */ virtual ~BlockSlicer() = default; diff --git a/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp b/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp index 78f955524b..cb12da9350 100644 --- a/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp +++ b/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp @@ -35,5 +35,7 @@ class OneDimensionalBlockSlicer : public BlockSlicer std::pair sliceBlock(Extent &totalExtent, int size, int rank) override; + + virtual std::unique_ptr clone() const override; }; } // namespace openPMD diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index 5acb1ea07e..7b6c1e32ca 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -23,6 +23,10 @@ #include "openPMD/auxiliary/Mpi.hpp" +#include // std::sort +#include +#include +#include #include #ifdef _WIN32 @@ -62,6 +66,494 @@ bool WrittenChunkInfo::operator==(WrittenChunkInfo const &other) const this->ChunkInfo::operator==(other); } +namespace chunk_assignment +{ + namespace + { + std::map > + ranksPerHost(RankMeta const &rankMeta) + { + std::map > res; + for (auto const &pair : rankMeta) + { + auto &list = res[pair.second]; + list.emplace_back(pair.first); + } + return res; + } + } // namespace + + Assignment Strategy::assign( + ChunkTable table, RankMeta const &rankIn, RankMeta const &rankOut) + { + if (rankOut.size() == 0) + { + throw std::runtime_error("[assignChunks] No output ranks defined"); + } + return this->assign( + PartialAssignment(std::move(table)), rankIn, rankOut); + } + + PartialAssignment::PartialAssignment( + ChunkTable notAssigned_in, Assignment assigned_in) + : notAssigned(std::move(notAssigned_in)) + , assigned(std::move(assigned_in)) + {} + + PartialAssignment::PartialAssignment(ChunkTable notAssigned_in) + : PartialAssignment(std::move(notAssigned_in), Assignment()) + {} + + PartialAssignment PartialStrategy::assign( + ChunkTable table, RankMeta const &rankIn, RankMeta const &rankOut) + { + return this->assign( + PartialAssignment(std::move(table)), rankIn, rankOut); + } + + FromPartialStrategy::FromPartialStrategy( + std::unique_ptr firstPass, + std::unique_ptr secondPass) + : m_firstPass(std::move(firstPass)), m_secondPass(std::move(secondPass)) + {} + + Assignment FromPartialStrategy::assign( + PartialAssignment partialAssignment, + RankMeta const &in, + RankMeta const &out) + { + return m_secondPass->assign( + m_firstPass->assign(std::move(partialAssignment), in, out), + in, + out); + } + + std::unique_ptr FromPartialStrategy::clone() const + { + return std::unique_ptr(new FromPartialStrategy( + m_firstPass->clone(), m_secondPass->clone())); + } + + Assignment RoundRobin::assign( + PartialAssignment partialAssignment, + RankMeta const &, // ignored parameter + RankMeta const &out) + { + if (out.size() == 0) + { + throw std::runtime_error( + "[RoundRobin] Cannot round-robin to zero ranks."); + } + auto it = out.begin(); + auto nextRank = [&it, &out]() { + if (it == out.end()) + { + it = out.begin(); + } + auto res = it->first; + it++; + return res; + }; + ChunkTable &sourceChunks = partialAssignment.notAssigned; + Assignment &sinkChunks = partialAssignment.assigned; + for (auto &chunk : sourceChunks) + { + chunk.sourceID = nextRank(); + sinkChunks[chunk.sourceID].push_back(std::move(chunk)); + } + return sinkChunks; + } + + std::unique_ptr RoundRobin::clone() const + { + return std::unique_ptr(new RoundRobin); + } + + ByHostname::ByHostname(std::unique_ptr withinNode) + : m_withinNode(std::move(withinNode)) + {} + + PartialAssignment ByHostname::assign( + PartialAssignment res, RankMeta const &in, RankMeta const &out) + { + // collect chunks by hostname + std::map chunkGroups; + ChunkTable &sourceChunks = res.notAssigned; + Assignment &sinkChunks = res.assigned; + { + ChunkTable leftover; + for (auto &chunk : sourceChunks) + { + auto it = in.find(chunk.sourceID); + if (it == in.end()) + { + leftover.push_back(std::move(chunk)); + } + else + { + std::string const &hostname = it->second; + ChunkTable &chunksOnHost = chunkGroups[hostname]; + chunksOnHost.push_back(std::move(chunk)); + } + } + // undistributed chunks will be put back in later on + sourceChunks.clear(); + for (auto &chunk : leftover) + { + sourceChunks.push_back(std::move(chunk)); + } + } + // chunkGroups will now contain chunks by hostname + // the ranks are the source ranks + + // which ranks live on host in the sink? + std::map > ranksPerHostSink = + ranksPerHost(out); + for (auto &chunkGroup : chunkGroups) + { + std::string const &hostname = chunkGroup.first; + // find reading ranks on the sink host with same name + auto it = ranksPerHostSink.find(hostname); + if (it == ranksPerHostSink.end() || it->second.empty()) + { + /* + * These are leftover, go back to the input. + */ + for (auto &chunk : chunkGroup.second) + { + sourceChunks.push_back(std::move(chunk)); + } + } + else + { + RankMeta ranksOnTargetNode; + for (unsigned int rank : it->second) + { + ranksOnTargetNode[rank] = hostname; + } + Assignment swapped; + swapped.swap(sinkChunks); + sinkChunks = m_withinNode->assign( + PartialAssignment(chunkGroup.second, std::move(swapped)), + in, + ranksOnTargetNode); + } + } + return res; + } + + std::unique_ptr ByHostname::clone() const + { + return std::unique_ptr( + new ByHostname(m_withinNode->clone())); + } + + ByCuboidSlice::ByCuboidSlice( + std::unique_ptr blockSlicer_in, + Extent totalExtent_in, + unsigned int mpi_rank_in, + unsigned int mpi_size_in) + : blockSlicer(std::move(blockSlicer_in)) + , totalExtent(std::move(totalExtent_in)) + , mpi_rank(mpi_rank_in) + , mpi_size(mpi_size_in) + {} + + namespace + { + /** + * @brief Compute the intersection of two chunks. + * + * @param offset Offset of chunk 1, result will be written in place. + * @param extent Extent of chunk 1, result will be written in place. + * @param withinOffset Offset of chunk 2. + * @param withinExtent Extent of chunk 2. + */ + void restrictToSelection( + Offset &offset, + Extent &extent, + Offset const &withinOffset, + Extent const &withinExtent) + { + for (size_t i = 0; i < offset.size(); ++i) + { + if (offset[i] < withinOffset[i]) + { + auto delta = withinOffset[i] - offset[i]; + offset[i] = withinOffset[i]; + if (delta > extent[i]) + { + extent[i] = 0; + } + else + { + extent[i] -= delta; + } + } + auto totalExtent = extent[i] + offset[i]; + auto totalWithinExtent = withinExtent[i] + withinOffset[i]; + if (totalExtent > totalWithinExtent) + { + auto delta = totalExtent - totalWithinExtent; + if (delta > extent[i]) + { + extent[i] = 0; + } + else + { + extent[i] -= delta; + } + } + } + } + + struct SizedChunk + { + WrittenChunkInfo chunk; + size_t dataSize; + + SizedChunk(WrittenChunkInfo chunk_in, size_t dataSize_in) + : chunk(std::move(chunk_in)), dataSize(dataSize_in) + {} + }; + + /** + * @brief Slice chunks to a maximum size and sort those by size. + * + * Chunks are sliced into hyperslabs along a specified dimension. + * Returned chunks may be larger than the specified maximum size + * if hyperslabs of thickness 1 are larger than that size. + * + * @param table Chunks of arbitrary sizes. + * @param maxSize The maximum size that returned chunks should have. + * @param dimension The dimension along which to create hyperslabs. + */ + std::vector splitToSizeSorted( + ChunkTable const &table, size_t maxSize, size_t const dimension = 0) + { + std::vector res; + for (auto const &chunk : table) + { + auto const &extent = chunk.extent; + size_t sliceSize = 1; + for (size_t i = 0; i < extent.size(); ++i) + { + if (i == dimension) + { + continue; + } + sliceSize *= extent[i]; + } + if (sliceSize == 0) + { + std::cerr << "Chunktable::splitToSizeSorted: encountered " + "zero-sized chunk" + << std::endl; + continue; + } + + // this many slices go in one packet before it exceeds the max + // size + size_t streakLength = maxSize / sliceSize; + if (streakLength == 0) + { + // otherwise we get caught in an endless loop + ++streakLength; + } + size_t const slicedDimensionExtent = extent[dimension]; + + for (size_t currentPosition = 0;; + currentPosition += streakLength) + { + WrittenChunkInfo newChunk = chunk; + newChunk.offset[dimension] += currentPosition; + if (currentPosition + streakLength >= slicedDimensionExtent) + { + newChunk.extent[dimension] = + slicedDimensionExtent - currentPosition; + size_t chunkSize = + newChunk.extent[dimension] * sliceSize; + res.emplace_back(std::move(newChunk), chunkSize); + break; + } + else + { + newChunk.extent[dimension] = streakLength; + res.emplace_back( + std::move(newChunk), streakLength * sliceSize); + } + } + } + std::sort( + res.begin(), + res.end(), + [](SizedChunk const &left, SizedChunk const &right) { + return right.dataSize < left.dataSize; // decreasing order + }); + return res; + } + } // namespace + + Assignment ByCuboidSlice::assign( + PartialAssignment res, RankMeta const &, RankMeta const &) + { + ChunkTable &sourceSide = res.notAssigned; + Assignment &sinkSide = res.assigned; + Offset myOffset; + Extent myExtent; + std::tie(myOffset, myExtent) = + blockSlicer->sliceBlock(totalExtent, mpi_size, mpi_rank); + + for (auto &chunk : sourceSide) + { + restrictToSelection(chunk.offset, chunk.extent, myOffset, myExtent); + for (auto ext : chunk.extent) + { + if (ext == 0) + { + goto outer_loop; + } + } + sinkSide[mpi_rank].push_back(std::move(chunk)); + outer_loop:; + } + + return res.assigned; + } + + std::unique_ptr ByCuboidSlice::clone() const + { + return std::unique_ptr(new ByCuboidSlice( + blockSlicer->clone(), totalExtent, mpi_rank, mpi_size)); + } + + BinPacking::BinPacking(size_t splitAlongDimension_in) + : splitAlongDimension(splitAlongDimension_in) + {} + + Assignment BinPacking::assign( + PartialAssignment res, RankMeta const &, RankMeta const &sinkRanks) + { + ChunkTable &sourceChunks = res.notAssigned; + Assignment &sinkChunks = res.assigned; + size_t totalExtent = 0; + for (auto const &chunk : sourceChunks) + { + size_t chunkExtent = 1; + for (auto ext : chunk.extent) + { + chunkExtent *= ext; + } + totalExtent += chunkExtent; + } + size_t const idealSize = totalExtent / sinkRanks.size(); + /* + * Split chunks into subchunks of size at most idealSize. + * The resulting list of chunks is sorted by chunk size in decreasing + * order. This is important for the greedy Bin-Packing approximation + * algorithm. + * Under sub-ideal circumstances, chunks may not be splittable small + * enough. This algorithm will still produce results just fine in that + * case, but it will not keep the factor-2 approximation. + */ + std::vector digestibleChunks = + splitToSizeSorted(sourceChunks, idealSize, splitAlongDimension); + + /* + * Worker lambda: Iterate the reading processes once and greedily assign + * the largest chunks to them without exceeding idealSize amount of + * data per process. + */ + auto worker = + [&sinkRanks, &digestibleChunks, &sinkChunks, idealSize]() { + for (auto const &destRank : sinkRanks) + { + /* + * Within the second call of the worker lambda, this will + * not be true any longer, strictly speaking. The trick of + * this algorithm is to pretend that it is. + */ + size_t leftoverSize = idealSize; + { + auto it = digestibleChunks.begin(); + while (it != digestibleChunks.end()) + { + if (it->dataSize >= idealSize) + { + /* + * This branch is only taken if it was not + * possible to slice chunks small enough -- or + * exactly the right size. In any case, the + * chunk will be the only one assigned to the + * process within this call of the worker + * lambda, so the loop can be broken out of. + */ + sinkChunks[destRank.first].push_back( + std::move(it->chunk)); + digestibleChunks.erase(it); + break; + } + else if (it->dataSize <= leftoverSize) + { + // assign smaller chunks as long as they fit + sinkChunks[destRank.first].push_back( + std::move(it->chunk)); + leftoverSize -= it->dataSize; + it = digestibleChunks.erase(it); + } + else + { + // look for smaller chunks + ++it; + } + } + } + } + }; + // sic! + // run the worker twice to implement a factor-two approximation + // of the bin packing problem + worker(); + worker(); + /* + * By the nature of the greedy approach, each iteration of the outer + * for loop in the worker assigns chunks to the current rank that sum + * up to at least more than half of the allowed idealSize. (Until it + * runs out of chunks). + * This means that calling the worker twice guarantees a full + * distribution. + */ + + return sinkChunks; + } + + std::unique_ptr BinPacking::clone() const + { + return std::unique_ptr(new BinPacking(splitAlongDimension)); + } + + FailingStrategy::FailingStrategy() = default; + + Assignment FailingStrategy::assign( + PartialAssignment assignment, RankMeta const &, RankMeta const &) + { + if (assignment.notAssigned.empty()) + { + return assignment.assigned; + } + else + { + throw std::runtime_error( + "[FailingStrategy] There are unassigned chunks!"); + } + } + + std::unique_ptr FailingStrategy::clone() const + { + return std::make_unique(); + } +} // namespace chunk_assignment + namespace host_info { constexpr size_t MAX_HOSTNAME_LENGTH = 256; diff --git a/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp b/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp index e494b175de..7fbb734faa 100644 --- a/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp +++ b/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp @@ -72,4 +72,9 @@ OneDimensionalBlockSlicer::sliceBlock(Extent &totalExtent, int size, int rank) } return std::make_pair(std::move(offs), std::move(localExtent)); } + +std::unique_ptr OneDimensionalBlockSlicer::clone() const +{ + return std::unique_ptr(new OneDimensionalBlockSlicer(m_dim)); +} } // namespace openPMD From dc814c7c143ade5486fbc09d53b045f8073536f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 2 Mar 2023 13:46:10 +0100 Subject: [PATCH 02/11] Python bindings: Chunk distribution algorithms --- include/openPMD/binding/python/Common.hpp | 10 +++ src/binding/python/ChunkInfo.cpp | 89 +++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/include/openPMD/binding/python/Common.hpp b/include/openPMD/binding/python/Common.hpp index 7b42b919d8..7a94351f62 100644 --- a/include/openPMD/binding/python/Common.hpp +++ b/include/openPMD/binding/python/Common.hpp @@ -8,6 +8,7 @@ */ #pragma once +#include "openPMD/ChunkInfo.hpp" #include "openPMD/Iteration.hpp" #include "openPMD/Mesh.hpp" #include "openPMD/ParticlePatches.hpp" @@ -29,6 +30,15 @@ // not yet used: // pybind11/functional.h // for std::function +using PyVecChunkInfo = std::vector; + +PYBIND11_MAKE_OPAQUE(openPMD::ChunkInfo) +PYBIND11_MAKE_OPAQUE(PyVecChunkInfo) +PYBIND11_MAKE_OPAQUE(openPMD::WrittenChunkInfo) +PYBIND11_MAKE_OPAQUE(openPMD::ChunkTable) +PYBIND11_MAKE_OPAQUE(openPMD::chunk_assignment::Assignment) +PYBIND11_MAKE_OPAQUE(openPMD::chunk_assignment::PartialAssignment) + // used exclusively in all our Python .cpp files namespace py = pybind11; using namespace openPMD; diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index a392cdd3e2..cda8de6653 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -19,12 +19,14 @@ * If not, see . */ #include "openPMD/ChunkInfo.hpp" +#include "openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp" #include "openPMD/binding/python/Mpi.hpp" #include "openPMD/binding/python/Common.hpp" #include #include +#include // std::move void init_Chunk(py::module &m) { @@ -100,4 +102,91 @@ void init_Chunk(py::module &m) return host_info::byMethod(self); }) .def("available", &host_info::methodAvailable); + + + using namespace chunk_assignment; + + (void)py::class_(m, "PartialStrategy"); + + py::class_(m, "PartialStrategy") + .def( + "assign", + py::overload_cast( + &PartialStrategy::assign), + py::arg("chunk_table"), + py::arg("rank_meta_in") = RankMeta(), + py::arg("rank_meta_out") = RankMeta()) + .def( + "assign", + py::overload_cast< + PartialAssignment, + RankMeta const &, + RankMeta const &>(&PartialStrategy::assign), + py::arg("partial_assignment"), + py::arg("rank_meta_in") = RankMeta(), + py::arg("rank_meta_out") = RankMeta()); + + py::class_(m, "Strategy") + .def( + "assign", + py::overload_cast( + &Strategy::assign), + py::arg("chunk_table"), + py::arg("rank_meta_in") = RankMeta(), + py::arg("rank_meta_out") = RankMeta()) + .def( + "assign", + py::overload_cast< + PartialAssignment, + RankMeta const &, + RankMeta const &>(&Strategy::assign), + py::arg("partial_assignment"), + py::arg("rank_meta_in") = RankMeta(), + py::arg("rank_meta_out") = RankMeta()); + + py::class_(m, "FromPartialStrategy") + .def(py::init([](PartialStrategy const &firstPass, + Strategy const &secondPass) { + return FromPartialStrategy(firstPass.clone(), secondPass.clone()); + })); + + py::class_(m, "RoundRobin").def(py::init<>()); + + py::class_(m, "ByHostname") + .def( + py::init([](Strategy const &withinNode) { + return ByHostname(withinNode.clone()); + }), + py::arg("strategy_within_node")); + + (void)py::class_(m, "BlockSlicer"); + + py::class_( + m, "OneDimensionalBlockSlicer") + .def(py::init<>()) + .def(py::init(), py::arg("dim")); + + py::class_(m, "ByCuboidSlice") + .def( + py::init([](BlockSlicer const &blockSlicer, + Extent totalExtent, + unsigned int mpi_rank, + unsigned int mpi_size) { + return ByCuboidSlice( + blockSlicer.clone(), + std::move(totalExtent), + mpi_rank, + mpi_size); + }), + py::arg("block_slicer"), + py::arg("total_extent"), + py::arg("mpi_rank"), + py::arg("mpi_size")); + + py::class_(m, "BinPacking") + .def(py::init<>()) + .def(py::init(), py::arg("split_along_dimension")); + + py::class_(m, "FailingStrategy") + .def(py::init<>()); } From c2ce2b8047ef2022de1b2a88ea757a6a6b8409cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 2 Mar 2023 13:47:06 +0100 Subject: [PATCH 03/11] Use chunk distribtion algorithms in openpmd-pipe --- .../python/openpmd_api/pipe/__main__.py | 145 +++++++++--------- 1 file changed, 70 insertions(+), 75 deletions(-) diff --git a/src/binding/python/openpmd_api/pipe/__main__.py b/src/binding/python/openpmd_api/pipe/__main__.py index 7bd5305c56..96802530b0 100644 --- a/src/binding/python/openpmd_api/pipe/__main__.py +++ b/src/binding/python/openpmd_api/pipe/__main__.py @@ -10,6 +10,7 @@ """ import argparse import os # os.path.basename +import re import sys # sys.stderr.write from .. import openpmd_api_cxx as io @@ -39,8 +40,14 @@ def parse_args(program_name): By default, the openPMD-api will be initialized without an MPI communicator if the MPI size is 1. This is to simplify the use of the JSON backend which is only available in serial openPMD. -With parallelization enabled, each dataset will be equally sliced along -the dimension with the largest extent. +With parallelization enabled, each dataset will be equally sliced according to +a chunk distribution strategy which may be selected via the environment +variable OPENPMD_CHUNK_DISTRIBUTION. Options include "roundrobin", +"binpacking", "slicedataset" and "hostname_<1>_<2>", where <1> should be +replaced with a strategy to be applied within a compute node and <2> with a +secondary strategy in case the hostname strategy does not distribute +all chunks. +The default is `hostname_binpacking_slicedataset`. Examples: {0} --infile simData.h5 --outfile simData_%T.bp @@ -99,65 +106,6 @@ def __init__(self): self.rank = 0 -class Chunk: - """ - A Chunk is an n-dimensional hypercube, defined by an offset and an extent. - Offset and extent must be of the same dimensionality (Chunk.__len__). - """ - def __init__(self, offset, extent): - assert (len(offset) == len(extent)) - self.offset = offset - self.extent = extent - - def __len__(self): - return len(self.offset) - - def slice1D(self, mpi_rank, mpi_size, dimension=None): - """ - Slice this chunk into mpi_size hypercubes along one of its - n dimensions. The dimension is given through the 'dimension' - parameter. If None, the dimension with the largest extent on - this hypercube is automatically picked. - Returns the mpi_rank'th of the sliced chunks. - """ - if dimension is None: - # pick that dimension which has the highest count of items - dimension = 0 - maximum = self.extent[0] - for k, v in enumerate(self.extent): - if v > maximum: - dimension = k - assert (dimension < len(self)) - # no offset - assert (self.offset == [0 for _ in range(len(self))]) - offset = [0 for _ in range(len(self))] - stride = self.extent[dimension] // mpi_size - rest = self.extent[dimension] % mpi_size - - # local function f computes the offset of a rank - # for more equal balancing, we want the start index - # at the upper gaussian bracket of (N/n*rank) - # where N the size of the dataset in dimension dim - # and n the MPI size - # for avoiding integer overflow, this is the same as: - # (N div n)*rank + round((N%n)/n*rank) - def f(rank): - res = stride * rank - padDivident = rest * rank - pad = padDivident // mpi_size - if pad * mpi_size < padDivident: - pad += 1 - return res + pad - - offset[dimension] = f(mpi_rank) - extent = self.extent.copy() - if mpi_rank >= mpi_size - 1: - extent[dimension] -= offset[dimension] - else: - extent[dimension] = f(mpi_rank + 1) - offset[dimension] - return Chunk(offset, extent) - - class deferred_load: def __init__(self, source, dynamicView, offset, extent): self.source = source @@ -166,6 +114,42 @@ def __init__(self, source, dynamicView, offset, extent): self.extent = extent +def distribution_strategy(dataset_extent, + mpi_rank, + mpi_size, + strategy_identifier=None): + if strategy_identifier is None or not strategy_identifier: + if 'OPENPMD_CHUNK_DISTRIBUTION' in os.environ: + strategy_identifier = os.environ[ + 'OPENPMD_CHUNK_DISTRIBUTION'].lower() + else: + strategy_identifier = 'hostname_binpacking_slicedataset' # default + match = re.search('hostname_(.*)_(.*)', strategy_identifier) + if match is not None: + inside_node = distribution_strategy(dataset_extent, + mpi_rank, + mpi_size, + strategy_identifier=match.group(1)) + second_phase = distribution_strategy( + dataset_extent, + mpi_rank, + mpi_size, + strategy_identifier=match.group(2)) + return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase) + elif strategy_identifier == 'roundrobin': + return io.RoundRobin() + elif strategy_identifier == 'binpacking': + return io.BinPacking() + elif strategy_identifier == 'slicedataset': + return io.ByCuboidSlice(io.OneDimensionalBlockSlicer(), dataset_extent, + mpi_rank, mpi_size) + elif strategy_identifier == 'fail': + return io.FailingStrategy() + else: + raise RuntimeError("Unknown distribution strategy: " + + strategy_identifier) + + class pipe: """ Represents the configuration of one "pipe" pass. @@ -177,6 +161,11 @@ def __init__(self, infile, outfile, inconfig, outconfig, comm): self.outconfig = outconfig self.loads = [] self.comm = comm + if HAVE_MPI: + hostinfo = io.HostInfo.MPI_PROCESSOR_NAME + self.outranks = hostinfo.get_collective(self.comm) + else: + self.outranks = {i: str(i) for i in range(self.comm.size)} def run(self): if not HAVE_MPI or (args.mpi is None and self.comm.size == 1): @@ -268,6 +257,9 @@ def __copy(self, src, dest, current_path="/data/"): print("With records:") for r in in_iteration.particles[ps]: print("\t {0}".format(r)) + # With linear read mode, we can only load the source rank table + # inside `read_iterations()` since it's a dataset. + self.inranks = src.get_rank_table(collective=True) out_iteration = write_iterations[in_iteration.iteration_index] sys.stdout.flush() self.__copy( @@ -284,7 +276,6 @@ def __copy(self, src, dest, current_path="/data/"): elif isinstance(src, io.Record_Component) and (not is_container or src.scalar): shape = src.shape - offset = [0 for _ in shape] dtype = src.dtype dest.reset_dataset(io.Dataset(dtype, shape)) if src.empty: @@ -294,19 +285,23 @@ def __copy(self, src, dest, current_path="/data/"): elif src.constant: dest.make_constant(src.get_attribute("value")) else: - chunk = Chunk(offset, shape) - local_chunk = chunk.slice1D(self.comm.rank, self.comm.size) - if debug: - end = local_chunk.offset.copy() - for i in range(len(end)): - end[i] += local_chunk.extent[i] - print("{}\t{}/{}:\t{} -- {}".format( - current_path, self.comm.rank, self.comm.size, - local_chunk.offset, end)) - span = dest.store_chunk(local_chunk.offset, local_chunk.extent) - self.loads.append( - deferred_load(src, span, local_chunk.offset, - local_chunk.extent)) + chunk_table = src.available_chunks() + strategy = distribution_strategy(shape, self.comm.rank, + self.comm.size) + my_chunks = strategy.assign(chunk_table, self.inranks, + self.outranks) + for chunk in my_chunks[ + self.comm.rank] if self.comm.rank in my_chunks else []: + if debug: + end = chunk.offset.copy() + for i in range(len(end)): + end[i] += chunk.extent[i] + print("{}\t{}/{}:\t{} -- {}".format( + current_path, self.comm.rank, self.comm.size, + chunk.offset, end)) + span = dest.store_chunk(chunk.offset, chunk.extent) + self.loads.append( + deferred_load(src, span, chunk.offset, chunk.extent)) elif isinstance(src, io.Iteration): self.__copy(src.meshes, dest.meshes, current_path + "meshes/") self.__copy(src.particles, dest.particles, From e084ac56d42e447e2ca436d02d628120fcf18877 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 2 Mar 2023 13:49:13 +0100 Subject: [PATCH 04/11] Testing --- CMakeLists.txt | 4 +- test/CoreTest.cpp | 88 +++++++++++ test/ParallelIOTest.cpp | 331 ++++++++++++++++++++++++++++++++++------ 3 files changed, 376 insertions(+), 47 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c17631c6cb..e5f00357dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1362,7 +1362,7 @@ if(openPMD_BUILD_TESTING) ) add_test(NAME CLI.pipe.py COMMAND sh -c - "${MPI_TEST_EXE} ${Python_EXECUTABLE} \ + "${MPI_TEST_EXE} -n 2 ${Python_EXECUTABLE} \ ${openPMD_RUNTIME_OUTPUT_DIRECTORY}/openpmd-pipe \ --infile ../samples/git-sample/data%T.h5 \ --outfile ../samples/git-sample/data%T.bp && \ @@ -1373,7 +1373,7 @@ if(openPMD_BUILD_TESTING) --outfile \ ../samples/git-sample/single_iteration_%T.bp && \ \ - ${MPI_TEST_EXE} ${Python_EXECUTABLE} \ + ${MPI_TEST_EXE} -n 2 ${Python_EXECUTABLE} \ ${openPMD_RUNTIME_OUTPUT_DIRECTORY}/openpmd-pipe \ --infile ../samples/git-sample/thetaMode/data%T.h5 \ --outfile \ diff --git a/test/CoreTest.cpp b/test/CoreTest.cpp index 17739e0b28..28577a7306 100644 --- a/test/CoreTest.cpp +++ b/test/CoreTest.cpp @@ -5,6 +5,8 @@ #define OPENPMD_private public: #define OPENPMD_protected public: #endif + +#include "openPMD/ChunkInfo.hpp" #include "openPMD/openPMD.hpp" #include "openPMD/IO/ADIOS/macros.hpp" @@ -30,6 +32,92 @@ using namespace openPMD; Dataset globalDataset(Datatype::CHAR, {1}); +namespace test_chunk_assignment +{ +using namespace openPMD::chunk_assignment; +struct Params +{ + ChunkTable table; + RankMeta metaSource; + RankMeta metaSink; + + void init( + size_t sourceRanks, + size_t sinkRanks, + size_t in_per_host, + size_t out_per_host) + { + for (size_t rank = 0; rank < sourceRanks; ++rank) + { + table.emplace_back(Offset{rank, rank}, Extent{rank, rank}, rank); + table.emplace_back( + Offset{rank, 100 * rank}, Extent{rank, 100 * rank}, rank); + metaSource.emplace(rank, std::to_string(rank / in_per_host)); + } + for (size_t rank = 0; rank < sinkRanks; ++rank) + { + metaSink.emplace(rank, std::to_string(rank / out_per_host)); + } + } +}; +void print(RankMeta const &meta, ChunkTable const &table) +{ + for (auto const &chunk : table) + { + std::cout << "[HOST: " << meta.at(chunk.sourceID) + << ",\tRank: " << chunk.sourceID << ",\tOffset: "; + for (auto offset : chunk.offset) + { + std::cout << offset << ", "; + } + std::cout << "\tExtent: "; + for (auto extent : chunk.extent) + { + std::cout << extent << ", "; + } + std::cout << "]" << std::endl; + } +} +void print(RankMeta const &meta, Assignment const &table) +{ + for (auto &[rank, chunkList] : table) + { + std::cout << "[HOST: " << meta.at(rank) << ",\tRank: " << rank << "]" + << std::endl; + for (auto const &chunk : chunkList) + { + std::cout << "\t[Offset: "; + for (auto offset : chunk.offset) + { + std::cout << offset << ", "; + } + std::cout << "\tExtent: "; + for (auto extent : chunk.extent) + { + std::cout << extent << ", "; + } + std::cout << "]" << std::endl; + } + } +} +} // namespace test_chunk_assignment + +TEST_CASE("chunk_assignment", "[core]") +{ + using namespace chunk_assignment; + test_chunk_assignment::Params params; + params.init(6, 2, 2, 1); + test_chunk_assignment::print(params.metaSource, params.table); + ByHostname byHostname(std::make_unique()); + FromPartialStrategy fullStrategy( + std::make_unique(std::move(byHostname)), + std::make_unique()); + Assignment res = + fullStrategy.assign(params.table, params.metaSource, params.metaSink); + std::cout << "\nRESULTS:" << std::endl; + test_chunk_assignment::print(params.metaSink, res); +} + TEST_CASE("versions_test", "[core]") { auto const apiVersion = getVersion(); diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index c6d90d773e..919134b64f 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -6,6 +6,8 @@ #include "openPMD/auxiliary/Environment.hpp" #include "openPMD/auxiliary/Filesystem.hpp" #include "openPMD/openPMD.hpp" +// @todo change includes +#include "openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp" #include #if openPMD_HAVE_MPI @@ -1190,6 +1192,53 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]") } #endif +#if openPMD_HAVE_MPI +TEST_CASE("unavailable_backend", "[core][parallel]") +{ +#if !openPMD_HAVE_ADIOS2 + { + auto fail = []() { + Series( + "unavailable.bp", + Access::CREATE, + MPI_COMM_WORLD, + R"({"backend": "ADIOS2"})"); + }; + REQUIRE_THROWS_WITH( + fail(), + "Wrong API usage: openPMD-api built without support for backend " + "'ADIOS2'."); + } +#endif +#if !openPMD_HAVE_ADIOS2 + { + auto fail = []() { + Series("unavailable.bp", Access::CREATE, MPI_COMM_WORLD); + }; + REQUIRE_THROWS_WITH( + fail(), + "Wrong API usage: openPMD-api built without support for backend " + "'ADIOS2'."); + } +#endif +#if !openPMD_HAVE_HDF5 + { + auto fail = []() { + Series( + "unavailable.h5", + Access::CREATE, + MPI_COMM_WORLD, + R"({"backend": "HDF5"})"); + }; + REQUIRE_THROWS_WITH( + fail(), + "Wrong API usage: openPMD-api built without support for backend " + "'HDF5'."); + } +#endif +} +#endif + #if openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI void adios2_streaming(bool variableBasedLayout) @@ -1887,51 +1936,6 @@ TEST_CASE("append_mode", "[serial]") } } -TEST_CASE("unavailable_backend", "[core][parallel]") -{ -#if !openPMD_HAVE_ADIOS2 - { - auto fail = []() { - Series( - "unavailable.bp", - Access::CREATE, - MPI_COMM_WORLD, - R"({"backend": "ADIOS2"})"); - }; - REQUIRE_THROWS_WITH( - fail(), - "Wrong API usage: openPMD-api built without support for backend " - "'ADIOS2'."); - } -#endif -#if !openPMD_HAVE_ADIOS2 - { - auto fail = []() { - Series("unavailable.bp", Access::CREATE, MPI_COMM_WORLD); - }; - REQUIRE_THROWS_WITH( - fail(), - "Wrong API usage: openPMD-api built without support for backend " - "'ADIOS2'."); - } -#endif -#if !openPMD_HAVE_HDF5 - { - auto fail = []() { - Series( - "unavailable.h5", - Access::CREATE, - MPI_COMM_WORLD, - R"({"backend": "HDF5"})"); - }; - REQUIRE_THROWS_WITH( - fail(), - "Wrong API usage: openPMD-api built without support for backend " - "'HDF5'."); - } -#endif -} - void joined_dim(std::string const &ext) { using type = float; @@ -2202,4 +2206,241 @@ TEST_CASE("adios2_flush_via_step") } #endif +void adios2_chunk_distribution() +{ + /* + * This test simulates a multi-node streaming setup in order to test some + * of our chunk distribution strategies. + * We don't actually stream (but write a .bp file instead) and also we don't + * actually run anything on multiple nodes, but we can use this for testing + * the distribution strategies anyway. + */ + int mpi_size{-1}; + int mpi_rank{-1}; + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + + /* + * Mappings: MPI rank -> hostname where the rank is executed. + * For the writing application as well as for the reading one. + */ + chunk_assignment::RankMeta writingRanksHostnames, readingRanksHostnames; + for (int i = 0; i < mpi_size; ++i) + { + /* + * The mapping is intentionally weird. Nodes "node1", "node3", ... + * do not have instances of the reading application running on them. + * Our distribution strategies will need to deal with that situation. + */ + // 0, 0, 1, 1, 2, 2, 3, 3 ... + writingRanksHostnames[i] = "node" + std::to_string(i / 2); + // 0, 0, 0, 0, 2, 2, 2, 2 ... + readingRanksHostnames[i] = "node" + std::to_string(i / 4 * 2); + } + + std::string filename = "../samples/adios2_chunk_distribution.bp"; + // Simulate a stream: BP4 assigns chunk IDs by subfile (i.e. aggregator). + std::stringstream parameters; + parameters << R"END( +{ + "adios2": + { + "engine": + { + "type": "bp4", + "parameters": + { + "NumAggregators":)END" + << "\"" << std::to_string(mpi_size) << "\"" + << R"END( + } + } + } +} +)END"; + + auto printChunktable = [mpi_rank]( + std::string const &strategyName, + ChunkTable const &table, + chunk_assignment::RankMeta const &meta) { + if (mpi_rank != 0) + { + return; + } + std::cout << "WITH STRATEGY '" << strategyName << "':\n"; + for (auto const &chunk : table) + { + std::cout << "[HOST: " << meta.at(chunk.sourceID) + << ",\tRank: " << chunk.sourceID << ",\tOffset: "; + for (auto offset : chunk.offset) + { + std::cout << offset << ", "; + } + std::cout << "\tExtent: "; + for (auto extent : chunk.extent) + { + std::cout << extent << ", "; + } + std::cout << "]" << std::endl; + } + }; + + auto printAssignment = [mpi_rank]( + std::string const &strategyName, + chunk_assignment::Assignment const &table, + chunk_assignment::RankMeta const &meta) { + if (mpi_rank != 0) + { + return; + } + std::cout << "WITH STRATEGY '" << strategyName << "':\n"; + for (auto &[rank, chunkList] : table) + { + std::cout << "[HOST: " << meta.at(rank) << ",\tRank: " << rank + << "]" << std::endl; + for (auto const &chunk : chunkList) + { + std::cout << "\t[Source rank: " << chunk.sourceID + << "\tOffset: "; + for (auto offset : chunk.offset) + { + std::cout << offset << ", "; + } + std::cout << "\tExtent: "; + for (auto extent : chunk.extent) + { + std::cout << extent << ", "; + } + std::cout << "]" << std::endl; + } + } + }; + + // Create a dataset. + { + Series series( + filename, + openPMD::Access::CREATE, + MPI_COMM_WORLD, + parameters.str()); + /* + * The writing application sets an attribute that tells the reading + * application about the "MPI rank -> hostname" mapping. + * Each rank only needs to set its own value. + * (Some other options like setting all at once or reading from a file + * exist as well.) + */ + series.setRankTable(writingRanksHostnames.at(mpi_rank)); + + auto E_x = series.iterations[0].meshes["E"]["x"]; + openPMD::Dataset ds(openPMD::Datatype::INT, {unsigned(mpi_size), 10}); + E_x.resetDataset(ds); + std::vector data(10, 0); + std::iota(data.begin(), data.end(), 0); + E_x.storeChunk(data, {unsigned(mpi_rank), 0}, {1, 10}); + series.flush(); + } + + { + Series series(filename, openPMD::Access::READ_ONLY, MPI_COMM_WORLD); + /* + * Inquire the writing application's "MPI rank -> hostname" mapping. + * The reading application needs to know about its own mapping. + * Having both of these mappings is the basis for an efficient chunk + * distribution since we can use it to figure out which instances + * are running on the same nodes. + */ + auto rankMetaIn = series.rankTable(/* collective = */ true); + REQUIRE(rankMetaIn == writingRanksHostnames); + + auto E_x = series.iterations[0].meshes["E"]["x"]; + /* + * Ask the backend which chunks are available. + */ + auto const chunkTable = E_x.availableChunks(); + + printChunktable("INPUT", chunkTable, rankMetaIn); + + using namespace chunk_assignment; + + /* + * Assign the chunks by distributing them one after the other to reading + * ranks. Easy, but not particularly efficient. + */ + RoundRobin roundRobinStrategy; + auto roundRobinAssignment = roundRobinStrategy.assign( + chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "ROUND ROBIN", roundRobinAssignment, readingRanksHostnames); + + /* + * Assign chunks by hostname. + * Two difficulties: + * * A distribution strategy within one node needs to be picked. + * We pick the BinPacking strategy that tries to assign chunks in a + * balanced manner. Since our chunks have a small extent along + * dimension 0, use dimension 1 for slicing. + * * The assignment is partial since some nodes only have instances of + * the writing application. Those chunks remain unassigned. + */ + ByHostname byHostname( + std::make_unique(/* splitAlongDimension = */ 1)); + auto byHostnamePartialAssignment = + byHostname.assign(chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "HOSTNAME, ASSIGNED", + byHostnamePartialAssignment.assigned, + readingRanksHostnames); + printChunktable( + "HOSTNAME, LEFTOVER", + byHostnamePartialAssignment.notAssigned, + rankMetaIn); + + /* + * Assign chunks by hostnames, once more. + * This time, apply a secondary distribution strategy to assign + * leftovers. We pick BinPacking, once more. + * Notice that the BinPacking strategy does not (yet) take into account + * chunks that have been assigned by the first round. + * Balancing is calculated solely based on the leftover chunks from the + * first round. + */ + FromPartialStrategy fromPartialStrategy( + std::make_unique(std::move(byHostname)), + std::make_unique(/* splitAlongDimension = */ 1)); + auto fromPartialAssignment = fromPartialStrategy.assign( + chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "HOSTNAME WITH SECOND PASS", + fromPartialAssignment, + readingRanksHostnames); + + /* + * Assign chunks by slicing the n-dimensional physical domain and + * intersecting those slices with the available chunks from the backend. + * Notice that this strategy only returns the chunks that the currently + * running rank is supposed to load, whereas the other strategies return + * a chunk table containing all chunks that all ranks will load. + * In principle, a chunk_assignment::Strategy only needs to return the + * chunks that the current rank should load, but is free to emplace the + * other chunks for other reading ranks as well. + * (Reasoning: In some strategies, calculating everything is necessary, + * in others such as this one, it's an unneeded overhead.) + */ + ByCuboidSlice cuboidSliceStrategy( + std::make_unique(1), + E_x.getExtent(), + mpi_rank, + mpi_size); + auto cuboidSliceAssignment = cuboidSliceStrategy.assign( + chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "CUBOID SLICE", cuboidSliceAssignment, readingRanksHostnames); + } +} + +TEST_CASE("adios2_chunk_distribution", "[parallel][adios2]") +{ + adios2_chunk_distribution(); +} #endif // openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI From 8d49ad83678ca493abdd075293bfb238bff16d8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 3 Mar 2023 15:09:57 +0100 Subject: [PATCH 05/11] Add DiscardingStrategy --- include/openPMD/ChunkInfo.hpp | 21 +++++++++++++++++++++ src/ChunkInfo.cpp | 13 +++++++++++++ src/binding/python/ChunkInfo.cpp | 3 +++ 3 files changed, 37 insertions(+) diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index b44379b2aa..54454b4f13 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -322,6 +322,27 @@ namespace chunk_assignment virtual std::unique_ptr clone() const override; }; + + /** + * @brief Strategy that purposefully discards leftover chunk from + * the PartialAssignment. + * + * Useful as second phase in FromPartialStrategy when knowing that some + * chunks will go unassigned, but still wanting to communicate only within + * the same node. + * + */ + struct DiscardingStrategy : Strategy + { + explicit DiscardingStrategy(); + + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + }; } // namespace chunk_assignment namespace host_info diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index 7b6c1e32ca..190bc8012c 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -552,6 +552,19 @@ namespace chunk_assignment { return std::make_unique(); } + + DiscardingStrategy::DiscardingStrategy() = default; + + Assignment DiscardingStrategy::assign( + PartialAssignment assignment, RankMeta const &, RankMeta const &) + { + return assignment.assigned; + } + + std::unique_ptr DiscardingStrategy::clone() const + { + return std::make_unique(); + } } // namespace chunk_assignment namespace host_info diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index cda8de6653..4391316f37 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -189,4 +189,7 @@ void init_Chunk(py::module &m) py::class_(m, "FailingStrategy") .def(py::init<>()); + + py::class_(m, "DiscardingStrategy") + .def(py::init<>()); } From dfd9850d642f191a1fa37739a427249dda1132e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 15 Mar 2023 14:56:50 +0100 Subject: [PATCH 06/11] Make Strategy class extensible from Python @todo Why do we need to increase the refcount twice?? --- .../openPMD/backend/BaseRecordComponent.hpp | 1 + src/binding/python/ChunkInfo.cpp | 158 +++++++++++++++++- .../python/openpmd_api/pipe/__main__.py | 33 ++++ 3 files changed, 189 insertions(+), 3 deletions(-) diff --git a/include/openPMD/backend/BaseRecordComponent.hpp b/include/openPMD/backend/BaseRecordComponent.hpp index fe4490830d..a871d67bcf 100644 --- a/include/openPMD/backend/BaseRecordComponent.hpp +++ b/include/openPMD/backend/BaseRecordComponent.hpp @@ -20,6 +20,7 @@ */ #pragma once +#include "openPMD/ChunkInfo.hpp" #include "openPMD/Dataset.hpp" #include "openPMD/Error.hpp" #include "openPMD/backend/Attributable.hpp" diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index 4391316f37..6992d8fa0d 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -28,8 +28,139 @@ #include #include // std::move +/* + * PyStrategy and PyPartialStrategy are the C++ representations for objects + * created in Python. + * One challenge about these classes is that they cannot be easily copied or + * moved in memory, as the clone will lose the relation to the Python object. + * This class has a clone_impl() method that child classes can use for cloning + * the object and at the same time storing a reference to the original Python + * object. + * The template parameters ChildCpp and ChildPy implement a CRT-like pattern, + * split into a C++ class and a Python trampoline class as documented here: + * https://pybind11.readthedocs.io/en/stable/advanced/classes.html?highlight=trampoline#overriding-virtual-functions-in-python + * + * A typical child instantiation would look like: + * struct ChildPy : ChildCpp, ClonableTrampoline; + */ +template +struct ClonableTrampoline +{ + struct OriginalInstance + { + py::handle pythonObject; + + ~OriginalInstance() + { + pythonObject.dec_ref(); + } + }; + /* + * If the shared pointer is empty, this object is the original object owned + * by Python and the Python handle can be acquired by: + * py::cast(static_cast(this)) + * + * Copied instances will refer to the Python object handle via this member. + * By only storing this member in copied instances, but not in the original + * instance, we avoid a memory cycle and ensure clean destruction. + */ + std::shared_ptr m_originalInstance; + + [[nodiscard]] py::handle get_python_handle() const + { + if (m_originalInstance) + { + // std::cout << "Refcount " + // << m_originalInstance->pythonObject.ref_count() + // << std::endl; + return m_originalInstance->pythonObject; + } + else + { + auto self = static_cast(this); + return py::cast(self); + } + } + + template + Res call_virtual(std::string const &nameOfPythonMethod, Args &&...args) + { + py::gil_scoped_acquire gil; + auto ptr = get_python_handle().template cast(); + auto fun = py::get_override(ptr, nameOfPythonMethod.c_str()); + if (!fun) + { + throw std::runtime_error( + "Virtual method not found. Did you define '" + + nameOfPythonMethod + "' as method in Python?"); + } + auto res = fun(std::forward(args)...); + return py::detail::cast_safe(std::move(res)); + } + + [[nodiscard]] std::unique_ptr clone_impl() const + { + auto self = static_cast(this); + if (m_originalInstance) + { + return std::make_unique(*self); + } + else + { + OriginalInstance oi; + oi.pythonObject = py::cast(self); + // no idea why we would need this twice, but we do + oi.pythonObject.inc_ref(); + oi.pythonObject.inc_ref(); + auto res = std::make_unique(*self); + res->m_originalInstance = + std::make_shared(std::move(oi)); + return res; + } + } +}; + +struct PyStrategy + : chunk_assignment::Strategy + , ClonableTrampoline +{ + chunk_assignment::Assignment assign( + chunk_assignment::PartialAssignment assignment, + chunk_assignment::RankMeta const &in, + chunk_assignment::RankMeta const &out) override + { + return call_virtual( + "assign", std::move(assignment), in, out); + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return clone_impl(); + } +}; + +struct PyPartialStrategy + : chunk_assignment::PartialStrategy + , ClonableTrampoline +{ + chunk_assignment::PartialAssignment assign( + chunk_assignment::PartialAssignment assignment, + chunk_assignment::RankMeta const &in, + chunk_assignment::RankMeta const &out) override + { + return call_virtual( + "assign", std::move(assignment), in, out); + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return clone_impl(); + } +}; + void init_Chunk(py::module &m) { + py::class_(m, "ChunkInfo") .def(py::init(), py::arg("offset"), py::arg("extent")) .def( @@ -40,6 +171,8 @@ void init_Chunk(py::module &m) }) .def_readwrite("offset", &ChunkInfo::offset) .def_readwrite("extent", &ChunkInfo::extent); + py::bind_vector(m, "VectorChunkInfo"); + py::implicitly_convertible>(); py::class_(m, "WrittenChunkInfo") .def(py::init(), py::arg("offset"), py::arg("extent")) .def( @@ -103,12 +236,21 @@ void init_Chunk(py::module &m) }) .def("available", &host_info::methodAvailable); + py::bind_vector(m, "ChunkTable"); using namespace chunk_assignment; - (void)py::class_(m, "PartialStrategy"); + py::bind_map(m, "Assignment"); + + py::class_(m, "PartialAssignment") + .def(py::init<>()) + .def_readwrite("not_assigned", &PartialAssignment::notAssigned) + .def_readwrite("assigned", &PartialAssignment::assigned); + + py::bind_map(m, "RankMeta"); - py::class_(m, "PartialStrategy") + py::class_(m, "PartialStrategy") + .def(py::init<>()) .def( "assign", py::overload_cast( @@ -126,7 +268,8 @@ void init_Chunk(py::module &m) py::arg("rank_meta_in") = RankMeta(), py::arg("rank_meta_out") = RankMeta()); - py::class_(m, "Strategy") + py::class_(m, "Strategy") + .def(py::init<>()) .def( "assign", py::overload_cast( @@ -192,4 +335,13 @@ void init_Chunk(py::module &m) py::class_(m, "DiscardingStrategy") .def(py::init<>()); + + // implicit conversions + { + py::implicitly_convertible(); + py::implicitly_convertible(); + py::implicitly_convertible(); + py::implicitly_convertible(); + py::implicitly_convertible(); + } } diff --git a/src/binding/python/openpmd_api/pipe/__main__.py b/src/binding/python/openpmd_api/pipe/__main__.py index 96802530b0..40e616e3bd 100644 --- a/src/binding/python/openpmd_api/pipe/__main__.py +++ b/src/binding/python/openpmd_api/pipe/__main__.py @@ -114,6 +114,37 @@ def __init__(self, source, dynamicView, offset, extent): self.extent = extent +# Example how to implement a simple partial strategy in Python +class LoadOne(io.PartialStrategy): + def __init__(self, rank): + super().__init__() + self.rank = rank + + def assign(self, assignment, *_): + element = assignment.not_assigned.pop() + if self.rank not in assignment.assigned: + assignment.assigned[self.rank] = [element] + else: + assignment.assigned[self.rank].append(element) + return assignment + + +# Example how to implement a simple strategy in Python +class LoadAll(io.Strategy): + + def __init__(self, rank): + super().__init__() + self.rank = rank + + def assign(self, assignment, *_): + res = assignment.assigned + if self.rank not in res: + res[self.rank] = assignment.not_assigned + else: + res[self.rank].extend(assignment.not_assigned) + return res + + def distribution_strategy(dataset_extent, mpi_rank, mpi_size, @@ -136,6 +167,8 @@ def distribution_strategy(dataset_extent, mpi_size, strategy_identifier=match.group(2)) return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase) + elif strategy_identifier == 'all': + return io.FromPartialStrategy(LoadOne(mpi_rank), LoadAll(mpi_rank)) elif strategy_identifier == 'roundrobin': return io.RoundRobin() elif strategy_identifier == 'binpacking': From 9c5ece701c659178acf56d8d38b92ead15884194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 14 Mar 2023 17:02:20 +0100 Subject: [PATCH 07/11] Make mergeChunks function public --- include/openPMD/ChunkInfo.hpp | 3 + src/ChunkInfo.cpp | 110 ++++++++++++++++++++++++++++++ src/IO/JSON/JSONIOHandlerImpl.cpp | 103 +--------------------------- src/binding/python/ChunkInfo.cpp | 7 +- 4 files changed, 118 insertions(+), 105 deletions(-) diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 54454b4f13..976fac955e 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -91,6 +91,9 @@ namespace chunk_assignment using Assignment = std::map>; + template + void mergeChunks(std::vector &); + struct PartialAssignment { ChunkTable notAssigned; diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index 190bc8012c..d0e10a4e11 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #ifdef _WIN32 @@ -68,6 +69,115 @@ bool WrittenChunkInfo::operator==(WrittenChunkInfo const &other) const namespace chunk_assignment { + namespace + { + /* + * Check whether two chunks can be merged to form a large one + * and optionally return that larger chunk + */ + template + std::optional + mergeChunks(Chunk_t const &chunk1, Chunk_t const &chunk2) + { + /* + * Idea: + * If two chunks can be merged into one, they agree on offsets and + * extents in all but exactly one dimension dim. + * At dimension dim, the offset of chunk 2 is equal to the offset + * of chunk 1 plus its extent -- or vice versa. + */ + unsigned dimensionality = chunk1.extent.size(); + for (unsigned dim = 0; dim < dimensionality; ++dim) + { + Chunk_t const *c1(&chunk1), *c2(&chunk2); + // check if one chunk is the extension of the other at + // dimension dim + // first, let's put things in order + if (c1->offset[dim] > c2->offset[dim]) + { + std::swap(c1, c2); + } + // now, c1 begins at the lower of both offsets + // next check, that both chunks border one another exactly + if (c2->offset[dim] != c1->offset[dim] + c1->extent[dim]) + { + continue; + } + // we've got a candidate + // verify that all other dimensions have equal values + auto equalValues = [dimensionality, dim, c1, c2]() { + for (unsigned j = 0; j < dimensionality; ++j) + { + if (j == dim) + { + continue; + } + if (c1->offset[j] != c2->offset[j] || + c1->extent[j] != c2->extent[j]) + { + return false; + } + } + return true; + }; + if (!equalValues()) + { + continue; + } + // we can merge the chunks + Offset offset(c1->offset); + Extent extent(c1->extent); + extent[dim] += c2->extent[dim]; + return std::make_optional(Chunk_t(offset, extent)); + } + return std::optional(); + } + } // namespace + + /* + * Merge chunks in the chunktable until no chunks are left that can be + * merged. + */ + template + void mergeChunks(std::vector &table) + { + bool stillChanging; + do + { + stillChanging = false; + auto innerLoops = [&table]() { + /* + * Iterate over pairs of chunks in the table. + * When a pair that can be merged is found, merge it, + * delete the original two chunks from the table, + * put the new one in and return. + */ + for (auto i = table.begin(); i < table.end(); ++i) + { + for (auto j = i + 1; j < table.end(); ++j) + { + std::optional merged = mergeChunks(*i, *j); + if (merged) + { + // erase order is important due to iterator + // invalidation + table.erase(j); + table.erase(i); + table.emplace_back(std::move(merged.value())); + return true; + } + } + } + return false; + }; + stillChanging = innerLoops(); + } while (stillChanging); + } + + template void mergeChunks(std::vector &); + template void + mergeChunks(std::vector &); + namespace { std::map > diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index e06aa36ed8..397bb7ca5c 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -498,107 +498,6 @@ namespace } return res; } - - /* - * Check whether two chunks can be merged to form a large one - * and optionally return that larger chunk - */ - std::optional - mergeChunks(WrittenChunkInfo const &chunk1, WrittenChunkInfo const &chunk2) - { - /* - * Idea: - * If two chunks can be merged into one, they agree on offsets and - * extents in all but exactly one dimension dim. - * At dimension dim, the offset of chunk 2 is equal to the offset - * of chunk 1 plus its extent -- or vice versa. - */ - unsigned dimensionality = chunk1.extent.size(); - for (unsigned dim = 0; dim < dimensionality; ++dim) - { - WrittenChunkInfo const *c1(&chunk1), *c2(&chunk2); - // check if one chunk is the extension of the other at - // dimension dim - // first, let's put things in order - if (c1->offset[dim] > c2->offset[dim]) - { - std::swap(c1, c2); - } - // now, c1 begins at the lower of both offsets - // next check, that both chunks border one another exactly - if (c2->offset[dim] != c1->offset[dim] + c1->extent[dim]) - { - continue; - } - // we've got a candidate - // verify that all other dimensions have equal values - auto equalValues = [dimensionality, dim, c1, c2]() { - for (unsigned j = 0; j < dimensionality; ++j) - { - if (j == dim) - { - continue; - } - if (c1->offset[j] != c2->offset[j] || - c1->extent[j] != c2->extent[j]) - { - return false; - } - } - return true; - }; - if (!equalValues()) - { - continue; - } - // we can merge the chunks - Offset offset(c1->offset); - Extent extent(c1->extent); - extent[dim] += c2->extent[dim]; - return std::make_optional(WrittenChunkInfo(offset, extent)); - } - return std::optional(); - } - - /* - * Merge chunks in the chunktable until no chunks are left that can be - * merged. - */ - void mergeChunks(ChunkTable &table) - { - bool stillChanging; - do - { - stillChanging = false; - auto innerLoops = [&table]() { - /* - * Iterate over pairs of chunks in the table. - * When a pair that can be merged is found, merge it, - * delete the original two chunks from the table, - * put the new one in and return. - */ - for (auto i = table.begin(); i < table.end(); ++i) - { - for (auto j = i + 1; j < table.end(); ++j) - { - std::optional merged = - mergeChunks(*i, *j); - if (merged) - { - // erase order is important due to iterator - // invalidation - table.erase(j); - table.erase(i); - table.emplace_back(std::move(merged.value())); - return true; - } - } - } - return false; - }; - stillChanging = innerLoops(); - } while (stillChanging); - } } // namespace void JSONIOHandlerImpl::availableChunks( @@ -608,7 +507,7 @@ void JSONIOHandlerImpl::availableChunks( auto filePosition = setAndGetFilePosition(writable); auto &j = obtainJsonContents(writable)["data"]; *parameters.chunks = chunksInJSON(j); - mergeChunks(*parameters.chunks); + chunk_assignment::mergeChunks(*parameters.chunks); } void JSONIOHandlerImpl::openFile( diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index 6992d8fa0d..550d5d69b2 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -171,8 +171,8 @@ void init_Chunk(py::module &m) }) .def_readwrite("offset", &ChunkInfo::offset) .def_readwrite("extent", &ChunkInfo::extent); - py::bind_vector(m, "VectorChunkInfo"); - py::implicitly_convertible>(); + py::bind_vector(m, "VectorChunkInfo") + .def("merge_chunks", &chunk_assignment::mergeChunks); py::class_(m, "WrittenChunkInfo") .def(py::init(), py::arg("offset"), py::arg("extent")) .def( @@ -236,7 +236,8 @@ void init_Chunk(py::module &m) }) .def("available", &host_info::methodAvailable); - py::bind_vector(m, "ChunkTable"); + py::bind_vector(m, "ChunkTable") + .def("merge_chunks", &chunk_assignment::mergeChunks); using namespace chunk_assignment; From fbd7cac8983a517a6400755a5f51b2c18e1341f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 9 Feb 2024 13:18:33 +0100 Subject: [PATCH 08/11] Add mergeChunksFromSameSourceID --- include/openPMD/ChunkInfo.hpp | 3 +++ src/ChunkInfo.cpp | 15 +++++++++++++++ src/binding/python/ChunkInfo.cpp | 5 ++++- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 976fac955e..6787b49eb8 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -94,6 +94,9 @@ namespace chunk_assignment template void mergeChunks(std::vector &); + auto mergeChunksFromSameSourceID(std::vector const &) + -> std::map>; + struct PartialAssignment { ChunkTable notAssigned; diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index d0e10a4e11..0aa6fb4653 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -174,6 +174,21 @@ namespace chunk_assignment } while (stillChanging); } + auto mergeChunksFromSameSourceID(std::vector const &table) + -> std::map> + { + std::map> sortedBySourceID; + for (auto const &chunk : table) + { + sortedBySourceID[chunk.sourceID].emplace_back(chunk); + } + for (auto &pair : sortedBySourceID) + { + mergeChunks(pair.second); + } + return sortedBySourceID; + } + template void mergeChunks(std::vector &); template void mergeChunks(std::vector &); diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index 550d5d69b2..187a6ab8f2 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -237,7 +237,10 @@ void init_Chunk(py::module &m) .def("available", &host_info::methodAvailable); py::bind_vector(m, "ChunkTable") - .def("merge_chunks", &chunk_assignment::mergeChunks); + .def("merge_chunks", &chunk_assignment::mergeChunks) + .def( + "merge_chunks_from_same_sourceID", + &chunk_assignment::mergeChunksFromSameSourceID); using namespace chunk_assignment; From 54747741cedb43526ce40c48eb29606e8540a096 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 8 Feb 2024 17:02:58 +0100 Subject: [PATCH 09/11] Add RoundRobinOfSourceRanks strategy --- include/openPMD/ChunkInfo.hpp | 11 ++++++++ src/ChunkInfo.cpp | 47 ++++++++++++++++++++++++++++++-- src/binding/python/ChunkInfo.cpp | 2 ++ test/ParallelIOTest.cpp | 24 ++++++++++++++-- 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 6787b49eb8..2193a6c691 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -24,6 +24,7 @@ #include "openPMD/Dataset.hpp" // Offset, Extent #include "openPMD/benchmark/mpi/BlockSlicer.hpp" +#include #if openPMD_HAVE_MPI #include @@ -224,6 +225,16 @@ namespace chunk_assignment virtual std::unique_ptr clone() const override; }; + struct RoundRobinOfSourceRanks : Strategy + { + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + }; + /** * @brief Strategy that assigns chunks to be read by processes within * the same host that produced the chunk. diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index 0aa6fb4653..6c5cff166f 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -24,10 +24,13 @@ #include "openPMD/auxiliary/Mpi.hpp" #include // std::sort +#include #include +#include #include #include #include +#include #include #ifdef _WIN32 @@ -195,10 +198,10 @@ namespace chunk_assignment namespace { - std::map > + std::map> ranksPerHost(RankMeta const &rankMeta) { - std::map > res; + std::map> res; for (auto const &pair : rankMeta) { auto &list = res[pair.second]; @@ -294,6 +297,44 @@ namespace chunk_assignment return std::unique_ptr(new RoundRobin); } + Assignment RoundRobinOfSourceRanks::assign( + PartialAssignment partialAssignment, + RankMeta const &, // ignored parameter + RankMeta const &out) + { + std::map> + sortSourceChunksBySourceRank; + for (auto &chunk : partialAssignment.notAssigned) + { + auto sourceID = chunk.sourceID; + sortSourceChunksBySourceRank[sourceID].push_back(std::move(chunk)); + } + partialAssignment.notAssigned.clear(); + auto source_it = sortSourceChunksBySourceRank.begin(); + auto sink_it = out.begin(); + for (; source_it != sortSourceChunksBySourceRank.end(); + ++source_it, ++sink_it) + { + if (sink_it == out.end()) + { + sink_it = out.begin(); + } + auto &chunks_go_here = partialAssignment.assigned[sink_it->first]; + chunks_go_here.reserve( + partialAssignment.assigned.size() + source_it->second.size()); + for (auto &chunk : source_it->second) + { + chunks_go_here.push_back(std::move(chunk)); + } + } + return partialAssignment.assigned; + } + + std::unique_ptr RoundRobinOfSourceRanks::clone() const + { + return std::unique_ptr(new RoundRobinOfSourceRanks); + } + ByHostname::ByHostname(std::unique_ptr withinNode) : m_withinNode(std::move(withinNode)) {} @@ -332,7 +373,7 @@ namespace chunk_assignment // the ranks are the source ranks // which ranks live on host in the sink? - std::map > ranksPerHostSink = + std::map> ranksPerHostSink = ranksPerHost(out); for (auto &chunkGroup : chunkGroups) { diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index 187a6ab8f2..72df3807d9 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -298,6 +298,8 @@ void init_Chunk(py::module &m) })); py::class_(m, "RoundRobin").def(py::init<>()); + py::class_(m, "RoundRobinOfSourceRanks") + .def(py::init<>()); py::class_(m, "ByHostname") .def( diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 919134b64f..c9e1c761a7 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1,6 +1,7 @@ /* Running this test in parallel with MPI requires MPI::Init. * To guarantee a correct call to Init, launch the tests manually. */ +#include "openPMD/ChunkInfo.hpp" #include "openPMD/IO/ADIOS/macros.hpp" #include "openPMD/IO/Access.hpp" #include "openPMD/auxiliary/Environment.hpp" @@ -2333,11 +2334,13 @@ void adios2_chunk_distribution() series.setRankTable(writingRanksHostnames.at(mpi_rank)); auto E_x = series.iterations[0].meshes["E"]["x"]; - openPMD::Dataset ds(openPMD::Datatype::INT, {unsigned(mpi_size), 10}); + openPMD::Dataset ds( + openPMD::Datatype::INT, {unsigned(mpi_size * 2), 10}); E_x.resetDataset(ds); std::vector data(10, 0); std::iota(data.begin(), data.end(), 0); - E_x.storeChunk(data, {unsigned(mpi_rank), 0}, {1, 10}); + E_x.storeChunk(data, {unsigned(mpi_rank * 2), 0}, {1, 10}); + E_x.storeChunk(data, {unsigned(mpi_rank * 2 + 1), 0}, {1, 10}); series.flush(); } @@ -2396,6 +2399,23 @@ void adios2_chunk_distribution() byHostnamePartialAssignment.notAssigned, rankMetaIn); + /* + * Same as above, but use RoundRobinOfSourceRanks this time, a strategy + * which ensures that each source rank's data is uniquely mapped to one + * sink rank. Needed in some domains. + */ + ByHostname byHostname2(std::make_unique()); + auto byHostnamePartialAssignment2 = + byHostname2.assign(chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "HOSTNAME2, ASSIGNED", + byHostnamePartialAssignment2.assigned, + readingRanksHostnames); + printChunktable( + "HOSTNAME2, LEFTOVER", + byHostnamePartialAssignment2.notAssigned, + rankMetaIn); + /* * Assign chunks by hostnames, once more. * This time, apply a secondary distribution strategy to assign From bd227b72aa7c7259f848384c8a7d6c692d802cea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 16 Aug 2024 12:05:39 +0200 Subject: [PATCH 10/11] Add Blocks distribution strategy --- include/openPMD/ChunkInfo.hpp | 16 ++++++ .../mpi/OneDimensionalBlockSlicer.hpp | 3 ++ src/ChunkInfo.cpp | 26 ++++++++++ .../mpi/OneDimensionalBlockSlicer.cpp | 52 +++++++++++-------- src/binding/python/ChunkInfo.cpp | 5 ++ test/ParallelIOTest.cpp | 5 ++ 6 files changed, 85 insertions(+), 22 deletions(-) diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 2193a6c691..868478f8f6 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -235,6 +235,22 @@ namespace chunk_assignment virtual std::unique_ptr clone() const override; }; + struct Blocks : Strategy + { + private: + unsigned int mpi_size, mpi_rank; + + public: + Blocks(unsigned int mpi_rank, unsigned int mpi_size); + + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + [[nodiscard]] std::unique_ptr clone() const override; + }; + /** * @brief Strategy that assigns chunks to be read by processes within * the same host that produced the chunk. diff --git a/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp b/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp index cb12da9350..f0d943d972 100644 --- a/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp +++ b/include/openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp @@ -33,6 +33,9 @@ class OneDimensionalBlockSlicer : public BlockSlicer explicit OneDimensionalBlockSlicer(Extent::value_type dim = 0); + static std::pair + n_th_block_inside(size_t length, size_t rank, size_t size); + std::pair sliceBlock(Extent &totalExtent, int size, int rank) override; diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index 6c5cff166f..ceb074b887 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -22,6 +22,7 @@ #include "openPMD/ChunkInfo_internal.hpp" #include "openPMD/auxiliary/Mpi.hpp" +#include "openPMD/benchmark/mpi/OneDimensionalBlockSlicer.hpp" #include // std::sort #include @@ -29,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -335,6 +337,30 @@ namespace chunk_assignment return std::unique_ptr(new RoundRobinOfSourceRanks); } + Blocks::Blocks(unsigned int mpi_rank_in, unsigned int mpi_size_in) + : mpi_size(mpi_size_in), mpi_rank(mpi_rank_in) + {} + + Assignment + Blocks::assign(PartialAssignment pa, RankMeta const &, RankMeta const &) + { + auto [notAssigned, res] = std::move(pa); + auto [myChunksFrom, myChunksTo] = + OneDimensionalBlockSlicer::n_th_block_inside( + notAssigned.size(), mpi_rank, mpi_size); + std::transform( + notAssigned.begin() + myChunksFrom, + notAssigned.begin() + (myChunksFrom + myChunksTo), + std::back_inserter(res[mpi_rank]), + [](WrittenChunkInfo &chunk) { return std::move(chunk); }); + return res; + } + + std::unique_ptr Blocks::clone() const + { + return std::unique_ptr(new Blocks(*this)); + } + ByHostname::ByHostname(std::unique_ptr withinNode) : m_withinNode(std::move(withinNode)) {} diff --git a/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp b/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp index 7fbb734faa..bb71cc29db 100644 --- a/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp +++ b/src/benchmark/mpi/OneDimensionalBlockSlicer.cpp @@ -29,29 +29,23 @@ OneDimensionalBlockSlicer::OneDimensionalBlockSlicer(Extent::value_type dim) : m_dim{dim} {} -std::pair -OneDimensionalBlockSlicer::sliceBlock(Extent &totalExtent, int size, int rank) +std::pair OneDimensionalBlockSlicer::n_th_block_inside( + size_t length, size_t rank, size_t size) { - Offset offs(totalExtent.size(), 0); - if (rank >= size) { - Extent extent(totalExtent.size(), 0); - return std::make_pair(std::move(offs), std::move(extent)); + return {length, 0}; } - auto dim = this->m_dim; - // for more equal balancing, we want the start index // at the upper gaussian bracket of (N/n*rank) // where N the size of the dataset in dimension dim // and n the MPI size // for avoiding integer overflow, this is the same as: // (N div n)*rank + round((N%n)/n*rank) - auto f = [&totalExtent, size, dim](int threadRank) { - auto N = totalExtent[dim]; - auto res = (N / size) * threadRank; - auto padDivident = (N % size) * threadRank; + auto f = [length, size](size_t rank_lambda) { + auto res = (length / size) * rank_lambda; + auto padDivident = (length % size) * rank_lambda; auto pad = padDivident / size; if (pad * size < padDivident) { @@ -60,17 +54,31 @@ OneDimensionalBlockSlicer::sliceBlock(Extent &totalExtent, int size, int rank) return res + pad; }; - offs[dim] = f(rank); + size_t offset = f(rank); + size_t extent = [&]() { + if (rank >= size - 1) + { + return length - offset; + } + else + { + return f(rank + 1) - offset; + } + }(); + return {offset, extent}; +} + +std::pair +OneDimensionalBlockSlicer::sliceBlock(Extent &totalExtent, int size, int rank) +{ + Offset localOffset(totalExtent.size(), 0); Extent localExtent{totalExtent}; - if (rank >= size - 1) - { - localExtent[dim] -= offs[dim]; - } - else - { - localExtent[dim] = f(rank + 1) - offs[dim]; - } - return std::make_pair(std::move(offs), std::move(localExtent)); + + auto [offset_dim, extent_dim] = + n_th_block_inside(totalExtent.at(this->m_dim), rank, size); + localOffset[m_dim] = offset_dim; + localExtent[m_dim] = extent_dim; + return std::make_pair(std::move(localOffset), std::move(localExtent)); } std::unique_ptr OneDimensionalBlockSlicer::clone() const diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index 72df3807d9..df0d8be629 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -300,6 +300,11 @@ void init_Chunk(py::module &m) py::class_(m, "RoundRobin").def(py::init<>()); py::class_(m, "RoundRobinOfSourceRanks") .def(py::init<>()); + py::class_(m, "Blocks") + .def( + py::init(), + py::arg("mpi_rank"), + py::arg("mpi_size")); py::class_(m, "ByHostname") .def( diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index c9e1c761a7..445e2b7614 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -2456,6 +2456,11 @@ void adios2_chunk_distribution() chunkTable, rankMetaIn, readingRanksHostnames); printAssignment( "CUBOID SLICE", cuboidSliceAssignment, readingRanksHostnames); + + Blocks blocksStrategy(mpi_rank, mpi_size); + auto blocksAssignment = blocksStrategy.assign( + chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment("BLOCKS", blocksAssignment, readingRanksHostnames); } } From 936882cb303608c9b1a6828bac5106c30854869a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 16 Aug 2024 14:15:20 +0200 Subject: [PATCH 11/11] Add BlocksOfSourceRanks strategy --- include/openPMD/ChunkInfo.hpp | 16 +++++++++++++ src/ChunkInfo.cpp | 41 ++++++++++++++++++++++++++++++++ src/binding/python/ChunkInfo.cpp | 5 ++++ test/ParallelIOTest.cpp | 8 +++++++ 4 files changed, 70 insertions(+) diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 868478f8f6..daa4cc2c5c 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -251,6 +251,22 @@ namespace chunk_assignment [[nodiscard]] std::unique_ptr clone() const override; }; + struct BlocksOfSourceRanks : Strategy + { + private: + unsigned int mpi_size, mpi_rank; + + public: + BlocksOfSourceRanks(unsigned int mpi_rank, unsigned int mpi_size); + + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + [[nodiscard]] std::unique_ptr clone() const override; + }; + /** * @brief Strategy that assigns chunks to be read by processes within * the same host that produced the chunk. diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index ceb074b887..6162a49f3a 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -361,6 +361,47 @@ namespace chunk_assignment return std::unique_ptr(new Blocks(*this)); } + BlocksOfSourceRanks::BlocksOfSourceRanks( + unsigned int mpi_rank_in, unsigned int mpi_size_in) + : mpi_size(mpi_size_in), mpi_rank(mpi_rank_in) + {} + + Assignment BlocksOfSourceRanks::assign( + PartialAssignment pa, RankMeta const &, RankMeta const &) + { + auto [notAssigned, res] = std::move(pa); + std::map> + sortSourceChunksBySourceRank; + for (auto &chunk : notAssigned) + { + auto sourceID = chunk.sourceID; + sortSourceChunksBySourceRank[sourceID].push_back(std::move(chunk)); + } + notAssigned.clear(); + auto [myChunksFrom, myChunksTo] = + OneDimensionalBlockSlicer::n_th_block_inside( + sortSourceChunksBySourceRank.size(), mpi_rank, mpi_size); + auto it = sortSourceChunksBySourceRank.begin(); + for (size_t i = 0; i < myChunksFrom; ++i) + { + ++it; + } + for (size_t i = 0; i < myChunksTo; ++i, ++it) + { + std::transform( + it->second.begin(), + it->second.end(), + std::back_inserter(res[mpi_rank]), + [](WrittenChunkInfo &chunk) { return std::move(chunk); }); + } + return res; + } + + std::unique_ptr BlocksOfSourceRanks::clone() const + { + return std::unique_ptr(new BlocksOfSourceRanks(*this)); + } + ByHostname::ByHostname(std::unique_ptr withinNode) : m_withinNode(std::move(withinNode)) {} diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index df0d8be629..626aa6fc8c 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -305,6 +305,11 @@ void init_Chunk(py::module &m) py::init(), py::arg("mpi_rank"), py::arg("mpi_size")); + py::class_(m, "BlocksOfSourceRanks") + .def( + py::init(), + py::arg("mpi_rank"), + py::arg("mpi_size")); py::class_(m, "ByHostname") .def( diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 445e2b7614..35f011b025 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -2461,6 +2461,14 @@ void adios2_chunk_distribution() auto blocksAssignment = blocksStrategy.assign( chunkTable, rankMetaIn, readingRanksHostnames); printAssignment("BLOCKS", blocksAssignment, readingRanksHostnames); + + BlocksOfSourceRanks blocksOfSourceRanksStrategy(mpi_rank, mpi_size); + auto blocksOfSourceRanksAssignment = blocksOfSourceRanksStrategy.assign( + chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "BLOCKS OF SOURCE RANKS", + blocksOfSourceRanksAssignment, + readingRanksHostnames); } }