Skip to content

Commit

Permalink
Performance: Flush IOHandler only once, not for each Iteration (#1642)
Browse files Browse the repository at this point in the history
* Some fixes for flushing many Iterations

SerialIOTests fileBased_write_test still failing.

* [wip] Try fixing bugs

* wip

* wip

* is this really the best solution for that..?

* Distinguish more clearly when setWritten needs to run async

* Improve performance of ADIOS2IOHandlerImpl::flush() for many files

* Better documentation for these changes
  • Loading branch information
franzpoeschel authored Jul 11, 2024
1 parent 9184c2e commit bda3544
Show file tree
Hide file tree
Showing 20 changed files with 167 additions and 74 deletions.
15 changes: 10 additions & 5 deletions include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@ class AbstractIOHandlerImpl
* extent of parameters.extent. If possible, the new dataset should be
* extensible. If possible, the new dataset should be divided into chunks
* with size parameters.chunkSize. If possible, the new dataset should be
* compressed according to parameters.compression. This may be
* format-specific. If possible, the new dataset should be transformed
* accoring to parameters.transform. This may be format-specific. The
* Writables file position should correspond to the newly created dataset.
* The Writable should be marked written when the operation completes
* compressed/transformed according to the backend-specific configuration in
* parameters.options. The Writables file position should correspond to the
* newly created dataset. Any pre-existing file position should be ignored,
* the new file position will be based upon the parent object and the newly
* created path. (The old file position might still contain data due to
* reuse of Writable objects across files in file-based encoding.) The
* Writable should be marked written when the operation completes
* successfully.
*/
virtual void
Expand Down Expand Up @@ -400,6 +402,9 @@ class AbstractIOHandlerImpl
virtual void
touch(Writable *, Parameter<Operation::TOUCH> const &param) = 0;

virtual void
setWritten(Writable *, Parameter<Operation::SET_WRITTEN> const &param);

AbstractIOHandler *m_handler;
bool m_verboseIOTasks = false;

Expand Down
24 changes: 23 additions & 1 deletion include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){
ADVANCE,
AVAILABLE_CHUNKS, //!< Query chunks that can be loaded in a dataset
DEREGISTER, //!< Inform the backend that an object has been deleted.
TOUCH //!< tell the backend that the file is to be considered active
TOUCH, //!< tell the backend that the file is to be considered active
SET_WRITTEN //!< tell backend to consider a file written / not written
}; // note: if you change the enum members here, please update
// docs/source/dev/design.rst

Expand Down Expand Up @@ -691,6 +692,27 @@ struct OPENPMDAPI_EXPORT Parameter<Operation::TOUCH> : public AbstractParameter
}
};

template <>
struct OPENPMDAPI_EXPORT Parameter<Operation::SET_WRITTEN>
: public AbstractParameter
{
explicit Parameter() = default;

Parameter(Parameter const &) = default;
Parameter(Parameter &&) = default;

Parameter &operator=(Parameter const &) = default;
Parameter &operator=(Parameter &&) = default;

std::unique_ptr<AbstractParameter> to_heap() && override
{
return std::make_unique<Parameter<Operation::SET_WRITTEN>>(
std::move(*this));
}

bool target_status = false;
};

/** @brief Self-contained description of a single IO operation.
*
* Contained are
Expand Down
18 changes: 15 additions & 3 deletions include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,22 @@ OPENPMD_protected
{
return writable().written;
}
bool &written()
enum class EnqueueAsynchronously : bool
{
return writable().written;
}
Yes,
No
};
/*
* setWritten() will take effect immediately.
* But it might additionally be necessary in some situations to enqueue a
* SET_WRITTEN task to the backend:
* A single flush() operation might encompass different Iterations. In
* file-based Iteration encoding, some objects must be written to every
* single file, thus their `written` flag must be restored to `false` for
* each Iteration. When flushing multiple Iterations at once, this must
* happen as an asynchronous IO task.
*/
void setWritten(bool val, EnqueueAsynchronously);

