Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions test/demo-giantdata/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,13 @@ add_library(
)
target_link_libraries(giantdata PRIVATE TBB::tbb)

add_executable(unfold_transform_fold unfold_transform_fold.cpp)
target_link_libraries(unfold_transform_fold PRIVATE giantdata phlex::core)
cet_test(
unfold_transform_fold
SOURCE
unfold_transform_fold.cpp
LIBRARIES
giantdata
phlex::core
layer_generator
USE_CATCH2_MAIN
)
220 changes: 98 additions & 122 deletions test/demo-giantdata/unfold_transform_fold.cpp
Original file line number Diff line number Diff line change
@@ -1,137 +1,113 @@
#include "phlex/core/framework_graph.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/async_driver.hpp"
#include "test/products_for_output.hpp"
#include "plugins/layer_generator.hpp"

#include "test/demo-giantdata/user_algorithms.hpp"
#include "test/demo-giantdata/waveform_generator.hpp"
#include "test/demo-giantdata/waveform_generator_input.hpp"

#include <algorithm>
#include <ranges>
#include <string>
#include <vector>
#include "catch2/catch_test_macros.hpp"

#include <atomic>

using namespace phlex::experimental;

// Call the program as follows:
// ./unfold_transform_fold [number of spills [APAs per spill]]
int main(int argc, char* argv[])
{
namespace {
// Tracks pipeline execution to verify that fold operations begin before all unfold
// operations complete (pipelined execution rather than batched execution).
struct ExecutionTracker {
// Total number of unfold operations completed
std::atomic<std::size_t> unfold_completed{0};

// Total number of fold operations started
std::atomic<std::size_t> fold_started{0};

// Maximum number of unfold operations completed when first fold started
std::atomic<std::size_t> unfold_completed_at_first_fold{0};

std::vector<std::string> const args(argv + 1, argv + argc);
std::size_t const n_runs = [&args]() {
if (args.size() > 1) {
return std::stoul(args[0]);
}
return 1ul;
}();

std::size_t const n_subruns = [&args]() {
if (args.size() > 2) {
return std::stoul(args[1]);
}
return 1ul;
}();

std::size_t const n_spills = [&args]() {
if (args.size() > 2) {
return std::stoul(args[1]);
}
return 1ul;
}();

int const apas_per_spill = [&args]() {
if (args.size() > 3) {
return std::stoi(args[2]);
}
return 150;
}();

std::size_t const wires_per_spill = apas_per_spill * 256ull;

// Create some data layers of the data-layer hierarchy.
// We may or may not want to create pre-generated data layers.
// Each data layer gets an index number in the hierarchy.

auto source = [n_runs, n_subruns, n_spills](framework_driver& driver) {
auto job_index = data_cell_index::base_ptr();
driver.yield(job_index);

// job -> run -> subrun -> spill data layers
for (unsigned runno : std::views::iota(0u, n_runs)) {
auto run_index = job_index->make_child(runno, "run");
driver.yield(run_index);

for (unsigned subrunno : std::views::iota(0u, n_subruns)) {
auto subrun_index = run_index->make_child(subrunno, "subrun");
driver.yield(subrun_index);

for (unsigned spillno : std::views::iota(0u, n_spills)) {

auto spill_index = subrun_index->make_child(spillno, "spill");
driver.yield(spill_index);
}
}
}
// Expected total number of operations
std::size_t total_expected{0};
};
}

TEST_CASE("Unfold-transform-fold pipeline", "[concurrency][unfold][fold]")
{
// Test parameters - moderate scale to ensure sustained concurrent execution
constexpr std::size_t n_runs = 1;
constexpr std::size_t n_subruns = 1;
constexpr std::size_t n_spills = 20;
constexpr int apas_per_spill = 20;
constexpr std::size_t wires_per_spill = apas_per_spill * 256ull;
constexpr std::size_t chunksize = 256;

ExecutionTracker tracker;
tracker.total_expected = n_spills * apas_per_spill;

// Create data layers using layer generator
layer_generator gen;
gen.add_layer("run", {"job", n_runs});
gen.add_layer("subrun", {"run", n_subruns});
gen.add_layer("spill", {"subrun", n_spills});

framework_graph g{driver_for_test(gen)};

g.provide("provide_wgen",
[](data_cell_index const& spill_index) {
return demo::WGI(wires_per_spill,
spill_index.parent()->parent()->number(),
spill_index.parent()->number(),
spill_index.number());
})
.output_product("wgen"_in("spill"));

g.unfold<demo::WaveformGenerator>(
"WaveformGenerator",
&demo::WaveformGenerator::predicate,
[&tracker](demo::WaveformGenerator const& wg, std::size_t running_value) {
auto result = wg.op(running_value, chunksize);
tracker.unfold_completed.fetch_add(1, std::memory_order_relaxed);
return result;
},
concurrency::unlimited,
"APA")
.input_family("wgen"_in("spill"))
.output_products("waves_in_apa");

// Add the transform node to the graph
auto wrapped_user_function = [](handle<demo::Waveforms> hwf) {
return demo::clampWaveforms(*hwf);
};

// Create the graph. The source tells us what data we will process.
// We introduce a new scope to make sure the graph is destroyed before we
// write out the logged records.
{
framework_graph g{source};

// Add the unfold node to the graph. We do not yet know how to provide the chunksize
// to the constructor of the WaveformGenerator, so we will use the default value.
auto const chunksize = 256LL; // this could be read from a configuration file

g.provide("provide_wgen",
[wires_per_spill](data_cell_index const& spill_index) {
return demo::WGI(wires_per_spill,
spill_index.parent()->parent()->number(), // ugh
spill_index.parent()->number(),
spill_index.number());
})
.output_product("wgen"_in("spill"));

g.unfold<demo::WaveformGenerator>(
"WaveformGenerator",
&demo::WaveformGenerator::predicate,
[](demo::WaveformGenerator const& wg, std::size_t running_value) {
return wg.op(running_value, chunksize);
},
concurrency::unlimited,
"APA")
.input_family("wgen"_in("spill"))
.output_products("waves_in_apa");

// Add the transform node to the graph.
auto wrapped_user_function = [](phlex::experimental::handle<demo::Waveforms> hwf) {
return demo::clampWaveforms(*hwf);
};

g.transform("clamp_node", wrapped_user_function, concurrency::unlimited)
.input_family("waves_in_apa"_in("APA"))
.output_products("clamped_waves");

// Add the fold node to the graph.
g.fold(
"accum_for_spill",
[](demo::SummedClampedWaveforms& scw, phlex::experimental::handle<demo::Waveforms> hwf) {
demo::accumulateSCW(scw, *hwf);
},
concurrency::unlimited,
"spill" // partition the output by the spill
)
.input_family("clamped_waves"_in("APA"))
.output_products("summed_waveforms");

g.make<test::products_for_output>().output(
"save", &test::products_for_output::save, concurrency::serial);

// Execute the graph.
g.execute();
}
g.transform("clamp_node", wrapped_user_function, concurrency::unlimited)
.input_family("waves_in_apa"_in("APA"))
.output_products("clamped_waves");

// Add the fold node with instrumentation to detect pipelined execution
g.fold(
"accum_for_spill",
[&tracker](demo::SummedClampedWaveforms& scw, handle<demo::Waveforms> hwf) {
// Record how many unfolds had completed when the first fold started
std::size_t expected = 0;
tracker.unfold_completed_at_first_fold.compare_exchange_strong(
expected, tracker.unfold_completed.load(std::memory_order_relaxed));

tracker.fold_started.fetch_add(1, std::memory_order_relaxed);
demo::accumulateSCW(scw, *hwf);
},
concurrency::unlimited,
"spill")
.input_family("clamped_waves"_in("APA"))
.output_products("summed_waveforms");

// Execute the graph
g.execute();

// Verify pipelined execution: first fold started before all unfolds completed
auto const unfolds_at_first_fold = tracker.unfold_completed_at_first_fold.load();
CHECK(unfolds_at_first_fold < tracker.total_expected);

// Verify all operations completed
CHECK(tracker.unfold_completed == tracker.total_expected);
CHECK(tracker.fold_started == tracker.total_expected);
}
Loading