Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ namespace phlex::experimental {
}
}

std::size_t framework_graph::execution_counts(std::string const& node_name) const
std::size_t framework_graph::seen_cell_count(std::string const& layer_name,
bool const missing_ok) const
{
return nodes_.execution_counts(node_name);
return hierarchy_.count_for(layer_name, missing_ok);
}

std::size_t framework_graph::product_counts(std::string const& node_name) const
std::size_t framework_graph::execution_count(std::string const& node_name) const
{
return nodes_.product_counts(node_name);
return nodes_.execution_count(node_name);
}

void framework_graph::execute()
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/framework_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ namespace phlex::experimental {

void execute();

std::size_t execution_counts(std::string const& node_name) const;
std::size_t product_counts(std::string const& node_name) const;
std::size_t seen_cell_count(std::string const& layer_name, bool missing_ok = false) const;
std::size_t execution_count(std::string const& node_name) const;

module_graph_proxy<void_tag> module_proxy(configuration const& config)
{
Expand Down
18 changes: 1 addition & 17 deletions phlex/core/node_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using namespace std::string_literals;

namespace phlex::experimental {
std::size_t node_catalog::execution_counts(std::string const& node_name) const
std::size_t node_catalog::execution_count(std::string const& node_name) const
{
// FIXME: Yuck!
if (auto node = predicates.get(node_name)) {
Expand All @@ -28,20 +28,4 @@ namespace phlex::experimental {
}
throw std::runtime_error("Unknown node type with name: "s + node_name);
}

std::size_t node_catalog::product_counts(std::string const& node_name) const
{
// FIXME: Yuck!
if (auto node = folds.get(node_name)) {
return node->product_count();
}
if (auto node = unfolds.get(node_name)) {
return node->product_count();
}
if (auto node = transforms.get(node_name)) {
return node->product_count();
}
return -1u;
}

}
3 changes: 1 addition & 2 deletions phlex/core/node_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ namespace phlex::experimental {
return registrar{boost::pfr::get<simple_ptr_map<Ptr>>(*this), errors};
}

std::size_t execution_counts(std::string const& node_name) const;
std::size_t product_counts(std::string const& node_name) const;
std::size_t execution_count(std::string const& node_name) const;

simple_ptr_map<declared_predicate_ptr> predicates{};
simple_ptr_map<declared_observer_ptr> observers{};
Expand Down
44 changes: 34 additions & 10 deletions phlex/model/data_layer_hierarchy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,42 @@ namespace phlex::experimental {
// guard against overwriting the value of "count", we use the returned iterator "it",
// which will either refer to the new node in the map, or to the already-emplaced
// node. We then increment the count.
auto [it, _] = layers_.emplace(id->layer_hash(),
std::make_shared<layer_entry>(id->layer_name(), parent_hash));
auto [it, _] = layers_.emplace(
id->layer_hash(),
std::make_shared<layer_entry>(id->layer_name(), id->layer_path(), parent_hash));
++it->second->count;
}

std::size_t data_layer_hierarchy::count_for(std::string const& layer_name) const
std::size_t data_layer_hierarchy::count_for(std::string const& layer, bool const missing_ok) const
{
auto it = find_if(begin(layers_), end(layers_), [&layer_name](auto const& layer) {
return layer.second->name == layer_name;
});
return it != cend(layers_) ? it->second->count.load() : 0;
// The assumption is that specified layer is the component of a layer path
std::string search_token = layer;
if (not layer.starts_with("/")) {
search_token = '/' + layer;
}

std::vector<layer_entry const*> candidates;
for (auto const& [_, entry] : layers_) {
if (entry->layer_path.ends_with(search_token)) {
candidates.push_back(entry.get());
}
}

if (candidates.empty()) {
return missing_ok ? 0ull
: throw std::runtime_error("No layers match the specification " + layer);
}

if (candidates.size() > 1ull) {
std::string msg{"The following data layers match the specification " + layer + ":\n"};
for (auto const* entry : candidates) {
msg += "\n- " + entry->layer_path;
}
msg += "\n\nPlease specify the full layer path to disambiguate between them.";
throw std::runtime_error(msg);
}

return candidates[0]->count.load();
}

void data_layer_hierarchy::print() const { spdlog::info("{}", graph_layout()); }
Expand Down Expand Up @@ -84,8 +109,7 @@ namespace phlex::experimental {
}

auto const initial_indent = " ";
return fmt::format("\nProcessed layers:\n\n{}job{}\n",
initial_indent,
pretty_recurse(tree, "job", initial_indent));
return fmt::format(
"\n\nSeen layers:\n\n{}job{}\n", initial_indent, pretty_recurse(tree, "job", initial_indent));
}
}
6 changes: 4 additions & 2 deletions phlex/model/data_layer_hierarchy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace phlex::experimental {
public:
~data_layer_hierarchy();
void increment_count(data_cell_index_ptr const& id);
std::size_t count_for(std::string const& layer_name) const;
std::size_t count_for(std::string const& layer, bool missing_ok = false) const;

void print() const;

Expand All @@ -31,11 +31,13 @@ namespace phlex::experimental {
std::string indent = {}) const;

struct layer_entry {
layer_entry(std::string n, std::size_t par_hash) : name{std::move(n)}, parent_hash{par_hash}
layer_entry(std::string n, std::string path, std::size_t par_hash) :
name{std::move(n)}, layer_path{std::move(path)}, parent_hash{par_hash}
{
}

std::string name;
std::string layer_path;
std::size_t parent_hash;
std::atomic<std::size_t> count{};
};
Expand Down
2 changes: 1 addition & 1 deletion plugins/layer_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace phlex::experimental {
emitted_cells_["/job"] = 0ull;
}

std::size_t layer_generator::emitted_cells(std::string layer_path) const
std::size_t layer_generator::emitted_cell_count(std::string layer_path) const
{
// Check if the count of all emitted cells is requested
if (layer_path.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/layer_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace phlex::experimental {

void operator()(framework_driver& driver) { execute(driver, data_cell_index::base_ptr()); }

std::size_t emitted_cells(std::string layer_path = {}) const;
std::size_t emitted_cell_count(std::string layer_path = {}) const;

private:
void execute(framework_driver& driver, data_cell_index_ptr index, bool recurse = true);
Expand Down
12 changes: 6 additions & 6 deletions test/allowed_families.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ TEST_CASE("Testing families", "[data model]")
.input_family("id"_in("run"), "id"_in("subrun"), "id"_in("event"));
g.execute();

CHECK(g.execution_counts("se") == 1ull);
CHECK(g.execution_counts("rs") == 1ull);
CHECK(g.execution_counts("rse") == 1ull);
CHECK(g.execution_count("se") == 1ull);
CHECK(g.execution_count("rs") == 1ull);
CHECK(g.execution_count("rse") == 1ull);

// FIXME: Need to improve the synchronization to supply strict equality
CHECK(g.execution_counts("run_id_provider") >= 1ull);
CHECK(g.execution_counts("subrun_id_provider") >= 1ull);
CHECK(g.execution_counts("event_id_provider") == 1ull);
CHECK(g.execution_count("run_id_provider") >= 1ull);
CHECK(g.execution_count("subrun_id_provider") >= 1ull);
CHECK(g.execution_count("event_id_provider") == 1ull);
}
12 changes: 6 additions & 6 deletions test/cached_execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ TEST_CASE("Cached function calls", "[data model]")
g.execute();

// FIXME: Need to improve the synchronization to supply strict equality
CHECK(g.execution_counts("A1") >= n_runs);
CHECK(g.execution_counts("A2") >= n_runs);
CHECK(g.execution_counts("A3") >= n_runs);
CHECK(g.execution_count("A1") >= n_runs);
CHECK(g.execution_count("A2") >= n_runs);
CHECK(g.execution_count("A3") >= n_runs);

CHECK(g.execution_counts("B1") >= n_runs * n_subruns);
CHECK(g.execution_counts("B2") >= n_runs * n_subruns);
CHECK(g.execution_count("B1") >= n_runs * n_subruns);
CHECK(g.execution_count("B2") >= n_runs * n_subruns);

CHECK(g.execution_counts("C") == n_runs * n_subruns * n_events);
CHECK(g.execution_count("C") == n_runs * n_subruns * n_events);
}
37 changes: 32 additions & 5 deletions test/data_cell_counting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "catch2/catch_test_macros.hpp"

using namespace phlex::experimental;
using phlex::data_cell_index;

namespace {
auto const job_hash_value = hash("job");
Expand All @@ -27,6 +28,33 @@ TEST_CASE("Counter one layer deep", "[data model]")
CHECK(job_counter.result().count_for(event_hash_value) == 10);
}

TEST_CASE("Data layer hierarchy with ambiguous layer names", "[data model]")
{
data_layer_hierarchy h;
CHECK_THROWS(h.count_for("/job"));
CHECK(h.count_for("/job", true) == 0);

auto job_index = data_cell_index::base_ptr();
h.increment_count(job_index);
CHECK(h.count_for("/job") == 1);

auto spill_index = job_index->make_child(0, "spill");
h.increment_count(spill_index);

auto run_index = job_index->make_child(0, "run");
h.increment_count(run_index);
CHECK(h.count_for("/job/run") == 1);
CHECK(h.count_for("run") == 1);

// Nested spill indices
h.increment_count(run_index->make_child(0, "spill"));
h.increment_count(run_index->make_child(1, "spill"));

CHECK_THROWS(h.count_for("spill"));
CHECK(h.count_for("/job/spill") == 1);
CHECK(h.count_for("/job/run/spill") == 2);
}

TEST_CASE("Counter multiple layers deep", "[data model]")
{
constexpr std::size_t nruns{2ull};
Expand All @@ -43,17 +71,16 @@ TEST_CASE("Counter multiple layers deep", "[data model]")

// Notice the wholesale capture by reference--generally a lazy way of doing things.
auto check_all_processed = [&] {
CHECK(h.count_for("job") == processed_jobs);
CHECK(h.count_for("run") == processed_runs);
CHECK(h.count_for("subrun") == processed_subruns);
CHECK(h.count_for("event") == processed_events);
CHECK(h.count_for("job", true) == processed_jobs);
CHECK(h.count_for("run", true) == processed_runs);
CHECK(h.count_for("subrun", true) == processed_subruns);
CHECK(h.count_for("event", true) == processed_events);
};

auto const run_hash_value = hash(job_hash_value, "run");
auto const subrun_hash_value = hash(run_hash_value, "subrun");
auto const event_hash_value = hash(subrun_hash_value, "event");

using phlex::data_cell_index;
auto job_index = data_cell_index::base_ptr();
counters.update(job_index);
for (std::size_t i = 0; i != nruns; ++i) {
Expand Down
8 changes: 4 additions & 4 deletions test/different_hierarchies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ TEST_CASE("Different hierarchies used with fold", "[graph]")

g.execute();

CHECK(g.execution_counts("run_add") == index_limit * number_limit);
CHECK(g.execution_counts("job_add") == index_limit * number_limit + top_level_event_limit);
CHECK(g.execution_counts("verify_run_sum") == index_limit);
CHECK(g.execution_counts("verify_job_sum") == 1);
CHECK(g.execution_count("run_add") == index_limit * number_limit);
CHECK(g.execution_count("job_add") == index_limit * number_limit + top_level_event_limit);
CHECK(g.execution_count("verify_run_sum") == index_limit);
CHECK(g.execution_count("verify_job_sum") == 1);
}
14 changes: 7 additions & 7 deletions test/filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ TEST_CASE("Two predicates", "[filtering]")

g.execute();

CHECK(g.execution_counts("add_evens") == 5);
CHECK(g.execution_counts("add_odds") == 5);
CHECK(g.execution_count("add_evens") == 5);
CHECK(g.execution_count("add_odds") == 5);
}

TEST_CASE("Two predicates in series", "[filtering]")
Expand All @@ -122,7 +122,7 @@ TEST_CASE("Two predicates in series", "[filtering]")

g.execute();

CHECK(g.execution_counts("add") == 0);
CHECK(g.execution_count("add") == 0);
}

TEST_CASE("Two predicates in parallel", "[filtering]")
Expand All @@ -138,7 +138,7 @@ TEST_CASE("Two predicates in parallel", "[filtering]")

g.execute();

CHECK(g.execution_counts("add") == 0);
CHECK(g.execution_count("add") == 0);
}

TEST_CASE("Three predicates in parallel", "[filtering]")
Expand Down Expand Up @@ -170,7 +170,7 @@ TEST_CASE("Three predicates in parallel", "[filtering]")

g.execute();

CHECK(g.execution_counts("collect") == 3);
CHECK(g.execution_count("collect") == 3);
}

