Skip to content

Commit

Permalink
Workaround for independent writes to Iterations in parallel, better d…
Browse files Browse the repository at this point in the history
…etection of BP5 which in turn uncovers more instances of the first issue (#1619)

* retrieveIteration: return both Series and Iteration

* Optimize implementation

* seriesFlush(): Mark containing Iteration as dirty

* Add failing test

* Backend implementation

* Add documentation

* Add ADIOS2 v2.10 define and use that for BP5 check

* Ask the engine if it is BP5 for BP5-specific features

* write_test_zero_extent: require flush to buffer

Somehow PerformDataWrite() leads to trouble with this pattern.

* Revert "write_test_zero_extent: require flush to buffer"

This reverts commit 36597bd.
No longer needed after rebasing on fix-iteration-flush

* Fix hipace_like_write test

It used Series::flush non-collectively

* Also ensure all ranks flush in group/variable encoding
  • Loading branch information
franzpoeschel authored Jun 11, 2024
1 parent 51caab3 commit 5d9fb34
Show file tree
Hide file tree
Showing 23 changed files with 285 additions and 61 deletions.
6 changes: 6 additions & 0 deletions docs/source/details/mpi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ Functionality Behavior Description
.. [4] We usually open iterations delayed on first access. This first access is usually the ``flush()`` call after a ``storeChunk``/``loadChunk`` operation. If the first access is non-collective, an explicit, collective ``Iteration::open()`` can be used to have the files already open.
Alternatively, iterations might be accessed for the first time by immediate operations such as ``::availableChunks()``.
.. warning::

The openPMD-api will by default flush only those Iterations which are dirty, i.e. have been written to.
This is somewhat unfortunate in parallel setups since only the dirty status of the current MPI rank can be considered.
As a workaround, use ``Attributable::seriesFlush()`` on an Iteration (or an object contained within an Iteration) to force flush that Iteration regardless of its dirty status.

.. tip::

Just because an operation is independent does not mean it is allowed to be inconsistent.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/dev/design.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Therefore, enabling users to handle hierarchical, self-describing file formats w

.. literalinclude:: IOTask.hpp
:language: cpp
:lines: 48-78
:lines: 50-81

Every task is designed to be a fully self-contained description of one such atomic operation. By describing a required minimal step of work (without any side-effect), these operations are the foundation of the unified handling mechanism across suitable file formats.
The actual low-level exchange of data is implemented in ``IOHandlers``, one per file format (possibly two if handlingi MPI-parallel work is possible and requires different behaviour).
Expand Down
2 changes: 2 additions & 0 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class ADIOS2IOHandlerImpl
void
deregister(Writable *, Parameter<Operation::DEREGISTER> const &) override;

void touch(Writable *, Parameter<Operation::TOUCH> const &) override;

/**
* @brief The ADIOS2 access type to chose for Engines opened
* within this instance.
Expand Down
5 changes: 4 additions & 1 deletion include/openPMD/IO/ADIOS/macros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
#define openPMD_HAS_ADIOS_2_9 \
(ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 209)

#if defined(ADIOS2_HAVE_BP5) || openPMD_HAS_ADIOS_2_9
#define openPMD_HAS_ADIOS_2_10 \
(ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 210)

#if defined(ADIOS2_HAVE_BP5) || openPMD_HAS_ADIOS_2_10
// ADIOS2 v2.10 no longer defines this
#define openPMD_HAVE_ADIOS2_BP5 1
#else
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ class AbstractIOHandlerImpl
virtual void
deregister(Writable *, Parameter<Operation::DEREGISTER> const &param) = 0;

/** Treat this writable's file as open/active/dirty.
*/
virtual void
touch(Writable *, Parameter<Operation::TOUCH> const &param) = 0;

AbstractIOHandler *m_handler;
bool m_verboseIOTasks = false;

Expand Down
1 change: 1 addition & 0 deletions include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class HDF5IOHandlerImpl : public AbstractIOHandlerImpl
void listAttributes(Writable *, Parameter<Operation::LIST_ATTS> &) override;
void
deregister(Writable *, Parameter<Operation::DEREGISTER> const &) override;
void touch(Writable *, Parameter<Operation::TOUCH> const &) override;

std::unordered_map<Writable *, std::string> m_fileNames;
std::unordered_map<std::string, hid_t> m_fileNamesWithID;
Expand Down
45 changes: 39 additions & 6 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,36 @@ Writable *getWritable(Attributable *);
/** Type of IO operation between logical and persistent data.
*/
OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){
CREATE_FILE, CHECK_FILE, OPEN_FILE, CLOSE_FILE,
CREATE_FILE,
CHECK_FILE,
OPEN_FILE,
CLOSE_FILE,
DELETE_FILE,

CREATE_PATH, CLOSE_PATH, OPEN_PATH, DELETE_PATH,
CREATE_PATH,
CLOSE_PATH,
OPEN_PATH,
DELETE_PATH,
LIST_PATHS,

CREATE_DATASET, EXTEND_DATASET, OPEN_DATASET, DELETE_DATASET,
WRITE_DATASET, READ_DATASET, LIST_DATASETS, GET_BUFFER_VIEW,
CREATE_DATASET,
EXTEND_DATASET,
OPEN_DATASET,
DELETE_DATASET,
WRITE_DATASET,
READ_DATASET,
LIST_DATASETS,
GET_BUFFER_VIEW,

DELETE_ATT, WRITE_ATT, READ_ATT, LIST_ATTS,
DELETE_ATT,
WRITE_ATT,
READ_ATT,
LIST_ATTS,

ADVANCE,
AVAILABLE_CHUNKS, //!< Query chunks that can be loaded in a dataset
DEREGISTER //!< Inform the backend that an object has been deleted.
DEREGISTER, //!< Inform the backend that an object has been deleted.
TOUCH //!< tell the backend that the file is to be considered active
}; // note: if you change the enum members here, please update
// docs/source/dev/design.rst

