Skip to content

Commit

Permalink
Add particle patches to streaming example, using local_value
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Mar 12, 2024
1 parent 8a7ec62 commit ee17a33
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 5 deletions.
19 changes: 19 additions & 0 deletions examples/10_streaming_read.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "openPMD/auxiliary/StringManip.hpp"
#include <openPMD/openPMD.hpp>

#include <algorithm>
Expand Down Expand Up @@ -52,6 +53,24 @@ int main()
extents[i] = rc.getExtent();
}

auto e_patches = iteration.particles["e"].particlePatches;
for (auto key :
{"numParticles", "numParticlesOffset", "offset", "extent"})
{
for (auto &rc : e_patches[key])
{
std::cout << "Chunks for '" << rc.second.myPath().openPMDPath()
<< "':";
for (auto const &chunk : rc.second.availableChunks())
{
std::cout << "\n\tRank " << chunk.sourceID << "\t"
<< auxiliary::format_vec(chunk.offset) << "\t"
<< auxiliary::format_vec(chunk.extent);
}
std::cout << std::endl;
}
}

// The iteration can be closed in order to help free up resources.
// The iteration's content will be flushed automatically.
// An iteration once closed cannot (yet) be reopened.
Expand Down
84 changes: 79 additions & 5 deletions examples/10_streaming_write.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <mpi.h>
#include <openPMD/openPMD.hpp>

#include <algorithm>
Expand All @@ -19,8 +20,22 @@ int main()
return 0;
}

int mpi_rank{0}, mpi_size{1};

#if openPMD_HAVE_MPI
MPI_Init(nullptr, nullptr);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
#endif

// open file for writing
Series series = Series("electrons.sst", Access::CREATE, R"(
Series series = Series(
"electrons.sst",
Access::CREATE,
#if openPMD_HAVE_MPI
MPI_COMM_WORLD,
#endif
R"(
{
"adios2": {
"engine": {
Expand All @@ -29,11 +44,13 @@ int main()
}
}
}
})");
})"

);

Datatype datatype = determineDatatype<position_t>();
constexpr unsigned long length = 10ul;
Extent global_extent = {length};
Extent global_extent = {mpi_size * length};
Dataset dataset = Dataset(datatype, global_extent);
std::shared_ptr<position_t> local_data(
new position_t[length], [](position_t const *ptr) { delete[] ptr; });
Expand All @@ -49,13 +66,66 @@ int main()
Iteration iteration = iterations[i];
Record electronPositions = iteration.particles["e"]["position"];

std::iota(local_data.get(), local_data.get() + length, i * length);
std::iota(
local_data.get(),
local_data.get() + length,
i * length * mpi_size + mpi_rank * length);
for (auto const &dim : {"x", "y", "z"})
{
RecordComponent pos = electronPositions[dim];
pos.resetDataset(dataset);
pos.storeChunk(local_data, Offset{0}, global_extent);
pos.storeChunk(local_data, Offset{length * mpi_rank}, {length});
}

// Use the `local_value` ADIOS2 dataset shape to send a dataset not via
// the data plane, but the control plane of ADIOS2 SST. This is
// advisable for datasets where each rank contributes only a single item
// since the control plane performs data aggregation, thus avoiding
// fully interconnected communication meshes for data that needs to be
// read by each reader. A local value dataset can only contain a single
// item per MPI rank, forming an array of length equal to the MPI size.

auto e_patches = iteration.particles["e"].particlePatches;
auto numParticles = e_patches["numParticles"];
auto numParticlesOffset = e_patches["numParticlesOffset"];
for (auto rc : {&numParticles, &numParticlesOffset})
{
rc->resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
}
numParticles.storeChunk(
std::make_unique<unsigned long>(10), {size_t(mpi_rank)}, {1});
numParticlesOffset.storeChunk(
std::make_unique<unsigned long>(10 * ((unsigned long)mpi_rank)),
{size_t(mpi_rank)},
{1});
auto offset = e_patches["offset"];
for (auto const &dim : {"x", "y", "z"})
{
auto rc = offset[dim];
rc.resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
rc.storeChunk(
std::make_unique<unsigned long>((unsigned long)mpi_rank),
{size_t(mpi_rank)},
{1});
}
auto extent = e_patches["extent"];
for (auto const &dim : {"x", "y", "z"})
{
auto rc = extent[dim];
rc.resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
rc.storeChunk(
std::make_unique<unsigned long>(1), {size_t(mpi_rank)}, {1});
}

iteration.close();
}

Expand All @@ -67,6 +137,10 @@ int main()
*/
series.close();

#if openPMD_HAVE_MPI
MPI_Finalize();
#endif

return 0;
#else
std::cout << "The streaming example requires that openPMD has been built "
Expand Down

0 comments on commit ee17a33

Please sign in to comment.