Skip to content

Commit

Permalink
Move collective parts of setMpiRanksMetaInfo to flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Aug 16, 2023
1 parent 35f28c7 commit 4486c9a
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 66 deletions.
23 changes: 18 additions & 5 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,27 @@ namespace internal
*/
std::optional<MPI_Comm> m_communicator;
#endif

struct NoSourceSpecified
{};
struct SourceSpecifiedViaJSON
{
std::string value;
};
struct SourceSpecifiedManually
{
std::string value;
};

struct RankTableData
{
Attributable m_attributable;
Parameter<Operation::CREATE_DATASET> m_param;
std::optional<std::string> m_sourceAsSpecifiedViaJSON;
std::string thisRankStringForWriting;
std::vector<Parameter<Operation::WRITE_DATASET> > m_chunks;
// Parameter<Operation::CREATE_DATASET> m_param;
std::variant<
NoSourceSpecified,
SourceSpecifiedViaJSON,
SourceSpecifiedManually>
m_rankTableSource;
std::optional<chunk_assignment::RankMeta> m_bufferedRead;
};
RankTableData m_rankTable;
Expand Down Expand Up @@ -357,7 +371,6 @@ class Series : public Attributable
* @brief Set the Mpi Ranks Meta Info attribute, i.e. a Vector with
* a String per (writing) MPI rank, indicating user-
* defined meta information per rank. Example: host name.
* MPI collective.
* @todo make private, only expose non-collective access methods
*
* @return Reference to modified series.
Expand Down
134 changes: 73 additions & 61 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@
* If not, see <http://www.gnu.org/licenses/>.
*/
#include "openPMD/Series.hpp"
#include "openPMD/ChunkInfo.hpp"
#include "openPMD/Error.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/AbstractIOHandlerHelper.hpp"
#include "openPMD/IO/Format.hpp"
#include "openPMD/IO/IOTask.hpp"
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/ReadIterations.hpp"
#include "openPMD/auxiliary/Date.hpp"
#include "openPMD/auxiliary/Filesystem.hpp"
#include "openPMD/auxiliary/JSON_internal.hpp"
#include "openPMD/auxiliary/Mpi.hpp"
#include "openPMD/auxiliary/StringManip.hpp"
#include "openPMD/auxiliary/Variant.hpp"
#include "openPMD/version.hpp"

#include <algorithm>
Expand Down Expand Up @@ -329,27 +332,70 @@ Series::setMpiRanksMetaInfo( chunk_assignment::RankMeta rankMeta )
}
#endif

Series &Series::setMpiRanksMetaInfo(std::string const &myRankInfo)
Series &Series::setMpiRanksMetaInfo(const std::string &myRankInfo)
{
get().m_rankTable.m_rankTableSource =
internal::SeriesData::SourceSpecifiedManually{myRankInfo};
return *this;
}

void Series::flushRankTable()
{
auto &series = get();
auto &rankTable = series.m_rankTable;
auto maybeMyRankInfo = std::visit(
auxiliary::overloaded{
[](internal::SeriesData::NoSourceSpecified &)
-> std::optional<std::string> { return std::nullopt; },
[](internal::SeriesData::SourceSpecifiedViaJSON &viaJson)
-> std::optional<std::string> {
if (viaJson.value == "hostname")
{
return host_info::hostname();
}
else
{
throw error::WrongAPIUsage(
"[Series] Wrong value for JSON option 'rank_table': '" +
viaJson.value + "'.");
};
},
[](internal::SeriesData::SourceSpecifiedManually &manually)
-> std::optional<std::string> { return manually.value; }},
rankTable.m_rankTableSource);
if (!maybeMyRankInfo.has_value())
{
return;
}

auto myRankInfo = std::move(*maybeMyRankInfo);

unsigned long long mySize = myRankInfo.size() + 1; // null character
int rank{0}, size{1};
unsigned long long maxSize = mySize;

auto createRankTable = [&rankTable, &size, &maxSize]() {
rankTable.m_param.name = "rankTable";
rankTable.m_param.dtype = Datatype::CHAR;
rankTable.m_param.extent = {uint64_t(size), uint64_t(maxSize)};
auto createRankTable = [&size, &maxSize, &rankTable, this]() {
if (rankTable.m_attributable.written())
{
return;
}
Parameter<Operation::CREATE_DATASET> param;
param.name = "rankTable";
param.dtype = Datatype::CHAR;
param.extent = {uint64_t(size), uint64_t(maxSize)};
IOHandler()->enqueue(
IOTask(&rankTable.m_attributable, std::move(param)));
};
auto writeDataset = [&rankTable, &rank, &maxSize](

auto writeDataset = [&rank, &maxSize, this, &rankTable](
std::shared_ptr<char> put, size_t num_lines = 1) {
Parameter<Operation::WRITE_DATASET> paramWriteDataset;
paramWriteDataset.dtype = Datatype::CHAR;
paramWriteDataset.offset = {uint64_t(rank), 0};
paramWriteDataset.extent = {num_lines, maxSize};
paramWriteDataset.data = std::move(put);
rankTable.m_chunks.push_back(std::move(paramWriteDataset));
Parameter<Operation::WRITE_DATASET> chunk;
chunk.dtype = Datatype::CHAR;
chunk.offset = {uint64_t(rank), 0};
chunk.extent = {num_lines, maxSize};
chunk.data = std::move(put);
IOHandler()->enqueue(
IOTask(&rankTable.m_attributable, std::move(chunk)));
};

#if openPMD_HAVE_MPI
Expand Down Expand Up @@ -382,61 +428,19 @@ Series &Series::setMpiRanksMetaInfo(std::string const &myRankInfo)
[asRawPtr](char *) { delete asRawPtr; }};
writeDataset(std::move(put), /* num_lines = */ size);
}
return *this;
return;
}
#endif

