diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index 81b0a0d816..10b0fd0c97 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -150,11 +150,13 @@ class AbstractIOHandlerImpl * extent of parameters.extent. If possible, the new dataset should be * extensible. If possible, the new dataset should be divided into chunks * with size parameters.chunkSize. If possible, the new dataset should be - * compressed according to parameters.compression. This may be - * format-specific. If possible, the new dataset should be transformed - * accoring to parameters.transform. This may be format-specific. The - * Writables file position should correspond to the newly created dataset. - * The Writable should be marked written when the operation completes + * compressed/transformed according to the backend-specific configuration in + * parameters.options. The Writables file position should correspond to the + * newly created dataset. Any pre-existing file position should be ignored, + * the new file position will be based upon the parent object and the newly + * created path. (The old file position might still contain data due to + * reuse of Writable objects across files in file-based encoding.) The + * Writable should be marked written when the operation completes * successfully. */ virtual void @@ -400,6 +402,9 @@ class AbstractIOHandlerImpl virtual void touch(Writable *, Parameter const ¶m) = 0; + virtual void + setWritten(Writable *, Parameter const ¶m); + AbstractIOHandler *m_handler; bool m_verboseIOTasks = false; diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 76839cd35b..731372f9e1 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -77,7 +77,8 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){ ADVANCE, AVAILABLE_CHUNKS, //!< Query chunks that can be loaded in a dataset DEREGISTER, //!< Inform the backend that an object has been deleted. - TOUCH //!< tell the backend that the file is to be considered active + TOUCH, //!< tell the backend that the file is to be considered active + SET_WRITTEN //!< tell backend to consider a file written / not written }; // note: if you change the enum members here, please update // docs/source/dev/design.rst @@ -691,6 +692,27 @@ struct OPENPMDAPI_EXPORT Parameter : public AbstractParameter } }; +template <> +struct OPENPMDAPI_EXPORT Parameter + : public AbstractParameter +{ + explicit Parameter() = default; + + Parameter(Parameter const &) = default; + Parameter(Parameter &&) = default; + + Parameter &operator=(Parameter const &) = default; + Parameter &operator=(Parameter &&) = default; + + std::unique_ptr to_heap() && override + { + return std::make_unique>( + std::move(*this)); + } + + bool target_status = false; +}; + /** @brief Self-contained description of a single IO operation. * * Contained are diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index 6dcb9dc918..d8255b6e7b 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -481,10 +481,22 @@ OPENPMD_protected { return writable().written; } - bool &written() + enum class EnqueueAsynchronously : bool { - return writable().written; - } + Yes, + No + }; + /* + * setWritten() will take effect immediately. + * But it might additionally be necessary in some situations to enqueue a + * SET_WRITTEN task to the backend: + * A single flush() operation might encompass different Iterations. In + * file-based Iteration encoding, some objects must be written to every + * single file, thus their `written` flag must be restored to `false` for + * each Iteration. When flushing multiple Iterations at once, this must + * happen as an asynchronous IO task. + */ + void setWritten(bool val, EnqueueAsynchronously); private: /** diff --git a/include/openPMD/backend/BaseRecord.hpp b/include/openPMD/backend/BaseRecord.hpp index db69e03835..6d17ae4eae 100644 --- a/include/openPMD/backend/BaseRecord.hpp +++ b/include/openPMD/backend/BaseRecord.hpp @@ -645,7 +645,7 @@ auto BaseRecord::erase(key_type const &key) -> size_type if (keyScalar) { - this->written() = false; + this->setWritten(false, Attributable::EnqueueAsynchronously::No); this->writable().abstractFilePosition.reset(); this->get().m_datasetDefined = false; } diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index e19698465b..447f358f48 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -1197,12 +1197,7 @@ AdvanceStatus ADIOS2File::advance(AdvanceMode mode) void ADIOS2File::drop() { - if (!m_buffer.empty()) - { - throw error::Internal( - "ADIOS2 backend: File data for '" + m_file + - "' dropped, but there were enqueued operations."); - } + assert(m_buffer.empty()); } static std::vector availableAttributesOrVariablesPrefixed( diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 0dc128340b..72adfded17 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -532,16 +532,17 @@ ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams) } } - for (auto &p : m_fileData) + for (auto const &file : m_dirty) { - if (m_dirty.find(p.first) != m_dirty.end()) + auto file_data = m_fileData.find(file); + if (file_data == m_fileData.end()) { - p.second->flush(adios2FlushParams, /* writeLatePuts = */ false); - } - else - { - p.second->drop(); + throw error::Internal( + "[ADIOS2 backend] No associated data found for file'" + *file + + "'."); } + file_data->second->flush( + adios2FlushParams, /* writeLatePuts = */ false); } m_dirty.clear(); return res; @@ -750,6 +751,7 @@ void ADIOS2IOHandlerImpl::createDataset( auto const file = refreshFileFromParent(writable, /* preferParentFile = */ true); + writable->abstractFilePosition.reset(); auto filePos = setAndGetFilePosition(writable, name); filePos->gd = GroupOrDataset::DATASET; auto const varName = nameOfVariable(writable); diff --git a/src/IO/AbstractIOHandlerImpl.cpp b/src/IO/AbstractIOHandlerImpl.cpp index fe01fea649..8993816f48 100644 --- a/src/IO/AbstractIOHandlerImpl.cpp +++ b/src/IO/AbstractIOHandlerImpl.cpp @@ -425,10 +425,18 @@ std::future AbstractIOHandlerImpl::flush() auto ¶meter = deref_dynamic_cast>(i.parameter.get()); writeToStderr( - "[", i.writable->parent, "->", i.writable, "] DEREGISTER"); + "[", i.writable->parent, "->", i.writable, "] TOUCH"); touch(i.writable, parameter); break; } + case O::SET_WRITTEN: { + auto ¶meter = deref_dynamic_cast>( + i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] SET_WRITTEN"); + setWritten(i.writable, parameter); + break; + } } } catch (...) @@ -476,4 +484,10 @@ std::future AbstractIOHandlerImpl::flush() } return std::future(); } + +void AbstractIOHandlerImpl::setWritten( + Writable *w, Parameter const ¶m) +{ + w->written = param.target_status; +} } // namespace openPMD diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index a091fd004f..a2b804d564 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -617,6 +617,7 @@ void HDF5IOHandlerImpl::createDataset( } #endif + writable->abstractFilePosition.reset(); /* Open H5Object to write into */ File file{}; if (auto opt = getFile(writable->parent); opt.has_value()) diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index 61b732ed0a..a92d068415 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -292,6 +292,7 @@ void JSONIOHandlerImpl::createDataset( std::string name = removeSlashes(parameter.name); auto file = refreshFileFromParent(writable); + writable->abstractFilePosition.reset(); setAndGetFilePosition(writable); auto &jsonVal = obtainJsonContents(writable); // be sure to have a JSON object, not a list diff --git a/src/Iteration.cpp b/src/Iteration.cpp index 7268ae7153..366fea0de1 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -213,7 +213,8 @@ void Iteration::flushFileBased( /* * If it was written before, then in the context of another iteration. */ - s.get().m_rankTable.m_attributable.written() = false; + auto &attr = s.get().m_rankTable.m_attributable; + attr.setWritten(false, Attributable::EnqueueAsynchronously::Yes); s.get() .m_rankTable.m_attributable.get() .m_writable.abstractFilePosition.reset(); @@ -630,9 +631,9 @@ void Iteration::readMeshes(std::string const &meshesPath) MeshRecordComponent &mrc = m; IOHandler()->enqueue(IOTask(&mrc, dOpen)); IOHandler()->flush(internal::defaultFlushParams); - mrc.written() = false; + mrc.setWritten(false, Attributable::EnqueueAsynchronously::No); mrc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent)); - mrc.written() = true; + mrc.setWritten(true, Attributable::EnqueueAsynchronously::No); try { m.read(); @@ -754,7 +755,8 @@ auto Iteration::beginStep( access::read(series.IOHandler()->m_frontendAccess)) { bool previous = series.iterations.written(); - series.iterations.written() = false; + series.iterations.setWritten( + false, Attributable::EnqueueAsynchronously::Yes); auto oldStatus = IOHandl->m_seriesStatus; IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing; try @@ -770,7 +772,8 @@ auto Iteration::beginStep( throw; } IOHandl->m_seriesStatus = oldStatus; - series.iterations.written() = previous; + series.iterations.setWritten( + previous, Attributable::EnqueueAsynchronously::Yes); } res.stepStatus = status; diff --git a/src/Mesh.cpp b/src/Mesh.cpp index 2730916ab1..f977bbe905 100644 --- a/src/Mesh.cpp +++ b/src/Mesh.cpp @@ -438,9 +438,9 @@ void Mesh::read() dOpen.name = component; IOHandler()->enqueue(IOTask(&rc, dOpen)); IOHandler()->flush(internal::defaultFlushParams); - rc.written() = false; + rc.setWritten(false, Attributable::EnqueueAsynchronously::No); rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent)); - rc.written() = true; + rc.setWritten(true, Attributable::EnqueueAsynchronously::No); try { rc.read(); diff --git a/src/ParticlePatches.cpp b/src/ParticlePatches.cpp index 9b110e48a4..491add8be7 100644 --- a/src/ParticlePatches.cpp +++ b/src/ParticlePatches.cpp @@ -93,9 +93,9 @@ void ParticlePatches::read() datatypeToString(*dOpen.dtype) + ")"); /* allow all attributes to be set */ - prc.written() = false; + prc.setWritten(false, Attributable::EnqueueAsynchronously::No); prc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent)); - prc.written() = true; + prc.setWritten(true, Attributable::EnqueueAsynchronously::No); pr.setDirty(false); try diff --git a/src/ParticleSpecies.cpp b/src/ParticleSpecies.cpp index 37644a14e1..4006cc82ba 100644 --- a/src/ParticleSpecies.cpp +++ b/src/ParticleSpecies.cpp @@ -124,9 +124,9 @@ void ParticleSpecies::read() RecordComponent &rc = r; IOHandler()->enqueue(IOTask(&rc, dOpen)); IOHandler()->flush(internal::defaultFlushParams); - rc.written() = false; + rc.setWritten(false, Attributable::EnqueueAsynchronously::No); rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent)); - rc.written() = true; + rc.setWritten(true, Attributable::EnqueueAsynchronously::No); r.read(); } catch (error::ReadError const &err) diff --git a/src/Record.cpp b/src/Record.cpp index ee377960de..3bcac4d7e1 100644 --- a/src/Record.cpp +++ b/src/Record.cpp @@ -150,9 +150,9 @@ void Record::read() dOpen.name = component; IOHandler()->enqueue(IOTask(&rc, dOpen)); IOHandler()->flush(internal::defaultFlushParams); - rc.written() = false; + rc.setWritten(false, Attributable::EnqueueAsynchronously::No); rc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent)); - rc.written() = true; + rc.setWritten(true, Attributable::EnqueueAsynchronously::No); try { rc.read(/* require_unit_si = */ true); diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp index 0011363bc4..0387268514 100644 --- a/src/RecordComponent.cpp +++ b/src/RecordComponent.cpp @@ -399,9 +399,9 @@ void RecordComponent::readBase(bool require_unit_si) Attribute a(*aRead.resource); DT dtype = *aRead.dtype; - written() = false; + setWritten(false, Attributable::EnqueueAsynchronously::No); switchNonVectorType(dtype, *this, a); - written() = true; + setWritten(true, Attributable::EnqueueAsynchronously::No); aRead.name = "shape"; IOHandler()->enqueue(IOTask(this, aRead)); @@ -426,9 +426,9 @@ void RecordComponent::readBase(bool require_unit_si) oss.str()); } - written() = false; + setWritten(false, Attributable::EnqueueAsynchronously::No); resetDataset(Dataset(dtype, e)); - written() = true; + setWritten(true, Attributable::EnqueueAsynchronously::No); } readAttributes(ReadMode::FullyReread); diff --git a/src/Series.cpp b/src/Series.cpp index 09921566bf..fbd42d6eb4 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -1138,12 +1138,12 @@ Given file pattern: ')END" { /* Access::READ_WRITE can be used to create a new Series * allow setting attributes in that case */ - written() = false; + setWritten(false, Attributable::EnqueueAsynchronously::No); initDefaults(input->iterationEncoding); setIterationEncoding(input->iterationEncoding); - written() = true; + setWritten(true, Attributable::EnqueueAsynchronously::No); } } catch (...) @@ -1321,12 +1321,12 @@ void Series::flushFileBased( it->second.get().m_closed = internal::CloseStatus::ClosedInBackend; } + } - // Phase 3 - if (flushIOHandler) - { - IOHandler()->flush(flushParams); - } + // Phase 3 + if (flushIOHandler) + { + IOHandler()->flush(flushParams); } break; case Access::READ_WRITE: @@ -1344,10 +1344,12 @@ void Series::flushFileBased( * emulate the file belonging to each iteration as not yet * written, even if the iteration itself is already written * (to ensure that the Series gets reassociated with the - * current iteration) + * current iteration by the backend) */ - written() = false; - series.iterations.written() = false; + this->setWritten( + false, Attributable::EnqueueAsynchronously::Yes); + series.iterations.setWritten( + false, Attributable::EnqueueAsynchronously::Yes); setDirty(dirty() || it->second.dirty()); std::string filename = iterationFilename(it->first); @@ -1379,18 +1381,18 @@ void Series::flushFileBased( it->second.get().m_closed = internal::CloseStatus::ClosedInBackend; } - - // Phase 3 - if (flushIOHandler) - { - IOHandler()->flush(flushParams); - } /* reset the dirty bit for every iteration (i.e. file) * otherwise only the first iteration will have updates attributes */ setDirty(allDirty); } setDirty(false); + + // Phase 3 + if (flushIOHandler) + { + IOHandler()->flush(flushParams); + } break; } } @@ -1431,14 +1433,14 @@ void Series::flushGorVBased( it->second.get().m_closed = internal::CloseStatus::ClosedInBackend; } + } - // Phase 3 - Parameter touch; - IOHandler()->enqueue(IOTask(&writable(), touch)); - if (flushIOHandler) - { - IOHandler()->flush(flushParams); - } + // Phase 3 + Parameter touch; + IOHandler()->enqueue(IOTask(&writable(), touch)); + if (flushIOHandler) + { + IOHandler()->flush(flushParams); } } else @@ -1798,9 +1800,9 @@ void Series::readOneIterationFileBased(std::string const &filePath) IOHandler()->flush(internal::defaultFlushParams); if (*aRead.dtype == DT::STRING) { - written() = false; + setWritten(false, Attributable::EnqueueAsynchronously::No); setIterationFormat(Attribute(*aRead.resource).get()); - written() = true; + setWritten(true, Attributable::EnqueueAsynchronously::No); } else throw error::ReadError( @@ -1949,9 +1951,9 @@ creating new iterations. IOHandler()->flush(internal::defaultFlushParams); if (*aRead.dtype == DT::STRING) { - written() = false; + setWritten(false, Attributable::EnqueueAsynchronously::No); setIterationFormat(Attribute(*aRead.resource).get()); - written() = true; + setWritten(true, Attributable::EnqueueAsynchronously::No); } else throw error::ReadError( @@ -2216,12 +2218,14 @@ void Series::readBase() { /* allow setting the meshes path after completed IO */ for (auto &it : series.iterations) - it.second.meshes.written() = false; + it.second.meshes.setWritten( + false, Attributable::EnqueueAsynchronously::No); setMeshesPath(val.value()); for (auto &it : series.iterations) - it.second.meshes.written() = true; + it.second.meshes.setWritten( + true, Attributable::EnqueueAsynchronously::No); } else throw error::ReadError( @@ -2246,12 +2250,14 @@ void Series::readBase() { /* allow setting the meshes path after completed IO */ for (auto &it : series.iterations) - it.second.particles.written() = false; + it.second.particles.setWritten( + false, Attributable::EnqueueAsynchronously::No); setParticlesPath(val.value()); for (auto &it : series.iterations) - it.second.particles.written() = true; + it.second.particles.setWritten( + true, Attributable::EnqueueAsynchronously::No); } else throw error::ReadError( diff --git a/src/auxiliary/JSON.cpp b/src/auxiliary/JSON.cpp index 1472e96558..e089c2dc22 100644 --- a/src/auxiliary/JSON.cpp +++ b/src/auxiliary/JSON.cpp @@ -26,6 +26,7 @@ #include "openPMD/auxiliary/Filesystem.hpp" #include "openPMD/auxiliary/StringManip.hpp" +#include #include #include @@ -293,9 +294,22 @@ namespace { ParsedConfig parseInlineOptions(std::string const &options) { + // speed up default options + ParsedConfig res; + if (options.empty()) + { + res.originallySpecifiedAs = SupportedLanguages::TOML; + res.config = nlohmann::json::object(); + return res; + } + else if (options == "{}") + { + res.originallySpecifiedAs = SupportedLanguages::JSON; + res.config = nlohmann::json::object(); + return res; + } std::string trimmed = auxiliary::trim(options, [](char c) { return std::isspace(c); }); - ParsedConfig res; if (trimmed.empty()) { return res; diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index ed24308950..9e4d1b4fd3 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -481,6 +481,23 @@ void Attributable::readAttributes(ReadMode mode) setDirty(false); } +void Attributable::setWritten(bool val, EnqueueAsynchronously ea) +{ + switch (ea) + { + + case EnqueueAsynchronously::Yes: { + Parameter param; + param.target_status = val; + IOHandler()->enqueue(IOTask(this, param)); + } + break; + case EnqueueAsynchronously::No: + break; + } + writable().written = val; +} + void Attributable::linkHierarchy(Writable &w) { auto handler = w.IOHandler; diff --git a/src/backend/PatchRecord.cpp b/src/backend/PatchRecord.cpp index 9b87835194..5d2b38d50f 100644 --- a/src/backend/PatchRecord.cpp +++ b/src/backend/PatchRecord.cpp @@ -90,9 +90,9 @@ void PatchRecord::read() IOHandler()->enqueue(IOTask(&prc, dOpen)); IOHandler()->flush(internal::defaultFlushParams); /* allow all attributes to be set */ - prc.written() = false; + prc.setWritten(false, Attributable::EnqueueAsynchronously::No); prc.resetDataset(Dataset(*dOpen.dtype, *dOpen.extent)); - prc.written() = true; + prc.setWritten(true, Attributable::EnqueueAsynchronously::No); try { prc.read(/* require_unit_si = */ false); diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index ae2a88fc98..e5e0879981 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -2071,6 +2071,7 @@ inline void fileBased_write_test(const std::string &backend) .makeConstant(1.0); o.iterations[overlong_it].setTime(static_cast(overlong_it)); + o.flush(); REQUIRE(o.iterations.size() == 7); } REQUIRE(