Skip to content
Merged
8 changes: 7 additions & 1 deletion phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,15 @@ namespace phlex::experimental {
}

void framework_graph::execute()
{
try {
finalize();
run();
} catch (std::exception const& e) {
spdlog::error(e.what());
throw;
} catch (...) {
spdlog::error("Unknown exception during graph execution");
throw;
}

void framework_graph::run()
Expand Down
20 changes: 7 additions & 13 deletions phlex/core/product_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
#include <tuple>

namespace phlex::experimental {
product_query product_query::operator()(std::string layer) &&
product_query product_tag::operator()(std::string data_layer) &&
{
return {std::move(name), std::move(layer)};
if (data_layer.empty()) {
throw std::runtime_error("Cannot specify the empty string as a data layer.");
}
return {std::move(name), std::move(data_layer)};
}

std::string product_query::to_string() const
Expand All @@ -20,12 +23,12 @@ namespace phlex::experimental {
return fmt::format("{} ϵ {}", name.full(), layer);
}

product_query operator""_in(char const* name, std::size_t length)
product_tag operator""_in(char const* product_name, std::size_t length)
{
if (length == 0ull) {
throw std::runtime_error("Cannot specify product with empty name.");
}
return product_query::create(name);
return {product_specification::create(product_name)};
}

bool operator==(product_query const& a, product_query const& b)
Expand All @@ -45,13 +48,4 @@ namespace phlex::experimental {
os << label.to_string();
return os;
}

product_query product_query::create(char const* c) { return create(std::string{c}); }

product_query product_query::create(std::string const& s)
{
return {product_specification::create(s)};
}

product_query product_query::create(product_query l) { return l; }
}
24 changes: 5 additions & 19 deletions phlex/core/product_query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,25 @@ namespace phlex::experimental {
struct product_query {
product_specification name;
std::string layer;
product_query operator()(std::string layer) &&;
std::string to_string() const;
};

static product_query create(char const* c);
static product_query create(std::string const& s);
static product_query create(product_query l);
struct product_tag {
product_specification name;
product_query operator()(std::string layer) &&;
};

using product_queries = std::vector<product_query>;

inline auto& to_name(product_query const& label) { return label.name.name(); }
inline auto& to_layer(product_query& label) { return label.layer; }

product_query operator""_in(char const* str, std::size_t);
product_tag operator""_in(char const* str, std::size_t);
bool operator==(product_query const& a, product_query const& b);
bool operator!=(product_query const& a, product_query const& b);
bool operator<(product_query const& a, product_query const& b);
std::ostream& operator<<(std::ostream& os, product_query const& label);

template <typename T>
concept label_compatible = requires(T t) {
{ product_query::create(t) };
};

template <label_compatible T, std::size_t N>
auto to_labels(std::array<T, N> const& like_labels)
{
std::array<product_query, N> labels;
std::ranges::transform(
like_labels, labels.begin(), [](T const& t) { return product_query::create(t); });
return labels;
}

namespace detail {
// C is a container of product_queries
template <typename C, typename T>
Expand Down
29 changes: 9 additions & 20 deletions phlex/core/registration_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace phlex::experimental {
auto input_family(std::array<product_query, N> input_args)
{
populate_types<input_parameter_types>(input_args);

if constexpr (M == 0ull) {
registrar_.set_creator(
[this, inputs = std::move(input_args)](auto predicates, auto /* output_products */) {
Expand All @@ -75,19 +76,12 @@ namespace phlex::experimental {
return upstream_predicates<NodePtr, M>{std::move(registrar_), config_};
}

template <label_compatible L>
auto input_family(std::array<L, N> input_args)
{
return input_family(to_labels(input_args));
}

auto input_family(label_compatible auto... input_args)
auto input_family(std::same_as<product_query> auto... input_args)
{
static_assert(N == sizeof...(input_args),
"The number of function parameters is not the same as the number of specified "
"input arguments.");
return input_family(
{product_query::create(std::forward<decltype(input_args)>(input_args))...});
return input_family({std::move(input_args)...});
}

private:
Expand Down Expand Up @@ -147,6 +141,7 @@ namespace phlex::experimental {
auto input_family(std::array<product_query, N - 1> input_args)
{
populate_types<input_parameter_types>(input_args);

registrar_.set_creator(
[this, inputs = std::move(input_args)](auto predicates, auto output_products) {
return std::make_unique<fold_node<AlgorithmBits, InitTuple>>(
Expand All @@ -163,19 +158,12 @@ namespace phlex::experimental {
return upstream_predicates<declared_fold_ptr, M>{std::move(registrar_), config_};
}

template <label_compatible L>
auto input_family(std::array<L, N> input_args)
{
return input_family(to_labels(input_args));
}

auto input_family(label_compatible auto... input_args)
auto input_family(std::same_as<product_query> auto... input_args)
{
static_assert(N - 1 == sizeof...(input_args),
"The number of function parameters is not the same as the number of specified "
"input arguments.");
return input_family(
{product_query::create(std::forward<decltype(input_args)>(input_args))...});
return input_family({std::move(input_args)...});
}

private:
Expand Down Expand Up @@ -229,6 +217,7 @@ namespace phlex::experimental {
auto input_family(std::array<product_query, N> input_args)
{
populate_types<input_parameter_types>(input_args);

registrar_.set_creator(
[this, inputs = std::move(input_args)](auto upstream_predicates, auto output_products) {
return std::make_unique<unfold_node<Object, Predicate, Unfold>>(
Expand All @@ -245,12 +234,12 @@ namespace phlex::experimental {
return upstream_predicates<declared_unfold_ptr, M>{std::move(registrar_), config_};
}

auto input_family(label_compatible auto... input_args)
auto input_family(std::same_as<product_query> auto... input_args)
{
static_assert(N == sizeof...(input_args),
"The number of function parameters is not the same as the number of specified "
"input arguments.");
return input_family({product_query{std::forward<decltype(input_args)>(input_args)}...});
return input_family({std::move(input_args)...});
}

private:
Expand Down
19 changes: 14 additions & 5 deletions phlex/utilities/async_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <atomic>
#include <condition_variable>
#include <exception>
#include <functional>
#include <mutex>
#include <optional>
Expand All @@ -22,13 +23,15 @@ namespace phlex::experimental {
}
async_driver(void (*ft)(async_driver<RT>&)) : driver_{ft} {}

~async_driver() { thread_.join(); }

std::optional<RT> operator()()
{
if (gear_ == states::off) {
thread_ = std::thread{[this] {
driver_(*this);
thread_ = std::jthread{[this] {
try {
driver_(*this);
} catch (...) {
cached_exception_ = std::current_exception();
}
gear_ = states::park;
cv_.notify_one();
}};
Expand All @@ -39,6 +42,11 @@ namespace phlex::experimental {

std::unique_lock lock{mutex_};
cv_.wait(lock, [&] { return current_.has_value() or gear_ == states::park; });

if (cached_exception_) {
std::rethrow_exception(cached_exception_);
}

return std::exchange(current_, std::nullopt);
}

Expand All @@ -54,9 +62,10 @@ namespace phlex::experimental {
std::function<void(async_driver&)> driver_;
std::optional<RT> current_;
std::atomic<states> gear_ = states::off;
std::thread thread_;
std::jthread thread_;
std::mutex mutex_;
std::condition_variable cv_;
std::exception_ptr cached_exception_;
};
}

Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_catch_test(class_registration LIBRARIES phlex::core Boost::json)
add_catch_test(different_hierarchies LIBRARIES phlex::core)
add_catch_test(filter_impl LIBRARIES phlex::core)
add_catch_test(filter LIBRARIES phlex::core Boost::json)
add_catch_test(framework_graph LIBRARIES phlex::core)
add_catch_test(function_registration LIBRARIES phlex::core Boost::json)
add_catch_test(hierarchical_nodes LIBRARIES Boost::json TBB::tbb phlex::core)
add_catch_test(multiple_function_registration LIBRARIES Boost::json phlex::core)
Expand Down
7 changes: 4 additions & 3 deletions test/benchmarks/accept_even_ids.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config)
{
using namespace phlex::experimental;
m.predicate(
"accept_even_ids",
[](phlex::experimental::data_cell_index const& id) { return id.number() % 2 == 0; },
phlex::experimental::concurrency::unlimited)
.input_family(config.get<std::string>("product_name"));
[](data_cell_index const& id) { return id.number() % 2 == 0; },
concurrency::unlimited)
.input_family(product_query{config.get<std::string>("product_name"), "event"});
}
7 changes: 3 additions & 4 deletions test/benchmarks/accept_even_numbers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config)
{
using namespace phlex::experimental;
m.predicate(
"accept_even_numbers",
[](int i) { return i % 2 == 0; },
phlex::experimental::concurrency::unlimited)
.input_family(config.get<std::string>("consumes"));
"accept_even_numbers", [](int i) { return i % 2 == 0; }, concurrency::unlimited)
.input_family(product_query{config.get<std::string>("consumes"), "event"});
}
6 changes: 3 additions & 3 deletions test/benchmarks/accept_fibonacci_numbers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config)
{
using namespace phlex::experimental;
m.make<test::fibonacci_numbers>(config.get<int>("max_number"))
.predicate(
"accept", &test::fibonacci_numbers::accept, phlex::experimental::concurrency::unlimited)
.input_family(config.get<std::string>("consumes"));
.predicate("accept", &test::fibonacci_numbers::accept, concurrency::unlimited)
.input_family(product_query{config.get<std::string>("consumes"), "event"});
}
2 changes: 1 addition & 1 deletion test/benchmarks/last_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ namespace {
PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config)
{
m.transform("last_index", last_index, concurrency::unlimited)
.input_family("id")
.input_family("id"_in("event"))
.output_products(config.get<std::string>("produces", "a"));
}
4 changes: 3 additions & 1 deletion test/benchmarks/plus_101.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ namespace {

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m)
{
m.transform("plus_101", plus_101, concurrency::unlimited).input_family("a").output_products("c");
m.transform("plus_101", plus_101, concurrency::unlimited)
.input_family("a"_in("event"))
.output_products("c");
}
4 changes: 3 additions & 1 deletion test/benchmarks/plus_one.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ namespace {

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m)
{
m.transform("plus_one", plus_one, concurrency::unlimited).input_family("a").output_products("b");
m.transform("plus_one", plus_one, concurrency::unlimited)
.input_family("a"_in("event"))
.output_products("b");
}
7 changes: 5 additions & 2 deletions test/benchmarks/read_id.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include "phlex/model/data_cell_index.hpp"
#include "phlex/module.hpp"

using namespace phlex::experimental;

namespace {
void read_id(phlex::experimental::data_cell_index const&) {}
void read_id(data_cell_index const&) {}
}

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m)
{
m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited).input_family("id");
m.observe("read_id", read_id, phlex::experimental::concurrency::unlimited)
.input_family("id"_in("event"));
}
3 changes: 2 additions & 1 deletion test/benchmarks/read_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace {

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config)
{
using namespace phlex::experimental;
m.observe("read_index", read_index, phlex::experimental::concurrency::unlimited)
.input_family(config.get<std::string>("consumes"));
.input_family(product_query{config.get<std::string>("consumes"), "event"});
}
3 changes: 2 additions & 1 deletion test/benchmarks/verify_difference.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config)
"verify_difference",
[expected = config.get<int>("expected", 100)](int i, int j) { assert(j - i == expected); },
concurrency::unlimited)
.input_family(config.get<std::string>("i", "b"), config.get<std::string>("j", "c"));
.input_family(product_query{config.get<std::string>("i", "b"), "event"},
product_query{config.get<std::string>("j", "c"), "event"});
}
3 changes: 2 additions & 1 deletion test/benchmarks/verify_even_fibonacci_numbers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ namespace test {

PHLEX_EXPERIMENTAL_REGISTER_ALGORITHMS(m, config)
{
using namespace phlex::experimental;
using namespace test;
m.make<even_fibonacci_numbers>(config.get<int>("max_number"))
.observe(
"only_even", &even_fibonacci_numbers::only_even, phlex::experimental::concurrency::unlimited)
.input_family(config.get<std::string>("consumes"));
.input_family(product_query{config.get<std::string>("consumes"), "event"});
}
3 changes: 1 addition & 2 deletions test/class_registration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ namespace {

TEST_CASE("Call non-framework functions", "[programming model]")
{
std::array const product_names{
product_query{"number"}, product_query{"temperature"}, product_query{"name"}};
std::array const product_names{"number"_in("job"), "temperature"_in("job"), "name"_in("job")};
std::array const oproduct_names{"onumber"s, "otemperature"s, "oname"s};

auto store = product_store::base();
Expand Down
Loading
Loading