Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion form/form_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace {
std::string creator = store.source();

// Extract segment ID (partition) - extract once for entire store
std::string segment_id = store.id()->to_string();
std::string segment_id = store.index()->to_string();

std::cout << "\n=== FormOutputModule::save_data_products ===\n";
std::cout << "Creator: " << creator << "\n";
Expand Down
12 changes: 6 additions & 6 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,28 @@ namespace phlex::experimental {
auto const& msg = most_derived(messages);
auto const& [store, original_message_id] = std::tie(msg.store, msg.original_id);

if (not store->is_flush() and not store->id()->parent(partition_)) {
if (not store->is_flush() and not store->index()->parent(partition_)) {
return;
}

if (store->is_flush()) {
// Downstream nodes always get the flush.
get<0>(outputs).try_put(msg);
if (store->id()->layer_name() != partition_) {
if (store->index()->layer_name() != partition_) {
return;
}
}

auto const& fold_index =
store->is_flush() ? store->id() : store->id()->parent(partition_);
store->is_flush() ? store->index() : store->index()->parent(partition_);
assert(fold_index);
auto const& id_hash_for_counter = fold_index->hash();

if (store->is_flush()) {
counter_for(id_hash_for_counter).set_flush_value(store, original_message_id);
} else {
call(ft, messages, std::make_index_sequence<N>{});
counter_for(id_hash_for_counter).increment(store->id()->layer_hash());
counter_for(id_hash_for_counter).increment(store->index()->layer_hash());
}

if (auto counter = done_with(id_hash_for_counter)) {
Expand Down Expand Up @@ -136,7 +136,7 @@ namespace phlex::experimental {
template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
{
auto const& parent_id = *most_derived(messages).store->id()->parent(partition_);
auto const& parent_id = *most_derived(messages).store->index()->parent(partition_);
// FIXME: Not the safest approach!
auto it = results_.find(parent_id);
if (it == results_.end()) {
Expand All @@ -163,7 +163,7 @@ namespace phlex::experimental {

void commit_(product_store& store)
{
auto& result = results_.at(*store.id());
auto& result = results_.at(*store.index());
if constexpr (requires { send(*result); }) {
store.add_product(output()[0].name(), send(*result));
} else {
Expand Down
10 changes: 5 additions & 5 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ namespace phlex::experimental {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(message_id);
flag_for(store->index()->hash()).flush_received(message_id);
} else if (accessor a; needs_new(store, a)) {
call(ft, messages, std::make_index_sequence<N>{});
a->second = true;
flag_for(store->id()->hash()).mark_as_processed();
flag_for(store->index()->hash()).mark_as_processed();
}

if (done_with(store)) {
cached_hashes_.erase(store->id()->hash());
cached_hashes_.erase(store->index()->hash());
}
return {};
}}
Expand All @@ -102,10 +102,10 @@ namespace phlex::experimental {

bool needs_new(product_store_const_ptr const& store, accessor& a)
{
if (cached_hashes_.count(store->id()->hash()) > 0ull) {
if (cached_hashes_.count(store->index()->hash()) > 0ull) {
return false;
}
return cached_hashes_.insert(a, store->id()->hash());
return cached_hashes_.insert(a, store->index()->hash());
}

template <std::size_t... Is>
Expand Down
10 changes: 5 additions & 5 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ namespace phlex::experimental {
auto const& [store, message_id] = std::tie(msg.store, msg.id);
predicate_result result{};
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(message_id);
} else if (const_accessor a; results_.find(a, store->id()->hash())) {
flag_for(store->index()->hash()).flush_received(message_id);
} else if (const_accessor a; results_.find(a, store->index()->hash())) {
result = {msg.eom, message_id, a->second.result};
} else if (accessor a; results_.insert(a, store->id()->hash())) {
} else if (accessor a; results_.insert(a, store->index()->hash())) {
bool const rc = call(ft, messages, std::make_index_sequence<N>{});
result = a->second = {msg.eom, message_id, rc};
flag_for(store->id()->hash()).mark_as_processed();
flag_for(store->index()->hash()).mark_as_processed();
}

if (done_with(store)) {
results_.erase(store->id()->hash());
results_.erase(store->index()->hash());
}
return result;
}}
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
spdlog::warn("Store with hash {} is null!", hash);
continue;
}
spdlog::debug(" => ID: {} (hash: {})", store->id()->to_string(), hash);
spdlog::debug(" => ID: {} (hash: {})", store->index()->to_string(), hash);

Check warning on line 28 in phlex/core/declared_provider.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/declared_provider.cpp#L28

Added line #L28 was not covered by tests
}
}
}
12 changes: 6 additions & 6 deletions phlex/core/declared_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ namespace phlex::experimental {
auto& [stay_in_graph, to_output] = output;

if (msg.store->is_flush()) {
flag_for(msg.store->id()->hash()).flush_received(msg.original_id);
flag_for(msg.store->index()->hash()).flush_received(msg.original_id);
stay_in_graph.try_put(msg);
} else {
// Check cache first
auto index_hash = msg.store->id()->hash();
auto index_hash = msg.store->index()->hash();
if (const_accessor ca; cache_.find(ca, index_hash)) {
// Cache hit - reuse the cached store
message const new_msg{ca->second, msg.eom, msg.id};
Expand All @@ -87,25 +87,25 @@ namespace phlex::experimental {
}

// Cache miss - compute the result
auto result = std::invoke(ft, *msg.store->id());
auto result = std::invoke(ft, *msg.store->index());
++calls_;

products new_products;
new_products.add(output_.name(), std::move(result));
auto store = std::make_shared<product_store>(
msg.store->id(), this->full_name(), std::move(new_products));
msg.store->index(), this->full_name(), std::move(new_products));

// Store in cache
cache_.emplace(index_hash, store);

message const new_msg{store, msg.eom, msg.id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
flag_for(msg.store->id()->hash()).mark_as_processed();
flag_for(msg.store->index()->hash()).mark_as_processed();
}

if (done_with(msg.store)) {
cache_.erase(msg.store->id()->hash());
cache_.erase(msg.store->index()->hash());
}
}}
{
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_transform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace phlex::experimental {
spdlog::warn("Store with hash {} is null!", hash);
continue;
}
spdlog::debug(" => ID: {} (hash: {})", store->id()->to_string(), hash);
spdlog::debug(" => ID: {} (hash: {})", store->index()->to_string(), hash);
}
}
}
12 changes: 6 additions & 6 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,31 +93,31 @@ namespace phlex::experimental {
std::tie(msg.store, msg.eom, msg.id);
auto& [stay_in_graph, to_output] = output;
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(msg.original_id);
flag_for(store->index()->hash()).flush_received(msg.original_id);
stay_in_graph.try_put(msg);
to_output.try_put(msg);
} else {
accessor a;
if (stores_.insert(a, store->id()->hash())) {
if (stores_.insert(a, store->index()->hash())) {
auto result = call(ft, messages, std::make_index_sequence<N>{});
++calls_;
++product_count_[store->id()->layer_hash()];
++product_count_[store->index()->layer_hash()];
products new_products;
new_products.add_all(output_, std::move(result));
a->second = std::make_shared<product_store>(
store->id(), this->full_name(), std::move(new_products));
store->index(), this->full_name(), std::move(new_products));

message const new_msg{a->second, msg.eom, message_id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
flag_for(store->id()->hash()).mark_as_processed();
flag_for(store->index()->hash()).mark_as_processed();
} else {
stay_in_graph.try_put({a->second, msg.eom, message_id});
}
}

if (done_with(store)) {
stores_.erase(store->id()->hash());
stores_.erase(store->index()->hash());
}
}}
{
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

product_store_const_ptr generator::make_child(std::size_t const i, products new_products)
{
auto child_index = parent_->id()->make_child(i, child_layer_name_);
auto child_index = parent_->index()->make_child(i, child_layer_name_);
++child_counts_[child_index->layer_hash()];
return std::make_shared<product_store>(child_index, node_name_, std::move(new_products));
}
Expand Down Expand Up @@ -52,7 +52,7 @@
spdlog::warn("Store with hash {} is null!", hash);
continue;
}
spdlog::debug(" => ID: {} (hash: {})", store->id()->to_string(), hash);
spdlog::debug(" => ID: {} (hash: {})", store->index()->to_string(), hash);

Check warning on line 55 in phlex/core/declared_unfold.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/declared_unfold.cpp#L55

Added line #L55 was not covered by tests
}
}
}
12 changes: 6 additions & 6 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,20 @@ namespace phlex::experimental {
auto const& msg = most_derived(messages);
auto const& store = msg.store;
if (store->is_flush()) {
flag_for(store->id()->hash()).flush_received(msg.id);
flag_for(store->index()->hash()).flush_received(msg.id);
std::get<0>(output).try_put(msg);
} else if (accessor a; stores_.insert(a, store->id()->hash())) {
} else if (accessor a; stores_.insert(a, store->index()->hash())) {
std::size_t const original_message_id{msg_counter_};
generator g{msg.store, this->full_name(), child_layer_name_};
call(p, ufold, msg.store->id(), g, msg.eom, messages, std::make_index_sequence<N>{});
call(p, ufold, msg.store->index(), g, msg.eom, messages, std::make_index_sequence<N>{});

message const flush_msg{g.flush_store(), msg.eom, ++msg_counter_, original_message_id};
std::get<0>(output).try_put(flush_msg);
flag_for(store->id()->hash()).mark_as_processed();
flag_for(store->index()->hash()).mark_as_processed();
}

if (done_with(store)) {
stores_.erase(store->id()->hash());
stores_.erase(store->index()->hash());
}
}}
{
Expand Down Expand Up @@ -169,7 +169,7 @@ namespace phlex::experimental {
}
++product_count_;
auto child = g.make_child_for(counter++, std::move(new_products));
message const child_msg{child, eom->make_child(child->id()), ++msg_counter_};
message const child_msg{child, eom->make_child(child->index()), ++msg_counter_};
output_port<0>(unfold_).try_put(child_msg);

// Every data cell needs a flush (for now)
Expand Down
8 changes: 4 additions & 4 deletions phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ namespace phlex::experimental {
layer_sentry::layer_sentry(flush_counters& counters,
message_sender& sender,
product_store_ptr store) :
counters_{counters}, sender_{sender}, store_{store}, depth_{store_->id()->depth()}
counters_{counters}, sender_{sender}, store_{store}, depth_{store_->index()->depth()}
{
counters_.update(store_->id());
counters_.update(store_->index());
}

layer_sentry::~layer_sentry()
{
// To consider: We may want to skip the following logic if the framework prematurely
// needs to shut down. Keeping it enabled allows in-flight folds to
// complete. However, in some cases it may not be desirable to do this.
auto flush_result = counters_.extract(store_->id());
auto flush_result = counters_.extract(store_->index());
auto flush_store = store_->make_flush();
if (not flush_result.empty()) {
flush_store->add_product("[flush]",
Expand Down Expand Up @@ -172,7 +172,7 @@ namespace phlex::experimental {
product_store_ptr framework_graph::accept(product_store_ptr store)
{
assert(store);
auto const new_depth = store->id()->depth();
auto const new_depth = store->index()->depth();
while (not empty(layers_) and new_depth <= layers_.top().depth()) {
layers_.pop();
eoms_.pop();
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace phlex::experimental {
{
assert(a.store);
assert(b.store);
if (a.store->id()->depth() > b.store->id()->depth()) {
if (a.store->index()->depth() > b.store->index()->depth()) {
return a;
}
return b;
Expand Down
8 changes: 4 additions & 4 deletions phlex/core/message_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ namespace phlex::experimental {
assert(store);
assert(not store->is_flush());
auto const message_id = ++calls_;
original_message_ids_.try_emplace(store->id(), message_id);
original_message_ids_.try_emplace(store->index(), message_id);
auto parent_eom = eoms_.top();
end_of_message_ptr current_eom{};
if (parent_eom == nullptr) {
current_eom = eoms_.emplace(end_of_message::make_base(&hierarchy_, store->id()));
current_eom = eoms_.emplace(end_of_message::make_base(&hierarchy_, store->index()));
} else {
current_eom = eoms_.emplace(parent_eom->make_child(store->id()));
current_eom = eoms_.emplace(parent_eom->make_child(store->index()));
}
return {store, current_eom, message_id, -1ull};
}
Expand All @@ -43,7 +43,7 @@ namespace phlex::experimental {
assert(store);
assert(store->is_flush());

auto h = original_message_ids_.extract(store->id());
auto h = original_message_ids_.extract(store->index());
assert(h);
return h.mapped();
}
Expand Down
6 changes: 3 additions & 3 deletions phlex/core/multiplexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
product_store_const_ptr store_for(product_store_const_ptr store,
std::string const& port_product_layer)
{
if (store->id()->layer_name() == port_product_layer) {
if (store->index()->layer_name() == port_product_layer) {
// This store's layer matches what is expected by the port
return store;
}

if (auto index = store->id()->parent(port_product_layer)) {
if (auto index = store->index()->parent(port_product_layer)) {
// This store has a parent layer that matches what is expected by the port
return std::make_shared<product_store>(index, store->source());
}
Expand Down Expand Up @@ -51,7 +51,7 @@
auto const& [store, eom, message_id, _] = msg;
if (debug_) {
spdlog::debug("Multiplexing {} with ID {} (is flush: {})",
store->id()->to_string(),
store->index()->to_string(),

Check warning on line 54 in phlex/core/multiplexer.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/multiplexer.cpp#L54

Added line #L54 was not covered by tests
message_id,
store->is_flush());
}
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/store_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace phlex::experimental {

bool detect_flush_flag::done_with(product_store_const_ptr const& store)
{
auto const h = store->id()->hash();
auto const h = store->index()->hash();
if (const_flag_accessor fa; flags_.find(fa, h) && fa->second->is_complete()) {
flags_.erase(fa);
return true;
Expand Down
4 changes: 2 additions & 2 deletions phlex/model/product_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace phlex::experimental {

std::string const& product_store::layer_name() const noexcept { return id_->layer_name(); }
std::string const& product_store::source() const noexcept { return source_; }
data_cell_index_ptr const& product_store::id() const noexcept { return id_; }
data_cell_index_ptr const& product_store::index() const noexcept { return id_; }
bool product_store::is_flush() const noexcept { return stage_ == stage::flush; }

bool product_store::contains_product(std::string const& product_name) const
Expand All @@ -41,7 +41,7 @@ namespace phlex::experimental {

product_store_ptr const& more_derived(product_store_ptr const& a, product_store_ptr const& b)
{
if (a->id()->depth() > b->id()->depth()) {
if (a->index()->depth() > b->index()->depth()) {
return a;
}
return b;
Expand Down
2 changes: 1 addition & 1 deletion phlex/model/product_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace phlex::experimental {
std::string const& layer_name() const noexcept;
std::string const& source() const noexcept;
product_store_ptr make_flush() const;
data_cell_index_ptr const& id() const noexcept;
data_cell_index_ptr const& index() const noexcept;
bool is_flush() const noexcept;

// Product interface
Expand Down
Loading
Loading