From d8595c18150a8385641dd8af295073ba49067a68 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 3 Dec 2025 09:16:44 -0600 Subject: [PATCH 01/10] Print exception message before terminating program --- phlex/core/framework_graph.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index bf281e8df..e07d55eb6 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -86,9 +86,12 @@ namespace phlex::experimental { } void framework_graph::execute() - { + try { finalize(); run(); + } catch (std::exception const& e) { + spdlog::error(e.what()); + throw; } void framework_graph::run() From f60ef84c33edd87ac45dda9fa53fe88ed2874842 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 3 Dec 2025 10:20:47 -0600 Subject: [PATCH 02/10] Require specification of data layers for input data products --- phlex/core/registrar.cpp | 23 +++++++++++++++++++++++ phlex/core/registrar.hpp | 11 +++++++++++ phlex/core/registration_api.hpp | 6 ++++++ 3 files changed, 40 insertions(+) diff --git a/phlex/core/registrar.cpp b/phlex/core/registrar.cpp index a9878349e..e72ff1b4c 100644 --- a/phlex/core/registrar.cpp +++ b/phlex/core/registrar.cpp @@ -3,6 +3,29 @@ #include "fmt/format.h" namespace phlex::experimental::detail { + + void verify_layers(std::vector& errors, + algorithm_name const& name, + std::span queries) + { + std::vector malformed_queries; + std::string err_msg; + for (auto const& query : queries) { + if (query.family.empty()) { + malformed_queries.push_back(query.to_string()); + } + } + + if (malformed_queries.empty()) { + return; + } + + errors.push_back(fmt::format("Node '{}' has one or more product specifications that do not " + "include a data layer: {}", + name.full(), + malformed_queries)); + } + void add_to_error_messages(std::vector& errors, std::string const& name) { errors.push_back(fmt::format("Node with name '{}' already exists", name)); diff --git a/phlex/core/registrar.hpp b/phlex/core/registrar.hpp index 92ef9e55b..9b72e3038 100644 --- a/phlex/core/registrar.hpp +++ b/phlex/core/registrar.hpp @@ -47,17 +47,23 @@ // // ======================================================================================= +#include "phlex/core/product_query.hpp" +#include "phlex/model/algorithm_name.hpp" #include "phlex/utilities/simple_ptr_map.hpp" #include #include #include +#include #include #include namespace phlex::experimental { namespace detail { + void verify_layers(std::vector& errors, + algorithm_name const& name, + std::span queries); void add_to_error_messages(std::vector& errors, std::string const& name); } @@ -78,6 +84,11 @@ namespace phlex::experimental { registrar(registrar&&) = default; registrar& operator=(registrar&&) = default; + void verify_layers(algorithm_name const& name, std::span queries) + { + detail::verify_layers(*errors_, name, queries); + } + bool has_predicates() const { return predicates_.has_value(); } void set_creator(node_creator creator) { creator_ = std::move(creator); } diff --git a/phlex/core/registration_api.hpp b/phlex/core/registration_api.hpp index 2b36a7ad3..98092f1d9 100644 --- a/phlex/core/registration_api.hpp +++ b/phlex/core/registration_api.hpp @@ -50,6 +50,8 @@ namespace phlex::experimental { auto input_family(std::array input_args) { populate_types(input_args); + + registrar_.verify_layers(name_, input_args); if constexpr (M == 0ull) { registrar_.set_creator( [this, inputs = std::move(input_args)](auto predicates, auto /* output_products */) { @@ -147,6 +149,8 @@ namespace phlex::experimental { auto input_family(std::array input_args) { populate_types(input_args); + + registrar_.verify_layers(name_, input_args); registrar_.set_creator( [this, inputs = std::move(input_args)](auto predicates, auto output_products) { return std::make_unique>( @@ -229,6 +233,8 @@ namespace phlex::experimental { auto input_family(std::array input_args) { populate_types(input_args); + + registrar_.verify_layers(name_, input_args); registrar_.set_creator( [this, inputs = std::move(input_args)](auto upstream_predicates, auto output_products) { return std::make_unique>( From 2ef72a75589329d30ee78643a1ac11bcaabf1369 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 3 Dec 2025 10:31:12 -0600 Subject: [PATCH 03/10] Update tests to include data layers in product specifications --- test/benchmarks/accept_even_ids.cpp | 7 ++-- test/benchmarks/accept_even_numbers.cpp | 7 ++-- test/benchmarks/accept_fibonacci_numbers.cpp | 6 ++-- test/benchmarks/last_index.cpp | 2 +- test/benchmarks/plus_101.cpp | 4 ++- test/benchmarks/plus_one.cpp | 4 ++- test/benchmarks/read_id.cpp | 7 ++-- test/benchmarks/read_index.cpp | 3 +- test/benchmarks/verify_difference.cpp | 3 +- .../verify_even_fibonacci_numbers.cpp | 3 +- test/class_registration.cpp | 3 +- test/different_hierarchies.cpp | 32 +++++++++++-------- test/filter.cpp | 24 +++++++------- test/fold.cpp | 14 ++++---- test/function_registration.cpp | 5 ++- test/hierarchical_nodes.cpp | 11 ++++--- test/max-parallelism/check_parallelism.cpp | 2 +- test/memory-checks/many_events.cpp | 2 +- test/mock-workflow/algorithm.hpp | 13 +++++++- test/multiple_function_registration.cpp | 15 +++++---- test/plugins/module.cpp | 4 +-- test/type_distinction.cpp | 21 ++++++------ test/unfold.cpp | 13 ++++---- test/vector_of_abstract_types.cpp | 4 +-- 24 files changed, 120 insertions(+), 89 deletions(-) diff --git a/test/benchmarks/accept_even_ids.cpp b/test/benchmarks/accept_even_ids.cpp index b98b5ddd4..a023b03de 100644 --- a/test/benchmarks/accept_even_ids.cpp +++ b/test/benchmarks/accept_even_ids.cpp @@ -5,9 +5,10 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.predicate( "accept_even_ids", - [](phlex::experimental::data_cell_index const& id) { return id.number() % 2 == 0; }, - phlex::experimental::concurrency::unlimited) - .input_family(config.get("product_name")); + [](data_cell_index const& id) { return id.number() % 2 == 0; }, + concurrency::unlimited) + .input_family(product_query{config.get("product_name"), "event"}); } diff --git a/test/benchmarks/accept_even_numbers.cpp b/test/benchmarks/accept_even_numbers.cpp index 33982374c..b4bd4d3bb 100644 --- a/test/benchmarks/accept_even_numbers.cpp +++ b/test/benchmarks/accept_even_numbers.cpp @@ -4,9 +4,8 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.predicate( - "accept_even_numbers", - [](int i) { return i % 2 == 0; }, - phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + "accept_even_numbers", [](int i) { return i % 2 == 0; }, concurrency::unlimited) + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/benchmarks/accept_fibonacci_numbers.cpp b/test/benchmarks/accept_fibonacci_numbers.cpp index e1438f20b..e7bf13d82 100644 --- a/test/benchmarks/accept_fibonacci_numbers.cpp +++ b/test/benchmarks/accept_fibonacci_numbers.cpp @@ -5,8 +5,8 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.make(config.get("max_number")) - .predicate( - "accept", &test::fibonacci_numbers::accept, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + .predicate("accept", &test::fibonacci_numbers::accept, concurrency::unlimited) + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/benchmarks/last_index.cpp b/test/benchmarks/last_index.cpp index 41d0a45f1..b9add99e6 100644 --- a/test/benchmarks/last_index.cpp +++ b/test/benchmarks/last_index.cpp @@ -10,6 +10,6 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { m.transform("last_index", last_index, concurrency::unlimited) - .input_family("id") + .input_family("id"_in("event")) .output_products(config.get("produces", "a")); } diff --git a/test/benchmarks/plus_101.cpp b/test/benchmarks/plus_101.cpp index 0d7b0ed5e..8cc987fd2 100644 --- a/test/benchmarks/plus_101.cpp +++ b/test/benchmarks/plus_101.cpp @@ -8,5 +8,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_101", plus_101, concurrency::unlimited).input_family("a").output_products("c"); + m.transform("plus_101", plus_101, concurrency::unlimited) + .input_family("a"_in("event")) + .output_products("c"); } diff --git a/test/benchmarks/plus_one.cpp b/test/benchmarks/plus_one.cpp index fef141cca..391aaf0f6 100644 --- a/test/benchmarks/plus_one.cpp +++ b/test/benchmarks/plus_one.cpp @@ -8,5 +8,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.transform("plus_one", plus_one, concurrency::unlimited).input_family("a").output_products("b"); + m.transform("plus_one", plus_one, concurrency::unlimited) + .input_family("a"_in("event")) + .output_products("b"); } diff --git a/test/benchmarks/read_id.cpp b/test/benchmarks/read_id.cpp index 02d6117ed..ae2dd59d5 100644 --- a/test/benchmarks/read_id.cpp +++ b/test/benchmarks/read_id.cpp @@ -1,11 +1,14 @@ #include "phlex/model/data_cell_index.hpp" #include "phlex/module.hpp" +using namespace phlex::experimental; + namespace { - void read_id(phlex::experimental::data_cell_index const&) {} + void read_id(data_cell_index const&) {} } PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { - m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited).input_family("id"); + m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited) + .input_family("id"_in("event")); } diff --git a/test/benchmarks/read_index.cpp b/test/benchmarks/read_index.cpp index ebc8d3677..3ca2ca405 100644 --- a/test/benchmarks/read_index.cpp +++ b/test/benchmarks/read_index.cpp @@ -7,6 +7,7 @@ namespace { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; m.observe("read_index", read_index, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/benchmarks/verify_difference.cpp b/test/benchmarks/verify_difference.cpp index 9294650af..fe1af4263 100644 --- a/test/benchmarks/verify_difference.cpp +++ b/test/benchmarks/verify_difference.cpp @@ -10,5 +10,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) "verify_difference", [expected = config.get("expected", 100)](int i, int j) { assert(j - i == expected); }, concurrency::unlimited) - .input_family(config.get("i", "b"), config.get("j", "c")); + .input_family(product_query{config.get("i", "b"), "event"}, + product_query{config.get("j", "c"), "event"}); } diff --git a/test/benchmarks/verify_even_fibonacci_numbers.cpp b/test/benchmarks/verify_even_fibonacci_numbers.cpp index 3da779f2d..ee69524c3 100644 --- a/test/benchmarks/verify_even_fibonacci_numbers.cpp +++ b/test/benchmarks/verify_even_fibonacci_numbers.cpp @@ -16,9 +16,10 @@ namespace test { PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) { + using namespace phlex::experimental; using namespace test; m.make(config.get("max_number")) .observe( "only_even", &even_fibonacci_numbers::only_even, phlex::experimental::concurrency::unlimited) - .input_family(config.get("consumes")); + .input_family(product_query{config.get("consumes"), "event"}); } diff --git a/test/class_registration.cpp b/test/class_registration.cpp index cd87a68b6..a5c2d3342 100644 --- a/test/class_registration.cpp +++ b/test/class_registration.cpp @@ -49,8 +49,7 @@ namespace { TEST_CASE("Call non-framework functions", "[programming model]") { - std::array const product_names{ - product_query{"number"}, product_query{"temperature"}, product_query{"name"}}; + std::array const product_names{"number"_in("job"), "temperature"_in("job"), "name"_in("job")}; std::array const oproduct_names{"onumber"s, "otemperature"s, "oname"s}; auto store = product_store::base(); diff --git a/test/different_hierarchies.cpp b/test/different_hierarchies.cpp index 4dc4301d7..f7ce4a9d3 100644 --- a/test/different_hierarchies.cpp +++ b/test/different_hierarchies.cpp @@ -16,14 +16,18 @@ // // job // │ -// ├ trigger primitive +// ├ event // │ // └ run // │ // └ event // -// As the run_add node performs folds only over "runs", any "trigger primitive" +// As the run_add node performs folds only over "runs", any top-level "events" // stores are excluded from the fold result. +// +// N.B. The multiplexer sends data products to nodes based on the name of the lowest +// layer. For example, the top-level "event" and the nested "run/event" are both +// candidates for the "job" fold. // ======================================================================================= #include "phlex/core/framework_graph.hpp" @@ -47,8 +51,8 @@ namespace { constexpr auto index_limit = 2u; constexpr auto number_limit = 5u; - // job -> trigger primitive layers - constexpr auto primitive_limit = 10u; + // job -> event levels + constexpr auto top_level_event_limit = 10u; void cells_to_process(framework_driver& driver) { @@ -66,9 +70,9 @@ namespace { } } - // job -> trigger primitive layers - for (unsigned i : std::views::iota(0u, primitive_limit)) { - auto tp_store = job_store->make_child(i, "trigger primitive"); + // job -> event layers + for (unsigned i : std::views::iota(0u, top_level_event_limit)) { + auto tp_store = job_store->make_child(i, "event"); tp_store->add_product("number", i); driver.yield(tp_store); } @@ -80,22 +84,24 @@ TEST_CASE("Different hierarchies used with fold", "[graph]") framework_graph g{cells_to_process}; g.fold("run_add", add, concurrency::unlimited, "run", 0u) - .input_family("number") + .input_family("number"_in("event")) .output_products("run_sum"); - g.fold("job_add", add, concurrency::unlimited).input_family("number").output_products("job_sum"); + g.fold("job_add", add, concurrency::unlimited) + .input_family("number"_in("event")) + .output_products("job_sum"); g.observe("verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }) - .input_family("run_sum"); + .input_family("run_sum"_in("run")); g.observe("verify_job_sum", [](unsigned int actual) { - CHECK(actual == 20u + 45u); // 20u from events, 45u from trigger primitives + CHECK(actual == 20u + 45u); // 20u from nested events, 45u from top-level events }) - .input_family("job_sum"); + .input_family("job_sum"_in("job")); g.execute(); CHECK(g.execution_counts("run_add") == index_limit * number_limit); - CHECK(g.execution_counts("job_add") == index_limit * number_limit + primitive_limit); + CHECK(g.execution_counts("job_add") == index_limit * number_limit + top_level_event_limit); CHECK(g.execution_counts("verify_run_sum") == index_limit); CHECK(g.execution_counts("verify_job_sum") == 1); } diff --git a/test/filter.cpp b/test/filter.cpp index 5e12f236f..9e1efacc2 100644 --- a/test/filter.cpp +++ b/test/filter.cpp @@ -105,13 +105,13 @@ TEST_CASE("Two predicates", "[filtering]") TEST_CASE("Two predicates in series", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); g.predicate("odds_only", odds_only, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when("evens_only"); g.make(0u) .observe("add", &sum_numbers::add, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when("odds_only"); g.execute(); @@ -122,11 +122,11 @@ TEST_CASE("Two predicates in series", "[filtering]") TEST_CASE("Two predicates in parallel", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); + g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"_in("event")); g.make(0u) .observe("add", &sum_numbers::add, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when("odds_only", "evens_only"); g.execute(); @@ -149,7 +149,7 @@ TEST_CASE("Three predicates in parallel", "[filtering]") for (auto const& [name, b, e] : configs) { g.make(b, e) .predicate(name, ¬_in_range::eval, concurrency::unlimited) - .input_family("num"); + .input_family("num"_in("event")); } std::vector const predicate_names{ @@ -157,7 +157,7 @@ TEST_CASE("Three predicates in parallel", "[filtering]") auto const expected_numbers = {4u, 5u, 7u}; g.make(expected_numbers) .observe("collect", &collect_numbers::collect, concurrency::unlimited) - .input_family("num") + .input_family("num"_in("event")) .when(predicate_names); g.execute(); @@ -168,16 +168,16 @@ TEST_CASE("Three predicates in parallel", "[filtering]") TEST_CASE("Two predicates in parallel (each with multiple arguments)", "[filtering]") { framework_graph g{source{10u}}; - g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"); - g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"); + g.predicate("evens_only", evens_only, concurrency::unlimited).input_family("num"_in("event")); + g.predicate("odds_only", odds_only, concurrency::unlimited).input_family("num"_in("event")); g.make(5 * 100) .observe("check_evens", &check_multiple_numbers::add_difference, concurrency::unlimited) - .input_family("num", "other_num") // <= Note input order + .input_family("num"_in("event"), "other_num"_in("event")) // <= Note input order .when("evens_only"); g.make(-5 * 100) .observe("check_odds", &check_multiple_numbers::add_difference, concurrency::unlimited) - .input_family("other_num", "num") // <= Note input order + .input_family("other_num"_in("event"), "num"_in("event")) // <= Note input order .when("odds_only"); g.execute(); diff --git a/test/fold.cpp b/test/fold.cpp index 328486164..2cc17521b 100644 --- a/test/fold.cpp +++ b/test/fold.cpp @@ -65,25 +65,27 @@ TEST_CASE("Different data layers of fold", "[graph]") framework_graph g{cells_to_process}; g.fold("run_add", add, concurrency::unlimited, "run") - .input_family("number") + .input_family("number"_in("event")) .output_products("run_sum"); - g.fold("job_add", add, concurrency::unlimited).input_family("number").output_products("job_sum"); + g.fold("job_add", add, concurrency::unlimited) + .input_family("number"_in("event")) + .output_products("job_sum"); g.fold("two_layer_job_add", add, concurrency::unlimited) - .input_family("run_sum") + .input_family("run_sum"_in("run")) .output_products("two_layer_job_sum"); g.observe( "verify_run_sum", [](unsigned int actual) { CHECK(actual == 10u); }, concurrency::unlimited) - .input_family("run_sum"); + .input_family("run_sum"_in("run")); g.observe( "verify_two_layer_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::unlimited) - .input_family("two_layer_job_sum"); + .input_family("two_layer_job_sum"_in("job")); g.observe( "verify_job_sum", [](unsigned int actual) { CHECK(actual == 20u); }, concurrency::unlimited) - .input_family("job_sum"); + .input_family("job_sum"_in("job")); g.execute(); diff --git a/test/function_registration.cpp b/test/function_registration.cpp index d76c19ecd..5a0a0d294 100644 --- a/test/function_registration.cpp +++ b/test/function_registration.cpp @@ -48,9 +48,8 @@ namespace { TEST_CASE("Call non-framework functions", "[programming model]") { - std::array const product_names{ - product_query{"number"}, product_query{"temperature"}, product_query{"name"}}; - std::array const oproduct_names = {"number"s, "temperature"s, "name"s}; + std::array const product_names{"number"_in("job"), "temperature"_in("job"), "name"_in("job")}; + std::array const oproduct_names = {"onumber"s, "otemperature"s, "oname"s}; std::array const result{"result"s}; auto store = product_store::base(); diff --git a/test/hierarchical_nodes.cpp b/test/hierarchical_nodes.cpp index 0cc36f8b6..773c042e2 100644 --- a/test/hierarchical_nodes.cpp +++ b/test/hierarchical_nodes.cpp @@ -103,22 +103,23 @@ TEST_CASE("Hierarchical nodes", "[graph]") framework_graph g{cells_to_process}; g.transform("get_the_time", strtime, concurrency::unlimited) - .input_family("time") + .input_family("time"_in("run")) .when() .output_products("strtime"); g.transform("square", square, concurrency::unlimited) - .input_family("number") + .input_family("number"_in("event")) .output_products("squared_number"); g.fold("add", add, concurrency::unlimited, "run", 15u) - .input_family("squared_number") + .input_family("squared_number"_in("event")) .when() .output_products("added_data"); g.transform("scale", scale, concurrency::unlimited) - .input_family("added_data") + .input_family("added_data"_in("run")) .output_products("result"); - g.observe("print_result", print_result, concurrency::unlimited).input_family("result", "strtime"); + g.observe("print_result", print_result, concurrency::unlimited) + .input_family("result"_in("run"), "strtime"_in("run")); g.make().output("save", &test::products_for_output::save).when(); diff --git a/test/max-parallelism/check_parallelism.cpp b/test/max-parallelism/check_parallelism.cpp index 526b67277..4f14b0f3d 100644 --- a/test/max-parallelism/check_parallelism.cpp +++ b/test/max-parallelism/check_parallelism.cpp @@ -36,5 +36,5 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config) [expected = config.get("expected_parallelism")](std::size_t actual) { assert(actual == expected); }) - .input_family("max_parallelism"); + .input_family("max_parallelism"_in("job")); } diff --git a/test/memory-checks/many_events.cpp b/test/memory-checks/many_events.cpp index 763d93859..7a8703f8e 100644 --- a/test/memory-checks/many_events.cpp +++ b/test/memory-checks/many_events.cpp @@ -26,7 +26,7 @@ int main() framework_graph g{cells_to_process}; g.transform("pass_on", pass_on, concurrency::unlimited) - .input_family("number") + .input_family("number"_in("event")) .output_products("different"); g.execute(); } diff --git a/test/mock-workflow/algorithm.hpp b/test/mock-workflow/algorithm.hpp index c16d963eb..7692438d8 100644 --- a/test/mock-workflow/algorithm.hpp +++ b/test/mock-workflow/algorithm.hpp @@ -8,6 +8,7 @@ #include "fmt/std.h" #include "spdlog/spdlog.h" +#include #include #include #include @@ -68,10 +69,20 @@ namespace phlex::experimental::test { using inputs_t = ensure_tuple; using algorithm_t = algorithm; concurrency const j{c.get("concurrency", concurrency::unlimited.value)}; + + // Convert product strings to product queries. + // FIXME: There should be a c.get(...) specialization + auto input_product_strings = c.get("inputs"); + std::array> queries{}; + std::ranges::transform( + input_product_strings, queries.begin(), [](std::string const& product_string) { + return product_query{product_string, "event"}; + }); + m.template make(c.get("module_label"), c.get("duration_usec")) .transform("execute", &algorithm_t::execute, j) - .input_family(c.get("inputs")) + .input_family(std::move(queries)) .output_products(c.get("outputs")); } } diff --git a/test/multiple_function_registration.cpp b/test/multiple_function_registration.cpp index 876fba44a..7f106c523 100644 --- a/test/multiple_function_registration.cpp +++ b/test/multiple_function_registration.cpp @@ -49,33 +49,34 @@ TEST_CASE("Call multiple functions", "[programming model]") SECTION("All free functions") { g.transform("square_numbers", square_numbers, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("job")) .output_products("squared_numbers"); g.transform("sum_numbers", sum_numbers, concurrency::unlimited) - .input_family("squared_numbers") + .input_family("squared_numbers"_in("job")) .output_products("summed_numbers"); g.transform("sqrt_sum_numbers", sqrt_sum_numbers, concurrency::unlimited) - .input_family("summed_numbers", "offset") + .input_family("summed_numbers"_in("job"), "offset"_in("job")) .output_products("result"); } SECTION("Transforms, one from a class") { g.transform("square_numbers", square_numbers, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("job")) .output_products("squared_numbers"); g.transform("sum_numbers", sum_numbers, concurrency::unlimited) - .input_family("squared_numbers") + .input_family("squared_numbers"_in("job")) .output_products("summed_numbers"); g.make() .transform("sqrt_sum", &A::sqrt_sum, concurrency::unlimited) - .input_family("summed_numbers", "offset") + .input_family("summed_numbers"_in("job"), "offset"_in("job")) .output_products("result"); } // The following is invoked for *each* section above - g.observe("verify_result", [](double actual) { assert(actual == 6.); }).input_family("result"); + g.observe("verify_result", [](double actual) { assert(actual == 6.); }) + .input_family("result"_in("job")); g.execute(); } diff --git a/test/plugins/module.cpp b/test/plugins/module.cpp index de8741183..2ed2a27bc 100644 --- a/test/plugins/module.cpp +++ b/test/plugins/module.cpp @@ -10,9 +10,9 @@ using namespace phlex::experimental; PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m) { m.transform("add", test::add, concurrency::unlimited) - .input_family("i", "j") + .input_family("i"_in("event"), "j"_in("event")) .output_products("sum"); m.observe( "verify", [](int actual) { assert(actual == 0); }, concurrency::unlimited) - .input_family("sum"); + .input_family("sum"_in("event")); } diff --git a/test/type_distinction.cpp b/test/type_distinction.cpp index 3784188bb..e9330623a 100644 --- a/test/type_distinction.cpp +++ b/test/type_distinction.cpp @@ -53,31 +53,32 @@ TEST_CASE("Distinguish products with same name and different types", "[programmi SECTION("Duplicate product name but differ in producer name") { - g.observe("starter", [](int num) { spdlog::info("Recieved {}", num); }).input_family("numbers"); + g.observe("starter", [](int num) { spdlog::info("Received {}", num); }) + .input_family("numbers"_in("event")); g.transform("triple_numbers", triple, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("event")) .output_products("tripled"); spdlog::info("Registered tripled"); g.transform("expand_orig", expand, concurrency::unlimited) - .input_family("numbers", "length") + .input_family("numbers"_in("event"), "length"_in("event")) .output_products("expanded_one"); spdlog::info("Registered expanded_one"); g.transform("expand_triples", expand, concurrency::unlimited) - .input_family("tripled", "length") + .input_family("tripled"_in("event"), "length"_in("event")) .output_products("expanded_three"); spdlog::info("Registered expanded_three"); g.transform("add_nums", add_numbers, concurrency::unlimited) - .input_family("numbers", "tripled") + .input_family("numbers"_in("event"), "tripled"_in("event")) .output_products("sums"); spdlog::info("Registered sums"); g.transform("add_vect", add_vectors, concurrency::unlimited) - .input_family("expanded_one", "expanded_three") + .input_family("expanded_one"_in("event"), "expanded_three"_in("event")) .output_products("sums"); g.transform("test_add_num", triple, concurrency::unlimited) - .input_family("sums") + .input_family("sums"_in("event")) .output_products("result"); spdlog::info("Registered result"); } @@ -85,16 +86,16 @@ TEST_CASE("Distinguish products with same name and different types", "[programmi SECTION("Duplicate product name and producer, differ only in type") { g.transform("square", square, concurrency::unlimited) - .input_family("numbers") + .input_family("numbers"_in("event")) .output_products("square_result", "square_result"); g.transform("extract_result", id, concurrency::unlimited) - .input_family("square_result") + .input_family("square_result"_in("event")) .output_products("result"); } g.observe("print_result", [](int res) { spdlog::info("Result: {}", res); }) - .input_family("result"); + .input_family("result"_in("event")); spdlog::info("Registered observe"); g.execute(); spdlog::info("Executed"); diff --git a/test/unfold.cpp b/test/unfold.cpp index 348fda919..899239871 100644 --- a/test/unfold.cpp +++ b/test/unfold.cpp @@ -98,24 +98,25 @@ TEST_CASE("Splitting the processing", "[graph]") framework_graph g{cells_to_process}; g.unfold("iota", &iota::predicate, &iota::unfold, concurrency::unlimited, "lower1") - .input_family("max_number") + .input_family("max_number"_in("event")) .output_products("new_number"); g.fold("add", add, concurrency::unlimited, "event") - .input_family("new_number") + .input_family("new_number"_in("lower1")) .output_products("sum1"); - g.observe("check_sum", check_sum, concurrency::unlimited).input_family("sum1"); + g.observe("check_sum", check_sum, concurrency::unlimited).input_family("sum1"_in("event")); g.unfold("iterate_through", &iterate_through::predicate, &iterate_through::unfold, concurrency::unlimited, "lower2") - .input_family("ten_numbers") + .input_family("ten_numbers"_in("event")) .output_products("each_number"); g.fold("add_numbers", add_numbers, concurrency::unlimited, "event") - .input_family("each_number") + .input_family("each_number"_in("lower2")) .output_products("sum2"); - g.observe("check_sum_same", check_sum_same, concurrency::unlimited).input_family("sum2"); + g.observe("check_sum_same", check_sum_same, concurrency::unlimited) + .input_family("sum2"_in("event")); g.make().output( "save", &test::products_for_output::save, concurrency::serial); diff --git a/test/vector_of_abstract_types.cpp b/test/vector_of_abstract_types.cpp index c55bd4414..8588beba8 100644 --- a/test/vector_of_abstract_types.cpp +++ b/test/vector_of_abstract_types.cpp @@ -60,10 +60,10 @@ namespace { TEST_CASE("Test vector of abstract types") { framework_graph g{source{1u}}; - g.transform("read_thing", read_abstract).input_family("thing").output_products("sum"); + g.transform("read_thing", read_abstract).input_family("thing"_in("event")).output_products("sum"); g.observe( "verify_sum", [](int sum) { CHECK(sum == 3); }, concurrency::serial) - .input_family("sum"); + .input_family("sum"_in("event")); g.execute(); CHECK(g.execution_counts("read_thing") == 1); From 6e004e9dda4e03058d17501b31b90c0422f72e9d Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 3 Dec 2025 12:58:20 -0600 Subject: [PATCH 04/10] Remove unneeded concept --- phlex/core/product_query.hpp | 14 -------------- phlex/core/registration_api.hpp | 26 ++++++-------------------- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/phlex/core/product_query.hpp b/phlex/core/product_query.hpp index cce79e8f5..563e407fd 100644 --- a/phlex/core/product_query.hpp +++ b/phlex/core/product_query.hpp @@ -32,20 +32,6 @@ namespace phlex::experimental { bool operator<(product_query const& a, product_query const& b); std::ostream& operator<<(std::ostream& os, product_query const& label); - template - concept label_compatible = requires(T t) { - { product_query::create(t) }; - }; - - template - auto to_labels(std::array const& like_labels) - { - std::array labels; - std::ranges::transform( - like_labels, labels.begin(), [](T const& t) { return product_query::create(t); }); - return labels; - } - namespace detail { // C is a container of product_queries template diff --git a/phlex/core/registration_api.hpp b/phlex/core/registration_api.hpp index 98092f1d9..d281ba494 100644 --- a/phlex/core/registration_api.hpp +++ b/phlex/core/registration_api.hpp @@ -77,19 +77,12 @@ namespace phlex::experimental { return upstream_predicates{std::move(registrar_), config_}; } - template - auto input_family(std::array input_args) - { - return input_family(to_labels(input_args)); - } - - auto input_family(label_compatible auto... input_args) + auto input_family(std::same_as auto... input_args) { static_assert(N == sizeof...(input_args), "The number of function parameters is not the same as the number of specified " "input arguments."); - return input_family( - {product_query::create(std::forward(input_args))...}); + return input_family({std::move(input_args)...}); } private: @@ -167,19 +160,12 @@ namespace phlex::experimental { return upstream_predicates{std::move(registrar_), config_}; } - template - auto input_family(std::array input_args) - { - return input_family(to_labels(input_args)); - } - - auto input_family(label_compatible auto... input_args) + auto input_family(std::same_as auto... input_args) { static_assert(N - 1 == sizeof...(input_args), "The number of function parameters is not the same as the number of specified " "input arguments."); - return input_family( - {product_query::create(std::forward(input_args))...}); + return input_family({std::move(input_args)...}); } private: @@ -251,12 +237,12 @@ namespace phlex::experimental { return upstream_predicates{std::move(registrar_), config_}; } - auto input_family(label_compatible auto... input_args) + auto input_family(std::same_as auto... input_args) { static_assert(N == sizeof...(input_args), "The number of function parameters is not the same as the number of specified " "input arguments."); - return input_family({product_query{std::forward(input_args)}...}); + return input_family({std::move(input_args)...}); } private: From f69a95aa3d7f1515bf37433bfa65c09947e748b9 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 3 Dec 2025 13:32:30 -0600 Subject: [PATCH 05/10] Turn run-time checks into compile-time requirements --- phlex/core/product_query.cpp | 20 +++++++------------- phlex/core/product_query.hpp | 10 +++++----- phlex/core/registrar.cpp | 23 ----------------------- phlex/core/registrar.hpp | 11 ----------- phlex/core/registration_api.hpp | 3 --- test/product_query.cpp | 22 ++++++++-------------- 6 files changed, 20 insertions(+), 69 deletions(-) diff --git a/phlex/core/product_query.cpp b/phlex/core/product_query.cpp index e3c425030..69b6226c9 100644 --- a/phlex/core/product_query.cpp +++ b/phlex/core/product_query.cpp @@ -7,9 +7,12 @@ #include namespace phlex::experimental { - product_query product_query::operator()(std::string layer) && + product_query product_query_prefix::operator()(std::string data_layer) && { - return {std::move(name), std::move(layer)}; + if (data_layer.empty()) { + throw std::runtime_error("Cannot specify the empty string as a data layer."); + } + return {std::move(name), std::move(data_layer)}; } std::string product_query::to_string() const @@ -20,12 +23,12 @@ namespace phlex::experimental { return fmt::format("{} ϵ {}", name.full(), layer); } - product_query operator""_in(char const* name, std::size_t length) + product_query_prefix operator""_in(char const* product_name, std::size_t length) { if (length == 0ull) { throw std::runtime_error("Cannot specify product with empty name."); } - return product_query::create(name); + return {product_specification::create(product_name)}; } bool operator==(product_query const& a, product_query const& b) @@ -45,13 +48,4 @@ namespace phlex::experimental { os << label.to_string(); return os; } - - product_query product_query::create(char const* c) { return create(std::string{c}); } - - product_query product_query::create(std::string const& s) - { - return {product_specification::create(s)}; - } - - product_query product_query::create(product_query l) { return l; } } diff --git a/phlex/core/product_query.hpp b/phlex/core/product_query.hpp index 563e407fd..de0215be5 100644 --- a/phlex/core/product_query.hpp +++ b/phlex/core/product_query.hpp @@ -13,12 +13,12 @@ namespace phlex::experimental { struct product_query { product_specification name; std::string layer; - product_query operator()(std::string layer) &&; std::string to_string() const; + }; - static product_query create(char const* c); - static product_query create(std::string const& s); - static product_query create(product_query l); + struct product_query_prefix { + product_specification name; + product_query operator()(std::string layer) &&; }; using product_queries = std::vector; @@ -26,7 +26,7 @@ namespace phlex::experimental { inline auto& to_name(product_query const& label) { return label.name.name(); } inline auto& to_layer(product_query& label) { return label.layer; } - product_query operator""_in(char const* str, std::size_t); + product_query_prefix operator""_in(char const* str, std::size_t); bool operator==(product_query const& a, product_query const& b); bool operator!=(product_query const& a, product_query const& b); bool operator<(product_query const& a, product_query const& b); diff --git a/phlex/core/registrar.cpp b/phlex/core/registrar.cpp index e72ff1b4c..a9878349e 100644 --- a/phlex/core/registrar.cpp +++ b/phlex/core/registrar.cpp @@ -3,29 +3,6 @@ #include "fmt/format.h" namespace phlex::experimental::detail { - - void verify_layers(std::vector& errors, - algorithm_name const& name, - std::span queries) - { - std::vector malformed_queries; - std::string err_msg; - for (auto const& query : queries) { - if (query.family.empty()) { - malformed_queries.push_back(query.to_string()); - } - } - - if (malformed_queries.empty()) { - return; - } - - errors.push_back(fmt::format("Node '{}' has one or more product specifications that do not " - "include a data layer: {}", - name.full(), - malformed_queries)); - } - void add_to_error_messages(std::vector& errors, std::string const& name) { errors.push_back(fmt::format("Node with name '{}' already exists", name)); diff --git a/phlex/core/registrar.hpp b/phlex/core/registrar.hpp index 9b72e3038..92ef9e55b 100644 --- a/phlex/core/registrar.hpp +++ b/phlex/core/registrar.hpp @@ -47,23 +47,17 @@ // // ======================================================================================= -#include "phlex/core/product_query.hpp" -#include "phlex/model/algorithm_name.hpp" #include "phlex/utilities/simple_ptr_map.hpp" #include #include #include -#include #include #include namespace phlex::experimental { namespace detail { - void verify_layers(std::vector& errors, - algorithm_name const& name, - std::span queries); void add_to_error_messages(std::vector& errors, std::string const& name); } @@ -84,11 +78,6 @@ namespace phlex::experimental { registrar(registrar&&) = default; registrar& operator=(registrar&&) = default; - void verify_layers(algorithm_name const& name, std::span queries) - { - detail::verify_layers(*errors_, name, queries); - } - bool has_predicates() const { return predicates_.has_value(); } void set_creator(node_creator creator) { creator_ = std::move(creator); } diff --git a/phlex/core/registration_api.hpp b/phlex/core/registration_api.hpp index d281ba494..c331e4c72 100644 --- a/phlex/core/registration_api.hpp +++ b/phlex/core/registration_api.hpp @@ -51,7 +51,6 @@ namespace phlex::experimental { { populate_types(input_args); - registrar_.verify_layers(name_, input_args); if constexpr (M == 0ull) { registrar_.set_creator( [this, inputs = std::move(input_args)](auto predicates, auto /* output_products */) { @@ -143,7 +142,6 @@ namespace phlex::experimental { { populate_types(input_args); - registrar_.verify_layers(name_, input_args); registrar_.set_creator( [this, inputs = std::move(input_args)](auto predicates, auto output_products) { return std::make_unique>( @@ -220,7 +218,6 @@ namespace phlex::experimental { { populate_types(input_args); - registrar_.verify_layers(name_, input_args); registrar_.set_creator( [this, inputs = std::move(input_args)](auto upstream_predicates, auto output_products) { return std::make_unique>( diff --git a/test/product_query.cpp b/test/product_query.cpp index 63e54cb33..c99bab1d1 100644 --- a/test/product_query.cpp +++ b/test/product_query.cpp @@ -1,26 +1,20 @@ #include "phlex/core/product_query.hpp" #include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers_string.hpp" using namespace phlex::experimental; -TEST_CASE("Empty label", "[data model]") +TEST_CASE("Empty specifications", "[data model]") { - product_query empty{}; - CHECK_THROWS(""_in); - CHECK_THROWS(""_in("")); + CHECK_THROWS_WITH(""_in, + Catch::Matchers::ContainsSubstring("Cannot specify product with empty name.")); + CHECK_THROWS_WITH( + "product"_in(""), + Catch::Matchers::ContainsSubstring("Cannot specify the empty string as a data layer.")); } -TEST_CASE("Only name in label", "[data model]") -{ - product_query label{"product"}; - CHECK(label == "product"_in); - - // Empty layer string is interpreted as a wildcard--i.e. any layer. - CHECK(label == "product"_in("")); -} - -TEST_CASE("Label with layer", "[data model]") +TEST_CASE("Product name with data layer", "[data model]") { product_query label{"product", {"event"}}; CHECK(label == "product"_in("event")); From b989fda5ea538b5aec7640986ca576022e41a3a1 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Thu, 4 Dec 2025 09:16:02 -0600 Subject: [PATCH 06/10] Rename product_query_prefix to product_tag --- phlex/core/product_query.cpp | 4 ++-- phlex/core/product_query.hpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/phlex/core/product_query.cpp b/phlex/core/product_query.cpp index 69b6226c9..08162e531 100644 --- a/phlex/core/product_query.cpp +++ b/phlex/core/product_query.cpp @@ -7,7 +7,7 @@ #include namespace phlex::experimental { - product_query product_query_prefix::operator()(std::string data_layer) && + product_query product_tag::operator()(std::string data_layer) && { if (data_layer.empty()) { throw std::runtime_error("Cannot specify the empty string as a data layer."); @@ -23,7 +23,7 @@ namespace phlex::experimental { return fmt::format("{} ϵ {}", name.full(), layer); } - product_query_prefix operator""_in(char const* product_name, std::size_t length) + product_tag operator""_in(char const* product_name, std::size_t length) { if (length == 0ull) { throw std::runtime_error("Cannot specify product with empty name."); diff --git a/phlex/core/product_query.hpp b/phlex/core/product_query.hpp index de0215be5..b781c334f 100644 --- a/phlex/core/product_query.hpp +++ b/phlex/core/product_query.hpp @@ -16,7 +16,7 @@ namespace phlex::experimental { std::string to_string() const; }; - struct product_query_prefix { + struct product_tag { product_specification name; product_query operator()(std::string layer) &&; }; @@ -26,7 +26,7 @@ namespace phlex::experimental { inline auto& to_name(product_query const& label) { return label.name.name(); } inline auto& to_layer(product_query& label) { return label.layer; } - product_query_prefix operator""_in(char const* str, std::size_t); + product_tag operator""_in(char const* str, std::size_t); bool operator==(product_query const& a, product_query const& b); bool operator!=(product_query const& a, product_query const& b); bool operator<(product_query const& a, product_query const& b); From 8de6b14f3be68d5c47be8c6a2a76fc8925aee5ab Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Thu, 4 Dec 2025 10:27:21 -0600 Subject: [PATCH 07/10] Catch unspecified exceptions --- phlex/core/framework_graph.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index e07d55eb6..46194b3b6 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -92,6 +92,9 @@ namespace phlex::experimental { } catch (std::exception const& e) { spdlog::error(e.what()); throw; + } catch (...) { + spdlog::error("Unknown exception during graph execution"); + throw; } void framework_graph::run() From ef72f55c23f3b72b9400942bc583c5b7826e590b Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Thu, 4 Dec 2025 11:41:04 -0600 Subject: [PATCH 08/10] Catch and rethrow exceptions from driver thread --- phlex/utilities/async_driver.hpp | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/phlex/utilities/async_driver.hpp b/phlex/utilities/async_driver.hpp index 31c29a1d9..2ea9bf336 100644 --- a/phlex/utilities/async_driver.hpp +++ b/phlex/utilities/async_driver.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -22,13 +23,22 @@ namespace phlex::experimental { } async_driver(void (*ft)(async_driver&)) : driver_{ft} {} - ~async_driver() { thread_.join(); } + ~async_driver() + { + if (thread_.joinable()) { + thread_.join(); + } + } std::optional operator()() { if (gear_ == states::off) { thread_ = std::thread{[this] { - driver_(*this); + try { + driver_(*this); + } catch (...) { + cached_exception_ = std::current_exception(); + } gear_ = states::park; cv_.notify_one(); }}; @@ -39,6 +49,11 @@ namespace phlex::experimental { std::unique_lock lock{mutex_}; cv_.wait(lock, [&] { return current_.has_value() or gear_ == states::park; }); + + if (cached_exception_) { + std::rethrow_exception(cached_exception_); + } + return std::exchange(current_, std::nullopt); } @@ -57,6 +72,7 @@ namespace phlex::experimental { std::thread thread_; std::mutex mutex_; std::condition_variable cv_; + std::exception_ptr cached_exception_; }; } From dea27a119b7a931bc29256b2b77cd9494fddf70c Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Thu, 4 Dec 2025 11:41:33 -0600 Subject: [PATCH 09/10] Check that exceptions are caught and reported --- test/CMakeLists.txt | 1 + test/framework_graph.cpp | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 test/framework_graph.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d67473cc1..68659d70e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,6 +40,7 @@ add_catch_test(class_registration LIBRARIES phlex::core Boost::json) add_catch_test(different_hierarchies LIBRARIES phlex::core) add_catch_test(filter_impl LIBRARIES phlex::core) add_catch_test(filter LIBRARIES phlex::core Boost::json) +add_catch_test(framework_graph LIBRARIES phlex::core) add_catch_test(function_registration LIBRARIES phlex::core Boost::json) add_catch_test(hierarchical_nodes LIBRARIES Boost::json TBB::tbb phlex::core) add_catch_test(multiple_function_registration LIBRARIES Boost::json phlex::core) diff --git a/test/framework_graph.cpp b/test/framework_graph.cpp new file mode 100644 index 000000000..2ed3e30e3 --- /dev/null +++ b/test/framework_graph.cpp @@ -0,0 +1,19 @@ +#include "phlex/core/framework_graph.hpp" + +#include "catch2/catch_test_macros.hpp" + +#include + +using namespace phlex::experimental; + +TEST_CASE("Catch STL exceptions", "[graph]") +{ + framework_graph g{[](framework_driver&) { throw std::runtime_error("STL error"); }}; + CHECK_THROWS_AS(g.execute(), std::exception); +} + +TEST_CASE("Catch other exceptions", "[graph]") +{ + framework_graph g{[](framework_driver&) { throw 2.5; }}; + CHECK_THROWS_AS(g.execute(), double); +} From 65b587a27952d0e09c5d2fc3ff5338b8709b7620 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Thu, 4 Dec 2025 11:57:01 -0600 Subject: [PATCH 10/10] Use std::jthread to automatically join on driver-thread destruction --- phlex/utilities/async_driver.hpp | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/phlex/utilities/async_driver.hpp b/phlex/utilities/async_driver.hpp index 2ea9bf336..435f90faf 100644 --- a/phlex/utilities/async_driver.hpp +++ b/phlex/utilities/async_driver.hpp @@ -23,17 +23,10 @@ namespace phlex::experimental { } async_driver(void (*ft)(async_driver&)) : driver_{ft} {} - ~async_driver() - { - if (thread_.joinable()) { - thread_.join(); - } - } - std::optional operator()() { if (gear_ == states::off) { - thread_ = std::thread{[this] { + thread_ = std::jthread{[this] { try { driver_(*this); } catch (...) { @@ -69,7 +62,7 @@ namespace phlex::experimental { std::function driver_; std::optional current_; std::atomic gear_ = states::off; - std::thread thread_; + std::jthread thread_; std::mutex mutex_; std::condition_variable cv_; std::exception_ptr cached_exception_;