TEST_CASE("Two predicates in parallel (each with multiple arguments)", "[filtering]")
Expand All @@ -193,6 +193,6 @@ TEST_CASE("Two predicates in parallel (each with multiple arguments)", "[filteri

g.execute();

CHECK(g.execution_counts("check_odds") == 5);
CHECK(g.execution_counts("check_evens") == 5);
CHECK(g.execution_count("check_odds") == 5);
CHECK(g.execution_count("check_evens") == 5);
}
12 changes: 6 additions & 6 deletions test/fold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ TEST_CASE("Different data layers of fold", "[graph]")

g.execute();

CHECK(g.execution_counts("run_add") == index_limit * number_limit);
CHECK(g.execution_counts("job_add") == index_limit * number_limit);
CHECK(g.execution_counts("two_layer_job_add") == index_limit);
CHECK(g.execution_counts("verify_run_sum") == index_limit);
CHECK(g.execution_counts("verify_two_layer_job_sum") == 1);
CHECK(g.execution_counts("verify_job_sum") == 1);
CHECK(g.execution_count("run_add") == index_limit * number_limit);
CHECK(g.execution_count("job_add") == index_limit * number_limit);
CHECK(g.execution_count("two_layer_job_add") == index_limit);
CHECK(g.execution_count("verify_run_sum") == index_limit);
CHECK(g.execution_count("verify_two_layer_job_sum") == 1);
CHECK(g.execution_count("verify_job_sum") == 1);
}
30 changes: 12 additions & 18 deletions test/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ TEST_CASE("Make progress with one thread", "[graph]")
.input_family("number"_in("spill"));
g.execute();

