Skip to content
73 changes: 39 additions & 34 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@
class fold_node : public declared_fold, private count_stores {
using all_parameter_types = typename AlgorithmBits::input_parameter_types;
using input_parameter_types = skip_first_type<all_parameter_types>; // Skip fold object
static constexpr auto N = std::tuple_size_v<input_parameter_types>;
using R = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;
static constexpr auto num_inputs = std::tuple_size_v<input_parameter_types>;
using result_type = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;

static constexpr std::size_t M = 1; // hard-coded for now
static constexpr std::size_t num_outputs = 1; // hard-coded for now
using function_t = typename AlgorithmBits::bound_type;

public:
Expand All @@ -74,7 +74,8 @@
std::string partition) :
declared_fold{std::move(name), std::move(predicates), std::move(product_labels)},
initializer_{std::move(initializer)},
output_{to_product_specifications(full_name(), std::move(output), make_type_ids<R>())},
output_{
to_product_specifications(full_name(), std::move(output), make_type_ids<result_type>())},
partition_{std::move(partition)},
flush_receiver_{g,
tbb::flow::unlimited,
Expand All @@ -88,32 +89,33 @@
emit_and_evict_if_done(index);
return {};
}},
join_{make_join_or_none<N>(
join_{make_join_or_none<num_inputs>(
g, full_name(), layers())}, // FIXME: This should change to include result product!
fold_{
g, concurrency, [this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto&) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& index = msg.store->index();
fold_{g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<num_inputs> const& messages, auto&) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& index = msg.store->index();

auto fold_index = index->parent(partition_);
if (not fold_index) {
return;
}
auto fold_index = index->parent(partition_);
if (not fold_index) {
return;
}

auto const& index_hash_for_counter = fold_index->hash();
auto index_hash_for_counter = fold_index->hash();

call(ft, messages, std::make_index_sequence<N>{});
++calls_;
call(ft, messages, std::make_index_sequence<num_inputs>{});
++calls_;

counter_for(index_hash_for_counter).increment(index->layer_hash());
counter_for(index_hash_for_counter).increment(index->layer_hash());

emit_and_evict_if_done(fold_index);
}}
emit_and_evict_if_done(fold_index);
}}
{
if constexpr (N > 1ull) {
if constexpr (num_inputs > 1ull) {
make_edge(join_, fold_);
}
}
Expand All @@ -123,7 +125,7 @@
{
if (auto counter = done_with(fold_index->hash())) {
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
commit_(parent);
commit(parent);
++product_count_;
tbb::flow::output_port<0>(fold_).try_put(
{.store = parent, .id = counter->original_message_id()});
Expand All @@ -132,20 +134,22 @@

tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label, fold_);
return receiver_for<num_inputs>(join_, input(), product_label, fold_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<N>(join_, fold_);
return input_ports<num_inputs>(join_, fold_);

Check warning on line 142 in phlex/core/declared_fold.hpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/declared_fold.hpp#L142

Added line #L142 was not covered by tests
}

tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& output_port() override { return tbb::flow::output_port<0>(fold_); }
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
void call(function_t const& ft,
messages_t<num_inputs> const& messages,
std::index_sequence<Is...>)
{
auto const parent_index = most_derived(messages).store->index()->parent(partition_);

Expand All @@ -160,7 +164,7 @@
.first;
}

if constexpr (N == 1ull) {
if constexpr (num_inputs == 1ull) {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(messages)...);
} else {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
Expand All @@ -174,11 +178,11 @@
template <size_t... Is>
auto initialized_object(InitTuple&& tuple, std::index_sequence<Is...>) const
{
return std::unique_ptr<R>{
new R{std::forward<std::tuple_element_t<Is, InitTuple>>(std::get<Is>(tuple))...}};
return std::unique_ptr<result_type>{
new result_type{std::forward<std::tuple_element_t<Is, InitTuple>>(std::get<Is>(tuple))...}};
}

auto commit_(product_store_ptr& store)
auto commit(product_store_ptr& store)
{
auto& result = results_.at(store->index()->hash());
if constexpr (requires { send(*result); }) {
Expand All @@ -196,9 +200,10 @@
product_specifications output_;
std::string partition_;
tbb::flow::function_node<flush_message> flush_receiver_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, message_tuple<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index::hash_type, std::unique_ptr<R>> results_;
join_or_none_t<num_inputs> join_;
tbb::flow::multifunction_node<messages_t<num_inputs>, message_tuple<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index::hash_type, std::unique_ptr<result_type>>
results_;
std::atomic<std::size_t> calls_;
std::atomic<std::size_t> product_count_;
};
Expand Down
28 changes: 15 additions & 13 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ namespace phlex::experimental {

template <typename AlgorithmBits>
class observer_node : public declared_observer {
using InputArgs = typename AlgorithmBits::input_parameter_types;
using input_args = typename AlgorithmBits::input_parameter_types;
using function_t = typename AlgorithmBits::bound_type;
static constexpr auto N = AlgorithmBits::number_inputs;
static constexpr auto num_inputs = AlgorithmBits::number_inputs;

public:
static constexpr auto number_output_products = 0;
Expand All @@ -60,36 +60,38 @@ namespace phlex::experimental {
AlgorithmBits alg,
product_queries input_products) :
declared_observer{std::move(name), std::move(predicates), std::move(input_products)},
join_{make_join_or_none<N>(g, full_name(), layers())},
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
observer_{g,
concurrency,
[this, ft = alg.release_algorithm()](
messages_t<N> const& messages) -> oneapi::tbb::flow::continue_msg {
call(ft, messages, std::make_index_sequence<N>{});
messages_t<num_inputs> const& messages) -> oneapi::tbb::flow::continue_msg {
call(ft, messages, std::make_index_sequence<num_inputs>{});
++calls_;
return {};
}}
{
if constexpr (N > 1ull) {
if constexpr (num_inputs > 1ull) {
make_edge(join_, observer_);
}
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label, observer_);
return receiver_for<num_inputs>(join_, input(), product_label, observer_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<N>(join_, observer_);
return input_ports<num_inputs>(join_, observer_);
}

template <std::size_t... Is>
void call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
void call(function_t const& ft,
messages_t<num_inputs> const& messages,
std::index_sequence<Is...>)
{
if constexpr (N == 1ull) {
if constexpr (num_inputs == 1ull) {
std::invoke(ft, std::get<Is>(input_).retrieve(messages)...);
} else {
std::invoke(ft, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
Expand All @@ -99,9 +101,9 @@ namespace phlex::experimental {
named_index_ports index_ports() final { return join_.index_ports(); }
std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>> observer_;
input_retriever_types<input_args> input_{input_arguments<input_args>()};
join_or_none_t<num_inputs> join_;
tbb::flow::function_node<messages_t<num_inputs>> observer_;
std::atomic<std::size_t> calls_;
};
}
Expand Down
46 changes: 24 additions & 22 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ namespace phlex::experimental {

template <typename AlgorithmBits>
class predicate_node : public declared_predicate {
using InputArgs = typename AlgorithmBits::input_parameter_types;
using input_args = typename AlgorithmBits::input_parameter_types;
using function_t = typename AlgorithmBits::bound_type;
static constexpr auto N = AlgorithmBits::number_inputs;
static constexpr auto num_inputs = AlgorithmBits::number_inputs;

public:
static constexpr auto number_output_products = 0ull;
Expand All @@ -64,40 +64,42 @@ namespace phlex::experimental {
AlgorithmBits alg,
product_queries input_products) :
declared_predicate{std::move(name), std::move(predicates), std::move(input_products)},
join_{make_join_or_none<N>(g, full_name(), layers())},
predicate_{
g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<N> const& messages) -> predicate_result {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);

bool const rc = call(ft, messages, std::make_index_sequence<N>{});
++calls_;
return {message_id, rc};
}}
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
predicate_{g,
concurrency,
[this, ft = alg.release_algorithm()](
messages_t<num_inputs> const& messages) -> predicate_result {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);

bool const rc = call(ft, messages, std::make_index_sequence<num_inputs>{});
++calls_;
return {message_id, rc};
}}
{
if constexpr (N > 1ull) {
if constexpr (num_inputs > 1ull) {
make_edge(join_, predicate_);
}
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
{
return receiver_for<N>(join_, input(), product_label, predicate_);
return receiver_for<num_inputs>(join_, input(), product_label, predicate_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<N>(join_, predicate_);
return input_ports<num_inputs>(join_, predicate_);
}
tbb::flow::sender<predicate_result>& sender() override { return predicate_; }

template <std::size_t... Is>
bool call(function_t const& ft, messages_t<N> const& messages, std::index_sequence<Is...>)
bool call(function_t const& ft,
messages_t<num_inputs> const& messages,
std::index_sequence<Is...>)
{
if constexpr (N == 1ull) {
if constexpr (num_inputs == 1ull) {
return std::invoke(ft, std::get<Is>(input_).retrieve(messages)...);
} else {
return std::invoke(ft, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
Expand All @@ -107,9 +109,9 @@ namespace phlex::experimental {
named_index_ports index_ports() final { return join_.index_ports(); }
std::size_t num_calls() const final { return calls_.load(); }

input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
join_or_none_t<N> join_;
tbb::flow::function_node<messages_t<N>, predicate_result> predicate_;
input_retriever_types<input_args> input_{input_arguments<input_args>()};
join_or_none_t<num_inputs> join_;
tbb::flow::function_node<messages_t<num_inputs>, predicate_result> predicate_;
std::atomic<std::size_t> calls_;
};

Expand Down
Loading
Loading