Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rank table for locality-aware streaming #1505

Merged
merged 28 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b0b6d6d
Python bindings: Extract MPI adaptor to Mpi.hpp
franzpoeschel Mar 2, 2023
cbe1172
Auxiliary: MPI helpers
franzpoeschel Feb 21, 2022
bb7f3bf
Remove unnecessary MPI helper
franzpoeschel Aug 17, 2023
8b69ae9
Some fixes in HDF5 createDataset
franzpoeschel Mar 2, 2023
7b77fec
Series.(get)mpiRanksMetaInfo, hosttable
franzpoeschel Mar 2, 2023
0ddc94e
Python bindings: mpi_ranks_meta_info
franzpoeschel Mar 2, 2023
d644cf6
Python bindings for HOST_INFO
franzpoeschel Aug 17, 2023
f4a0d6c
Test chunk table
franzpoeschel Aug 16, 2023
f6130e4
CI fixes: Windows
franzpoeschel Aug 17, 2023
ef5ef3b
SerialIOTests: Windows compatibility
franzpoeschel Aug 18, 2023
ddfff5f
CI fixes: NVIDIA
franzpoeschel Aug 17, 2023
49752f8
Add review suggestions
franzpoeschel Oct 16, 2023
2bf8949
Make hostname() implementation explicit to the user
franzpoeschel Oct 16, 2023
465cc58
Remove Winsocks functionality
franzpoeschel Oct 17, 2023
d78ef35
rank_table: "hostname": pick mpi impl if parallel series
franzpoeschel Jan 31, 2024
d5a9403
Extend verbose mode a bit
franzpoeschel Jan 31, 2024
8980d3a
Initialize rank_table in parallel for HDF5
franzpoeschel Jan 31, 2024
0cbc1a6
[wip] fix open/close file issue
franzpoeschel Jan 31, 2024
93344f7
Extended testing
franzpoeschel Jan 31, 2024
faad84d
Reduce support for file-based encoding
franzpoeschel Feb 2, 2024
747780e
Rename mpiRanksMetaInfo() -> rankTable()
franzpoeschel Feb 2, 2024
5a0f296
Fix unused parameter warning
franzpoeschel Feb 5, 2024
34cfd04
Documentation
franzpoeschel Feb 5, 2024
6993b9b
Add line in docs
franzpoeschel Feb 6, 2024
fbd933f
CI fix
franzpoeschel Feb 5, 2024
d993a39
Test writing of MPI rank table
franzpoeschel Oct 16, 2023
9508c50
Fixes after rebase
franzpoeschel Feb 29, 2024
981fb2e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ set(CORE_SOURCE
src/auxiliary/Date.cpp
src/auxiliary/Filesystem.cpp
src/auxiliary/JSON.cpp
src/auxiliary/Mpi.cpp
src/backend/Attributable.cpp
src/backend/BaseRecordComponent.cpp
src/backend/MeshRecordComponent.cpp
Expand Down
4 changes: 4 additions & 0 deletions docs/source/details/backendconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ For a consistent user interface, backends shall follow the following rules:
Backend-independent JSON configuration
--------------------------------------

.. _backend_independent_config:

The openPMD backend can be chosen via the JSON/TOML key ``backend`` which recognizes the alternatives ``["hdf5", "adios2", "json"]``.

The iteration encoding can be chosen via the JSON/TOML key ``iteration_encoding`` which recognizes the alternatives ``["file_based", "group_based", "variable_based"]``.
Expand All @@ -97,6 +99,8 @@ It if set to ``{"resizable": true}``, this declares that it shall be allowed to
For HDF5, resizable Datasets come with a performance penalty.
For JSON and ADIOS2, all datasets are resizable, independent of this option.

The key ``rank_table`` allows specifying the creation of a **rank table**, used for tracking :ref:`chunk provenance especially in streaming setups <rank_table>`, refer to the streaming documentation for details.

Configuration Structure per Backend
-----------------------------------

Expand Down
8 changes: 5 additions & 3 deletions docs/source/details/mpi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ A **collective** operation needs to be executed by *all* MPI ranks of the MPI co
Contrarily, **independent** operations can also be called by a subset of these MPI ranks.
For more information, please see the `MPI standard documents <https://www.mpi-forum.org/docs/>`_, for example MPI-3.1 in `"Section 2.4 - Semantic Terms" <https://www.mpi-forum.org/docs/mpi-3.1/mpi31-report.pdf>`_.

============================ ================== ===========================
============================ ================== ================================
Functionality Behavior Description
============================ ================== ===========================
============================ ================== ================================
``Series`` **collective** open and close
``::flush()`` **collective** read and write
``::setRankTable()`` **collective** write, performed at flush
``::rankTable()`` **coll**/indep. behavior specified by bool param
``Iteration`` [1]_ independent declare and open
``::open()`` [4]_ **collective** explicit open
``Mesh`` [1]_ independent declare, open, write
Expand All @@ -30,7 +32,7 @@ Functionality Behavior Description
``::storeChunk`` [1]_ independent write
``::loadChunk`` independent read
``::availableChunks`` [4]_ collective read, immediate result
============================ ================== ===========================
============================ ================== ================================

.. [1] Individual backends, i.e. :ref:`parallel HDF5 <backends-hdf5>`, will only support independent operations if the default, non-collective (aka independent) behavior is kept.
Otherwise these operations are collective.
Expand Down
27 changes: 27 additions & 0 deletions docs/source/usage/streaming.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,30 @@ This pays tribute to the fact that in streaming mode, an iteration is sent to th

.. literalinclude:: 10_streaming_write.py
:language: python3


Chunk provenance tracking using a rank table
--------------------------------------------

.. _rank_table:

In a large parallel streaming setup, it is important to adhere to a certain concept of data locality when deciding which data to load from the producer.
The openPMD-api has some mechanisms to help with this process:

The API call ``BaseRecordComponent::availableChunks()``/``Base_Record_Component.available_chunks()`` returns the data chunks within a specific dataset that are available for loading, each chunk hereby annotating its MPI rank within the *data producer* in ``WrittenChunkInfo::sourceID``/``WrittenChunkInfo::source_ID``.

In order to correlate this information with the MPI ranks of the *data consumer*, a **rank table** can be used in order to transmit an additional tag for each of the producer's MPI ranks. On the data producer side, the rank table can be set manually or automatically:


* **automatically** Using the :ref:`JSON/TOML option <backend_independent_config>` ``rank_table``.
The suggested specification is ``{"rank_table": "hostname"}``, although the explicit values ``"mpi_processor_name"`` and ``"posix_hostname"`` are also accepted.
``"hostname"`` resolves to the MPI processor name when the Series has been initialized with MPI, to the POSIX hostname otherwise (if that is available).
* **manually:** Using the API call ``Series::setRankTable(std::string const &myRankInfo)`` that specifies the current rank's tag.
This can be used to set custom tags, identifying e.g. NUMA nodes or groups of compute nodes.

The rank table takes the form of a 2-dimensional dataset, listing the tags as null-terminated strings line by line in order of the MPI ranks and can be loaded using ``Series::rankTable()``/``Series.get_rank_table()``.

Setting the rank table is **collective**, though the collective action is only performed upon flushing.
Reading the rank table requires specifying if the read operation should be done collectively (better for performance), or independently.

In order to retrieve the corresponding information on the **consumer side**, the function ``host_info::byMethod()``/``HostInfo.get()`` can be used for retrieving the local rank's information, or alternatively ``host_info::byMethodCollective()``/``HostInfo.get_info()`` for retrieving the rank table for all consumer ranks.
60 changes: 60 additions & 0 deletions include/openPMD/ChunkInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@
*/
#pragma once

#include "openPMD/config.hpp"

#include "openPMD/Dataset.hpp" // Offset, Extent

#if openPMD_HAVE_MPI
#include <mpi.h>
#endif

#include <map>
#include <string>
#include <vector>
franzpoeschel marked this conversation as resolved.
Show resolved Hide resolved

namespace openPMD
Expand Down Expand Up @@ -73,4 +81,56 @@ struct WrittenChunkInfo : ChunkInfo
};

