Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Flush IOHandler only once, not for each Iteration #1642

Merged
merged 8 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@ class AbstractIOHandlerImpl
* extent of parameters.extent. If possible, the new dataset should be
* extensible. If possible, the new dataset should be divided into chunks
* with size parameters.chunkSize. If possible, the new dataset should be
* compressed according to parameters.compression. This may be
* format-specific. If possible, the new dataset should be transformed
* accoring to parameters.transform. This may be format-specific. The
* Writables file position should correspond to the newly created dataset.
* The Writable should be marked written when the operation completes
* compressed/transformed according to the backend-specific configuration in
* parameters.options. The Writables file position should correspond to the
* newly created dataset. Any pre-existing file position should be ignored,
* the new file position will be based upon the parent object and the newly
* created path. (The old file position might still contain data due to
* reuse of Writable objects across files in file-based encoding.) The
* Writable should be marked written when the operation completes
* successfully.
*/
virtual void
Expand Down Expand Up @@ -400,6 +402,9 @@ class AbstractIOHandlerImpl
virtual void
touch(Writable *, Parameter<Operation::TOUCH> const &param) = 0;

virtual void
setWritten(Writable *, Parameter<Operation::SET_WRITTEN> const &param);

AbstractIOHandler *m_handler;
bool m_verboseIOTasks = false;

Expand Down
24 changes: 23 additions & 1 deletion include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){
ADVANCE,
AVAILABLE_CHUNKS, //!< Query chunks that can be loaded in a dataset
DEREGISTER, //!< Inform the backend that an object has been deleted.
TOUCH //!< tell the backend that the file is to be considered active
TOUCH, //!< tell the backend that the file is to be considered active
SET_WRITTEN //!< tell backend to consider a file written / not written
}; // note: if you change the enum members here, please update
// docs/source/dev/design.rst

Expand Down Expand Up @@ -691,6 +692,27 @@ struct OPENPMDAPI_EXPORT Parameter<Operation::TOUCH> : public AbstractParameter
}
};

template <>
struct OPENPMDAPI_EXPORT Parameter<Operation::SET_WRITTEN>
: public AbstractParameter
{
explicit Parameter() = default;

Parameter(Parameter const &) = default;
Parameter(Parameter &&) = default;

Parameter &operator=(Parameter const &) = default;
Parameter &operator=(Parameter &&) = default;

std::unique_ptr<AbstractParameter> to_heap() && override
{
return std::make_unique<Parameter<Operation::SET_WRITTEN>>(
std::move(*this));
}

bool target_status = false;
};

/** @brief Self-contained description of a single IO operation.
*
* Contained are
Expand Down
18 changes: 15 additions & 3 deletions include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,22 @@ OPENPMD_protected
{
return writable().written;
}
bool &written()
enum class EnqueueAsynchronously : bool
{
return writable().written;
}
Yes,
No
};
/*
* setWritten() will take effect immediately.
* But it might additionally be necessary in some situations to enqueue a
* SET_WRITTEN task to the backend:
* A single flush() operation might encompass different Iterations. In
* file-based Iteration encoding, some objects must be written to every
* single file, thus their `written` flag must be restored to `false` for
* each Iteration. When flushing multiple Iterations at once, this must
* happen as an asynchronous IO task.
*/
void setWritten(bool val, EnqueueAsynchronously);

private:
/**
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/backend/BaseRecord.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ auto BaseRecord<T_elem>::erase(key_type const &key) -> size_type

if (keyScalar)
{
this->written() = false;
this->setWritten(false, Attributable::EnqueueAsynchronously::No);
this->writable().abstractFilePosition.reset();
this->get().m_datasetDefined = false;
}
Expand Down
7 changes: 1 addition & 6 deletions src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1197,12 +1197,7 @@ AdvanceStatus ADIOS2File::advance(AdvanceMode mode)

void ADIOS2File::drop()
{
if (!m_buffer.empty())
{
throw error::Internal(
"ADIOS2 backend: File data for '" + m_file +
"' dropped, but there were enqueued operations.");
}
assert(m_buffer.empty());
}

static std::vector<std::string> availableAttributesOrVariablesPrefixed(
Expand Down
16 changes: 9 additions & 7 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,16 +532,17 @@ ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams)
}
}

for (auto &p : m_fileData)
for (auto const &file : m_dirty)
{
if (m_dirty.find(p.first) != m_dirty.end())
auto file_data = m_fileData.find(file);
if (file_data == m_fileData.end())
{
p.second->flush(adios2FlushParams, /* writeLatePuts = */ false);
}
else
{
p.second->drop();
throw error::Internal(
"[ADIOS2 backend] No associated data found for file'" + *file +
"'.");
}
file_data->second->flush(
adios2FlushParams, /* writeLatePuts = */ false);
}
m_dirty.clear();
return res;
Expand Down Expand Up @@ -750,6 +751,7 @@ void ADIOS2IOHandlerImpl::createDataset(

auto const file =
refreshFileFromParent(writable, /* preferParentFile = */ true);
writable->abstractFilePosition.reset();
auto filePos = setAndGetFilePosition(writable, name);
filePos->gd = GroupOrDataset::DATASET;
auto const varName = nameOfVariable(writable);
Expand Down
16 changes: 15 additions & 1 deletion src/IO/AbstractIOHandlerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,18 @@ std::future<void> AbstractIOHandlerImpl::flush()
auto &parameter =
deref_dynamic_cast<Parameter<O::TOUCH>>(i.parameter.get());
writeToStderr(
"[", i.writable->parent, "->", i.writable, "] DEREGISTER");
"[", i.writable->parent, "->", i.writable, "] TOUCH");
touch(i.writable, parameter);
break;
}
case O::SET_WRITTEN: {
auto &parameter = deref_dynamic_cast<Parameter<O::SET_WRITTEN>>(
i.parameter.get());
writeToStderr(
"[", i.writable->parent, "->", i.writable, "] SET_WRITTEN");
setWritten(i.writable, parameter);
break;
}
}
}
catch (...)
Expand Down Expand Up @@ -476,4 +484,10 @@ std::future<void> AbstractIOHandlerImpl::flush()
}
return std::future<void>();
}

