diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 6787b49eb8..2193a6c691 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -24,6 +24,7 @@ #include "openPMD/Dataset.hpp" // Offset, Extent #include "openPMD/benchmark/mpi/BlockSlicer.hpp" +#include #if openPMD_HAVE_MPI #include @@ -224,6 +225,16 @@ namespace chunk_assignment virtual std::unique_ptr clone() const override; }; + struct RoundRobinOfSourceRanks : Strategy + { + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + virtual std::unique_ptr clone() const override; + }; + /** * @brief Strategy that assigns chunks to be read by processes within * the same host that produced the chunk. diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index 0aa6fb4653..6c5cff166f 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -24,10 +24,13 @@ #include "openPMD/auxiliary/Mpi.hpp" #include // std::sort +#include #include +#include #include #include #include +#include #include #ifdef _WIN32 @@ -195,10 +198,10 @@ namespace chunk_assignment namespace { - std::map > + std::map> ranksPerHost(RankMeta const &rankMeta) { - std::map > res; + std::map> res; for (auto const &pair : rankMeta) { auto &list = res[pair.second]; @@ -294,6 +297,44 @@ namespace chunk_assignment return std::unique_ptr(new RoundRobin); } + Assignment RoundRobinOfSourceRanks::assign( + PartialAssignment partialAssignment, + RankMeta const &, // ignored parameter + RankMeta const &out) + { + std::map> + sortSourceChunksBySourceRank; + for (auto &chunk : partialAssignment.notAssigned) + { + auto sourceID = chunk.sourceID; + sortSourceChunksBySourceRank[sourceID].push_back(std::move(chunk)); + } + partialAssignment.notAssigned.clear(); + auto source_it = sortSourceChunksBySourceRank.begin(); + auto sink_it = out.begin(); + for (; source_it != sortSourceChunksBySourceRank.end(); + ++source_it, ++sink_it) + { + if (sink_it == out.end()) + { + sink_it = out.begin(); + } + auto &chunks_go_here = partialAssignment.assigned[sink_it->first]; + chunks_go_here.reserve( + partialAssignment.assigned.size() + source_it->second.size()); + for (auto &chunk : source_it->second) + { + chunks_go_here.push_back(std::move(chunk)); + } + } + return partialAssignment.assigned; + } + + std::unique_ptr RoundRobinOfSourceRanks::clone() const + { + return std::unique_ptr(new RoundRobinOfSourceRanks); + } + ByHostname::ByHostname(std::unique_ptr withinNode) : m_withinNode(std::move(withinNode)) {} @@ -332,7 +373,7 @@ namespace chunk_assignment // the ranks are the source ranks // which ranks live on host in the sink? - std::map > ranksPerHostSink = + std::map> ranksPerHostSink = ranksPerHost(out); for (auto &chunkGroup : chunkGroups) { diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index 187a6ab8f2..72df3807d9 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -298,6 +298,8 @@ void init_Chunk(py::module &m) })); py::class_(m, "RoundRobin").def(py::init<>()); + py::class_(m, "RoundRobinOfSourceRanks") + .def(py::init<>()); py::class_(m, "ByHostname") .def( diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 8bf0ca348d..77b3aa8b96 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1,6 +1,7 @@ /* Running this test in parallel with MPI requires MPI::Init. * To guarantee a correct call to Init, launch the tests manually. */ +#include "openPMD/ChunkInfo.hpp" #include "openPMD/IO/ADIOS/macros.hpp" #include "openPMD/IO/Access.hpp" #include "openPMD/auxiliary/Environment.hpp" @@ -2213,11 +2214,13 @@ void adios2_chunk_distribution() series.setRankTable(writingRanksHostnames.at(mpi_rank)); auto E_x = series.iterations[0].meshes["E"]["x"]; - openPMD::Dataset ds(openPMD::Datatype::INT, {unsigned(mpi_size), 10}); + openPMD::Dataset ds( + openPMD::Datatype::INT, {unsigned(mpi_size * 2), 10}); E_x.resetDataset(ds); std::vector data(10, 0); std::iota(data.begin(), data.end(), 0); - E_x.storeChunk(data, {unsigned(mpi_rank), 0}, {1, 10}); + E_x.storeChunk(data, {unsigned(mpi_rank * 2), 0}, {1, 10}); + E_x.storeChunk(data, {unsigned(mpi_rank * 2 + 1), 0}, {1, 10}); series.flush(); } @@ -2276,6 +2279,23 @@ void adios2_chunk_distribution() byHostnamePartialAssignment.notAssigned, rankMetaIn); + /* + * Same as above, but use RoundRobinOfSourceRanks this time, a strategy + * which ensures that each source rank's data is uniquely mapped to one + * sink rank. Needed in some domains. + */ + ByHostname byHostname2(std::make_unique()); + auto byHostnamePartialAssignment2 = + byHostname2.assign(chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "HOSTNAME2, ASSIGNED", + byHostnamePartialAssignment2.assigned, + readingRanksHostnames); + printChunktable( + "HOSTNAME2, LEFTOVER", + byHostnamePartialAssignment2.notAssigned, + rankMetaIn); + /* * Assign chunks by hostnames, once more. * This time, apply a secondary distribution strategy to assign