Expand Down Expand Up @@ -658,6 +674,23 @@ struct OPENPMDAPI_EXPORT Parameter<Operation::DEREGISTER>
void const *former_parent = nullptr;
};

template <>
struct OPENPMDAPI_EXPORT Parameter<Operation::TOUCH> : public AbstractParameter
{
explicit Parameter() = default;

Parameter(Parameter const &) = default;
Parameter(Parameter &&) = default;

Parameter &operator=(Parameter const &) = default;
Parameter &operator=(Parameter &&) = default;

std::unique_ptr<AbstractParameter> to_heap() && override
{
return std::make_unique<Parameter<Operation::TOUCH>>(std::move(*this));
}
};

/** @brief Self-contained description of a single IO operation.
*
* Contained are
Expand Down
2 changes: 2 additions & 0 deletions include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl
void
deregister(Writable *, Parameter<Operation::DEREGISTER> const &) override;

void touch(Writable *, Parameter<Operation::TOUCH> const &) override;

std::future<void> flush();

private:
Expand Down
1 change: 1 addition & 0 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class Iteration : public Attributable
friend class Series;
friend class WriteIterations;
friend class SeriesIterator;
friend class internal::AttributableData;

public:
Iteration(Iteration const &) = default;
Expand Down
1 change: 1 addition & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class Series : public Attributable
friend class ReadIterations;
friend class SeriesIterator;
friend class internal::SeriesData;
friend class internal::AttributableData;
friend class WriteIterations;

public:
Expand Down
50 changes: 48 additions & 2 deletions include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class Series;

namespace internal
{
class IterationData;
class SeriesData;

class AttributableData
{
friend class openPMD::Attributable;
Expand All @@ -74,6 +77,42 @@ namespace internal
*/
Writable m_writable;

template <typename T>
T asInternalCopyOf()
{
auto *self = dynamic_cast<typename T::Data_t *>(this);
if (!self)
{
if constexpr (std::is_same_v<Series, T>)
{
throw std::runtime_error(
"[Attributable::retrieveSeries] Error when trying to "
"retrieve the Series object. Note: An instance of the "
"Series object must still exist when flushing. A "
"common cause for this error is using a flush call on "
"a handle (e.g. `Iteration::seriesFlush()`) when the "
"original Series object has already gone out of "
"scope.");
}
else
{
throw std::runtime_error(

"[AttributableData::asInternalCopyOf<T>] Error when "
"trying to retrieve a containing object. Note: An "
"instance of the Series object must still exist when "
"flushing. A common cause for this error is using a "
"flush call on a handle (e.g. "
"`Iteration::seriesFlush()`) when the original Series "
"object has already gone out of scope.");
}
}
T res;
res.setData(
std::shared_ptr<typename T::Data_t>(self, [](auto const *) {}));
return res;
}

private:
/**
* The attributes defined by this Attributable.
Expand Down Expand Up @@ -207,6 +246,8 @@ class Attributable
* of parents. This method will walk up the parent list until it reaches
* an object that has no parent, which is the Series object, and flush()-es
* it.
* If the Attributable is an Iteration or any object contained in an
* Iteration, that Iteration will be flushed regardless of its dirty status.
*
* @param backendConfig Further backend-specific instructions on how to
* implement this flush call.
Expand Down Expand Up @@ -263,8 +304,13 @@ OPENPMD_protected
* Throws an error otherwise, e.g., for Series objects.
* @{
*/
Iteration const &containingIteration() const;
Iteration &containingIteration();
[[nodiscard]] auto containingIteration() const
-> std::pair<
std::optional<internal::IterationData const *>,
internal::SeriesData const *>;
auto containingIteration() -> std::pair<
std::optional<internal::IterationData *>,
internal::SeriesData *>;
/** @} */

void seriesFlush(internal::FlushParams const &);
Expand Down
10 changes: 9 additions & 1 deletion src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "openPMD/Error.hpp"
#include "openPMD/IO/ADIOS/ADIOS2IOHandler.hpp"
#include "openPMD/auxiliary/Environment.hpp"
#include "openPMD/auxiliary/StringManip.hpp"