private:
/**
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/backend/BaseRecord.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ auto BaseRecord<T_elem>::erase(key_type const &key) -> size_type

if (keyScalar)
{
this->written() = false;
this->setWritten(false, Attributable::EnqueueAsynchronously::No);
this->writable().abstractFilePosition.reset();
this->get().m_datasetDefined = false;
}
Expand Down
7 changes: 1 addition & 6 deletions src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1197,12 +1197,7 @@ AdvanceStatus ADIOS2File::advance(AdvanceMode mode)

void ADIOS2File::drop()
{
if (!m_buffer.empty())
{
throw error::Internal(
"ADIOS2 backend: File data for '" + m_file +
"' dropped, but there were enqueued operations.");
}
assert(m_buffer.empty());
}

static std::vector<std::string> availableAttributesOrVariablesPrefixed(
Expand Down
16 changes: 9 additions & 7 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,16 +532,17 @@ ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams)
}
}

for (auto &p : m_fileData)
for (auto const &file : m_dirty)
{
if (m_dirty.find(p.first) != m_dirty.end())
auto file_data = m_fileData.find(file);
if (file_data == m_fileData.end())
{
p.second->flush(adios2FlushParams, /* writeLatePuts = */ false);
}
else
{
p.second->drop();
throw error::Internal(
"[ADIOS2 backend] No associated data found for file'" + *file +
"'.");
}
file_data->second->flush(
adios2FlushParams, /* writeLatePuts = */ false);
}
m_dirty.clear();
return res;
Expand Down Expand Up @@ -750,6 +751,7 @@ void ADIOS2IOHandlerImpl::createDataset(

auto const file =
refreshFileFromParent(writable, /* preferParentFile = */ true);
writable->abstractFilePosition.reset();
auto filePos = setAndGetFilePosition(writable, name);
filePos->gd = GroupOrDataset::DATASET;
auto const varName = nameOfVariable(writable);
Expand Down
14 changes: 14 additions & 0 deletions src/IO/AbstractIOHandlerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,14 @@ std::future<void> AbstractIOHandlerImpl::flush()
touch(i.writable, parameter);
break;
}
case O::SET_WRITTEN: {
auto &parameter = deref_dynamic_cast<Parameter<O::SET_WRITTEN>>(
i.parameter.get());
writeToStderr(
"[", i.writable->parent, "->", i.writable, "] SET_WRITTEN");
setWritten(i.writable, parameter);
break;
}
}
}
catch (...)
Expand Down Expand Up @@ -476,4 +484,10 @@ std::future<void> AbstractIOHandlerImpl::flush()
}
return std::future<void>();
}

void AbstractIOHandlerImpl::setWritten(
Writable *w, Parameter<Operation::SET_WRITTEN> const &param)
{
w->written = param.target_status;
}
} // namespace openPMD
1 change: 1 addition & 0 deletions src/IO/HDF5/HDF5IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ void HDF5IOHandlerImpl::createDataset(
}
#endif

