Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions form/form_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ namespace {
products.reserve(store.size());

// Iterate through all products in the store
for (auto const& [product_name, product_ptr] : store) {
// product_name: "tracks" (from the map key)
for (auto const& [product_spec, product_ptr] : store) {
// product_spec: "tracks" (from the map key)
// product_ptr: pointer to the actual product data
assert(product_ptr && "store should not contain null product_ptr");

std::cout << " Product: " << product_name.full() << "\n";
std::cout << " Product: " << product_spec.full() << "\n";

// Create FORM product with metadata
products.emplace_back(product_name.name().trans_get_string(), // label, from map key
products.emplace_back(product_spec.suffix().trans_get_string(), // label, from map key
product_ptr->address(), // data, from phlex product_base
&product_ptr->type() // type, from phlex product_base
);
Expand Down
8 changes: 4 additions & 4 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ namespace phlex::experimental {
tbb::flow::graph& g,
AlgorithmBits alg,
InitTuple initializer,
product_queries product_labels,
product_queries input_products,
std::vector<std::string> output,
std::string partition) :
declared_fold{std::move(name), std::move(predicates), std::move(product_labels)},
declared_fold{std::move(name), std::move(predicates), std::move(input_products)},
initializer_{std::move(initializer)},
output_{
to_product_specifications(full_name(), std::move(output), make_type_ids<result_type>())},
Expand Down Expand Up @@ -132,9 +132,9 @@ namespace phlex::experimental {
}
}

tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, fold_);
return receiver_for<num_inputs>(join_, input(), input_product, fold_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, observer_);
return receiver_for<num_inputs>(join_, input(), input_product, observer_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, predicate_);
return receiver_for<num_inputs>(join_, input(), input_product, predicate_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, transform_);
return receiver_for<num_inputs>(join_, input(), input_product, transform_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace phlex::experimental {

product_store_const_ptr generator::make_child(std::size_t const i, products new_products)
{
auto child_index = parent_->index()->make_child(i, child_layer_name_);
auto child_index = parent_->index()->make_child(child_layer_name_, i);
++child_counts_[child_index->layer_hash()];
return std::make_shared<product_store>(child_index, node_name_, std::move(new_products));
}
Expand Down
14 changes: 7 additions & 7 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ namespace phlex::experimental {
tbb::flow::graph& g,
Predicate&& predicate,
Unfold&& unfold,
product_queries product_labels,
std::vector<std::string> output_products,
product_queries input_products,
std::vector<std::string> output_product_suffixes,
std::string child_layer_name) :
declared_unfold{std::move(name),
std::move(predicates),
std::move(product_labels),
std::move(input_products),
std::move(child_layer_name)},
output_{to_product_specifications(full_name(),
std::move(output_products),
std::move(output_product_suffixes),
make_type_ids<skip_first_type<return_type<Unfold>>>())},
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
unfold_{g,
Expand All @@ -125,9 +125,9 @@ namespace phlex::experimental {
}

private:
tbb::flow::receiver<message>& port_for(product_query const& product_label) override
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), product_label, unfold_);
return receiver_for<num_inputs>(join_, input(), input_product, unfold_);
}
std::vector<tbb::flow::receiver<message>*> ports() override
{
Expand Down Expand Up @@ -165,7 +165,7 @@ namespace phlex::experimental {
auto running_value = obj.initial_value();
while (std::invoke(predicate, obj, running_value)) {
products new_products;
auto new_id = unfolded_id->make_child(counter, child_layer());
auto new_id = unfolded_id->make_child(child_layer(), counter);
if constexpr (requires { std::invoke(unfold, obj, running_value, *new_id); }) {
auto [next_value, prods] = std::invoke(unfold, obj, running_value, *new_id);
new_products.add_all(output_, std::move(prods));
Expand Down
6 changes: 3 additions & 3 deletions phlex/core/detail/filter_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ namespace phlex::experimental {

void decision_map::erase(accessor& a) { results_.erase(a); }

data_map::data_map(product_queries const& product_names) :
product_names_{&product_names}, nargs_{product_names.size()}
data_map::data_map(product_queries const& input_products) :
input_products_{&input_products}, nargs_{input_products.size()}
{
assert(nargs_ > 0);
}
Expand All @@ -76,7 +76,7 @@ namespace phlex::experimental {

// Fill slots in the order of the input arguments to the downstream node.
for (std::size_t i = 0; i != nargs_; ++i) {
if (elem[i] or not resolve_in_store((*product_names_)[i], *store)) {
if (elem[i] or not resolve_in_store((*input_products_)[i], *store)) {
continue;
}
elem[i] = store;
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/detail/filter_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace phlex::experimental {

private:
stores_t stores_;
std::vector<product_query> const* product_names_;
std::vector<product_query> const* input_products_;
std::size_t nargs_;
};
}
Expand Down
18 changes: 9 additions & 9 deletions phlex/core/edge_creation_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include <string>

namespace phlex::experimental {
using product_name_t = identifier;
using product_suffix_t = identifier;

class edge_creation_policy {
public:
Expand All @@ -31,25 +31,25 @@ namespace phlex::experimental {

private:
template <typename T>
static std::multimap<product_name_t, named_output_port> producing_nodes(T& nodes);
static std::multimap<product_suffix_t, named_output_port> producing_nodes(T& nodes);

std::multimap<product_name_t, named_output_port> producers_;
std::multimap<product_suffix_t, named_output_port> producers_;
};

// =============================================================================
// Implementation
template <typename T>
std::multimap<product_name_t, edge_creation_policy::named_output_port>
std::multimap<product_suffix_t, edge_creation_policy::named_output_port>
edge_creation_policy::producing_nodes(T& nodes)
{
std::multimap<product_name_t, named_output_port> result;
std::multimap<product_suffix_t, named_output_port> result;
for (auto const& [node_name, node] : nodes) {
for (auto const& product_name : node->output()) {
if (product_name.name().empty())
for (auto const& product_spec : node->output()) {
if (product_spec.suffix().empty())
continue;

result.emplace(product_name.name(),
named_output_port{node_name, &node->output_port(), product_name.type()});
result.emplace(product_spec.suffix(),
named_output_port{node_name, &node->output_port(), product_spec.type()});
}
}
return result;
Expand Down
10 changes: 6 additions & 4 deletions phlex/core/edge_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include <cassert>

using namespace std::string_literals;

namespace phlex::experimental {
index_router::provider_input_ports_t make_provider_edges(index_router::head_ports_t head_ports,
declared_providers& providers)
Expand All @@ -19,22 +21,22 @@ namespace phlex::experimental {
bool found_match = false;
for (auto const& [_, p] : providers) {
auto& provider = *p;
if (port.product_label.match(provider.output_product())) {
if (port.input_product.match(provider.output_product())) {
if (!result.contains(provider.full_name())) {
result.try_emplace(provider.full_name(), port.product_label, provider.input_port());
result.try_emplace(provider.full_name(), port.input_product, provider.input_port());
}
spdlog::debug("Connecting provider {} to node {} (product: {})",
provider.full_name(),
node_name,
port.product_label.to_string());
port.input_product.to_string());
make_edge(provider.output_port(), *(port.port));
found_match = true;
break;
}
}
if (!found_match) {
throw std::runtime_error("No provider found for product: "s +
port.product_label.to_string());
port.input_product.to_string());
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions phlex/core/edge_maker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
#include <vector>

namespace phlex::experimental {
using namespace std::string_literals;

using product_name_t = identifier;

index_router::provider_input_ports_t make_provider_edges(index_router::head_ports_t head_ports,
declared_providers& providers);
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/index_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace phlex::experimental {
class index_router {
public:
struct named_input_port {
product_query product_label;
product_query input_product;
tbb::flow::receiver<message>* port;
};
using named_input_ports_t = std::vector<named_input_port>;
Expand All @@ -86,7 +86,7 @@ namespace phlex::experimental {
using head_ports_t = std::map<std::string, named_input_ports_t>;

struct provider_input_port_t {
product_query product_label;
product_query input_product;
tbb::flow::receiver<index_message>* port;
};
using provider_input_ports_t = std::map<std::string, provider_input_port_t>;
Expand Down
10 changes: 5 additions & 5 deletions phlex/core/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ namespace phlex::experimental {
return b;
}

std::size_t port_index_for(product_queries const& product_labels,
product_query const& product_label)
std::size_t port_index_for(product_queries const& input_products,
product_query const& input_product)
{
auto const [b, e] = std::tuple{cbegin(product_labels), cend(product_labels)};
auto it = std::find(b, e, product_label);
auto const [b, e] = std::tuple{cbegin(input_products), cend(input_products)};
auto it = std::find(b, e, input_product);
if (it == e) {
throw std::runtime_error(
fmt::format("Algorithm does not accept product '{}'.", product_label));
fmt::format("Algorithm does not accept product '{}'.", input_product));
}
return std::distance(b, it);
}
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace phlex::experimental {
// Non-template overload for single message case
inline message const& most_derived(message const& msg) { return msg; }

std::size_t port_index_for(product_queries const& product_labels,
product_query const& product_label);
std::size_t port_index_for(product_queries const& input_products,
product_query const& input_product);
}

#endif // PHLEX_CORE_MESSAGE_HPP
20 changes: 10 additions & 10 deletions phlex/core/multilayer_join_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ namespace phlex::experimental {
}(std::make_index_sequence<N>{});
}

// Looks up the port index for the given product label, then returns a reference to
// Looks up the port index for the given input product query, then returns a reference to
// the corresponding input port of the join node. Only valid for N > 1.
template <std::size_t N>
tbb::flow::receiver<message>& receiver_for(join_or_none_t<N>& join,
product_queries const& product_labels,
product_query const& product_label)
product_queries const& input_products,
product_query const& input_product)
{
static_assert(N > 1ull, "receiver_for should not be called for N=1");
auto const index = port_index_for(product_labels, product_label);
auto const index = port_index_for(input_products, input_product);
return receiver_for<0ull, N>(join, index);
}
}
Expand All @@ -212,19 +212,19 @@ namespace phlex::experimental {
}
}

// Returns the receiver for the input port that corresponds to the given product label. For
// N == 1 there is only one port so the node itself is returned; for N > 1 the port is looked
// up by label within the join.
// Returns the receiver for the input port that corresponds to the given input product query.
// For N == 1 there is only one port so the node itself is returned; for N > 1 the port is
// looked up by query within the join.
template <std::size_t N, typename Node>
tbb::flow::receiver<message>& receiver_for(join_or_none_t<N>& join,
product_queries const& product_labels,
product_query const& product_label,
product_queries const& input_products,
product_query const& input_product,
Node& node)
{
if constexpr (N == 1ull) {
return node;
} else {
return detail::receiver_for<N>(join, product_labels, product_label);
return detail::receiver_for<N>(join, input_products, input_product);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/product_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace phlex {
return false;
}
if (suffix) {
if (*suffix != spec.name()) {
if (*suffix != spec.suffix()) {
return false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/products_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ namespace phlex::experimental {

std::size_t products_consumer::num_inputs() const { return input().size(); }

tbb::flow::receiver<message>& products_consumer::port(product_query const& product_label)
tbb::flow::receiver<message>& products_consumer::port(product_query const& input_product)
{
return port_for(product_label);
return port_for(input_product);
}

product_queries const& products_consumer::input() const noexcept { return input_products_; }
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/products_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace phlex::experimental {

product_queries const& input() const noexcept;
std::vector<identifier> const& layers() const noexcept;
tbb::flow::receiver<message>& port(product_query const& product_label);
tbb::flow::receiver<message>& port(product_query const& input_product);

virtual named_index_ports index_ports() = 0;
virtual std::vector<tbb::flow::receiver<message>*> ports() = 0;
Expand All @@ -41,7 +41,7 @@ namespace phlex::experimental {
}

private:
virtual tbb::flow::receiver<message>& port_for(product_query const& product_label) = 0;
virtual tbb::flow::receiver<message>& port_for(product_query const& input_product) = 0;

product_queries input_products_;
std::vector<identifier> layers_;
Expand Down
Loading
Loading