Skip to content

Commit

Permalink
ADIOS2: Override access mode (#1638)
Browse files Browse the repository at this point in the history
* Main implementation

* Add test

* Documentation

* Guard against ADIOS2 v2.7

* Extend the test to parallel
  • Loading branch information
franzpoeschel committed Jul 30, 2024
1 parent 74fdc47 commit 4f83a7e
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 0 deletions.
3 changes: 3 additions & 0 deletions docs/source/details/backendconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ Explanation of the single keys:

* ``adios2.engine.type``: A string that is passed directly to ``adios2::IO:::SetEngine`` for choosing the ADIOS2 engine to be used.
Please refer to the `official ADIOS2 documentation <https://adios2.readthedocs.io/en/latest/engines/engines.html>`_ for a list of available engines.
* ``adios2.engine.access_mode``: One of ``"Write", "Read", "Append", "ReadRandomAccess"``.
Only needed in specific use cases, the access mode is usually determined from the specified ``openPMD::Access``.
Useful for finetuning the backend-specific behavior of ADIOS2 when overwriting existing Iterations in file-based Append mode.
* ``adios2.engine.parameters``: An associative array of string-formatted engine parameters, passed directly through to ``adios2::IO::SetParameters``.
Please refer to the `official ADIOS2 documentation <https://adios2.readthedocs.io/en/latest/engines/engines.html>`_ for the available engine parameters.
The openPMD-api does not interpret these values and instead simply forwards them to ADIOS2.
Expand Down
2 changes: 2 additions & 0 deletions docs/source/usage/workflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ The openPMD-api distinguishes between a number of different access modes:
We suggest to fully define iterations when using Append mode (i.e. as if using Create mode) to avoid implementation-specific behavior.
Appending to an openPMD Series is only supported on a per-iteration level.

**Tip:** Use the ``adios2.engine.access_mode`` :ref:`backend key <backendconfig>` of the :ref:`ADIOS2 backend <backends-adios2>` to finetune the backend-specific behavior of Append mode for niche use cases.

**Warning:** There is no reading involved in using Append mode.
It is a user's responsibility to ensure that the appended dataset and the appended-to dataset are compatible with each other.
The results of using incompatible backend configurations are undefined.
Expand Down
42 changes: 42 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/auxiliary/Environment.hpp"
#include "openPMD/auxiliary/Filesystem.hpp"
#include "openPMD/auxiliary/JSON_internal.hpp"
#include "openPMD/auxiliary/Mpi.hpp"
#include "openPMD/auxiliary/StringManip.hpp"
#include "openPMD/auxiliary/TypeTraits.hpp"
Expand All @@ -40,6 +41,7 @@
#include <iterator>
#include <memory>
#include <set>
#include <sstream>
#include <stdexcept>
#include <string>
#include <type_traits>
Expand Down Expand Up @@ -1496,6 +1498,46 @@ void ADIOS2IOHandlerImpl::touch(

adios2::Mode ADIOS2IOHandlerImpl::adios2AccessMode(std::string const &fullPath)
{
if (m_config.json().contains("engine") &&
m_config["engine"].json().contains("access_mode"))
{
auto const &access_mode_json = m_config["engine"]["access_mode"].json();
auto maybe_access_mode_string =
json::asLowerCaseStringDynamic(access_mode_json);
if (!maybe_access_mode_string.has_value())
{
throw error::BackendConfigSchema(
{"adios2", "engine", "access_mode"}, "Must be of string type.");
}
auto access_mode_string = *maybe_access_mode_string;
using pair_t = std::pair<char const *, adios2::Mode>;
constexpr std::array<pair_t, 4> modeNames{
pair_t{"write", adios2::Mode::Write},
pair_t{"read", adios2::Mode::Read},
pair_t{"append", adios2::Mode::Append}
#if openPMD_HAS_ADIOS_2_8
,
pair_t{"readrandomaccess", adios2::Mode::ReadRandomAccess}
#endif
};
for (auto const &[name, mode] : modeNames)
{
if (name == access_mode_string)
{
return mode;
}
}
std::stringstream error;
error << "Unsupported value '" << access_mode_string
<< "'. Must be one of:";
for (auto const &pair : modeNames)
{
error << " '" << pair.first << "'";
}
error << '.';
throw error::BackendConfigSchema(
{"adios2", "engine", "access_mode"}, error.str());
}
switch (m_handler->m_backendAccess)
{
case Access::CREATE:
Expand Down
115 changes: 115 additions & 0 deletions test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2082,4 +2082,119 @@ TEST_CASE("joined_dim", "[parallel]")
}
}

#if openPMD_HAVE_ADIOS2_BP5
// Parallel version of the same test from SerialIOTest.cpp
TEST_CASE("adios2_flush_via_step")
{
int size_i(0), rank_i(0);
MPI_Comm_rank(MPI_COMM_WORLD, &rank_i);
MPI_Comm_size(MPI_COMM_WORLD, &size_i);
Extent::value_type const size(size_i), rank(rank_i);

Series write(
"../samples/adios2_flush_via_step_parallel/simData_%T.bp5",
Access::CREATE,
MPI_COMM_WORLD,
R"(adios2.engine.parameters.FlattenSteps = "on")");
std::vector<float> data(10);
for (Iteration::IterationIndex_t i = 0; i < 5; ++i)
{
Iteration it = write.writeIterations()[i];
auto E_x = it.meshes["E"]["x"];
E_x.resetDataset({Datatype::FLOAT, {size, 10, 10}});
for (Extent::value_type j = 0; j < 10; ++j)
{
std::iota(
data.begin(), data.end(), i * 100 * size + rank * 100 + j * 10);
E_x.storeChunk(data, {rank, j, 0}, {1, 1, 10});
write.flush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
it.close();
}

#if openPMD_HAS_ADIOS_2_10_1
for (auto access : {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR})
{
Series read(
"../samples/adios2_flush_via_step_parallel/simData_%T.%E",
access,
MPI_COMM_WORLD);
std::vector<float> load_data(100 * size);
data.resize(100 * size);
for (auto iteration : read.readIterations())
{
std::iota(
data.begin(),
data.end(),
iteration.iterationIndex * size * 100);
iteration.meshes["E"]["x"].loadChunkRaw(
load_data.data(), {0, 0, 0}, {size, 10, 10});
iteration.close();
REQUIRE(load_data == data);
}
}
#endif

/*
* Now emulate restarting from a checkpoint after a crash and continuing to
* write to the output Series. The semantics of openPMD::Access::APPEND
* don't fully fit here since that mode is for adding new Iterations to an
* existing Series. What we truly want to do is to continue writing to an
* Iteration without replacing it with a new one. So we must use the option
* adios2.engine.access_mode = "append" to tell the ADIOS2 backend that new
* steps should be added to an existing Iteration file.
*/

write = Series(
"../samples/adios2_flush_via_step_parallel/simData_%T.bp5",
Access::APPEND,
MPI_COMM_WORLD,
R"(
[adios2.engine]
access_mode = "append"
parameters.FlattenSteps = "on"
)");
for (Iteration::IterationIndex_t i = 0; i < 5; ++i)
{
Iteration it = write.writeIterations()[i];
auto E_x = it.meshes["E"]["y"];
E_x.resetDataset({Datatype::FLOAT, {size, 10, 10}});
for (Extent::value_type j = 0; j < 10; ++j)
{
std::iota(
data.begin(), data.end(), i * 100 * size + rank * 100 + j * 10);
E_x.storeChunk(data, {rank, j, 0}, {1, 1, 10});
write.flush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
it.close();
}

#if openPMD_HAS_ADIOS_2_10_1
for (auto access : {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR})
{
Series read(
"../samples/adios2_flush_via_step_parallel/simData_%T.%E",
access,
MPI_COMM_WORLD);
std::vector<float> load_data(100 * size);
data.resize(100 * size);
for (auto iteration : read.readIterations())
{
std::iota(
data.begin(),
data.end(),
iteration.iterationIndex * size * 100);
iteration.meshes["E"]["x"].loadChunkRaw(
load_data.data(), {0, 0, 0}, {size, 10, 10});
iteration.meshes["E"]["y"].loadChunkRaw(
load_data.data(), {0, 0, 0}, {size, 10, 10});
iteration.close();
REQUIRE(load_data == data);
REQUIRE(load_data == data);
}
}
#endif
}
#endif

#endif // openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI
52 changes: 52 additions & 0 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4475,6 +4475,58 @@ TEST_CASE("adios2_flush_via_step")
}
}
#endif

/*
* Now emulate restarting from a checkpoint after a crash and continuing to
* write to the output Series. The semantics of openPMD::Access::APPEND
* don't fully fit here since that mode is for adding new Iterations to an
* existing Series. What we truly want to do is to continue writing to an
* Iteration without replacing it with a new one. So we must use the option
* adios2.engine.access_mode = "append" to tell the ADIOS2 backend that new
* steps should be added to an existing Iteration file.
*/

write = Series(
"../samples/adios2_flush_via_step/simData_%T.bp5",
Access::APPEND,
R"(
[adios2.engine]
access_mode = "append"
parameters.FlattenSteps = "on"
)");
for (Iteration::IterationIndex_t i = 0; i < 5; ++i)
{
Iteration it = write.writeIterations()[i];
auto E_x = it.meshes["E"]["y"];
E_x.resetDataset({Datatype::FLOAT, {10, 10}});
for (Extent::value_type j = 0; j < 10; ++j)
{
std::iota(data.begin(), data.end(), i * 100 + j * 10);
E_x.storeChunk(data, {j, 0}, {1, 10});
write.flush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
it.close();
}

#if openPMD_HAS_ADIOS_2_10_1
for (auto access : {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR})
{
Series read("../samples/adios2_flush_via_step/simData_%T.%E", access);
std::vector<float> load_data(100);
data.resize(100);
for (auto iteration : read.readIterations())
{
std::iota(data.begin(), data.end(), iteration.iterationIndex * 100);
iteration.meshes["E"]["x"].loadChunkRaw(
load_data.data(), {0, 0}, {10, 10});
iteration.meshes["E"]["y"].loadChunkRaw(
load_data.data(), {0, 0}, {10, 10});
iteration.close();
REQUIRE(load_data == data);
REQUIRE(load_data == data);
}
}
#endif
}
#endif

Expand Down

0 comments on commit 4f83a7e

Please sign in to comment.