writable->abstractFilePosition.reset();
/* Open H5Object to write into */
File file{};
if (auto opt = getFile(writable->parent); opt.has_value())
Expand Down
1 change: 1 addition & 0 deletions src/IO/JSON/JSONIOHandlerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ void JSONIOHandlerImpl::createDataset(
std::string name = removeSlashes(parameter.name);

auto file = refreshFileFromParent(writable);
writable->abstractFilePosition.reset();
setAndGetFilePosition(writable);
auto &jsonVal = obtainJsonContents(writable);
// be sure to have a JSON object, not a list
Expand Down
13 changes: 8 additions & 5 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ void Iteration::flushFileBased(
/*
* If it was written before, then in the context of another iteration.
*/
s.get().m_rankTable.m_attributable.written() = false;
auto &attr = s.get().m_rankTable.m_attributable;
attr.setWritten(false, Attributable::EnqueueAsynchronously::Yes);
s.get()
.m_rankTable.m_attributable.get()
.m_writable.abstractFilePosition.reset();
Expand Down Expand Up @@ -630,9 +631,9 @@ void Iteration::readMeshes(std::string const &meshesPath)
MeshRecordComponent &mrc = m;
IOHandler()->enqueue(IOTask(&mrc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
mrc.written() = false;
mrc.setWritten(false, Attributable::EnqueueAsynchronously::No);
mrc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
mrc.written() = true;
mrc.setWritten(true, Attributable::EnqueueAsynchronously::No);
try
{
m.read();
Expand Down Expand Up @@ -754,7 +755,8 @@ auto Iteration::beginStep(
access::read(series.IOHandler()->m_frontendAccess))
{
bool previous = series.iterations.written();
series.iterations.written() = false;
series.iterations.setWritten(
false, Attributable::EnqueueAsynchronously::Yes);
auto oldStatus = IOHandl->m_seriesStatus;
IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing;
try
Expand All @@ -770,7 +772,8 @@ auto Iteration::beginStep(
throw;
}
IOHandl->m_seriesStatus = oldStatus;
series.iterations.written() = previous;
series.iterations.setWritten(
previous, Attributable::EnqueueAsynchronously::Yes);
}

res.stepStatus = status;
Expand Down
4 changes: 2 additions & 2 deletions src/Mesh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ void Mesh::read()
dOpen.name = component;
IOHandler()->enqueue(IOTask(&rc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
rc.written() = false;
rc.setWritten(false, Attributable::EnqueueAsynchronously::No);
rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
rc.written() = true;
rc.setWritten(true, Attributable::EnqueueAsynchronously::No);
try
{
rc.read();
Expand Down
4 changes: 2 additions & 2 deletions src/ParticlePatches.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ void ParticlePatches::read()
datatypeToString(*dOpen.dtype) + ")");

/* allow all attributes to be set */
prc.written() = false;
prc.setWritten(false, Attributable::EnqueueAsynchronously::No);
prc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
prc.written() = true;
prc.setWritten(true, Attributable::EnqueueAsynchronously::No);

pr.setDirty(false);
try
Expand Down
4 changes: 2 additions & 2 deletions src/ParticleSpecies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ void ParticleSpecies::read()
RecordComponent &rc = r;
IOHandler()->enqueue(IOTask(&rc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
rc.written() = false;
rc.setWritten(false, Attributable::EnqueueAsynchronously::No);
rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
rc.written() = true;
rc.setWritten(true, Attributable::EnqueueAsynchronously::No);
r.read();
}
catch (error::ReadError const &err)
Expand Down
4 changes: 2 additions & 2 deletions src/Record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ void Record::read()
dOpen.name = component;
IOHandler()->enqueue(IOTask(&rc, dOpen));
IOHandler()->flush(internal::defaultFlushParams);
rc.written() = false;
rc.setWritten(false, Attributable::EnqueueAsynchronously::No);
rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent));
rc.written() = true;
rc.setWritten(true, Attributable::EnqueueAsynchronously::No);
try
{
rc.read(/* require_unit_si = */ true);
Expand Down
8 changes: 4 additions & 4 deletions src/RecordComponent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ void RecordComponent::readBase(bool require_unit_si)

Attribute a(*aRead.resource);
DT dtype = *aRead.dtype;
written() = false;
setWritten(false, Attributable::EnqueueAsynchronously::No);
switchNonVectorType<MakeConstant>(dtype, *this, a);
written() = true;
setWritten(true, Attributable::EnqueueAsynchronously::No);

aRead.name = "shape";
IOHandler()->enqueue(IOTask(this, aRead));
Expand All @@ -426,9 +426,9 @@ void RecordComponent::readBase(bool require_unit_si)
oss.str());
}

written() = false;
setWritten(false, Attributable::EnqueueAsynchronously::No);
resetDataset(Dataset(dtype, e));
written() = true;
setWritten(true, Attributable::EnqueueAsynchronously::No);
}

readAttributes(ReadMode::FullyReread);
Expand Down
Loading

0 comments on commit bda3544

Please sign in to comment.