void AbstractIOHandlerImpl::setWritten(
Writable *w, Parameter<Operation::SET_WRITTEN> const &param)
{
w->written = param.target_status;
}
} // namespace openPMD
1 change: 1 addition & 0 deletions src/IO/HDF5/HDF5IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ void HDF5IOHandlerImpl::createDataset(
}
#endif

writable->abstractFilePosition.reset();
/* Open H5Object to write into */
File file{};
if (auto opt = getFile(writable->parent); opt.has_value())
Expand Down
1 change: 1 addition & 0 deletions src/IO/JSON/JSONIOHandlerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ void JSONIOHandlerImpl::createDataset(
std::string name = removeSlashes(parameter.name);

auto file = refreshFileFromParent(writable);
writable->abstractFilePosition.reset();
setAndGetFilePosition(writable);
auto &jsonVal = obtainJsonContents(writable);
// be sure to have a JSON object, not a list
Expand Down
13 changes: 8 additions & 5 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ void Iteration::flushFileBased(
/*
* If it was written before, then in the context of another iteration.
*/
s.get().m_rankTable.m_attributable.written() = false;
auto &attr = s.get().m_rankTable.m_attributable;
attr.setWritten(false, Attributable::EnqueueAsynchronously::Yes);
s.get()
.m_rankTable.m_attributable.get()
.m_writable.abstractFilePosition.reset();
Expand Down Expand Up @@ -630,9 +631,9 @@ void Iteration::readMeshes(std::string const &meshesPath)
MeshRecordComponent &mrc = m;
IOHandler()->enqueue(IOTask(&mrc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
mrc.written() = false;
mrc.setWritten(false, Attributable::EnqueueAsynchronously::No);
mrc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
mrc.written() = true;
mrc.setWritten(true, Attributable::EnqueueAsynchronously::No);
try
{
m.read();
Expand Down Expand Up @@ -754,7 +755,8 @@ auto Iteration::beginStep(
access::read(series.IOHandler()->m_frontendAccess))
{
bool previous = series.iterations.written();
series.iterations.written() = false;
series.iterations.setWritten(
false, Attributable::EnqueueAsynchronously::Yes);
auto oldStatus = IOHandl->m_seriesStatus;
IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing;
try
Expand All @@ -770,7 +772,8 @@ auto Iteration::beginStep(
throw;
}
IOHandl->m_seriesStatus = oldStatus;
series.iterations.written() = previous;
series.iterations.setWritten(
previous, Attributable::EnqueueAsynchronously::Yes);
}

res.stepStatus = status;
Expand Down
4 changes: 2 additions & 2 deletions src/Mesh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ void Mesh::read()
dOpen.name = component;
IOHandler()->enqueue(IOTask(&rc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
rc.written() = false;
rc.setWritten(false, Attributable::EnqueueAsynchronously::No);
rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
rc.written() = true;
rc.setWritten(true, Attributable::EnqueueAsynchronously::No);
try
{
rc.read();
Expand Down
4 changes: 2 additions & 2 deletions src/ParticlePatches.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ void ParticlePatches::read()
datatypeToString(*dOpen.dtype) + ")");

/* allow all attributes to be set */
prc.written() = false;
prc.setWritten(false, Attributable::EnqueueAsynchronously::No);
prc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
prc.written() = true;
prc.setWritten(true, Attributable::EnqueueAsynchronously::No);

pr.setDirty(false);
try
Expand Down
4 changes: 2 additions & 2 deletions src/ParticleSpecies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ void ParticleSpecies::read()
RecordComponent &rc = r;
IOHandler()->enqueue(IOTask(&rc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
rc.written() = false;
rc.setWritten(false, Attributable::EnqueueAsynchronously::No);
rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
rc.written() = true;
rc.setWritten(true, Attributable::EnqueueAsynchronously::No);
r.read();
}
catch (error::ReadError const &err)
Expand Down
4 changes: 2 additions & 2 deletions src/Record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ void Record::read()
dOpen.name = component;
IOHandler()->enqueue(IOTask(&rc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
rc.written() = false;
rc.setWritten(false, Attributable::EnqueueAsynchronously::No);
rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
rc.written() = true;
rc.setWritten(true, Attributable::EnqueueAsynchronously::No);
try
{
rc.read(/* require_unit_si = */ true);
Expand Down
8 changes: 4 additions & 4 deletions src/RecordComponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ void RecordComponent::readBase(bool require_unit_si)

Attribute a(*aRead.resource);
DT dtype = *aRead.dtype;
written() = false;
setWritten(false, Attributable::EnqueueAsynchronously::No);
switchNonVectorType<MakeConstant>(dtype, *this, a);
written() = true;
setWritten(true, Attributable::EnqueueAsynchronously::No);

aRead.name = "shape";
IOHandler()->enqueue(IOTask(this, aRead));
Expand All @@ -426,9 +426,9 @@ void RecordComponent::readBase(bool require_unit_si)
oss.str());
}

written() = false;
setWritten(false, Attributable::EnqueueAsynchronously::No);
resetDataset(Dataset(dtype, e));
written() = true;
setWritten(true, Attributable::EnqueueAsynchronously::No);
}

readAttributes(ReadMode::FullyReread);
Expand Down
Loading
Loading