Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions form/form_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ namespace {
products.reserve(store.size());

// Iterate through all products in the store
for (auto const& [product_name, product_ptr] : store) {
// product_name: "tracks" (from the map key)
for (auto const& [product_spec, product_ptr] : store) {
// product_spec: "tracks" (from the map key)
// product_ptr: pointer to the actual product data
assert(product_ptr && "store should not contain null product_ptr");

std::cout << " Product: " << product_name.full() << "\n";
std::cout << " Product: " << product_spec.full() << "\n";

// Create FORM product with metadata
products.emplace_back(product_name.name().trans_get_string(), // label, from map key
products.emplace_back(product_spec.suffix().trans_get_string(), // label, from map key
product_ptr->address(), // data, from phlex product_base
&product_ptr->type() // type, from phlex product_base
);
Expand Down
8 changes: 4 additions & 4 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ namespace phlex::experimental {
tbb::flow::graph& g,
AlgorithmBits alg,
InitTuple initializer,
product_queries product_labels,
product_queries input_products,
std::vector<std::string> output,
std::string partition) :
declared_fold{std::move(name), std::move(predicates), std::move(product_labels)},
declared_fold{std::move(name), std::move(predicates), std::move(input_products)},
initializer_{std::move(initializer)},
output_{
to_product_specifications(full_name(), std::move(output), make_type_ids<result_type>())},
Expand Down Expand Up @@ -132,9 +132,9 @@ namespace phlex::experimental {
}
}

tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, fold_);
return receiver_for<num_inputs>(join_, input(), input_product, fold_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, observer_);
return receiver_for<num_inputs>(join_, input(), input_product, observer_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, predicate_);
return receiver_for<num_inputs>(join_, input(), input_product, predicate_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, transform_);
return receiver_for<num_inputs>(join_, input(), input_product, transform_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace phlex::experimental {

product_store_const_ptr generator::make_child(std::size_t const i, products new_products)
{
auto child_index = parent_->index()->make_child(i, child_layer_name_);
auto child_index = parent_->index()->make_child(child_layer_name_, i);
++child_counts_[child_index->layer_hash()];
return std::make_shared<product_store>(child_index, node_name_, std::move(new_products));
}
Expand Down
14 changes: 7 additions & 7 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ namespace phlex::experimental {
tbb::flow::graph& g,
Predicate&& predicate,
Unfold&& unfold,
product_queries product_labels,
std::vector<std::string> output_products,
product_queries input_products,
std::vector<std::string> output_product_suffixes,
std::string child_layer_name) :
declared_unfold{std::move(name),
std::move(predicates),
std::move(product_labels),
std::move(input_products),
std::move(child_layer_name)},
output_{to_product_specifications(full_name(),
std::move(output_products),
std::move(output_product_suffixes),
make_type_ids<skip_first_type<return_type<Unfold>>>())},
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
unfold_{g,
Expand All @@ -125,9 +125,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, unfold_);
return receiver_for<num_inputs>(join_, input(), input_product, unfold_);
}
std::vector<tbb::flow::receiver<message>*> ports() override
{
Expand Down Expand Up @@ -165,7 +165,7 @@ namespace phlex::experimental {
auto running_value = obj.initial_value();
while (std::invoke(predicate, obj, running_value)) {
products new_products;
auto new_id = unfolded_id->make_child(counter, child_layer());
auto new_id = unfolded_id->make_child(child_layer(), counter);
if constexpr (requires { std::invoke(unfold, obj, running_value, *new_id); }) {
auto [next_value, prods] = std::invoke(unfold, obj, running_value, *new_id);
new_products.add_all(output_, std::move(prods));
Expand Down
6 changes: 3 additions & 3 deletions phlex/core/detail/filter_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ namespace phlex::experimental {

void decision_map::erase(accessor& a) { results_.erase(a); }

data_map::data_map(product_queries const& product_names) :
product_names_{&product_names}, nargs_{product_names.size()}
data_map::data_map(product_queries const& input_products) :
input_products_{&input_products}, nargs_{input_products.size()}
{
assert(nargs_ > 0);
}
Expand All @@ -76,7 +76,7 @@ namespace phlex::experimental {

// Fill slots in the order of the input arguments to the downstream node.
for (std::size_t i = 0; i != nargs_; ++i) {
if (elem[i] or not resolve_in_store((*product_names_)[i], *store)) {
if (elem[i] or not resolve_in_store((*input_products_)[i], *store)) {
continue;
}
elem[i] = store;
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/detail/filter_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace phlex::experimental {

private:
stores_t stores_;
std::vector<product_query> const* product_names_;
std::vector<product_query> const* input_products_;
std::size_t nargs_;
};
}
Expand Down
18 changes: 9 additions & 9 deletions phlex/core/edge_creation_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <string>

namespace phlex::experimental {
using product_name_t = identifier;
using product_suffix_t = identifier;

class edge_creation_policy {
public:
Expand All @@ -31,25 +31,25 @@ namespace phlex::experimental {

private:
template <typename T>
static std::multimap<product_name_t, named_output_port> producing_nodes(T& nodes);
static std::multimap<product_suffix_t, named_output_port> producing_nodes(T& nodes);

std::multimap<product_name_t, named_output_port> producers_;
std::multimap<product_suffix_t, named_output_port> producers_;
};

// =============================================================================
// Implementation
template <typename T>
std::multimap<product_name_t, edge_creation_policy::named_output_port>
std::multimap<product_suffix_t, edge_creation_policy::named_output_port>
edge_creation_policy::producing_nodes(T& nodes)
{
std::multimap<product_name_t, named_output_port> result;
std::multimap<product_suffix_t, named_output_port> result;
for (auto const& [node_name, node] : nodes) {
for (auto const& product_name : node->output()) {
if (product_name.name().empty())
for (auto const& product_spec : node->output()) {
if (product_spec.suffix().empty())
continue;

result.emplace(product_name.name(),
named_output_port{node_name, &node->output_port(), product_name.type()});
result.emplace(product_spec.suffix(),
named_output_port{node_name, &node->output_port(), product_spec.type()});
}
}
return result;
Expand Down
8 changes: 4 additions & 4 deletions phlex/core/edge_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@
bool found_match = false;
for (auto const& [_, p] : providers) {
auto& provider = *p;
if (port.product_label.match(provider.output_product())) {
if (port.input_product.match(provider.output_product())) {
if (!result.contains(provider.full_name())) {
result.try_emplace(provider.full_name(), port.product_label, provider.input_port());
result.try_emplace(provider.full_name(), port.input_product, provider.input_port());
}
spdlog::debug("Connecting provider {} to node {} (product: {})",
provider.full_name(),
node_name,
port.product_label.to_string());
port.input_product.to_string());
make_edge(provider.output_port(), *(port.port));
found_match = true;
break;
}
}
if (!found_match) {
throw std::runtime_error("No provider found for product: "s +
port.product_label.to_string());
port.input_product.to_string());

Check warning on line 37 in phlex/core/edge_maker.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/edge_maker.cpp#L37

Added line #L37 was not covered by tests
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/index_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace phlex::experimental {
class index_router {
public:
struct named_input_port {
product_query product_label;
product_query input_product;
tbb::flow::receiver<message>* port;
};
using named_input_ports_t = std::vector<named_input_port>;
Expand All @@ -86,7 +86,7 @@ namespace phlex::experimental {
using head_ports_t = std::map<std::string, named_input_ports_t>;

struct provider_input_port_t {
product_query product_label;
product_query input_product;
tbb::flow::receiver<index_message>* port;
};
using provider_input_ports_t = std::map<std::string, provider_input_port_t>;
Expand Down
10 changes: 5 additions & 5 deletions phlex/core/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
return b;
}

std::size_t port_index_for(product_queries const& product_labels,
product_query const& product_label)
std::size_t port_index_for(product_queries const& input_products,
product_query const& input_product)
{
auto const [b, e] = std::tuple{cbegin(product_labels), cend(product_labels)};
auto it = std::find(b, e, product_label);
auto const [b, e] = std::tuple{cbegin(input_products), cend(input_products)};
auto it = std::find(b, e, input_product);
if (it == e) {
throw std::runtime_error(
fmt::format("Algorithm does not accept product '{}'.", product_label));
fmt::format("Algorithm does not accept product '{}'.", input_product));

Check warning on line 32 in phlex/core/message.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/message.cpp#L32

Added line #L32 was not covered by tests
}
return std::distance(b, it);
}
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace phlex::experimental {
// Non-template overload for single message case
inline message const& most_derived(message const& msg) { return msg; }

std::size_t port_index_for(product_queries const& product_labels,
product_query const& product_label);
std::size_t port_index_for(product_queries const& input_products,
product_query const& input_product);
}

#endif // PHLEX_CORE_MESSAGE_HPP
20 changes: 10 additions & 10 deletions phlex/core/multilayer_join_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ namespace phlex::experimental {
}(std::make_index_sequence<N>{});
}

// Looks up the port index for the given product label, then returns a reference to
// Looks up the port index for the given input product query, then returns a reference to
// the corresponding input port of the join node. Only valid for N > 1.
template <std::size_t N>
tbb::flow::receiver<message>& receiver_for(join_or_none_t<N>& join,
product_queries const& product_labels,
product_query const& product_label)
product_queries const& input_products,
product_query const& input_product)
{
static_assert(N > 1ull, "receiver_for should not be called for N=1");
auto const index = port_index_for(product_labels, product_label);
auto const index = port_index_for(input_products, input_product);
return receiver_for<0ull, N>(join, index);
}
}
Expand All @@ -212,19 +212,19 @@ namespace phlex::experimental {
}
}

// Returns the receiver for the input port that corresponds to the given product label. For
// N == 1 there is only one port so the node itself is returned; for N > 1 the port is looked
// up by label within the join.
// Returns the receiver for the input port that corresponds to the given input product query.
// For N == 1 there is only one port so the node itself is returned; for N > 1 the port is
// looked up by query within the join.
template <std::size_t N, typename Node>
tbb::flow::receiver<message>& receiver_for(join_or_none_t<N>& join,
product_queries const& product_labels,
product_query const& product_label,
product_queries const& input_products,
product_query const& input_product,
Node& node)
{
if constexpr (N == 1ull) {
return node;
} else {
return detail::receiver_for<N>(join, product_labels, product_label);
return detail::receiver_for<N>(join, input_products, input_product);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/product_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace phlex {
return false;
}
if (suffix) {
if (*suffix != spec.name()) {
if (*suffix != spec.suffix()) {
return false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/products_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ namespace phlex::experimental {

std::size_t products_consumer::num_inputs() const { return input().size(); }

tbb::flow::receiver<message>& products_consumer::port(product_query const& product_label)
tbb::flow::receiver<message>& products_consumer::port(product_query const& input_product)
{
return port_for(product_label);
return port_for(input_product);
}

product_queries const& products_consumer::input() const noexcept { return input_products_; }
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/products_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace phlex::experimental {

product_queries const& input() const noexcept;
std::vector<identifier> const& layers() const noexcept;
tbb::flow::receiver<message>& port(product_query const& product_label);
tbb::flow::receiver<message>& port(product_query const& input_product);

virtual named_index_ports index_ports() = 0;
virtual std::vector<tbb::flow::receiver<message>*> ports() = 0;
Expand All @@ -41,7 +41,7 @@ namespace phlex::experimental {
}

private:
virtual tbb::flow::receiver<message>& port_for(product_query const& product_label) = 0;
virtual tbb::flow::receiver<message>& port_for(product_query const& input_product) = 0;

product_queries input_products_;
std::vector<identifier> layers_;
Expand Down
Loading
Loading