Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions phlex/core/cached_product_stores.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@
// FIXME: only intended to be used in a single-threaded context as std::map is not
// thread-safe.

#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/fwd.hpp"
#include "phlex/model/level_id.hpp"
#include "phlex/model/product_store.hpp"

namespace phlex::experimental {

class cached_product_stores {
public:
product_store_ptr get_store(level_id_ptr id = level_id::base_ptr())
product_store_ptr get_store(data_cell_index_ptr id = data_cell_index::base_ptr())
{
auto it = product_stores_.find(id->hash());
if (it != cend(product_stores_)) {
return it->second;
}
if (id == level_id::base_ptr()) {
if (id == data_cell_index::base_ptr()) {
return new_store(product_store::base());
}
return new_store(
get_store(id->parent())
->make_child(id->number(), id->level_name(), source_name_, stage::process));
->make_child(id->number(), id->layer_name(), source_name_, stage::process));
}

private:
Expand All @@ -33,7 +33,7 @@ namespace phlex::experimental {
}

std::string const source_name_{"Source"};
std::map<level_id::hash_type, product_store_ptr> product_stores_{};
std::map<data_cell_index::hash_type, product_store_ptr> product_stores_{};
};

}
Expand Down
8 changes: 4 additions & 4 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/store_counters.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/level_id.hpp"
#include "phlex/model/product_specification.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"
Expand Down Expand Up @@ -92,7 +92,7 @@ namespace phlex::experimental {
if (store->is_flush()) {
// Downstream nodes always get the flush.
get<0>(outputs).try_put(msg);
if (store->id()->level_name() != partition_) {
if (store->id()->layer_name() != partition_) {
return;
}
}
Expand All @@ -105,7 +105,7 @@ namespace phlex::experimental {
counter_for(id_hash_for_counter).set_flush_value(store, original_message_id);
} else {
call(ft, messages, std::make_index_sequence<N>{});
counter_for(id_hash_for_counter).increment(store->id()->level_hash());
counter_for(id_hash_for_counter).increment(store->id()->layer_hash());
}

if (auto counter = done_with(id_hash_for_counter)) {
Expand Down Expand Up @@ -179,7 +179,7 @@ namespace phlex::experimental {
std::string partition_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, messages_t<1>> fold_;
tbb::concurrent_unordered_map<level_id, std::unique_ptr<R>> results_;
tbb::concurrent_unordered_map<data_cell_index, std::unique_ptr<R>> results_;
std::atomic<std::size_t> calls_;
std::atomic<std::size_t> product_count_;
};
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace phlex::experimental {
declared_observer::~declared_observer() = default;

void declared_observer::report_cached_hashes(
tbb::concurrent_hash_map<level_id::hash_type, bool> const& hashes) const
tbb::concurrent_hash_map<data_cell_index::hash_type, bool> const& hashes) const
{
if (hashes.size() > 0ull) {
spdlog::warn("Monitor {} has {} cached hashes.", full_name(), hashes.size());
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
#include "phlex/core/store_counters.hpp"
#include "phlex/metaprogramming/type_deduction.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/level_id.hpp"
#include "phlex/model/product_specification.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"
Expand All @@ -38,7 +38,7 @@ namespace phlex::experimental {
virtual ~declared_observer();

protected:
using hashes_t = tbb::concurrent_hash_map<level_id::hash_type, bool>;
using hashes_t = tbb::concurrent_hash_map<data_cell_index::hash_type, bool>;
using accessor = hashes_t::accessor;

void report_cached_hashes(hashes_t const& hashes) const;
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
#include "phlex/core/store_counters.hpp"
#include "phlex/metaprogramming/type_deduction.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/level_id.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"

Expand Down Expand Up @@ -43,7 +43,7 @@ namespace phlex::experimental {
virtual tbb::flow::sender<predicate_result>& sender() = 0;

protected:
using results_t = tbb::concurrent_hash_map<level_id::hash_type, predicate_result>;
using results_t = tbb::concurrent_hash_map<data_cell_index::hash_type, predicate_result>;
using accessor = results_t::accessor;
using const_accessor = results_t::const_accessor;

Expand Down
6 changes: 3 additions & 3 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#include "phlex/core/store_counters.hpp"
#include "phlex/metaprogramming/type_deduction.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/level_id.hpp"
#include "phlex/model/product_specification.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"
Expand Down Expand Up @@ -50,7 +50,7 @@ namespace phlex::experimental {
virtual std::size_t product_count() const = 0;

protected:
using stores_t = tbb::concurrent_hash_map<level_id::hash_type, product_store_ptr>;
using stores_t = tbb::concurrent_hash_map<data_cell_index::hash_type, product_store_ptr>;
using accessor = stores_t::accessor;
using const_accessor = stores_t::const_accessor;

Expand Down Expand Up @@ -101,7 +101,7 @@ namespace phlex::experimental {
if (stores_.insert(a, store->id()->hash())) {
auto result = call(ft, messages, std::make_index_sequence<N>{});
++calls_;
++product_count_[store->id()->level_hash()];
++product_count_[store->id()->layer_hash()];
products new_products;
new_products.add_all(output_, std::move(result));
a->second =
Expand Down
10 changes: 5 additions & 5 deletions phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "phlex/core/declared_unfold.hpp"
#include "phlex/model/data_cell_counter.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/level_counter.hpp"

#include "fmt/std.h"
#include "spdlog/spdlog.h"
Expand All @@ -9,17 +9,17 @@ namespace phlex::experimental {

generator::generator(product_store_const_ptr const& parent,
std::string node_name,
std::string const& new_level_name) :
std::string const& new_layer_name) :
parent_{std::const_pointer_cast<product_store>(parent)},
node_name_{std::move(node_name)},
new_level_name_{new_level_name}
new_layer_name_{new_layer_name}
{
}

product_store_const_ptr generator::make_child(std::size_t const i, products new_products)
{
auto child = parent_->make_child(i, new_level_name_, node_name_, std::move(new_products));
++child_counts_[child->id()->level_hash()];
auto child = parent_->make_child(i, new_layer_name_, node_name_, std::move(new_products));
++child_counts_[child->id()->layer_hash()];
return child;
}

Expand Down
24 changes: 12 additions & 12 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/store_counters.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/level_id.hpp"
#include "phlex/model/product_specification.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"
Expand All @@ -37,7 +37,7 @@ namespace phlex::experimental {
public:
explicit generator(product_store_const_ptr const& parent,
std::string node_name,
std::string const& new_level_name);
std::string const& new_layer_name);
product_store_const_ptr flush_store() const;

product_store_const_ptr make_child_for(std::size_t const level_number, products new_products)
Expand All @@ -49,8 +49,8 @@ namespace phlex::experimental {
product_store_const_ptr make_child(std::size_t i, products new_products);
product_store_ptr parent_;
std::string node_name_;
std::string const& new_level_name_;
std::map<level_id::hash_type, std::size_t> child_counts_;
std::string const& new_layer_name_;
std::map<data_cell_index::hash_type, std::size_t> child_counts_;
};

class declared_unfold : public products_consumer {
Expand All @@ -66,7 +66,7 @@ namespace phlex::experimental {
virtual std::size_t product_count() const = 0;

protected:
using stores_t = tbb::concurrent_hash_map<level_id::hash_type, product_store_ptr>;
using stores_t = tbb::concurrent_hash_map<data_cell_index::hash_type, product_store_ptr>;
using accessor = stores_t::accessor;
using const_accessor = stores_t::const_accessor;

Expand All @@ -93,12 +93,12 @@ namespace phlex::experimental {
Unfold&& unfold,
product_queries product_labels,
std::vector<std::string> output_products,
std::string new_level_name) :
std::string new_layer_name) :
declared_unfold{std::move(name), std::move(predicates), std::move(product_labels)},
output_{to_product_specifications(full_name(),
std::move(output_products),
make_type_ids<skip_first_type<return_type<Unfold>>>())},
new_level_name_{std::move(new_level_name)},
new_layer_name_{std::move(new_layer_name)},
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
unfold_{
g,
Expand All @@ -112,7 +112,7 @@ namespace phlex::experimental {
std::get<0>(output).try_put(msg);
} else if (accessor a; stores_.insert(a, store->id()->hash())) {
std::size_t const original_message_id{msg_counter_};
generator g{msg.store, this->full_name(), new_level_name_};
generator g{msg.store, this->full_name(), new_layer_name_};
call(p, ufold, msg.store->id(), g, msg.eom, messages, std::make_index_sequence<N>{});

message const flush_msg{g.flush_store(), msg.eom, ++msg_counter_, original_message_id};
Expand Down Expand Up @@ -144,7 +144,7 @@ namespace phlex::experimental {
template <std::size_t... Is>
void call(Predicate const& predicate,
Unfold const& unfold,
level_id_ptr const& unfolded_id,
data_cell_index_ptr const& unfolded_id,
generator& g,
end_of_message_ptr const& eom,
messages_t<N> const& messages,
Expand All @@ -156,7 +156,7 @@ namespace phlex::experimental {
auto running_value = obj.initial_value();
while (std::invoke(predicate, obj, running_value)) {
products new_products;
auto new_id = unfolded_id->make_child(counter, new_level_name_);
auto new_id = unfolded_id->make_child(counter, new_layer_name_);
if constexpr (requires { std::invoke(unfold, obj, running_value, *new_id); }) {
auto [next_value, prods] = std::invoke(unfold, obj, running_value, *new_id);
new_products.add_all(output_, std::move(prods));
Expand All @@ -178,10 +178,10 @@ namespace phlex::experimental {

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
product_specifications output_;
std::string new_level_name_;
std::string new_layer_name_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, messages_t<1u>> unfold_;
tbb::concurrent_hash_map<level_id::hash_type, product_store_ptr> stores_;
tbb::concurrent_hash_map<data_cell_index::hash_type, product_store_ptr> stores_;
std::atomic<std::size_t> msg_counter_{}; // Is this sufficient? Probably not.
std::atomic<std::size_t> calls_{};
std::atomic<std::size_t> product_count_{};
Expand Down
11 changes: 6 additions & 5 deletions phlex/core/end_of_message.cpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
#include "phlex/core/end_of_message.hpp"
#include "phlex/model/level_hierarchy.hpp"
#include "phlex/model/data_layer_hierarchy.hpp"

namespace phlex::experimental {

end_of_message::end_of_message(end_of_message_ptr parent,
level_hierarchy* hierarchy,
level_id_ptr id) :
data_layer_hierarchy* hierarchy,
data_cell_index_ptr id) :
parent_{parent}, hierarchy_{hierarchy}, id_{id}
{
}

end_of_message_ptr end_of_message::make_base(level_hierarchy* hierarchy, level_id_ptr id)
end_of_message_ptr end_of_message::make_base(data_layer_hierarchy* hierarchy,
data_cell_index_ptr id)
{
return end_of_message_ptr{new end_of_message{nullptr, hierarchy, id}};
}

end_of_message_ptr end_of_message::make_child(level_id_ptr id)
end_of_message_ptr end_of_message::make_child(data_cell_index_ptr id)
{
return end_of_message_ptr{new end_of_message{shared_from_this(), hierarchy_, id}};
}
Expand Down
12 changes: 7 additions & 5 deletions phlex/core/end_of_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ namespace phlex::experimental {

class end_of_message : public std::enable_shared_from_this<end_of_message> {
public:
static end_of_message_ptr make_base(level_hierarchy* hierarchy, level_id_ptr id);
end_of_message_ptr make_child(level_id_ptr id);
static end_of_message_ptr make_base(data_layer_hierarchy* hierarchy, data_cell_index_ptr id);
end_of_message_ptr make_child(data_cell_index_ptr id);
~end_of_message();

private:
end_of_message(end_of_message_ptr parent, level_hierarchy* hierarchy, level_id_ptr id);
end_of_message(end_of_message_ptr parent,
data_layer_hierarchy* hierarchy,
data_cell_index_ptr id);

end_of_message_ptr parent_;
level_hierarchy* hierarchy_;
level_id_ptr id_;
data_layer_hierarchy* hierarchy_;
data_cell_index_ptr id_;
};

}
Expand Down
8 changes: 4 additions & 4 deletions phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "phlex/concurrency.hpp"
#include "phlex/core/edge_maker.hpp"
#include "phlex/model/level_counter.hpp"
#include "phlex/model/data_cell_counter.hpp"
#include "phlex/model/product_store.hpp"

#include "fmt/std.h"
Expand All @@ -13,15 +13,15 @@
#include <iostream>

namespace phlex::experimental {
level_sentry::level_sentry(flush_counters& counters,
layer_sentry::layer_sentry(flush_counters& counters,
message_sender& sender,
product_store_ptr store) :
counters_{counters}, sender_{sender}, store_{store}, depth_{store_->id()->depth()}
{
counters_.update(store_->id());
}

level_sentry::~level_sentry()
layer_sentry::~layer_sentry()
{
auto flush_result = counters_.extract(store_->id());
auto flush_store = store_->make_flush();
Expand All @@ -32,7 +32,7 @@ namespace phlex::experimental {
sender_.send_flush(std::move(flush_store));
}

std::size_t level_sentry::depth() const noexcept { return depth_; }
std::size_t layer_sentry::depth() const noexcept { return depth_; }

framework_graph::framework_graph(product_store_ptr store, int const max_parallelism) :
framework_graph{[store](framework_driver& driver) { driver.yield(store); }, max_parallelism}
Expand Down
Loading
Loading