diff --git a/phlex/core/edge_creation_policy.cpp b/phlex/core/edge_creation_policy.cpp index 36abd0d1..fcdb2d86 100644 --- a/phlex/core/edge_creation_policy.cpp +++ b/phlex/core/edge_creation_policy.cpp @@ -9,12 +9,18 @@ namespace phlex::experimental { edge_creation_policy::named_output_port const* edge_creation_policy::find_producer( product_query const& query) const { - // TODO: Update later with correct querying - auto [b, e] = producers_.equal_range(query.suffix.value_or(""_id)); + if (producers_.empty()) { + spdlog::debug("No producers found. Skipping and assuming {} comes from a provider.", + query.to_string()); + return nullptr; + } + // Now the only way b == e is if we have a suffix and nothing creates matching products + auto [b, e] = query.suffix.has_value() ? producers_.equal_range(*query.suffix) + : std::pair{producers_.begin(), producers_.end()}; if (b == e) { spdlog::debug( "Failed to find an algorithm that creates {} products. Assuming it comes from a provider", - query.suffix.value_or("\"\""_id)); + query.suffix.value_or("*"_id)); return nullptr; } std::map candidates; @@ -45,13 +51,16 @@ namespace phlex::experimental { candidates.emplace(producer.node.full(), &producer); } } else { - spdlog::error( + spdlog::debug( "Creator name mismatch between ({}) and {}", query.to_string(), producer.node.full()); } } if (candidates.empty()) { - throw std::runtime_error("Cannot identify product matching the query " + query.to_string()); + spdlog::debug( + "Cannot identify product matching the query {}. Assuming it comes from a provider.", + query.to_string()); + return nullptr; } if (candidates.size() > 1ull) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b1a29f02..e1ddcd58 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -198,7 +198,17 @@ cet_test( phlex::utilities spdlog::spdlog ) - +cet_test( + product_querying + USE_CATCH2_MAIN + SOURCE + product_querying.cpp + LIBRARIES + phlex::core + layer_generator + Boost::json + fmt::fmt +) cet_test(product_query USE_CATCH2_MAIN SOURCE product_query.cpp LIBRARIES phlex::core ) diff --git a/test/product_querying.cpp b/test/product_querying.cpp new file mode 100644 index 00000000..ae40a3c7 --- /dev/null +++ b/test/product_querying.cpp @@ -0,0 +1,93 @@ +#include "phlex/core/framework_graph.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/product_store.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "plugins/layer_generator.hpp" + +#include "fmt/format.h" + +#include +#include +#include + +using namespace phlex; +using namespace std::string_literals; + +namespace { + // Provider functions + int provide_idx(data_cell_index const& dci) { return int(dci.number()); } + int provide_number(data_cell_index const&) { return 3; } + double provide_temperature(data_cell_index const& dci) { return double(dci.number()) * 100.0; } + std::string provide_name(data_cell_index const& dci) + { + return fmt::format("John the {}th", dci.number()); + } +} + +TEST_CASE("Querying products in different ways", "[graph]") +{ + constexpr int num_events = 25; + experimental::layer_generator gen; + gen.add_layer("event", {.parent_layer_name = "job", .total_per_parent_data_cell = num_events}); + experimental::framework_graph g{driver_for_test(gen)}; + + // Register providers + g.provide("provide_number_in_job", provide_number, concurrency::unlimited) + .output_product(product_query{.creator = "input", .layer = "job", .suffix = "number"}); + g.provide("provide_number_in_event", provide_idx, concurrency::unlimited) + .output_product(product_query{.creator = "input", .layer = "event", .suffix = "evt_number"}); + g.provide("provide_temperature_in_event", provide_temperature, concurrency::unlimited) + .output_product(product_query{.creator = "input", .layer = "event", .suffix = "temperature"}); + g.provide("provide_temperature_in_event_again", provide_temperature, concurrency::unlimited) + .output_product( + product_query{.creator = "thermometer", .layer = "event", .suffix = "temperature"}); + g.provide("provide_name_in_event", provide_name, concurrency::unlimited) + .output_product(product_query{.creator = "give_name", .layer = "event", .suffix = "name"}); + + // Duplicate with transform + g.transform("duplicate_temperature", [](double const& t) { return t; }) + .input_family(product_query{.creator = "input", .layer = "event", .suffix = "temperature"}) + .output_products("temperature"); + + SECTION("All fields") + { + g.transform("all_fields", [](int const& i) { return i + 1; }) + .input_family(product_query{.creator = "input", .layer = "job", .suffix = "number"}) + .output_products("job_number"); + g.execute(); + CHECK(g.execution_count("all_fields") == 1); + } + + SECTION("Creator and Layer, using creator (and using type alone)") + { + g.transform("creator_and_layer_by_creator", [](std::string const& str) { return str; }) + .input_family(product_query{.creator = "give_name", .layer = "event"}) + .output_products("event_name"); + g.observe( + "verify_name", + [](std::string const& str, int const& n) { CHECK(str == fmt::format("John the {}th", n)); }) + .input_family(product_query{.creator = "give_name", .layer = "event"}, + product_query{.creator = "input", .layer = "event"}); + g.execute(); + CHECK(g.execution_count("creator_and_layer_by_creator") == num_events); + } + + SECTION("Creator and Layer, using layer") + { + g.transform("creator_and_layer_by_layer", [](double const& d) { return d; }) + .input_family(product_query{.creator = "input", .layer = "event"}) + .output_products("event_temp"); + g.execute(); + CHECK(g.execution_count("creator_and_layer_by_layer") == num_events); + } + + SECTION("Creator and Layer only, after transform") + { + g.transform("creator_and_layer_after_transform", [](double const& d) { return d; }) + .input_family(product_query{.creator = "duplicate_temperature", .layer = "event"}) + .output_products("event_temp"); + g.execute(); + CHECK(g.execution_count("creator_and_layer_after_transform") == num_events); + } +}