Skip to content

Commit

Permalink
Have only one instance of SeriesIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Jul 26, 2023
1 parent 0fd6a5a commit d5b2d12
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 37 deletions.
21 changes: 18 additions & 3 deletions include/openPMD/ReadIterations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,20 @@ class SeriesIterator
std::set<Iteration::IterationIndex_t> ignoreIterations;
};

std::shared_ptr<SharedData> m_data;
/*
* The shared data is never empty, emptiness is indicated by std::optional
*/
std::shared_ptr<std::optional<SharedData>> m_data =
std::make_shared<std::optional<SharedData>>(std::nullopt);

SharedData &get()
{
return m_data->value();
}
SharedData const &get() const
{
return m_data->value();
}

public:
//! construct the end() iterator
Expand All @@ -79,7 +92,7 @@ class SeriesIterator
private:
inline bool setCurrentIteration()
{
auto &data = *m_data;
auto &data = get();
if (data.iterationsInCurrentStep.empty())
{
std::cerr << "[ReadIterations] Encountered a step without "
Expand All @@ -94,7 +107,7 @@ class SeriesIterator

inline std::optional<uint64_t> peekCurrentIteration()
{
auto &data = *m_data;
auto &data = get();
if (data.iterationsInCurrentStep.empty())
{
return std::nullopt;
Expand Down Expand Up @@ -124,6 +137,8 @@ class SeriesIterator
void deactivateDeadIteration(iteration_index_t);

void initSeriesInLinearReadMode();

void close();
};

/**
Expand Down
1 change: 1 addition & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ class Series : public Attributable
/**
* @brief Parse the Series.
*
* Only necessary in linear read mode.
* In linear read mode, the Series constructor does not do any IO accesses.
* This call effectively triggers the side effects of
* Series::readIterations(), for use cases where data needs to be accessed
Expand Down
55 changes: 32 additions & 23 deletions src/ReadIterations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "openPMD/Series.hpp"

#include <iostream>
#include <optional>

namespace openPMD
{
Expand Down Expand Up @@ -58,7 +59,7 @@ SeriesIterator::SeriesIterator() = default;

void SeriesIterator::initSeriesInLinearReadMode()
{
auto &data = *m_data;
auto &data = get();
auto &series = *data.series;
series.IOHandler()->m_seriesStatus = internal::SeriesStatus::Parsing;
try
Expand Down Expand Up @@ -103,11 +104,16 @@ void SeriesIterator::initSeriesInLinearReadMode()
series.IOHandler()->m_seriesStatus = internal::SeriesStatus::Default;
}

void SeriesIterator::close()
{
*m_data = std::nullopt; // turn this into end iterator
}

SeriesIterator::SeriesIterator(
Series series_in, std::optional<internal::ParsePreference> parsePreference)
: m_data{std::make_shared<SharedData>()}
: m_data{std::make_shared<std::optional<SharedData>>(std::in_place)}
{
auto &data = *m_data;
auto &data = get();
data.parsePreference = std::move(parsePreference);
data.series = std::move(series_in);
auto &series = data.series.value();
Expand All @@ -120,7 +126,7 @@ SeriesIterator::SeriesIterator(
auto it = series.get().iterations.begin();
if (it == series.get().iterations.end())
{
*this = end();
this->close();
return;
}
else if (
Expand Down Expand Up @@ -212,12 +218,12 @@ SeriesIterator::SeriesIterator(

if (status == AdvanceStatus::OVER)
{
*this = end();
this->close();
return;
}
if (!setCurrentIteration())
{
*this = end();
this->close();
return;
}
it->second.setStepStatus(StepStatus::DuringStep);
Expand All @@ -226,7 +232,7 @@ SeriesIterator::SeriesIterator(

std::optional<SeriesIterator *> SeriesIterator::nextIterationInStep()
{
auto &data = *m_data;
auto &data = get();
using ret_t = std::optional<SeriesIterator *>;

if (data.iterationsInCurrentStep.empty())
Expand Down Expand Up @@ -298,7 +304,7 @@ std::optional<SeriesIterator *> SeriesIterator::nextIterationInStep()

std::optional<SeriesIterator *> SeriesIterator::nextStep(size_t recursion_depth)
{
auto &data = *m_data;
auto &data = get();
// since we are in group-based iteration layout, it does not
// matter which iteration we begin a step upon
AdvanceStatus status{};
Expand Down Expand Up @@ -339,7 +345,7 @@ std::optional<SeriesIterator *> SeriesIterator::nextStep(size_t recursion_depth)
if (status == AdvanceStatus::RANDOMACCESS ||
status == AdvanceStatus::OVER)
{
*this = end();
this->close();
return {this};
}
else
Expand All @@ -366,7 +372,7 @@ std::optional<SeriesIterator *> SeriesIterator::nextStep(size_t recursion_depth)
if (status == AdvanceStatus::RANDOMACCESS ||
status == AdvanceStatus::OVER)
{
*this = end();
this->close();
return {this};
}
else
Expand All @@ -390,7 +396,7 @@ std::optional<SeriesIterator *> SeriesIterator::nextStep(size_t recursion_depth)

if (status == AdvanceStatus::OVER)
{
*this = end();
this->close();
return {this};
}

Expand All @@ -399,7 +405,7 @@ std::optional<SeriesIterator *> SeriesIterator::nextStep(size_t recursion_depth)

std::optional<SeriesIterator *> SeriesIterator::loopBody()
{
auto &data = *m_data;
auto &data = get();
Series &series = data.series.value();
auto &iterations = series.iterations;

Expand Down Expand Up @@ -485,7 +491,7 @@ std::optional<SeriesIterator *> SeriesIterator::loopBody()
if (series.iterationEncoding() == IterationEncoding::fileBased)
{
// this one is handled above, stream is over once it proceeds to here
*this = end();
this->close();
return {this};
}

Expand All @@ -495,7 +501,7 @@ std::optional<SeriesIterator *> SeriesIterator::loopBody()

void SeriesIterator::deactivateDeadIteration(iteration_index_t index)
{
auto &data = *m_data;
auto &data = get();
switch (data.series->iterationEncoding())
{
case IterationEncoding::fileBased: {
Expand All @@ -520,10 +526,10 @@ void SeriesIterator::deactivateDeadIteration(iteration_index_t index)

SeriesIterator &SeriesIterator::operator++()
{
auto &data = *m_data;
auto &data = get();
if (!data.series.has_value())
{
*this = end();
this->close();
return *this;
}
auto oldIterationIndex = data.currentIteration;
Expand Down Expand Up @@ -570,18 +576,20 @@ SeriesIterator &SeriesIterator::operator++()

IndexedIteration SeriesIterator::operator*()
{
auto &data = *m_data;
auto &data = get();
return IndexedIteration(
data.series.value().iterations[data.currentIteration],
data.currentIteration);
}

bool SeriesIterator::operator==(SeriesIterator const &other) const
{
return (this->m_data.operator bool() && other.m_data.operator bool() &&
(this->m_data->currentIteration ==
other.m_data->currentIteration)) ||
(!this->m_data.operator bool() && !other.m_data.operator bool());
return
// either both iterators are filled
(this->m_data->has_value() && other.m_data->has_value() &&
(this->get().currentIteration == other.get().currentIteration)) ||
// or both are empty
(!this->m_data->has_value() && !other.m_data->has_value());
}

bool SeriesIterator::operator!=(SeriesIterator const &other) const
Expand All @@ -600,10 +608,11 @@ ReadIterations::ReadIterations(
std::optional<internal::ParsePreference> parsePreference)
: m_series(std::move(series)), m_parsePreference(std::move(parsePreference))
{
if (access == Access::READ_LINEAR)
auto &data = m_series.get();
if (access == Access::READ_LINEAR && !data.m_sharedStatefulIterator)
{
// Open the iterator now already, so that metadata may already be read
m_series.get().m_sharedStatefulIterator =
data.m_sharedStatefulIterator =
std::make_unique<iterator_t>(m_series, m_parsePreference);
}
}
Expand Down
11 changes: 6 additions & 5 deletions test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1533,8 +1533,9 @@ void append_mode(
++counter;
}
REQUIRE(counter == 8);
// Cannot do listSeries here because the Series is already drained
REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage);
// listSeries will not see any iterations since they have already
// been read
helper::listSeries(read);
}
break;
case ParseMode::AheadOfTimeWithoutSnapshot: {
Expand Down Expand Up @@ -1663,9 +1664,9 @@ void append_mode(
++counter;
}
REQUIRE(counter == 8);
// Cannot do listSeries here because the Series is already
// drained
REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage);
// listSeries will not see any iterations since they have already
// been read
helper::listSeries(read);
}
}
#endif
Expand Down
15 changes: 9 additions & 6 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6901,8 +6901,9 @@ void append_mode(
++counter;
}
REQUIRE(counter == 8);
// Cannot do listSeries here because the Series is already drained
REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage);
// listSeries will not see any iterations since they have already
// been read
helper::listSeries(read);
}
break;
case ParseMode::AheadOfTimeWithoutSnapshot: {
Expand Down Expand Up @@ -6937,7 +6938,9 @@ void append_mode(
* should see both instances when reading.
* Final goal: Read only the last instance.
*/
REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage);
// listSeries will not see any iterations since they have already
// been read
helper::listSeries(read);
}
break;
}
Expand Down Expand Up @@ -7037,9 +7040,9 @@ void append_mode(
++counter;
}
REQUIRE(counter == 8);
// Cannot do listSeries here because the Series is already
// drained
REQUIRE_THROWS_AS(helper::listSeries(read), error::WrongAPIUsage);
// listSeries will not see any iterations since they have already
// been read
helper::listSeries(read);
}
}
#endif
Expand Down

0 comments on commit d5b2d12

Please sign in to comment.