Skip to content

Commit

Permalink
Add flush_entire_series parameter to Attributable::seriesFlush
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Aug 6, 2024
1 parent ff21bbc commit c125299
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 19 deletions.
1 change: 1 addition & 0 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class Iteration : public Attributable
friend class internal::AttributableData;
template <typename T>
friend T &internal::makeOwning(T &self, Series);
friend class Writable;

public:
Iteration(Iteration const &) = default;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
* Flush the openPMD hierarchy to the backend without flushing any actual
* data yet.
*/
seriesFlush({FlushLevel::SkeletonOnly});
seriesFlush({FlushLevel::SkeletonOnly}, /* flush_entire_series = */ false);

size_t size = 1;
for (auto ext : e)
Expand Down
10 changes: 8 additions & 2 deletions include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,14 @@ class Attributable
* implement this flush call.
* Must be provided in-line, configuration is not read
* from files.
* @param flush_entire_series By default, this method is just a shortcut
* for Series::flush() when you don't currently have a Series handle
* at hand. If however the current object is an Iteration or
* contained within an Iteration, flushing can be restricted to that
* specific Iteration by setting this flag to false.
*/
void seriesFlush(std::string backendConfig = "{}");
void seriesFlush(
std::string backendConfig = "{}", bool flush_entire_series = true);

/** String serialization to describe an Attributable
*
Expand Down Expand Up @@ -330,7 +336,7 @@ OPENPMD_protected
internal::SeriesData *>;
/** @} */

void seriesFlush(internal::FlushParams const &);
void seriesFlush(internal::FlushParams const &, bool flush_entire_series);

void flushAttributes(internal::FlushParams const &);

Expand Down
5 changes: 3 additions & 2 deletions include/openPMD/backend/Writable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ class Writable final
* an object that has no parent, which is the Series object, and flush()-es
* it.
*/
void seriesFlush(std::string backendConfig = "{}");
void seriesFlush(
std::string backendConfig = "{}", bool flush_entire_series = true);

// clang-format off
OPENPMD_private
// clang-format on

void seriesFlush(internal::FlushParams const &);
void seriesFlush(internal::FlushParams const &, bool flush_entire_series);
/*
* These members need to be shared pointers since distinct instances of
* Writable may share them.
Expand Down
10 changes: 6 additions & 4 deletions src/backend/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ Attributable &Attributable::setComment(std::string const &c)
return *this;
}

void Attributable::seriesFlush(std::string backendConfig)
void Attributable::seriesFlush(
std::string backendConfig, bool flush_entire_series)
{
writable().seriesFlush(std::move(backendConfig));
writable().seriesFlush(std::move(backendConfig), flush_entire_series);
}

Series Attributable::retrieveSeries() const
Expand Down Expand Up @@ -240,9 +241,10 @@ auto Attributable::myPath() const -> MyPath
return res;
}

void Attributable::seriesFlush(internal::FlushParams const &flushParams)
void Attributable::seriesFlush(
internal::FlushParams const &flushParams, bool flush_entire_series)
{
writable().seriesFlush(flushParams);
writable().seriesFlush(flushParams, flush_entire_series);
}

void Attributable::flushAttributes(internal::FlushParams const &flushParams)
Expand Down
39 changes: 34 additions & 5 deletions src/backend/Writable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
* If not, see <http://www.gnu.org/licenses/>.
*/
#include "openPMD/backend/Writable.hpp"
#include "openPMD/Error.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
#include <stdexcept>

namespace openPMD
{
Expand All @@ -42,12 +44,14 @@ Writable::~Writable()
IOTask(this, Parameter<Operation::DEREGISTER>(parent)));
}

void Writable::seriesFlush(std::string backendConfig)
void Writable::seriesFlush(std::string backendConfig, bool flush_entire_series)
{
seriesFlush({FlushLevel::UserFlush, std::move(backendConfig)});
seriesFlush(
{FlushLevel::UserFlush, std::move(backendConfig)}, flush_entire_series);
}

void Writable::seriesFlush(internal::FlushParams const &flushParams)
void Writable::seriesFlush(
internal::FlushParams const &flushParams, bool flush_entire_series)
{
Attributable impl;
impl.setData({attributable, [](auto const *) {}});
Expand All @@ -59,8 +63,33 @@ void Writable::seriesFlush(internal::FlushParams const &flushParams)
.setDirtyRecursive(true);
}
auto series = series_internal->asInternalCopyOf<Series>();
series.flush_impl(
series.iterations.begin(), series.iterations.end(), flushParams);
auto [begin, end] = [&, &iteration_internal_lambda = iteration_internal]()
-> std::pair<Series::iterations_iterator, Series::iterations_iterator> {
if (flush_entire_series && iteration_internal_lambda)
{
auto it = series.iterations.begin();
auto end_lambda = series.iterations.end();
for (; it != end_lambda; ++it)
{
if (&it->second.Iteration::get() == *iteration_internal_lambda)
{
auto next = it;
++next;
return {it, next};
}
}
throw std::runtime_error(
"[Writable::seriesFlush()] Found a containing Iteration that "
"seems to not be part of the containing Series?? You might try "
"running this with `flushing_entire_series=false` as a "
"workaround, but something is still wrong.");
}
else
{
return {series.iterations.begin(), series.iterations.end()};
}
}();
series.flush_impl(begin, end, flushParams);
}

} // namespace openPMD
5 changes: 3 additions & 2 deletions src/binding/python/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,9 @@ void init_Attributable(py::module &m)
})
.def(
"series_flush",
py::overload_cast<std::string>(&Attributable::seriesFlush),
py::arg("backend_config") = "{}")
py::overload_cast<std::string, bool>(&Attributable::seriesFlush),
py::arg("backend_config") = "{}",
py::arg("flush_entire_series") = true)

.def_property_readonly(
"attributes",
Expand Down
6 changes: 3 additions & 3 deletions test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,11 +1182,11 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]")
* conflict with the default buffer target that will run in the destructor,
* unless the flush in the next line really is collective.
*/
std::cout << "ENTER" << std::endl;
MPI_Barrier(MPI_COMM_WORLD);
iteration.seriesFlush("adios2.engine.preferred_flush_target = \"disk\"");
iteration.seriesFlush(
"adios2.engine.preferred_flush_target = \"disk\"",
/* flush_entire_series = */ false);
MPI_Barrier(MPI_COMM_WORLD);
std::cout << "LEAVE" << std::endl;
}
#endif

Expand Down

0 comments on commit c125299

Please sign in to comment.