Skip to content

Commit c63e417

Browse files
committed
Add comments
1 parent 1df9a75 commit c63e417

File tree

1 file changed

+57
-5
lines changed

1 file changed

+57
-5
lines changed

phlex/core/multilayer_join_node.hpp

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,33 @@ namespace phlex::experimental {
1616
template <typename Input>
1717
using multilayer_join_node_base_t = tbb::flow::composite_node<Input, std::tuple<Input>>;
1818

19+
// A multilayer_join_node is a TBB composite flow-graph node that collects one message
20+
// from each of its N input streams and forwards the complete tuple downstream once all
21+
// N messages for the same data unit have arrived.
22+
//
23+
// Each input stream is associated with a named hierarchy layer. When two or more
24+
// inputs belong to *different* layers a repeater_node is inserted in front of each
25+
// stream so that a product originating at a parent layer is repeated for every
26+
// child-layer data unit, allowing the underlying join_node to see a matching message on
27+
// every port. When all inputs share the same layer, repeaters are unnecessary and the
28+
// join_node is used directly.
29+
//
30+
// Schematic with N inputs spanning multiple distinct layers:
31+
//
32+
// data[0] ───┐
33+
// flush[0] ───┤ repeater[0] ──┐
34+
// index[0] ───┘ │
35+
// ⋮ ├──► join_node ──► message tuple
36+
// data[N-1] ───┐ │
37+
// flush[N-1] ───┤ repeater[N-1] ┘
38+
// index[N-1] ───┘
39+
//
40+
// When all inputs share the same layer the repeaters are omitted:
41+
//
42+
// data[0] ──┐
43+
// ├──► join_node ──► message tuple
44+
// data[N-1] ──┘
45+
1946
template <std::size_t n_inputs>
2047
requires(n_inputs > 1)
2148
class multilayer_join_node : public multilayer_join_node_base_t<message_tuple<n_inputs>> {
@@ -28,6 +55,10 @@ namespace phlex::experimental {
2855
template <std::size_t... Is>
2956
static auto make_join(tbb::flow::graph& g, std::index_sequence<Is...>)
3057
{
58+
// tag_matching causes TBB to group messages by the value returned by
59+
// message_matcher, which extracts the message ID. Messages with the same ID across
60+
// all ports are forwarded together as one tuple, ensuring that each output contains
61+
// exactly the messages that belong to the same data unit.
3162
return tbb::flow::join_node<args_t, tbb::flow::tag_matching>{
3263
g, type_t<message_matcher, Is>{}...};
3364
}
@@ -43,10 +74,12 @@ namespace phlex::experimental {
4374
{
4475
assert(n_inputs == layers_.size());
4576

46-
// Removes duplicates
77+
// Collapse to the set of distinct layer names. More than one distinct layer means
78+
// at least one input crosses a layer boundary and therefore every input stream
79+
// needs a repeater_node.
4780
std::set collapsed_layers{layers_.begin(), layers_.end()};
4881

49-
// Add repeaters only if there are non-duplicate layer specifications
82+
// Add repeaters only if the inputs span more than one distinct layer.
5083
if (collapsed_layers.size() > 1) {
5184
repeaters_.reserve(n_inputs);
5285
for (auto const& layer : layers_) {
@@ -69,12 +102,14 @@ namespace phlex::experimental {
69102
set_ports(std::make_index_sequence<n_inputs>{});
70103
}
71104

105+
// Returns one named_index_port per repeater so that the index router can deliver
106+
// flush and index messages to each repeater. Returns an empty list when no repeaters
107+
// were constructed (all inputs share the same layer).
72108
std::vector<named_index_port> index_ports()
73109
{
74-
// Returns an empty list if no repeaters are required
75110
std::vector<named_index_port> result;
76111
result.reserve(repeaters_.size());
77-
for (std::size_t i = 0; i != n_inputs; ++i) {
112+
for (std::size_t i = 0; i != repeaters_.size(); ++i) {
78113
result.emplace_back(layers_[i], &repeaters_[i]->flush_port(), &repeaters_[i]->index_port());
79114
}
80115
return result;
@@ -88,11 +123,14 @@ namespace phlex::experimental {
88123
};
89124

90125
namespace detail {
91-
// Stateless placeholder for cases where no join is needed
126+
// Stateless placeholder used instead of multilayer_join_node when a node has only a
127+
// single input (no joining is required).
92128
struct no_join {
93129
named_index_ports index_ports() const { return {}; }
94130
};
95131

132+
// Maps the number of inputs to the appropriate join type: a real multilayer_join_node
133+
// for N > 1, or the no_join sentinel for N == 1.
96134
template <std::size_t N>
97135
struct pre_node {
98136
using type = multilayer_join_node<N>;
@@ -104,9 +142,11 @@ namespace phlex::experimental {
104142
};
105143
}
106144

145+
// Resolves to multilayer_join_node<N> for N > 1 and to no_join for N == 1.
107146
template <std::size_t N>
108147
using join_or_none_t = typename detail::pre_node<N>::type;
109148

149+
// Constructs a multilayer_join_node when N > 1, or a no_join when N == 1.
110150
template <std::size_t N>
111151
join_or_none_t<N> make_join_or_none(tbb::flow::graph& g,
112152
std::string const& node_name,
@@ -119,6 +159,9 @@ namespace phlex::experimental {
119159
}
120160
}
121161

162+
// Translates a runtime port index into a reference to the corresponding compile-time
163+
// input port by recursively incrementing the compile-time parameter I until it matches
164+
// the runtime index.
122165
template <std::size_t I, std::size_t N>
123166
tbb::flow::receiver<message>& receiver_for(multilayer_join_node<N>& join, std::size_t const index)
124167
{
@@ -132,6 +175,8 @@ namespace phlex::experimental {
132175
}
133176

134177
namespace detail {
178+
// Returns pointers to all N input ports of the join node. Only valid for N > 1;
179+
// callers with a single input should use the node directly.
135180
template <std::size_t N>
136181
std::vector<tbb::flow::receiver<message>*> input_ports(join_or_none_t<N>& join)
137182
{
@@ -142,6 +187,8 @@ namespace phlex::experimental {
142187
}(std::make_index_sequence<N>{});
143188
}
144189

190+
// Looks up the port index for the given product label, then returns a reference to
191+
// the corresponding input port of the join node. Only valid for N > 1.
145192
template <std::size_t N>
146193
tbb::flow::receiver<message>& receiver_for(join_or_none_t<N>& join,
147194
product_queries const& product_labels,
@@ -153,6 +200,8 @@ namespace phlex::experimental {
153200
}
154201
}
155202

203+
// Returns all input-port pointers for a node. For N == 1 the node itself is the sole
204+
// receiver; for N > 1 each port of the join is returned.
156205
template <std::size_t N, typename Node>
157206
std::vector<tbb::flow::receiver<message>*> input_ports(join_or_none_t<N>& join, Node& node)
158207
{
@@ -163,6 +212,9 @@ namespace phlex::experimental {
163212
}
164213
}
165214

215+
// Returns the receiver for the input port that corresponds to the given product label. For
216+
// N == 1 there is only one port so the node itself is returned; for N > 1 the port is looked
217+
// up by label within the join.
166218
template <std::size_t N, typename Node>
167219
tbb::flow::receiver<message>& receiver_for(join_or_none_t<N>& join,
168220
product_queries const& product_labels,

0 commit comments

Comments
 (0)