Skip to content

Commit

Permalink
Flush target newstep (#1632)
Browse files Browse the repository at this point in the history
* Main implementation and test

* Test reading

* Test and fixes for READ_LINEAR mode

* Cleanup and documentation

* Update test/SerialIOTest.cpp

* Run test only when BP5 exists
  • Loading branch information
franzpoeschel committed Jul 16, 2024
1 parent bda3544 commit 5a24100
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 51 deletions.
3 changes: 2 additions & 1 deletion docs/source/details/backendconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ Explanation of the single keys:

* If ``"disk"``, data will be moved to disk on every flush.
* If ``"buffer"``, then only upon ending an IO step or closing an engine.
* If ``new_step``, then a new step will be created. This should be used in combination with the ADIOS2 option ``adios2.engine.parameters.FlattenSteps = "on"``.

This behavior can be overridden on a per-flush basis by specifying this JSON/TOML key as an optional parameter to the ``Series::flush()`` or ``Attributable::seriesFlush()`` methods.

Additionally, specifying ``"disk_override"`` or ``"buffer_override"`` will take precedence over options specified without the ``_override`` suffix, allowing to invert the normal precedence order.
Additionally, specifying ``"disk_override"``, ``"buffer_override"`` or ``"new_step_override"`` will take precedence over options specified without the ``_override`` suffix, allowing to invert the normal precedence order.
This way, a data producing code can hardcode the preferred flush target per ``flush()`` call, but users can e.g. still entirely deactivate flushing to disk in the ``Series`` constructor by specifying ``preferred_flush_target = buffer_override``.
This is useful when applying the asynchronous IO capabilities of the BP5 engine.
* ``adios2.dataset.operators``: This key contains a list of ADIOS2 `operators <https://adios2.readthedocs.io/en/latest/components/components.html#operator>`_, used to enable compression or dataset transformations.
Expand Down
4 changes: 3 additions & 1 deletion include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ namespace adios_defs
Buffer,
Buffer_Override,
Disk,
Disk_Override
Disk_Override,
NewStep,
NewStep_Override
};

using FlushTarget = adios_defs::FlushTarget;
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/IO/ADIOS/macros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
#define openPMD_HAS_ADIOS_2_10 \
(ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 210)

#define openPMD_HAS_ADIOS_2_10_1 \
(ADIOS2_VERSION_MAJOR * 1000 + ADIOS2_VERSION_MINOR * 10 + \
ADIOS2_VERSION_PATCH >= \
2101)

