Skip to content

Commit 2fa1a15

Browse files
committed
Caching for nodes that require data products from different data layers.
1 parent 3328368 commit 2fa1a15

File tree

9 files changed

+831
-0
lines changed

9 files changed

+831
-0
lines changed

plugins/layer_generator.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ namespace phlex::experimental {
5454
void operator()(framework_driver& driver) { execute(driver, data_cell_index::base_ptr()); }
5555

5656
std::size_t emitted_cells(std::string layer_path = {}) const;
57+
std::vector<std::string> const& layer_paths() const noexcept { return layer_paths_; }
5758

5859
private:
5960
void execute(framework_driver& driver, data_cell_index_ptr index, bool recurse = true);

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ add_subdirectory(utilities)
237237
add_subdirectory(mock-workflow)
238238
add_subdirectory(demo-giantdata)
239239
add_subdirectory(python)
240+
add_subdirectory(repeater)
240241

241242
if(PHLEX_USE_FORM)
242243
add_subdirectory(form)

test/repeater/CMakeLists.txt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
add_library(index_router SHARED index_router.cpp)
2+
target_link_libraries(index_router PRIVATE TBB::tbb phlex::model)
3+
4+
cet_test(
5+
repeater
6+
USE_CATCH2_MAIN
7+
SOURCE
8+
repeater.cpp
9+
LIBRARIES
10+
phlex::core
11+
index_router
12+
layer_generator
13+
)

test/repeater/index_router.cpp

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
#include "index_router.hpp"
2+
3+
#include "fmt/format.h"
4+
#include "fmt/ranges.h"
5+
#include "spdlog/spdlog.h"
6+
7+
#include <cassert>
8+
#include <stdexcept>
9+
#include <utility>
10+
11+
using namespace tbb;
12+
using namespace phlex;
13+
14+
test::index_router::index_router(flow::graph& g,
15+
std::vector<std::string> layers,
16+
std::map<std::string, named_index_ports> multilayers)
17+
{
18+
for (auto const& layer : layers) {
19+
broadcasters_.try_emplace(layer, g);
20+
}
21+
for (auto const& [node_name, multilayer] : multilayers) {
22+
spdlog::trace("Making multilayer caster for {}", node_name);
23+
multibroadcaster_entries casters;
24+
casters.reserve(multilayer.size());
25+
for (auto const& [layer, flush_port, input_port] : multilayer) {
26+
auto& entry = casters.emplace_back(layer, index_set_node{g}, flush_node{g});
27+
make_edge(entry.broadcaster, *input_port); // Connect with index ports of multi-algorithms
28+
make_edge(entry.flusher, *flush_port); // Connect with flush ports of multi-algorithms
29+
}
30+
multibroadcasters_.try_emplace(node_name, std::move(casters));
31+
}
32+
}
33+
34+
void test::index_router::shutdown()
35+
{
36+
backout_to(data_cell_index::base_ptr());
37+
last_index_ = nullptr;
38+
}
39+
40+
void test::index_router::route(data_cell_index_ptr const& index)
41+
{
42+
backout_to(index);
43+
auto msg_id = counter_.fetch_add(1);
44+
send(index, msg_id);
45+
multisend(index, msg_id);
46+
last_index_ = index;
47+
}
48+
49+
void test::index_router::backout_to(data_cell_index_ptr const& index)
50+
{
51+
assert(index);
52+
53+
if (!last_index_) {
54+
// This happens when we encounter the first index
55+
return;
56+
}
57+
58+
if (index->parent() == last_index_->parent()) {
59+
// At the same level in the hierarchy
60+
return;
61+
}
62+
63+
if (index->parent(last_index_->layer_name())) {
64+
// Descending further into the hierarchy
65+
return;
66+
}
67+
68+
// What's left is situations where we need to go up the hierarchy chain.
69+
70+
auto do_the_put = [this](data_cell_index_ptr const& index) {
71+
// FIXME: This lookup should be fixed
72+
for (auto& [_, senders] : cached_multicasters_) {
73+
for (auto& sender : senders) {
74+
if (sender.layer() == index->layer_name()) {
75+
sender.put_end_token(index);
76+
}
77+
}
78+
}
79+
};
80+
81+
auto current = last_index_;
82+
while (current and current->layer_hash() != index->layer_hash()) {
83+
do_the_put(current);
84+
current = current->parent();
85+
assert(current); // Cannot be non-null
86+
}
87+
do_the_put(current);
88+
}
89+
90+
auto test::index_router::index_node_for(std::string const& layer) -> index_set_node&
91+
{
92+
std::vector<broadcasters_t::iterator> candidates;
93+
for (auto it = broadcasters_.begin(), e = broadcasters_.end(); it != e; ++it) {
94+
if (it->first.ends_with("/" + layer)) {
95+
candidates.push_back(it);
96+
}
97+
}
98+
99+
if (candidates.size() == 1ull) {
100+
return candidates[0]->second;
101+
}
102+
103+
if (candidates.empty()) {
104+
throw std::runtime_error("No broadcaster found for layer specification" + layer);
105+
}
106+
107+
std::string msg{"Multiple layers match specification " + layer + ":\n"};
108+
for (auto const& it : candidates) {
109+
msg += "\n- " + it->first;
110+
}
111+
throw std::runtime_error(msg);
112+
}
113+
114+
void test::index_router::send(data_cell_index_ptr const& index, std::size_t message_id)
115+
{
116+
auto it = broadcasters_.find(index->layer_path());
117+
assert(it != broadcasters_.end());
118+
it->second.try_put({.msg_id = message_id, .index = index});
119+
}
120+
121+
void test::index_router::multisend(data_cell_index_ptr const& index, std::size_t message_id)
122+
{
123+
auto const layer_hash = index->layer_hash();
124+
// spdlog::trace("Multilayer send for layer hash {} {}", layer_hash, index->to_string());
125+
126+
auto do_the_put = [](data_cell_index_ptr const& index,
127+
std::size_t message_id,
128+
std::vector<multilayer_sender>& nodes) {
129+
for (auto& sender : nodes) {
130+
sender.put_message(index, message_id);
131+
}
132+
};
133+
134+
if (auto it = cached_multicasters_.find(layer_hash); it != cached_multicasters_.end()) {
135+
do_the_put(index, message_id, it->second);
136+
return;
137+
}
138+
139+
auto [it, _] = cached_multicasters_.try_emplace(layer_hash);
140+
141+
// spdlog::trace("Assigning new multi-caster for {} (path: {})", layer_hash, index->layer_path());
142+
for (auto& [multilayer_str, entries] : multibroadcasters_) {
143+
// Now we need to check how to match "ports" and the multilayer
144+
std::vector<multilayer_sender> senders;
145+
senders.reserve(entries.size());
146+
bool name_in_multilayer = false;
147+
for (auto& [layer, caster, flusher] : entries) {
148+
if (layer == index->layer_name()) {
149+
senders.emplace_back(layer, &caster, &flusher);
150+
name_in_multilayer = true;
151+
} else if (index->parent(layer)) {
152+
senders.emplace_back(layer, &caster, &flusher);
153+
}
154+
}
155+
156+
if (name_in_multilayer and senders.size() == entries.size()) {
157+
// spdlog::trace("Match for {}: {} (path: {})", multilayer_str, layer_hash, index->layer_path());
158+
it->second.insert(it->second.end(),
159+
std::make_move_iterator(senders.begin()),
160+
std::make_move_iterator(senders.end()));
161+
}
162+
}
163+
// if (it->second.empty()) {
164+
// spdlog::trace("No broadcasters for {}", layer_hash);
165+
// } else {
166+
// spdlog::trace("Number of broadcasters for {}: {}", layer_hash, it->second.size());
167+
// }
168+
do_the_put(index, message_id, it->second);
169+
}
170+
171+
test::index_router::multilayer_sender::multilayer_sender(std::string const& layer,
172+
index_set_node* broadcaster,
173+
flush_node* flusher) :
174+
layer_{layer}, broadcaster_{broadcaster}, flusher_{flusher}
175+
{
176+
}
177+
178+
void test::index_router::multilayer_sender::put_message(data_cell_index_ptr const& index,
179+
std::size_t message_id)
180+
{
181+
if (layer_ == index->layer_name()) {
182+
broadcaster_->try_put({.msg_id = message_id, .index = index, .cache = false});
183+
return;
184+
}
185+
186+
// Flush values are needed only used for indices that are *not* the "lowest" in the branch
187+
// of the hierarchy.
188+
++counter_;
189+
broadcaster_->try_put({.msg_id = message_id, .index = index->parent(layer_)});
190+
}
191+
192+
void test::index_router::multilayer_sender::put_end_token(data_cell_index_ptr const& index)
193+
{
194+
auto count = std::exchange(counter_, 0);
195+
if (count == 0) {
196+
// See comment above about flush values
197+
return;
198+
}
199+
200+
flusher_->try_put({.index = index, .count = count});
201+
}

test/repeater/index_router.hpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#ifndef TEST_INDEX_ROUTER_HPP
2+
#define TEST_INDEX_ROUTER_HPP
3+
4+
#include "message_types.hpp"
5+
#include "phlex/model/data_cell_index.hpp"
6+
7+
#include "oneapi/tbb/flow_graph.h"
8+
9+
#include <map>
10+
#include <string>
11+
#include <unordered_map>
12+
13+
namespace phlex::test {
14+
using layers_t = std::vector<std::string>;
15+
16+
class index_router {
17+
public:
18+
using flush_node = tbb::flow::broadcast_node<indexed_end_token>;
19+
using index_set_node = tbb::flow::broadcast_node<index_message>;
20+
21+
explicit index_router(tbb::flow::graph& g,
22+
std::vector<std::string> layers,
23+
std::map<std::string, named_index_ports> multilayers);
24+
auto index_node_for(std::string const& layer) -> index_set_node&;
25+
26+
void shutdown();
27+
void route(data_cell_index_ptr const& index);
28+
29+
private:
30+
void send(data_cell_index_ptr const& index, std::size_t message_id);
31+
void multisend(data_cell_index_ptr const& index, std::size_t message_id);
32+
void backout_to(data_cell_index_ptr const& index);
33+
34+
using broadcasters_t = std::map<std::string, index_set_node>;
35+
broadcasters_t broadcasters_;
36+
37+
struct multibroadcaster_entry {
38+
std::string layer;
39+
index_set_node broadcaster;
40+
flush_node flusher;
41+
};
42+
using multibroadcaster_entries = std::vector<multibroadcaster_entry>;
43+
44+
using multibroadcasters_t = std::unordered_map<std::string, multibroadcaster_entries>;
45+
multibroadcasters_t multibroadcasters_;
46+
47+
class multilayer_sender {
48+
public:
49+
multilayer_sender(std::string const& layer, index_set_node* broadcaster, flush_node* flusher);
50+
51+
auto const& layer() const noexcept { return layer_; }
52+
void put_message(data_cell_index_ptr const& index, std::size_t message_id);
53+
void put_end_token(data_cell_index_ptr const& index);
54+
55+
private:
56+
std::string layer_;
57+
index_set_node* broadcaster_;
58+
flush_node* flusher_;
59+
int counter_ = 0;
60+
};
61+
62+
using cached_casters_t = std::unordered_map<std::size_t, std::vector<multilayer_sender>>;
63+
cached_casters_t cached_multicasters_;
64+
std::atomic<unsigned> counter_;
65+
data_cell_index_ptr last_index_;
66+
};
67+
}
68+
69+
#endif // TEST_INDEX_ROUTER_HPP

test/repeater/message_types.hpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#ifndef TEST_MESSAGE_TYPES_HPP
2+
#define TEST_MESSAGE_TYPES_HPP
3+
4+
#include "oneapi/tbb/flow_graph.h"
5+
#include "phlex/model/data_cell_index.hpp"
6+
#include "spdlog/spdlog.h"
7+
8+
#include <cstddef>
9+
#include <string>
10+
#include <tuple>
11+
#include <variant>
12+
#include <vector>
13+
14+
namespace phlex::test {
15+
struct index_message {
16+
std::size_t msg_id;
17+
data_cell_index_ptr index;
18+
bool cache{true};
19+
};
20+
21+
template <typename T>
22+
struct indexed_message {
23+
std::size_t msg_id;
24+
data_cell_index_ptr index;
25+
T data;
26+
};
27+
28+
struct indexed_end_token {
29+
data_cell_index_ptr index;
30+
int count;
31+
};
32+
33+
struct named_index_port {
34+
std::string layer;
35+
tbb::flow::receiver<indexed_end_token>* token_port;
36+
tbb::flow::receiver<index_message>* index_port;
37+
};
38+
using named_index_ports = std::vector<named_index_port>;
39+
40+
template <typename... Ts>
41+
using indexed_message_tuple = std::tuple<indexed_message<Ts>...>;
42+
43+
struct no_more_indices {};
44+
45+
using message_from_input = std::variant<data_cell_index_ptr, no_more_indices>;
46+
47+
struct index_message_matcher {
48+
std::size_t operator()(index_message const& msg) const noexcept { return msg.msg_id; }
49+
};
50+
51+
template <typename T>
52+
struct indexed_message_matcher {
53+
std::size_t operator()(indexed_message<T> const& msg) const noexcept { return msg.msg_id; }
54+
};
55+
}
56+
57+
#endif // TEST_MESSAGE_TYPES_HPP

0 commit comments

Comments
 (0)