Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions phlex/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ namespace phlex {
{
using detail::value_decorate_exception;
auto query_object = jv.as_object();
auto product = value_decorate_exception<std::string>(query_object, "product");
auto layer = value_decorate_exception<std::string>(query_object, "layer");
return product_query{experimental::product_specification::create(product), layer};
auto creator = value_decorate_exception<experimental::identifier>(query_object, "creator");
auto layer = value_decorate_exception<experimental::identifier>(query_object, "layer");
auto suffix = detail::value_if_exists(query_object, "suffix");
auto stage = detail::value_if_exists(query_object, "stage");
return product_query{
.creator = std::move(creator), .layer = std::move(layer), .suffix = suffix, .stage = stage};
}

experimental::identifier experimental::tag_invoke(boost::json::value_to_tag<identifier> const&,
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
return output_product_;
}

std::string const& declared_provider::layer() const noexcept { return output_product_.layer(); }
identifier const& declared_provider::layer() const noexcept { return output_product_.layer; }

Check warning on line 18 in phlex/core/declared_provider.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/declared_provider.cpp#L18

Added line #L18 was not covered by tests
}
2 changes: 1 addition & 1 deletion phlex/core/declared_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace phlex::experimental {

std::string full_name() const;
product_query const& output_product() const noexcept;
std::string const& layer() const noexcept;
identifier const& layer() const noexcept;

virtual tbb::flow::receiver<index_message>* input_port() = 0;
virtual tbb::flow::sender<message>& sender() = 0;
Expand Down
8 changes: 2 additions & 6 deletions phlex/core/detail/filter_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
#include <string>

namespace {
phlex::product_query const output_dummy{
phlex::experimental::product_specification{
phlex::experimental::algorithm_name{"for_output_only", ""},
"for_output_only",
phlex::experimental::type_id{}},
"dummy_layer"};
phlex::product_query const output_dummy = phlex::product_query{
.creator = "for_output_only"_id, .layer = "dummy_layer"_id, .suffix = "for_output_only"_id};
phlex::product_queries const for_output_only{output_dummy};
}

Expand Down
2 changes: 1 addition & 1 deletion phlex/core/detail/repeater_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace phlex::experimental::detail {

repeater_node::repeater_node(tbb::flow::graph& g, std::string node_name, std::string layer_name) :
repeater_node::repeater_node(tbb::flow::graph& g, std::string node_name, identifier layer_name) :
base_t{g},
indexer_{g},
repeater_{g,
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/detail/repeater_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace phlex::experimental::detail {

class repeater_node : public tbb::flow::composite_node<repeater_node_input, message_tuple<1>> {
public:
repeater_node(tbb::flow::graph& g, std::string node_name, std::string layer_name);
repeater_node(tbb::flow::graph& g, std::string node_name, identifier layer_name);

tbb::flow::receiver<message>& data_port();
tbb::flow::receiver<indexed_end_token>& flush_port();
Expand Down Expand Up @@ -55,7 +55,7 @@ namespace phlex::experimental::detail {
cache_t cached_products_;
std::atomic<bool> cache_enabled_{true};
std::string node_name_;
std::string layer_;
identifier layer_;
};
}

Expand Down
40 changes: 20 additions & 20 deletions phlex/core/edge_creation_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,55 @@
edge_creation_policy::named_output_port const* edge_creation_policy::find_producer(
product_query const& query) const
{
auto const& spec = query.spec();
auto [b, e] = producers_.equal_range(spec.name());
// TODO: Update later with correct querying
auto [b, e] = producers_.equal_range(query.suffix.value_or(""_id).trans_get_string());
if (b == e) {
spdlog::debug(
"Failed to find an algorithm that creates {} products. Assuming it comes from a provider",
spec.name());
query.suffix.value_or("\"\""_id));
return nullptr;
}
std::map<std::string, named_output_port const*> candidates;
for (auto const& [key, producer] : std::ranges::subrange{b, e}) {
if (producer.node.match(spec.qualifier())) {
if (spec.type() != producer.type) {
spdlog::debug("Matched {} ({}) from {} but types don't match (`{}` vs `{}`). Excluding "
// TODO: Definitely not right yet
if (producer.node.plugin() == std::string_view(identifier(query.creator)) ||
producer.node.algorithm() == std::string_view(identifier(query.creator))) {
if (query.type != producer.type) {
spdlog::debug("Matched ({}) from {} but types don't match (`{}` vs `{}`). Excluding "
"from candidate list.",
spec.full(),
query.to_string(),
producer.node.full(),
spec.type(),
query.type,
producer.type);
} else {
if (spec.type().exact_compare(producer.type)) {
spdlog::debug("Matched {} ({}) from {} and types match. Keeping in candidate list.",
spec.full(),
if (query.type.exact_compare(producer.type)) {
spdlog::debug("Matched ({}) from {} and types match. Keeping in candidate list.",
query.to_string(),
producer.node.full());
} else {
spdlog::warn("Matched {} ({}) from {} and types match, but not exactly (produce {} and "
spdlog::warn("Matched ({}) from {} and types match, but not exactly (produce {} and "
"consume {}). Keeping in candidate list!",
spec.full(),
query.to_string(),
producer.node.full(),
spec.type().exact_name(),
query.type.exact_name(),
producer.type.exact_name());
}
candidates.emplace(producer.node.full(), &producer);
}
} else {
spdlog::error(
"Creator name mismatch between ({}) and {}", query.to_string(), producer.node.full());
}
}

if (candidates.empty()) {
throw std::runtime_error("Cannot identify product matching the specified label " +
spec.full());
throw std::runtime_error("Cannot identify product matching the query " + query.to_string());

Check warning on line 54 in phlex/core/edge_creation_policy.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/edge_creation_policy.cpp#L54

Added line #L54 was not covered by tests
}

if (candidates.size() > 1ull) {
std::string msg =
fmt::format("More than one candidate matches the specification {}: \n - {}\n",
spec.full(),
fmt::join(std::views::keys(candidates), "\n - "));
std::string msg = fmt::format("More than one candidate matches the query {}: \n - {}\n",
query.to_string(),
fmt::join(std::views::keys(candidates), "\n - "));

Check warning on line 60 in phlex/core/edge_creation_policy.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/edge_creation_policy.cpp#L58-L60

Added lines #L58 - L60 were not covered by tests
throw std::runtime_error(msg);
}

Expand Down
5 changes: 2 additions & 3 deletions phlex/core/edge_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ namespace phlex::experimental {
bool found_match = false;
for (auto const& [_, p] : providers) {
auto& provider = *p;
if (port.product_label == provider.output_product()) {
auto it = result.find(provider.full_name());
if (it == result.cend()) {
if (port.product_label.match(provider.output_product())) {
if (!result.contains(provider.full_name())) {
result.try_emplace(provider.full_name(), port.product_label, provider.input_port());
}
spdlog::debug("Connecting provider {} to node {} (product: {})",
Expand Down
6 changes: 3 additions & 3 deletions phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ namespace phlex::experimental {
nodes_.unfolds,
nodes_.transforms);

std::map<std::string, flusher_t*> flushers_from_unfolds;
std::map<identifier, flusher_t*> flushers_from_unfolds;
for (auto const& n : nodes_.unfolds | std::views::values) {
flushers_from_unfolds.try_emplace(n->child_layer(), &n->flusher());
flushers_from_unfolds.try_emplace(identifier{n->child_layer()}, &n->flusher());
}

// Connect edges between all nodes, the graph-wide flusher, and the unfolds' flushers
Expand All @@ -154,7 +154,7 @@ namespace phlex::experimental {
std::set<flusher_t*> flushers;
// For providers
for (product_query const& pq : n->input()) {
if (auto it = unfold_flushers.find(pq.layer()); it != unfold_flushers.end()) {
if (auto it = unfold_flushers.find(pq.layer); it != unfold_flushers.end()) {
flushers.insert(it->second);
} else {
flushers.insert(&index_router_.flusher());
Expand Down
29 changes: 15 additions & 14 deletions phlex/core/index_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
using namespace phlex::experimental;

namespace {
auto delimited_layer_path(std::string layer_path)
std::string delimited_layer_path(std::string_view const layer_path)
{
if (not layer_path.starts_with("/")) {
return "/" + layer_path;
return fmt::format("/{}", layer_path);
}
return layer_path;
return std::string{layer_path};

Check warning on line 21 in phlex/core/index_router.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/index_router.cpp#L21

Added line #L21 was not covered by tests
}

void send_messages(phlex::data_cell_index_ptr const& index,
Expand Down Expand Up @@ -77,7 +77,7 @@
// multilayer_slot implementation

detail::multilayer_slot::multilayer_slot(tbb::flow::graph& g,
std::string layer,
identifier layer,
tbb::flow::receiver<indexed_end_token>* flush_port,
tbb::flow::receiver<index_message>* input_port) :
layer_{std::move(layer)}, broadcaster_{g}, flusher_{g}
Expand All @@ -89,15 +89,16 @@
void detail::multilayer_slot::put_message(data_cell_index_ptr const& index,
std::size_t message_id)
{
if (layer_ == index->layer_name()) {
auto const layer = static_cast<std::string_view>(layer_);
if (layer == index->layer_name()) {
broadcaster_.try_put({.index = index, .msg_id = message_id, .cache = false});
return;
}

// Flush values are only used for indices that are *not* the "lowest" in the branch
// of the hierarchy.
++counter_;
broadcaster_.try_put({.index = index->parent(layer_), .msg_id = message_id});
broadcaster_.try_put({.index = index->parent(layer), .msg_id = message_id});
}

void detail::multilayer_slot::put_end_token(data_cell_index_ptr const& index)
Expand All @@ -113,12 +114,12 @@

bool detail::multilayer_slot::matches_exactly(std::string const& layer_path) const
{
return layer_path.ends_with(delimited_layer_path(layer_));
return layer_path.ends_with(delimited_layer_path(static_cast<std::string_view>(layer_)));
}

bool detail::multilayer_slot::is_parent_of(data_cell_index_ptr const& index) const
{
return index->parent(layer_) != nullptr;
return index->parent(static_cast<std::string_view>(layer_)) != nullptr;
}

//========================================================================================
Expand All @@ -136,8 +137,8 @@

// Create the index-set broadcast nodes for providers
for (auto& [pq, provider_port] : provider_input_ports_ | std::views::values) {
auto [it, _] =
broadcasters_.try_emplace(pq.layer(), std::make_shared<detail::index_set_node>(g));
auto [it, _] = broadcasters_.try_emplace(static_cast<identifier const&>(pq.layer),
std::make_shared<detail::index_set_node>(g));
make_edge(*it->second, *provider_port);
}

Expand All @@ -150,7 +151,7 @@
auto entry = std::make_shared<detail::multilayer_slot>(g, layer, flush_port, input_port);
casters.push_back(entry);
}
multibroadcasters_.try_emplace(node_name, std::move(casters));
multibroadcasters_.try_emplace(identifier{node_name}, std::move(casters));
}
}

Expand Down Expand Up @@ -262,7 +263,7 @@

std::vector<decltype(broadcasters_.begin())> candidates;
for (auto it = broadcasters_.begin(), e = broadcasters_.end(); it != e; ++it) {
if (search_token.ends_with(delimited_layer_path(it->first))) {
if (search_token.ends_with(delimited_layer_path(static_cast<std::string_view>(it->first)))) {
candidates.push_back(it);
}
}
Expand All @@ -275,9 +276,9 @@
return nullptr;
}

std::string msg{"Multiple layers match specification " + layer_path + ":\n"};
std::string msg = fmt::format("Multiple layers match specification {}:\n", layer_path);

Check warning on line 279 in phlex/core/index_router.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/index_router.cpp#L279

Added line #L279 was not covered by tests
for (auto const& it : candidates) {
msg += "\n- " + it->first;
msg += fmt::format("\n- {}", it->first);

Check warning on line 281 in phlex/core/index_router.cpp

View check run for this annotation

Codecov / codecov/patch

phlex/core/index_router.cpp#L281

Added line #L281 was not covered by tests
}
throw std::runtime_error(msg);
}
Expand Down
9 changes: 5 additions & 4 deletions phlex/core/index_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "phlex/core/message.hpp"
#include "phlex/model/data_cell_counter.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/identifier.hpp"

#include "oneapi/tbb/flow_graph.h"

Expand Down Expand Up @@ -32,7 +33,7 @@ namespace phlex::experimental {
class multilayer_slot {
public:
multilayer_slot(tbb::flow::graph& g,
std::string layer,
identifier layer,
tbb::flow::receiver<indexed_end_token>* flush_port,
tbb::flow::receiver<index_message>* input_port);

Expand All @@ -43,7 +44,7 @@ namespace phlex::experimental {
bool is_parent_of(data_cell_index_ptr const& index) const;

private:
std::string layer_;
identifier layer_;
detail::index_set_node broadcaster_;
detail::flush_node flusher_;
int counter_ = 0;
Expand Down Expand Up @@ -116,7 +117,7 @@ namespace phlex::experimental {
// Routing to provider nodes
// The following maps are used to route data-cell indices to provider nodes.
// The first map is from layer name to the corresponding broadcaster node.
std::unordered_map<std::string, detail::index_set_node_ptr> broadcasters_;
std::unordered_map<identifier, detail::index_set_node_ptr> broadcasters_;
// The second map is a cache from a layer hash matched to a broadcaster node, to avoid
// repeated lookups for the same layer.
std::unordered_map<std::size_t, detail::index_set_node_ptr> matched_broadcasters_;
Expand All @@ -125,7 +126,7 @@ namespace phlex::experimental {
// Routing to multi-layer join nodes
// The first map is from the node name to the corresponding broadcaster nodes and flush
// nodes.
std::unordered_map<std::string, detail::multilayer_slots> multibroadcasters_;
std::unordered_map<identifier, detail::multilayer_slots> multibroadcasters_;
// The second map is a cache from a layer hash matched to a set of multilayer slots, to
// avoid repeated lookups for the same layer.
std::unordered_map<std::size_t, detail::multilayer_slots> matched_routing_entries_;
Expand Down
3 changes: 2 additions & 1 deletion phlex/core/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "phlex/core/product_query.hpp"
#include "phlex/model/fwd.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/identifier.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/sized_tuple.hpp"

Expand Down Expand Up @@ -54,7 +55,7 @@ namespace phlex::experimental {
using messages_t = std::conditional_t<N == 1ull, message, message_tuple<N>>;

struct named_index_port {
std::string layer;
identifier layer;
tbb::flow::receiver<indexed_end_token>* token_port;
tbb::flow::receiver<index_message>* index_port;
};
Expand Down
6 changes: 3 additions & 3 deletions phlex/core/multilayer_join_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace phlex::experimental {
public:
multilayer_join_node(tbb::flow::graph& g,
std::string const& node_name,
std::vector<std::string> layer_names) :
std::vector<identifier> layer_names) :
base_t{g},
join_{make_join(g, std::make_index_sequence<n_inputs>{})},
name_{node_name},
Expand Down Expand Up @@ -119,7 +119,7 @@ namespace phlex::experimental {
std::vector<std::unique_ptr<detail::repeater_node>> repeaters_;
tbb::flow::join_node<args_t, tbb::flow::tag_matching> join_;
std::string const name_;
std::vector<std::string> const layers_;
std::vector<identifier> const layers_;
};

namespace detail {
Expand Down Expand Up @@ -150,7 +150,7 @@ namespace phlex::experimental {
template <std::size_t N>
join_or_none_t<N> make_join_or_none(tbb::flow::graph& g,
std::string const& node_name,
std::vector<std::string> const& layers)
std::vector<identifier> const& layers)
{
if constexpr (N > 1ull) {
return multilayer_join_node<N>{g, node_name, layers};
Expand Down
Loading
Loading