#if openPMD_USE_VERIFY
#define VERIFY(CONDITION, TEXT) \
Expand Down Expand Up @@ -1044,7 +1045,14 @@ void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts)
performDataWrite = false;
break;
}
performDataWrite = performDataWrite && m_engineType == "bp5";
performDataWrite = performDataWrite &&
(m_engineType == "bp5" ||
/* this second check should be sufficient, but we leave the
first check in as a safeguard against renamings in ADIOS2.
Also do a lowerCase transform since the docstring of
`Engine::Type()` claims that the return value is in
lowercase, but for BP5 this does not seem true. */
auxiliary::lowerCase(engine.Type()) == "bp5writer");

if (performDataWrite)
{
Expand Down
8 changes: 8 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ void ADIOS2IOHandlerImpl::openFile(
// lazy opening is deathly in parallel situations
auto &fileData = getFileData(file, IfFileNotOpen::OpenImplicitly);
*parameters.out_parsePreference = fileData.parsePreference;
m_dirty.emplace(std::move(file));
}

void ADIOS2IOHandlerImpl::closeFile(
Expand Down Expand Up @@ -1482,6 +1483,13 @@ void ADIOS2IOHandlerImpl::deregister(
m_files.erase(writable);
}

void ADIOS2IOHandlerImpl::touch(
Writable *writable, Parameter<Operation::TOUCH> const &)
{
auto file = refreshFileFromParent(writable, /* preferParentFile = */ false);
m_dirty.emplace(std::move(file));
}

adios2::Mode ADIOS2IOHandlerImpl::adios2AccessMode(std::string const &fullPath)
{
switch (m_handler->m_backendAccess)
Expand Down
8 changes: 8 additions & 0 deletions src/IO/AbstractIOHandlerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ std::future<void> AbstractIOHandlerImpl::flush()
deregister(i.writable, parameter);
break;
}
case O::TOUCH: {
auto &parameter =
deref_dynamic_cast<Parameter<O::TOUCH>>(i.parameter.get());
writeToStderr(
"[", i.writable->parent, "->", i.writable, "] DEREGISTER");
touch(i.writable, parameter);
break;
}
}
}
catch (...)
Expand Down
5 changes: 5 additions & 0 deletions src/IO/HDF5/HDF5IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2910,6 +2910,11 @@ void HDF5IOHandlerImpl::deregister(
m_fileNames.erase(writable);
}

void HDF5IOHandlerImpl::touch(Writable *, Parameter<Operation::TOUCH> const &)
{
// no-op
}

std::optional<HDF5IOHandlerImpl::File>
HDF5IOHandlerImpl::getFile(Writable *writable)
{
Expand Down
7 changes: 7 additions & 0 deletions src/IO/JSON/JSONIOHandlerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,13 @@ void JSONIOHandlerImpl::deregister(
m_files.erase(writable);
}

void JSONIOHandlerImpl::touch(
Writable *writable, Parameter<Operation::TOUCH> const &)
{
auto file = refreshFileFromParent(writable);
m_dirty.emplace(std::move(file));
}

auto JSONIOHandlerImpl::getFilehandle(File const &fileName, Access access)
-> std::tuple<std::unique_ptr<FILEHANDLE>, std::istream *, std::ostream *>
{
Expand Down
3 changes: 3 additions & 0 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "openPMD/Dataset.hpp"
#include "openPMD/Datatype.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/IOTask.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
#include "openPMD/auxiliary/Filesystem.hpp"
Expand Down Expand Up @@ -315,6 +316,8 @@ void Iteration::flushVariableBased(

void Iteration::flush(internal::FlushParams const &flushParams)
{
Parameter<Operation::TOUCH> touch;
IOHandler()->enqueue(IOTask(&writable(), touch));
if (access::readOnly(IOHandler()->m_frontendAccess))
{
for (auto &m : meshes)
Expand Down
5 changes: 5 additions & 0 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,7 @@ void Series::flushGorVBased(
bool flushIOHandler)
{
auto &series = get();

if (access::readOnly(IOHandler()->m_frontendAccess))
{
for (auto it = begin; it != end; ++it)
Expand Down Expand Up @@ -1432,6 +1433,8 @@ void Series::flushGorVBased(
}

// Phase 3
Parameter<Operation::TOUCH> touch;
IOHandler()->enqueue(IOTask(&writable(), touch));
if (flushIOHandler)
{
IOHandler()->flush(flushParams);
Expand Down Expand Up @@ -1510,6 +1513,8 @@ void Series::flushGorVBased(
}

flushAttributes(flushParams);
Parameter<Operation::TOUCH> touch;
IOHandler()->enqueue(IOTask(&writable(), touch));
if (flushIOHandler)
{
IOHandler()->flush(flushParams);
Expand Down
Loading

0 comments on commit 5d9fb34

Please sign in to comment.