using ChunkTable = std::vector<WrittenChunkInfo>;

namespace chunk_assignment
{
using RankMeta = std::map<unsigned int, std::string>;
} // namespace chunk_assignment

namespace host_info
{
/**
* Methods for retrieving hostname / processor identifiers that openPMD-api
* is aware of. These can be used for locality-aware chunk distribution
* schemes in streaming setups.
*/
enum class Method
{
POSIX_HOSTNAME,
MPI_PROCESSOR_NAME
};

/**
* @brief Is the method available on the current system?
*
* @return true If it is available.
* @return false Otherwise.
*/
bool methodAvailable(Method);

/**
* @brief Wrapper for the native hostname retrieval functions such as
* POSIX gethostname().
*
* @return std::string The hostname / processor name returned by the native
* function.
*/
std::string byMethod(Method);

#if openPMD_HAVE_MPI
/**
* @brief Retrieve the hostname information on all MPI ranks and distribute
* a map of "rank -> hostname" to all ranks.
*
* This call is MPI collective.
*
* @return chunk_assignment::RankMeta Hostname / processor name information
* for all MPI ranks known to the communicator.
* The result is returned on all ranks.
*/
chunk_assignment::RankMeta byMethodCollective(MPI_Comm, Method);
#endif
} // namespace host_info
} // namespace openPMD