CHECK(gen.emitted_cells("/job/spill") == 1000);
CHECK(g.execution_counts("provide_number") == 1000);
CHECK(g.execution_counts("observe_number") == 1000);
CHECK(gen.emitted_cell_count("/job/spill") == 1000);
CHECK(g.execution_count("provide_number") == 1000);
CHECK(g.execution_count("observe_number") == 1000);
}

TEST_CASE("Stop driver when workflow throws exception", "[graph]")
Expand All @@ -63,24 +63,18 @@ TEST_CASE("Stop driver when workflow throws exception", "[graph]")

CHECK_THROWS(g.execute());

// There are N + 1 potential existing threads for a framework job, where N corresponds
// to the number configured by the user, and 1 corresponds to the separate std::jthread
// created by the async_driver. Each "pull" from the async_driver happens in a
// serialized way. However, once an index has been pulled from the async_driver by the
// flow graph, that index is sent to downstream nodes for further processing.
// The framework will see one fewer data cells than were emitted by the generator (for
// the data layer in which the exception was thrown).
//
// The first node that processes that index is a provider that immediately throws an
// exception. This places the framework graph in an error state, where the async_driver
// is short-circuited from doing further processing.
//
// We make the assumption that one of those threads will trigger the exception and the
// remaining threads must be permitted to complete.
CHECK(gen.emitted_cells("/job/spill") <=
static_cast<std::size_t>(experimental::max_allowed_parallelism::active_value() + 1));
// With the current implementation, it is possible that framework graph will not see the
// "/job/spill" data layer before the job ends. In that case, the "/job/spill" layer
// will not have been recorded, and we therefore allow it to be "missing", which is what
// the 'true' argument allows for.
CHECK(gen.emitted_cell_count("/job/spill") == g.seen_cell_count("/job/spill", true) + 1u);

// A node has not "executed" until it has returned successfully. For that reason,
// neither the "throw_exception" provider nor the "downstream_of_exception" observer
// will have executed.
CHECK(g.execution_counts("throw_exception") == 0ull);
CHECK(g.execution_counts("downstream_of_exception") == 0ull);
CHECK(g.execution_count("throw_exception") == 0ull);
CHECK(g.execution_count("downstream_of_exception") == 0ull);
}
Loading
Loading