#if defined(ADIOS2_HAVE_BP5) || openPMD_HAS_ADIOS_2_10
// ADIOS2 v2.10 no longer defines this
#define openPMD_HAVE_ADIOS2_BP5 1
Expand Down
25 changes: 20 additions & 5 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,28 @@ class AbstractIOHandler
{
/*
* In file-based iteration encoding, the APPEND mode is handled entirely
* by the frontend, the backend should just treat it as CREATE mode
* by the frontend, the backend should just treat it as CREATE mode.
* Similar for READ_LINEAR which should be treated as READ_RANDOM_ACCESS
* in the backend.
*/
if (encoding == IterationEncoding::fileBased &&
m_backendAccess == Access::APPEND)
if (encoding == IterationEncoding::fileBased)
{
// do we really want to have those as const members..?
*const_cast<Access *>(&m_backendAccess) = Access::CREATE;
switch (m_backendAccess)
{

case Access::READ_LINEAR:
// do we really want to have those as const members..?
*const_cast<Access *>(&m_backendAccess) =
Access::READ_RANDOM_ACCESS;
break;
case Access::APPEND:
*const_cast<Access *>(&m_backendAccess) = Access::CREATE;
break;
case Access::READ_RANDOM_ACCESS:
case Access::READ_WRITE:
case Access::CREATE:
break;
}
}

m_encoding = encoding;
Expand Down
10 changes: 9 additions & 1 deletion src/IO/ADIOS/ADIOS2Auxiliary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,19 @@ FlushTarget flushTargetFromString(std::string const &str)
{
return FlushTarget::Disk_Override;
}
else if (str == "new_step")
{
return FlushTarget::NewStep;
}
else if (str == "new_step_override")
{
return FlushTarget::NewStep_Override;
}
else
{
throw error::BackendConfigSchema(
{"adios2", "engine", adios_defaults::str_flushtarget},
"Flush target must be either 'disk' or 'buffer', but "
"Flush target must be either 'disk', 'buffer' or 'new_step', but "
"was " +
str + ".");
}
Expand Down
124 changes: 81 additions & 43 deletions src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
#include "openPMD/IO/ADIOS/ADIOS2File.hpp"
#include "openPMD/Error.hpp"
#include "openPMD/IO/ADIOS/ADIOS2IOHandler.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/auxiliary/Environment.hpp"
#include "openPMD/auxiliary/StringManip.hpp"

#include <stdexcept>

#if openPMD_USE_VERIFY
#define VERIFY(CONDITION, TEXT) \
{ \
Expand Down Expand Up @@ -1028,60 +1031,95 @@ void ADIOS2File::flush_impl(

void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts)
{
auto decideFlushAPICall =
[this, flushTarget = flushParams.flushTarget](adios2::Engine &engine) {
auto decideFlushAPICall = [this, flushTarget = flushParams.flushTarget](
adios2::Engine &engine) {
#if ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \
ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \
2701001223
bool performDataWrite{};
switch (flushTarget)
enum class CleanedFlushTarget
{
Buffer,
Disk,
Step
};

CleanedFlushTarget target{};
switch (flushTarget)
{
case FlushTarget::Disk:
case FlushTarget::Disk_Override:
if (m_engineType == "bp5" ||
/* this second check should be sufficient, but we leave the
first check in as a safeguard against renamings in
ADIOS2. Also do a lowerCase transform since the docstring
of `Engine::Type()` claims that the return value is in
lowercase, but for BP5 this does not seem true. */
auxiliary::lowerCase(engine.Type()) == "bp5writer")
{
case FlushTarget::Disk:
case FlushTarget::Disk_Override:
performDataWrite = true;
break;
case FlushTarget::Buffer:
case FlushTarget::Buffer_Override:
performDataWrite = false;
break;
target = CleanedFlushTarget::Disk;
}
performDataWrite = performDataWrite &&
(m_engineType == "bp5" ||
/* this second check should be sufficient, but we leave the
first check in as a safeguard against renamings in ADIOS2.
Also do a lowerCase transform since the docstring of
`Engine::Type()` claims that the return value is in
lowercase, but for BP5 this does not seem true. */
auxiliary::lowerCase(engine.Type()) == "bp5writer");

if (performDataWrite)
else
{
/*
* Deliberately don't write buffered attributes now since
* readers won't be able to see them before EndStep anyway,
* so there's no use. In fact, writing them now is harmful
* because they can't be overwritten after this anymore in the
* current step.
* Draining the uniquePtrPuts now is good however, since we
* should use this chance to free the memory.
*/
for (auto &entry : m_uniquePtrPuts)
{
entry.run(*this);
}
engine.PerformDataWrite();
m_uniquePtrPuts.clear();
target = CleanedFlushTarget::Buffer;
}
else
break;
case FlushTarget::Buffer:
case FlushTarget::Buffer_Override:
target = CleanedFlushTarget::Buffer;
break;
case FlushTarget::NewStep:
case FlushTarget::NewStep_Override:
target = CleanedFlushTarget::Step;
break;
}

switch (target)
{
case CleanedFlushTarget::Disk:
/*
* Draining the uniquePtrPuts now to use this chance to free the
* memory.
*/
for (auto &entry : m_uniquePtrPuts)
{
engine.PerformPuts();
entry.run(*this);
}
#else
(void)this;
(void)flushTarget;
engine.PerformDataWrite();
m_uniquePtrPuts.clear();
m_updateSpans.clear();
break;
case CleanedFlushTarget::Buffer:
engine.PerformPuts();
break;
case CleanedFlushTarget::Step:
if (streamStatus != StreamStatus::DuringStep)
{
throw error::OperationUnsupportedInBackend(
"ADIOS2",
"Trying to flush to a new step while no step is active");
}
/*
* Draining the uniquePtrPuts now to use this chance to free the
* memory.
*/
for (auto &entry : m_uniquePtrPuts)
{
entry.run(*this);
}
engine.EndStep();
engine.BeginStep();
// ++m_currentStep; // think we should keep this as the logical step
m_uniquePtrPuts.clear();
uncommittedAttributes.clear();
m_updateSpans.clear();
break;
}
#else
(void)this;
(void)flushTarget;
engine.PerformPuts();
#endif
};
};

flush_impl(
flushParams,
Expand Down
2 changes: 2 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,11 @@ overrideFlushTarget(FlushTarget &inplace, FlushTarget new_val)
{
case FlushTarget::Buffer:
case FlushTarget::Disk:
case FlushTarget::NewStep:
return true;
case FlushTarget::Buffer_Override:
case FlushTarget::Disk_Override:
case FlushTarget::NewStep_Override:
return false;
}
return true;
Expand Down
7 changes: 7 additions & 0 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,13 @@ void Series::readFileBased()
Parameter<Operation::OPEN_FILE> fOpen;
Parameter<Operation::READ_ATT> aRead;

// Tell the backend that we are parsing file-based iteration encoding.
// This especially means that READ_RANDOM_ACCESS will be used instead of
// READ_LINEAR, as READ_LINEAR is implemented in the frontend for file-based
// encoding. Don't set the iteration encoding in the frontend yet, will be
// set after reading the iteration encoding attribute from the opened file.
IOHandler()->setIterationEncoding(IterationEncoding::fileBased);

if (!auxiliary::directory_exists(IOHandler()->directory))
throw error::ReadError(
error::AffectedObject::File,
Expand Down
41 changes: 41 additions & 0 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4437,6 +4437,47 @@ BufferChunkSize = 2147483646 # 2^31 - 2
}
#endif

#if openPMD_HAVE_ADIOS2_BP5
TEST_CASE("adios2_flush_via_step")
{
Series write(
"../samples/adios2_flush_via_step/simData_%T.bp5",
Access::CREATE,
R"(adios2.engine.parameters.FlattenSteps = "on")");
std::vector<float> data(10);
for (Iteration::IterationIndex_t i = 0; i < 5; ++i)
{
Iteration it = write.writeIterations()[i];
auto E_x = it.meshes["E"]["x"];
E_x.resetDataset({Datatype::FLOAT, {10, 10}});
for (Extent::value_type j = 0; j < 10; ++j)
{
std::iota(data.begin(), data.end(), i * 100 + j * 10);
E_x.storeChunk(data, {j, 0}, {1, 10});
write.flush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
it.close();
}

#if openPMD_HAS_ADIOS_2_10_1
for (auto access : {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR})
{
Series read("../samples/adios2_flush_via_step/simData_%T.%E", access);
std::vector<float> load_data(100);
data.resize(100);
for (auto iteration : read.readIterations())
{
std::iota(data.begin(), data.end(), iteration.iterationIndex * 100);
iteration.meshes["E"]["x"].loadChunkRaw(
load_data.data(), {0, 0}, {10, 10});
iteration.close();
REQUIRE(load_data == data);
}
}
#endif
}
#endif

TEST_CASE("adios2_engines_and_file_endings")
{
size_t filenameCounter = 0;
Expand Down

0 comments on commit 5a24100

Please sign in to comment.