#undef openPMD_POSIX_AVAILABLE
67 changes: 67 additions & 0 deletions include/openPMD/ChunkInfo_internal.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright 2024 Franz Poeschel
*
* This file is part of openPMD-api.
*
* openPMD-api is free software: you can redistribute it and/or modify
* it under the terms of of either the GNU General Public License or
* the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* openPMD-api is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License and the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License
* and the GNU Lesser General Public License along with openPMD-api.
* If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once

#include "openPMD/ChunkInfo.hpp"
#include <string>

namespace openPMD::host_info
{

/**
* @brief This defines the method identifiers used
* in `{"rank_table": "hostname"}`
*
* Currently recognized are:
*
* * posix_hostname
* * mpi_processor_name
*
* For backwards compatibility reasons, "hostname" is also recognized as a
* deprecated alternative for "posix_hostname".
*
* @return Method enum identifier. The identifier is returned even if the
* method is not available on the system. This should by checked
* via methodAvailable().
* @throws std::out_of_range If an unknown string identifier is passed.
*/
Method methodFromStringDescription(std::string const &descr, bool consider_mpi);

/*
* The following block contains one wrapper for each native hostname
* retrieval method. The purpose is to have the same function pointer type
* for all of them.
*/

#ifdef _WIN32
#define openPMD_POSIX_AVAILABLE false
#else
#define openPMD_POSIX_AVAILABLE true
#endif

#if openPMD_POSIX_AVAILABLE
std::string posix_hostname();
#endif

#if openPMD_HAVE_MPI
std::string mpi_processor_name();
#endif
} // namespace openPMD::host_info
58 changes: 58 additions & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <stdexcept>
#include <string>
#include <tuple>
#include <variant>

// expose private and protected members for invasive testing
#ifndef OPENPMD_private
Expand Down Expand Up @@ -201,6 +202,36 @@ namespace internal
m_deferred_initialization = std::nullopt;

void close();

#if openPMD_HAVE_MPI
/*
* @todo Once we have separate MPI headers, move this there.
*/
std::optional<MPI_Comm> m_communicator;
#endif

struct NoSourceSpecified
{};
struct SourceSpecifiedViaJSON
{
std::string value;
};
struct SourceSpecifiedManually
{
std::string value;
};

