Skip to content

Commit

Permalink
Add RoundRobinOfSourceRanks strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Aug 5, 2024
1 parent 08cf43d commit 3c93be4
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 5 deletions.
11 changes: 11 additions & 0 deletions include/openPMD/ChunkInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "openPMD/Dataset.hpp" // Offset, Extent
#include "openPMD/benchmark/mpi/BlockSlicer.hpp"
#include <memory>

#if openPMD_HAVE_MPI
#include <mpi.h>
Expand Down Expand Up @@ -224,6 +225,16 @@ namespace chunk_assignment
virtual std::unique_ptr<Strategy> clone() const override;
};

struct RoundRobinOfSourceRanks : Strategy
{
Assignment assign(
PartialAssignment,
RankMeta const &in,
RankMeta const &out) override;

virtual std::unique_ptr<Strategy> clone() const override;
};

/**
* @brief Strategy that assigns chunks to be read by processes within
* the same host that produced the chunk.
Expand Down
47 changes: 44 additions & 3 deletions src/ChunkInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
#include "openPMD/auxiliary/Mpi.hpp"

#include <algorithm> // std::sort
#include <deque>
#include <iostream>
#include <iterator>
#include <list>
#include <map>
#include <optional>
#include <set>
#include <utility>

#ifdef _WIN32
Expand Down Expand Up @@ -195,10 +198,10 @@ namespace chunk_assignment

namespace
{
std::map<std::string, std::list<unsigned int> >
std::map<std::string, std::list<unsigned int>>
ranksPerHost(RankMeta const &rankMeta)
{
std::map<std::string, std::list<unsigned int> > res;
std::map<std::string, std::list<unsigned int>> res;
for (auto const &pair : rankMeta)
{
auto &list = res[pair.second];
Expand Down Expand Up @@ -294,6 +297,44 @@ namespace chunk_assignment
return std::unique_ptr<Strategy>(new RoundRobin);
}

Assignment RoundRobinOfSourceRanks::assign(
PartialAssignment partialAssignment,
RankMeta const &, // ignored parameter
RankMeta const &out)
{
std::map<unsigned int, std::deque<WrittenChunkInfo>>
sortSourceChunksBySourceRank;
for (auto &chunk : partialAssignment.notAssigned)
{
auto sourceID = chunk.sourceID;
sortSourceChunksBySourceRank[sourceID].push_back(std::move(chunk));
}
partialAssignment.notAssigned.clear();
auto source_it = sortSourceChunksBySourceRank.begin();
auto sink_it = out.begin();
for (; source_it != sortSourceChunksBySourceRank.end();
++source_it, ++sink_it)
{
if (sink_it == out.end())
{
sink_it = out.begin();
}
auto &chunks_go_here = partialAssignment.assigned[sink_it->first];
chunks_go_here.reserve(
partialAssignment.assigned.size() + source_it->second.size());
for (auto &chunk : source_it->second)
{
chunks_go_here.push_back(std::move(chunk));
}
}
return partialAssignment.assigned;
}

std::unique_ptr<Strategy> RoundRobinOfSourceRanks::clone() const
{
return std::unique_ptr<Strategy>(new RoundRobinOfSourceRanks);
}

ByHostname::ByHostname(std::unique_ptr<Strategy> withinNode)
: m_withinNode(std::move(withinNode))
{}
Expand Down Expand Up @@ -332,7 +373,7 @@ namespace chunk_assignment
// the ranks are the source ranks

// which ranks live on host <string> in the sink?
std::map<std::string, std::list<unsigned int> > ranksPerHostSink =
std::map<std::string, std::list<unsigned int>> ranksPerHostSink =
ranksPerHost(out);
for (auto &chunkGroup : chunkGroups)
{
Expand Down
2 changes: 2 additions & 0 deletions src/binding/python/ChunkInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ void init_Chunk(py::module &m)
}));

py::class_<RoundRobin, Strategy>(m, "RoundRobin").def(py::init<>());
py::class_<RoundRobinOfSourceRanks, Strategy>(m, "RoundRobinOfSourceRanks")
.def(py::init<>());

py::class_<ByHostname, PartialStrategy>(m, "ByHostname")
.def(
Expand Down
24 changes: 22 additions & 2 deletions test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Running this test in parallel with MPI requires MPI::Init.
* To guarantee a correct call to Init, launch the tests manually.
*/
#include "openPMD/ChunkInfo.hpp"
#include "openPMD/IO/ADIOS/macros.hpp"
#include "openPMD/IO/Access.hpp"
#include "openPMD/auxiliary/Environment.hpp"
Expand Down Expand Up @@ -2333,11 +2334,13 @@ void adios2_chunk_distribution()
series.setRankTable(writingRanksHostnames.at(mpi_rank));

auto E_x = series.iterations[0].meshes["E"]["x"];
openPMD::Dataset ds(openPMD::Datatype::INT, {unsigned(mpi_size), 10});
openPMD::Dataset ds(
openPMD::Datatype::INT, {unsigned(mpi_size * 2), 10});
E_x.resetDataset(ds);
std::vector<int> data(10, 0);
std::iota(data.begin(), data.end(), 0);
E_x.storeChunk(data, {unsigned(mpi_rank), 0}, {1, 10});
E_x.storeChunk(data, {unsigned(mpi_rank * 2), 0}, {1, 10});
E_x.storeChunk(data, {unsigned(mpi_rank * 2 + 1), 0}, {1, 10});
series.flush();
}

Expand Down Expand Up @@ -2396,6 +2399,23 @@ void adios2_chunk_distribution()
byHostnamePartialAssignment.notAssigned,
rankMetaIn);

/*
* Same as above, but use RoundRobinOfSourceRanks this time, a strategy
* which ensures that each source rank's data is uniquely mapped to one
* sink rank. Needed in some domains.
*/
ByHostname byHostname2(std::make_unique<RoundRobinOfSourceRanks>());
auto byHostnamePartialAssignment2 =
byHostname2.assign(chunkTable, rankMetaIn, readingRanksHostnames);
printAssignment(
"HOSTNAME2, ASSIGNED",
byHostnamePartialAssignment2.assigned,
readingRanksHostnames);
printChunktable(
"HOSTNAME2, LEFTOVER",
byHostnamePartialAssignment2.notAssigned,
rankMetaIn);

/*
* Assign chunks by hostnames, once more.
* This time, apply a secondary distribution strategy to assign
Expand Down

0 comments on commit 3c93be4

Please sign in to comment.