// sic! no else
// if the Series was initialized without a communicator, then this code will
// run as well
createRankTable();

std::shared_ptr<char> put{
new char[maxSize]{}, [](char const *ptr) { delete[] ptr; }};
std::copy_n(myRankInfo.c_str(), mySize, put.get());

writeDataset(std::move(put));

return *this;
}

void Series::flushRankTable()
{
auto &series = get();
auto &rankTable = series.m_rankTable;
if (rankTable.m_sourceAsSpecifiedViaJSON.has_value())
{
if (!rankTable.m_chunks.empty())
{
throw error::WrongAPIUsage(
"[Series] Writing the rank table manually (via the API call) "
"and automatically (via the JSON option) is mutually "
"exclusive.");
}
if (*rankTable.m_sourceAsSpecifiedViaJSON == "hostname")
{
setMpiRanksMetaInfo(host_info::hostname());
}
else
{
throw error::WrongAPIUsage(
"[Series] Wrong value for JSON option 'rank_table': '" +
*rankTable.m_sourceAsSpecifiedViaJSON + "'.");
}
}
if (rankTable.m_chunks.empty())
{
return;
}

if (!rankTable.m_attributable.written())
{
IOHandler()->enqueue(
IOTask(&rankTable.m_attributable, rankTable.m_param));
}
for (auto &writeDataset : rankTable.m_chunks)
{
IOHandler()->enqueue(
IOTask(&rankTable.m_attributable, std::move(writeDataset)));
}
rankTable.m_chunks.clear();
}

std::string Series::particlesPath() const
Expand Down Expand Up @@ -2409,7 +2413,7 @@ namespace
* The string is converted to lower case.
*/
template <typename Dest = std::string>
void getJsonOptionLowerCase(
bool getJsonOptionLowerCase(
json::TracingJSON &config, std::string const &key, Dest &dest)
{
if (config.json().contains(key))
Expand All @@ -2425,6 +2429,11 @@ namespace
throw error::BackendConfigSchema(
{key}, "Must be convertible to string type.");
}
return true;
}
else
{
return false;
}
}
} // namespace
Expand All @@ -2435,8 +2444,11 @@ void Series::parseJsonOptions(TracingJSON &options, ParsedInput &input)
auto &series = get();
getJsonOption<bool>(
options, "defer_iteration_parsing", series.m_parseLazily);
getJsonOptionLowerCase(
options, "rank_table", series.m_rankTable.m_sourceAsSpecifiedViaJSON);
internal::SeriesData::SourceSpecifiedViaJSON rankTableSource;
if (getJsonOptionLowerCase(options, "rank_table", rankTableSource.value))
{
series.m_rankTable.m_rankTableSource = std::move(rankTableSource);
}
// backend key
{
std::map<std::string, Format> const backendDescriptors{
Expand Down

0 comments on commit 4486c9a

Please sign in to comment.