struct RankTableData
{
Attributable m_attributable;
std::variant<
franzpoeschel marked this conversation as resolved.
Show resolved Hide resolved
NoSourceSpecified,
SourceSpecifiedViaJSON,
SourceSpecifiedManually>
m_rankTableSource;
std::optional<chunk_assignment::RankMeta> m_bufferedRead;
};
RankTableData m_rankTable;
}; // SeriesData

class SeriesInternal;
Expand Down Expand Up @@ -388,6 +419,32 @@ class Series : public Attributable
*/
Series &setMeshesPath(std::string const &meshesPath);

/**
* @throw no_such_attribute_error If optional attribute is not present.
* @param collective Run this read operation collectively.
There might be an enormous IO overhead if running this
operation non-collectively.
To make this explicit to users, there is no default parameter.
Parameter is ignored if compiling without MPI support, (it is
present for the sake of a consistent API).
* @return Vector with a String per (writing) MPI rank, indicating user-
* defined meta information per rank. Example: host name.
*/
#if openPMD_HAVE_MPI
chunk_assignment::RankMeta rankTable(bool collective);
#else
chunk_assignment::RankMeta rankTable(bool collective = false);
#endif

/**
* @brief Set the Mpi Ranks Meta Info attribute, i.e. a Vector with
* a String per (writing) MPI rank, indicating user-
* defined meta information per rank. Example: host name.
*
* @return Reference to modified series.
*/
Series &setRankTable(std::string const &myRankInfo);

/**
* @throw no_such_attribute_error If optional attribute is not present.
* @return String representing the path to particle species, relative(!) to
Expand Down Expand Up @@ -745,6 +802,7 @@ OPENPMD_private
bool flushIOHandler = true);
void flushMeshesPath();
void flushParticlesPath();
void flushRankTable();
void readFileBased();
void readOneIterationFileBased(std::string const &filePath);
/**
Expand Down
50 changes: 50 additions & 0 deletions include/openPMD/auxiliary/Mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

#if openPMD_HAVE_MPI
#include <mpi.h>

#include <string>
#include <vector>
#endif

#include <type_traits>
Expand Down Expand Up @@ -64,5 +67,52 @@ namespace
}
} // namespace

/**
* Multiple variable-length strings represented in one single buffer
* with a fixed line width.
* Strings smaller than the maximum width are padded with zeros.
* Each line is zero-terminated with at least one zero character.
* The length of char_buffer should be equal to the product of line_length
* and num_lines.
*/
struct StringMatrix
franzpoeschel marked this conversation as resolved.
Show resolved Hide resolved
{
std::vector<char> char_buffer;
size_t line_length = 0;
size_t num_lines = 0;
};

/*
* These are mostly internal helper functions, so this defines only those that
* we need.
* Logically, these should be complemented by `collectStringsTo()` and
* `distributeStringsAsMatrixToAllRanks()`, but we don't need them (yet).
*/

/**
* @brief Collect multiple variable-length strings to one rank in MPI_Gatherv
* fashion. Uses two collective MPI calls, the first to gather the
* different string lengths, the second to gather the actual strings.
*
* @param communicator MPI communicator
* @param destRank Target rank for MPI_Gatherv
* @param thisRankString The current MPI rank's contribution to the data.
* @return StringMatrix See documentation of StringMatrix struct.
*/
StringMatrix collectStringsAsMatrixTo(
MPI_Comm communicator, int destRank, std::string const &thisRankString);

/**
* @brief Collect multiple variable-length strings to all ranks in
* MPI_Allgatherv fashion. Uses two collective MPI calls, the first to
* gather the different string lengths, the second to gather the actual
* strings.
*
* @param communicator communicator
* @param thisRankString The current MPI rank's contribution to the data.
* @return std::vector<std::string> All ranks' strings, returned on all ranks.
*/
std::vector<std::string> distributeStringsToAllRanks(
MPI_Comm communicator, std::string const &thisRankString);
#endif
} // namespace openPMD::auxiliary
Loading
Loading