diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index bf281e8df..46194b3b6 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -86,9 +86,15 @@ namespace phlex::experimental { } void framework_graph::execute() - { + try { finalize(); run(); + } catch (std::exception const& e) { + spdlog::error(e.what()); + throw; + } catch (...) { + spdlog::error("Unknown exception during graph execution"); + throw; } void framework_graph::run() diff --git a/phlex/core/product_query.cpp b/phlex/core/product_query.cpp index e3c425030..08162e531 100644 --- a/phlex/core/product_query.cpp +++ b/phlex/core/product_query.cpp @@ -7,9 +7,12 @@ #include namespace phlex::experimental { - product_query product_query::operator()(std::string layer) && + product_query product_tag::operator()(std::string data_layer) && { - return {std::move(name), std::move(layer)}; + if (data_layer.empty()) { + throw std::runtime_error("Cannot specify the empty string as a data layer."); + } + return {std::move(name), std::move(data_layer)}; } std::string product_query::to_string() const @@ -20,12 +23,12 @@ namespace phlex::experimental { return fmt::format("{} ϵ {}", name.full(), layer); } - product_query operator""_in(char const* name, std::size_t length) + product_tag operator""_in(char const* product_name, std::size_t length) { if (length == 0ull) { throw std::runtime_error("Cannot specify product with empty name."); } - return product_query::create(name); + return {product_specification::create(product_name)}; } bool operator==(product_query const& a, product_query const& b) @@ -45,13 +48,4 @@ namespace phlex::experimental { os << label.to_string(); return os; } - - product_query product_query::create(char const* c) { return create(std::string{c}); } - - product_query product_query::create(std::string const& s) - { - return {product_specification::create(s)}; - } - - product_query product_query::create(product_query l) { return l; } } diff --git a/phlex/core/product_query.hpp b/phlex/core/product_query.hpp index cce79e8f5..b781c334f 100644 --- a/phlex/core/product_query.hpp +++ b/phlex/core/product_query.hpp @@ -13,12 +13,12 @@ namespace phlex::experimental { struct product_query { product_specification name; std::string layer; - product_query operator()(std::string layer) &&; std::string to_string() const; + }; - static product_query create(char const* c); - static product_query create(std::string const& s); - static product_query create(product_query l); + struct product_tag { + product_specification name; + product_query operator()(std::string layer) &&; }; using product_queries = std::vector; @@ -26,26 +26,12 @@ namespace phlex::experimental { inline auto& to_name(product_query const& label) { return label.name.name(); } inline auto& to_layer(product_query& label) { return label.layer; } - product_query operator""_in(char const* str, std::size_t); + product_tag operator""_in(char const* str, std::size_t); bool operator==(product_query const& a, product_query const& b); bool operator!=(product_query const& a, product_query const& b); bool operator<(product_query const& a, product_query const& b); std::ostream& operator<<(std::ostream& os, product_query const& label); - template - concept label_compatible = requires(T t) { - { product_query::create(t) }; - }; - - template - auto to_labels(std::array const& like_labels) - { - std::array labels; - std::ranges::transform( - like_labels, labels.begin(), [](T const& t) { return product_query::create(t); }); - return labels; - } - namespace detail { // C is a container of product_queries template diff --git a/phlex/core/registration_api.hpp b/phlex/core/registration_api.hpp index 2b36a7ad3..c331e4c72 100644 --- a/phlex/core/registration_api.hpp +++ b/phlex/core/registration_api.hpp @@ -50,6 +50,7 @@ namespace phlex::experimental { auto input_family(std::array input_args) { populate_types(input_args); + if constexpr (M == 0ull) { registrar_.set_creator( [this, inputs = std::move(input_args)](auto predicates, auto /* output_products */) { @@ -75,19 +76,12 @@ namespace phlex::experimental { return upstream_predicates{std::move(registrar_), config_}; } - template - auto input_family(std::array input_args) - { - return input_family(to_labels(input_args)); - } - - auto input_family(label_compatible auto... input_args) + auto input_family(std::same_as auto... input_args) { static_assert(N == sizeof...(input_args), "The number of function parameters is not the same as the number of specified " "input arguments."); - return input_family( - {product_query::create(std::forward(input_args))...}); + return input_family({std::move(input_args)...}); } private: @@ -147,6 +141,7 @@ namespace phlex::experimental { auto input_family(std::array input_args) { populate_types(input_args); + registrar_.set_creator( [this, inputs = std::move(input_args)](auto predicates, auto output_products) { return std::make_unique>( @@ -163,19 +158,12 @@ namespace phlex::experimental { return upstream_predicates{std::move(registrar_), config_}; } - template - auto input_family(std::array input_args) - { - return input_family(to_labels(input_args)); - } - - auto input_family(label_compatible auto... input_args) + auto input_family(std::same_as auto... input_args) { static_assert(N - 1 == sizeof...(input_args), "The number of function parameters is not the same as the number of specified " "input arguments."); - return input_family( - {product_query::create(std::forward(input_args))...}); + return input_family({std::move(input_args)...}); } private: @@ -229,6 +217,7 @@ namespace phlex::experimental { auto input_family(std::array input_args) { populate_types(input_args); + registrar_.set_creator( [this, inputs = std::move(input_args)](auto upstream_predicates, auto output_products) { return std::make_unique>( @@ -245,12 +234,12 @@ namespace phlex::experimental { return upstream_predicates{std::move(registrar_), config_}; } - auto input_family(label_compatible auto... input_args) + auto input_family(std::same_as auto... input_args) { static_assert(N == sizeof...(input_args), "The number of function parameters is not the same as the number of specified " "input arguments."); - return input_family({product_query{std::forward(input_args)}...}); + return input_family({std::move(input_args)...}); } private: diff --git a/phlex/utilities/async_driver.hpp b/phlex/utilities/async_driver.hpp index 31c29a1d9..435f90faf 100644 --- a/phlex/utilities/async_driver.hpp +++ b/phlex/utilities/async_driver.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -22,13 +23,15 @@ namespace phlex::experimental { } async_driver(void (*ft)(async_driver&)) : driver_{ft} {} - ~async_driver() { thread_.join(); } - std::optional operator()() { if (gear_ == states::off) { - thread_ = std::thread{[this] { - driver_(*this); + thread_ = std::jthread{[this] { + try { + driver_(*this); + } catch (...) { + cached_exception_ = std::current_exception(); + } gear_ = states::park; cv_.notify_one(); }}; @@ -39,6 +42,11 @@ namespace phlex::experimental { std::unique_lock lock{mutex_}; cv_.wait(lock, [&] { return current_.has_value() or gear_ == states::park; }); + + if (cached_exception_) { + std::rethrow_exception(cached_exception_); + } + return std::exchange(current_, std::nullopt); } @@ -54,9 +62,10 @@ namespace phlex::experimental { std::function driver_; std::optional current_; std::atomic gear_ = states::off; - std::thread thread_; + std::jthread thread_; std::mutex mutex_; std::condition_variable cv_; + std::exception_ptr cached_exception_; }; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d67473cc1..68659d70e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,6 +40,7 @@ add_catch_test(class_registration LIBRARIES phlex::core Boost::json) add_catch_test(different_hierarchies LIBRARIES phlex::core) add_catch_test(filter_impl LIBRARIES phlex::core) add_catch_test(filter LIBRARIES phlex::core Boost::json) +add_catch_test(framework_graph LIBRARIES phlex::core) add_catch_test(function_registration LIBRARIES phlex::core Boost::json) add_catch_test(hierarchical_nodes LIBRARIES Boost::json TBB::tbb phlex::core) add_catch_test(multiple_function_registration LIBRARIES Boost::json phlex::core) diff --git a/test/benchmarks/accept_even_ids.cpp b/test/benchmarks/accept_even_ids.cpp index b98b5ddd4..a023b03de 100644 --- a/test/benchmarks/accept_even_ids.cpp +++ b/test/benchmarks/accept_even_ids.cpp @@ -5,9 +5,10 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.predicate( "accept_even_ids", - [](phlex::experimental::data_cell_index const& id) { return id.number() % 2 == 0; }, - phlex::experimental::concurrency::unlimited) - .input_family(config.get("product_name")); + [](data_cell_index const& id) { return id.number() % 2 == 0; }, + concurrency::unlimited) + .input_family(product_query{config.get("product_name"), "event"}); } diff --git a/test/benchmarks/accept_even_numbers.cpp b/test/benchmarks/accept_even_numbers.cpp index 33982374c..b4bd4d3bb 100644 --- a/test/benchmarks/accept_even_numbers.cpp +++ b/test/benchmarks/accept_even_numbers.cpp @@ -4,9 +4,8 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.predicate( - "accept_even_numbers", - [](int i) { return i % 2 == 0; }, - phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + "accept_even_numbers", [](int i) { return i % 2 == 0; }, concurrency::unlimited) + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/benchmarks/accept_fibonacci_numbers.cpp b/test/benchmarks/accept_fibonacci_numbers.cpp index e1438f20b..e7bf13d82 100644 --- a/test/benchmarks/accept_fibonacci_numbers.cpp +++ b/test/benchmarks/accept_fibonacci_numbers.cpp @@ -5,8 +5,8 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.make(config.get("max_number")) - .predicate( - "accept", &test::fibonacci_numbers::accept, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + .predicate("accept", &test::fibonacci_numbers::accept, concurrency::unlimited) + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/benchmarks/last_index.cpp b/test/benchmarks/last_index.cpp index 41d0a45f1..b9add99e6 100644 --- a/test/benchmarks/last_index.cpp +++ b/test/benchmarks/last_index.cpp @@ -10,6 +10,6 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { m.transform("last_index", last_index, concurrency::unlimited) - .input_family("id") + .input_family("id"_in("event")) .output_products(config.get("produces", "a")); } diff --git a/test/benchmarks/plus_101.cpp b/test/benchmarks/plus_101.cpp index 0d7b0ed5e..8cc987fd2 100644 --- a/test/benchmarks/plus_101.cpp +++ b/test/benchmarks/plus_101.cpp @@ -8,5 +8,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_101", plus_101, concurrency::unlimited).input_family("a").output_products("c"); + m.transform("plus_101", plus_101, concurrency::unlimited) + .input_family("a"_in("event")) + .output_products("c"); } diff --git a/test/benchmarks/plus_one.cpp b/test/benchmarks/plus_one.cpp index fef141cca..391aaf0f6 100644 --- a/test/benchmarks/plus_one.cpp +++ b/test/benchmarks/plus_one.cpp @@ -8,5 +8,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_one", plus_one, concurrency::unlimited).input_family("a").output_products("b"); + m.transform("plus_one", plus_one, concurrency::unlimited) + .input_family("a"_in("event")) + .output_products("b"); } diff --git a/test/benchmarks/read_id.cpp b/test/benchmarks/read_id.cpp index 02d6117ed..ae2dd59d5 100644 --- a/test/benchmarks/read_id.cpp +++ b/test/benchmarks/read_id.cpp @@ -1,11 +1,14 @@ #include "phlex/model/data_cell_index.hpp" #include "phlex/module.hpp" +using namespace phlex::experimental; + namespace { - void read_id(phlex::experimental::data_cell_index const&) {} + void read_id(data_cell_index const&) {} } PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited).input_family("id"); + m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited) + .input_family("id"_in("event")); } diff --git a/test/benchmarks/read_index.cpp b/test/benchmarks/read_index.cpp index ebc8d3677..3ca2ca405 100644 --- a/test/benchmarks/read_index.cpp +++ b/test/benchmarks/read_index.cpp @@ -7,6 +7,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.observe("read_index", read_index, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/benchmarks/verify_difference.cpp b/test/benchmarks/verify_difference.cpp index 9294650af..fe1af4263 100644 --- a/test/benchmarks/verify_difference.cpp +++ b/test/benchmarks/verify_difference.cpp @@ -10,5 +10,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) "verify_difference", [expected = config.get("expected", 100)](int i, int j) { assert(j - i == expected); }, concurrency::unlimited) - .input_family(config.get("i", "b"), config.get("j", "c")); + .input_family(product_query{config.get("i", "b"), "event"}, + product_query{config.get("j", "c"), "event"}); } diff --git a/test/benchmarks/verify_even_fibonacci_numbers.cpp b/test/benchmarks/verify_even_fibonacci_numbers.cpp index 3da779f2d..ee69524c3 100644 --- a/test/benchmarks/verify_even_fibonacci_numbers.cpp +++ b/test/benchmarks/verify_even_fibonacci_numbers.cpp @@ -16,9 +16,10 @@ namespace test { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; using namespace test; m.make(config.get("max_number")) .observe( "only_even", &even_fibonacci_numbers::only_even, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/class_registration.cpp b/test/class_registration.cpp index cd87a68b6..a5c2d3342 100644 --- a/test/class_registration.cpp +++ b/test/class_registration.cpp @@ -49,8 +49,7 @@ namespace { TEST_CASE("Call non-framework functions", "[programming model]") { - std::array const product_names{ - product_query{"number"}, product_query{"temperature"}, product_query{"name"}}; + std::array const product_names{"number"_in("job"), "temperature"_in("job"), "name"_in("job")}; std::array const oproduct_names{"onumber"s, "otemperature"s, "oname"s}; auto store = product_store::base(); diff --git a/test/different_hierarchies.cpp b/test/different_hierarchies.cpp index 4dc4301d7..f7ce4a9d3 100644 --- a/test/different_hierarchies.cpp +++ b/test/different_hierarchies.cpp @@ -16,14 +16,18 @@ // // job // │ -// ├ trigger primitive +// ├ event // │ // └ run // │ // └ event // -// As the run_add node performs folds only over "runs", any "trigger primitive" +// As the run_add node performs folds only over "runs", any top-level "events" // stores are excluded from the fold result. +// +// N.B. The multiplexer sends data products to nodes based on the name of the lowest +// layer. For example, the top-level "event" and the nested "run/event" are both +// candidates for the "job" fold. // ======================================================================================= #include "phlex/core/framework_graph.hpp" @@ -47,8 +51,8 @@ namespace { constexpr auto index_limit = 2u; constexpr auto number_limit = 5u; - // job -> trigger primitive layers - constexpr auto primitive_limit = 10u; + // job -> event levels + constexpr auto top_level_event_limit = 10u; void cells_to_process(framework_driver& driver) { @@ -66,9 +70,9 @@ namespace { } } - // job -> trigger primitive layers - for (unsigned i : std::views::iota(0u, primitive_limit)) { - auto tp_store = job_store->make_child(i, "trigger primitive"); + // job -> event layers + for (unsigned i : std::views::iota(0u, top_level_event_limit)) { + auto tp_store = job_store->make_child(i, "event"); tp_store->add_product("number", i); driver.yield(tp_store); } @@ -80,22 +84,24 @@ TEST_CASE("Different hierarchies used with fold", "[graph]") framework_graph g{cells_to_process}; g.fold("run_add", add, concurrency::unlimited, "run", 0u) - .input_family("number") + .input_family("number"_in("event")) .output_products("run_sum"); - g.fold("job_add", add, concurrency::unlimited).input_family("number").output_products("job_sum"); + g.fold("job_add", add, concurrency::unlimited) + .input_family("number"_in("event")) + .output_products("job_sum"); g.observe("verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }) - .input_family("run_sum"); + .input_family("run_sum"_in("run")); g.observe("verify_job_sum", [](unsigned int actual) { - CHECK(actual == 20u + 45u); // 20u from events, 45u from trigger primitives + CHECK(actual == 20u + 45u); // 20u from nested events, 45u from top-level events }) - .input_family("job_sum"); + .input_family("job_sum"_in("job")); g.execute(); CHECK(g.execution_counts("run_add") == index_limit * number_limit); - CHECK(g.execution_counts("job_add") == index_limit * number_limit + primitive_limit); + CHECK(g.execution_counts("job_add") == index_limit * number_limit + top_level_event_limit); CHECK(g.execution_counts("verify_run_sum") == index_limit); CHECK(g.execution_counts("verify_job_sum") == 1); } diff --git a/test/filter.cpp b/test/filter.cpp index 5e12f236f..9e1efacc2 100644 --- a/test/filter.cpp +++ b/test/filter.cpp @@ -105,13 +105,13 @@ TEST_CASE("Two predicates", "[filtering]") TEST_CASE("Two predicates in series", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); g.predicate("odds_only", odds_only, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when("evens_only"); g.make(0u) .observe("add", &sum_numbers::add, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when("odds_only"); g.execute(); @@ -122,11 +122,11 @@ TEST_CASE("Two predicates in series", "[filtering]") TEST_CASE("Two predicates in parallel", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); + g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"_in("event")); g.make(0u) .observe("add", &sum_numbers::add, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when("odds_only", "evens_only"); g.execute(); @@ -149,7 +149,7 @@ TEST_CASE("Three predicates in parallel", "[filtering]") for (auto const& [name, b, e] : configs) { g.make(b, e) .predicate(name, ¬_in_range::eval, concurrency::unlimited) - .input_family("num"); + .input_family("num"_in("event")); } std::vector const predicate_names{ @@ -157,7 +157,7 @@ TEST_CASE("Three predicates in parallel", "[filtering]") auto const expected_numbers = {4u, 5u, 7u}; g.make(expected_numbers) .observe("collect", &collect_numbers::collect, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when(predicate_names); g.execute(); @@ -168,16 +168,16 @@ TEST_CASE("Three predicates in parallel", "[filtering]") TEST_CASE("Two predicates in parallel (each with multiple arguments)", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); + g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"_in("event")); g.make(5 * 100) .observe("check_evens", &check_multiple_numbers::add_difference, concurrency::unlimited) - .input_family("num", "other_num") // <= Note input order + .input_family("num"_in("event"), "other_num"_in("event")) // <= Note input order .when("evens_only"); g.make(-5 * 100) .observe("check_odds", &check_multiple_numbers::add_difference, concurrency::unlimited) - .input_family("other_num", "num") // <= Note input order + .input_family("other_num"_in("event"), "num"_in("event")) // <= Note input order .when("odds_only"); g.execute(); diff --git a/test/fold.cpp b/test/fold.cpp index 328486164..2cc17521b 100644 --- a/test/fold.cpp +++ b/test/fold.cpp @@ -65,25 +65,27 @@ TEST_CASE("Different data layers of fold", "[graph]") framework_graph g{cells_to_process}; g.fold("run_add", add, concurrency::unlimited, "run") - .input_family("number") + .input_family("number"_in("event")) .output_products("run_sum"); - g.fold("job_add", add, concurrency::unlimited).input_family("number").output_products("job_sum"); + g.fold("job_add", add, concurrency::unlimited) + .input_family("number"_in("event")) + .output_products("job_sum"); g.fold("two_layer_job_add", add, concurrency::unlimited) - .input_family("run_sum") + .input_family("run_sum"_in("run")) .output_products("two_layer_job_sum"); g.observe( "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::unlimited) - .input_family("run_sum"); + .input_family("run_sum"_in("run")); g.observe( "verify_two_layer_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::unlimited) - .input_family("two_layer_job_sum"); + .input_family("two_layer_job_sum"_in("job")); g.observe( "verify_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::unlimited) - .input_family("job_sum"); + .input_family("job_sum"_in("job")); g.execute(); diff --git a/test/framework_graph.cpp b/test/framework_graph.cpp new file mode 100644 index 000000000..2ed3e30e3 --- /dev/null +++ b/test/framework_graph.cpp @@ -0,0 +1,19 @@ +#include "phlex/core/framework_graph.hpp" + +#include "catch2/catch_test_macros.hpp" + +#include + +using namespace phlex::experimental; + +TEST_CASE("Catch STL exceptions", "[graph]") +{ + framework_graph g{[](framework_driver&) { throw std::runtime_error("STL error"); }}; + CHECK_THROWS_AS(g.execute(), std::exception); +} + +TEST_CASE("Catch other exceptions", "[graph]") +{ + framework_graph g{[](framework_driver&) { throw 2.5; }}; + CHECK_THROWS_AS(g.execute(), double); +} diff --git a/test/function_registration.cpp b/test/function_registration.cpp index d76c19ecd..5a0a0d294 100644 --- a/test/function_registration.cpp +++ b/test/function_registration.cpp @@ -48,9 +48,8 @@ namespace { TEST_CASE("Call non-framework functions", "[programming model]") { - std::array const product_names{ - product_query{"number"}, product_query{"temperature"}, product_query{"name"}}; - std::array const oproduct_names = {"number"s, "temperature"s, "name"s}; + std::array const product_names{"number"_in("job"), "temperature"_in("job"), "name"_in("job")}; + std::array const oproduct_names = {"onumber"s, "otemperature"s, "oname"s}; std::array const result{"result"s}; auto store = product_store::base(); diff --git a/test/hierarchical_nodes.cpp b/test/hierarchical_nodes.cpp index 0cc36f8b6..773c042e2 100644 --- a/test/hierarchical_nodes.cpp +++ b/test/hierarchical_nodes.cpp @@ -103,22 +103,23 @@ TEST_CASE("Hierarchical nodes", "[graph]") framework_graph g{cells_to_process}; g.transform("get_the_time", strtime, concurrency::unlimited) - .input_family("time") + .input_family("time"_in("run")) .when() .output_products("strtime"); g.transform("square", square, concurrency::unlimited) - .input_family("number") + .input_family("number"_in("event")) .output_products("squared_number"); g.fold("add", add, concurrency::unlimited, "run", 15u) - .input_family("squared_number") + .input_family("squared_number"_in("event")) .when() .output_products("added_data"); g.transform("scale", scale, concurrency::unlimited) - .input_family("added_data") + .input_family("added_data"_in("run")) .output_products("result"); - g.observe("print_result", print_result, concurrency::unlimited).input_family("result", "strtime"); + g.observe("print_result", print_result, concurrency::unlimited) + .input_family("result"_in("run"), "strtime"_in("run")); g.make().output("save", &test::products_for_output::save).when(); diff --git a/test/max-parallelism/check_parallelism.cpp b/test/max-parallelism/check_parallelism.cpp index 526b67277..4f14b0f3d 100644 --- a/test/max-parallelism/check_parallelism.cpp +++ b/test/max-parallelism/check_parallelism.cpp @@ -36,5 +36,5 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) [expected = config.get("expected_parallelism")](std::size_t actual) { assert(actual == expected); }) - .input_family("max_parallelism"); + .input_family("max_parallelism"_in("job")); } diff --git a/test/memory-checks/many_events.cpp b/test/memory-checks/many_events.cpp index 763d93859..7a8703f8e 100644 --- a/test/memory-checks/many_events.cpp +++ b/test/memory-checks/many_events.cpp @@ -26,7 +26,7 @@ int main() framework_graph g{cells_to_process}; g.transform("pass_on", pass_on, concurrency::unlimited) - .input_family("number") + .input_family("number"_in("event")) .output_products("different"); g.execute(); } diff --git a/test/mock-workflow/algorithm.hpp b/test/mock-workflow/algorithm.hpp index c16d963eb..7692438d8 100644 --- a/test/mock-workflow/algorithm.hpp +++ b/test/mock-workflow/algorithm.hpp @@ -8,6 +8,7 @@ #include "fmt/std.h" #include "spdlog/spdlog.h" +#include #include #include #include @@ -68,10 +69,20 @@ namespace phlex::experimental::test { using inputs_t = ensure_tuple; using algorithm_t = algorithm; concurrency const j{c.get("concurrency", concurrency::unlimited.value)}; + + // Convert product strings to product queries. + // FIXME: There should be a c.get(...) specialization + auto input_product_strings = c.get("inputs"); + std::array> queries{}; + std::ranges::transform( + input_product_strings, queries.begin(), [](std::string const& product_string) { + return product_query{product_string, "event"}; + }); + m.template make(c.get("module_label"), c.get("duration_usec")) .transform("execute", &algorithm_t::execute, j) - .input_family(c.get("inputs")) + .input_family(std::move(queries)) .output_products(c.get("outputs")); } } diff --git a/test/multiple_function_registration.cpp b/test/multiple_function_registration.cpp index 876fba44a..7f106c523 100644 --- a/test/multiple_function_registration.cpp +++ b/test/multiple_function_registration.cpp @@ -49,33 +49,34 @@ TEST_CASE("Call multiple functions", "[programming model]") SECTION("All free functions") { g.transform("square_numbers", square_numbers, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("job")) .output_products("squared_numbers"); g.transform("sum_numbers", sum_numbers, concurrency::unlimited) - .input_family("squared_numbers") + .input_family("squared_numbers"_in("job")) .output_products("summed_numbers"); g.transform("sqrt_sum_numbers", sqrt_sum_numbers, concurrency::unlimited) - .input_family("summed_numbers", "offset") + .input_family("summed_numbers"_in("job"), "offset"_in("job")) .output_products("result"); } SECTION("Transforms, one from a class") { g.transform("square_numbers", square_numbers, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("job")) .output_products("squared_numbers"); g.transform("sum_numbers", sum_numbers, concurrency::unlimited) - .input_family("squared_numbers") + .input_family("squared_numbers"_in("job")) .output_products("summed_numbers"); g.make() .transform("sqrt_sum", &A::sqrt_sum, concurrency::unlimited) - .input_family("summed_numbers", "offset") + .input_family("summed_numbers"_in("job"), "offset"_in("job")) .output_products("result"); } // The following is invoked for *each* section above - g.observe("verify_result", [](double actual) { assert(actual == 6.); }).input_family("result"); + g.observe("verify_result", [](double actual) { assert(actual == 6.); }) + .input_family("result"_in("job")); g.execute(); } diff --git a/test/plugins/module.cpp b/test/plugins/module.cpp index de8741183..2ed2a27bc 100644 --- a/test/plugins/module.cpp +++ b/test/plugins/module.cpp @@ -10,9 +10,9 @@ using namespace phlex::experimental; PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { m.transform("add", test::add, concurrency::unlimited) - .input_family("i", "j") + .input_family("i"_in("event"), "j"_in("event")) .output_products("sum"); m.observe( "verify", [](int actual) { assert(actual == 0); }, concurrency::unlimited) - .input_family("sum"); + .input_family("sum"_in("event")); } diff --git a/test/product_query.cpp b/test/product_query.cpp index 63e54cb33..c99bab1d1 100644 --- a/test/product_query.cpp +++ b/test/product_query.cpp @@ -1,26 +1,20 @@ #include "phlex/core/product_query.hpp" #include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers_string.hpp" using namespace phlex::experimental; -TEST_CASE("Empty label", "[data model]") +TEST_CASE("Empty specifications", "[data model]") { - product_query empty{}; - CHECK_THROWS(""_in); - CHECK_THROWS(""_in("")); + CHECK_THROWS_WITH(""_in, + Catch::Matchers::ContainsSubstring("Cannot specify product with empty name.")); + CHECK_THROWS_WITH( + "product"_in(""), + Catch::Matchers::ContainsSubstring("Cannot specify the empty string as a data layer.")); } -TEST_CASE("Only name in label", "[data model]") -{ - product_query label{"product"}; - CHECK(label == "product"_in); - - // Empty layer string is interpreted as a wildcard--i.e. any layer. - CHECK(label == "product"_in("")); -} - -TEST_CASE("Label with layer", "[data model]") +TEST_CASE("Product name with data layer", "[data model]") { product_query label{"product", {"event"}}; CHECK(label == "product"_in("event")); diff --git a/test/type_distinction.cpp b/test/type_distinction.cpp index 3784188bb..e9330623a 100644 --- a/test/type_distinction.cpp +++ b/test/type_distinction.cpp @@ -53,31 +53,32 @@ TEST_CASE("Distinguish products with same name and different types", "[programmi SECTION("Duplicate product name but differ in producer name") { - g.observe("starter", [](int num) { spdlog::info("Recieved {}", num); }).input_family("numbers"); + g.observe("starter", [](int num) { spdlog::info("Received {}", num); }) + .input_family("numbers"_in("event")); g.transform("triple_numbers", triple, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("event")) .output_products("tripled"); spdlog::info("Registered tripled"); g.transform("expand_orig", expand, concurrency::unlimited) - .input_family("numbers", "length") + .input_family("numbers"_in("event"), "length"_in("event")) .output_products("expanded_one"); spdlog::info("Registered expanded_one"); g.transform("expand_triples", expand, concurrency::unlimited) - .input_family("tripled", "length") + .input_family("tripled"_in("event"), "length"_in("event")) .output_products("expanded_three"); spdlog::info("Registered expanded_three"); g.transform("add_nums", add_numbers, concurrency::unlimited) - .input_family("numbers", "tripled") + .input_family("numbers"_in("event"), "tripled"_in("event")) .output_products("sums"); spdlog::info("Registered sums"); g.transform("add_vect", add_vectors, concurrency::unlimited) - .input_family("expanded_one", "expanded_three") + .input_family("expanded_one"_in("event"), "expanded_three"_in("event")) .output_products("sums"); g.transform("test_add_num", triple, concurrency::unlimited) - .input_family("sums") + .input_family("sums"_in("event")) .output_products("result"); spdlog::info("Registered result"); } @@ -85,16 +86,16 @@ TEST_CASE("Distinguish products with same name and different types", "[programmi SECTION("Duplicate product name and producer, differ only in type") { g.transform("square", square, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("event")) .output_products("square_result", "square_result"); g.transform("extract_result", id, concurrency::unlimited) - .input_family("square_result") + .input_family("square_result"_in("event")) .output_products("result"); } g.observe("print_result", [](int res) { spdlog::info("Result: {}", res); }) - .input_family("result"); + .input_family("result"_in("event")); spdlog::info("Registered observe"); g.execute(); spdlog::info("Executed"); diff --git a/test/unfold.cpp b/test/unfold.cpp index 348fda919..899239871 100644 --- a/test/unfold.cpp +++ b/test/unfold.cpp @@ -98,24 +98,25 @@ TEST_CASE("Splitting the processing", "[graph]") framework_graph g{cells_to_process}; g.unfold("iota", &iota::predicate, &iota::unfold, concurrency::unlimited, "lower1") - .input_family("max_number") + .input_family("max_number"_in("event")) .output_products("new_number"); g.fold("add", add, concurrency::unlimited, "event") - .input_family("new_number") + .input_family("new_number"_in("lower1")) .output_products("sum1"); - g.observe("check_sum", check_sum, concurrency::unlimited).input_family("sum1"); + g.observe("check_sum", check_sum, concurrency::unlimited).input_family("sum1"_in("event")); g.unfold("iterate_through", &iterate_through::predicate, &iterate_through::unfold, concurrency::unlimited, "lower2") - .input_family("ten_numbers") + .input_family("ten_numbers"_in("event")) .output_products("each_number"); g.fold("add_numbers", add_numbers, concurrency::unlimited, "event") - .input_family("each_number") + .input_family("each_number"_in("lower2")) .output_products("sum2"); - g.observe("check_sum_same", check_sum_same, concurrency::unlimited).input_family("sum2"); + g.observe("check_sum_same", check_sum_same, concurrency::unlimited) + .input_family("sum2"_in("event")); g.make().output( "save", &test::products_for_output::save, concurrency::serial); diff --git a/test/vector_of_abstract_types.cpp b/test/vector_of_abstract_types.cpp index c55bd4414..8588beba8 100644 --- a/test/vector_of_abstract_types.cpp +++ b/test/vector_of_abstract_types.cpp @@ -60,10 +60,10 @@ namespace { TEST_CASE("Test vector of abstract types") { framework_graph g{source{1u}}; - g.transform("read_thing", read_abstract).input_family("thing").output_products("sum"); + g.transform("read_thing", read_abstract).input_family("thing"_in("event")).output_products("sum"); g.observe( "verify_sum", [](int sum) { CHECK(sum == 3); }, concurrency::serial) - .input_family("sum"); + .input_family("sum"_in("event")); g.execute(); CHECK(g.execution_counts("read_thing") == 1);