diff --git a/phlex/CMakeLists.txt b/phlex/CMakeLists.txt index 542636260..f208a812a 100644 --- a/phlex/CMakeLists.txt +++ b/phlex/CMakeLists.txt @@ -2,7 +2,6 @@ cet_register_export_set(SET_NAME phlex NAMESPACE phlex SET_DEFAULT) add_subdirectory(metaprogramming) add_subdirectory(utilities) -add_subdirectory(graph) add_subdirectory(model) add_subdirectory(core) diff --git a/phlex/core/CMakeLists.txt b/phlex/core/CMakeLists.txt index 60c307cc5..c59d0ccd1 100644 --- a/phlex/core/CMakeLists.txt +++ b/phlex/core/CMakeLists.txt @@ -37,7 +37,6 @@ cet_make_library( phlex::model phlex::utilities PRIVATE - phlex::graph Boost::json spdlog::spdlog ) @@ -101,7 +100,6 @@ cet_make_library( LIBRARIES phlex_core phlex::metaprogramming - phlex::graph phlex::model phlex::utilities Boost::json diff --git a/phlex/graph/CMakeLists.txt b/phlex/graph/CMakeLists.txt deleted file mode 100644 index 3b668d173..000000000 --- a/phlex/graph/CMakeLists.txt +++ /dev/null @@ -1,30 +0,0 @@ -cet_make_library( - LIBRARY_NAME - phlex_graph - SHARED - SOURCE - serializer_node.cpp - LIBRARIES - PRIVATE - Boost::boost - TBB::tbb - phlex::utilities - fmt::fmt -) - -install(FILES serial_node.hpp serializer_node.hpp DESTINATION include/phlex/graph) - -target_include_directories(phlex_graph PRIVATE ${PROJECT_SOURCE_DIR}) - -# Interface library -cet_make_library( - LIBRARY_NAME - phlex_graph_int - EXPORT_NAME - graph - INTERFACE - NO_SOURCE - LIBRARIES - phlex_graph - TBB::tbb -) diff --git a/phlex/graph/serial_node.hpp b/phlex/graph/serial_node.hpp deleted file mode 100644 index 11e93b7a6..000000000 --- a/phlex/graph/serial_node.hpp +++ /dev/null @@ -1,82 +0,0 @@ -#ifndef PHLEX_GRAPH_SERIAL_NODE_HPP -#define PHLEX_GRAPH_SERIAL_NODE_HPP - -#include "phlex/graph/serializer_node.hpp" -#include "phlex/utilities/sized_tuple.hpp" - -#include "oneapi/tbb/flow_graph.h" - -namespace phlex::experimental { - - template - using base = tbb::flow::composite_node, std::tuple>; - - namespace detail { - template - using join_tuple = concatenated_tuples, sized_tuple>; - } - - template - class serial_node : public base { - template - void make_edges(std::index_sequence, Serializers const& serializers) - { - (make_edge(std::get(serializers), input_port(join_)), ...); - } - - template - explicit serial_node(tbb::flow::graph& g, - std::size_t concurrency, - FT f, - Serializers serializers, - std::index_sequence iseq) : - base{g}, - buffered_msgs_{g}, - join_{g}, - serialized_function_{g, - concurrency, - [serialized_resources = std::move(serializers), function = std::move(f)]( - detail::join_tuple const& tup) mutable { - (void)serialized_resources; // To silence unused warning when N == 0 - auto input = std::get<0>(tup); - function(input); - (std::get(serialized_resources).try_put(1), ...); - return input; - }} - { - // Need way to route null messages around the join. - make_edge(buffered_msgs_, input_port<0>(join_)); - make_edges(iseq, serializers); - make_edge(join_, serialized_function_); - base::set_external_ports( - typename base::input_ports_type{buffered_msgs_}, - typename base::output_ports_type{serialized_function_}); - } - - public: - template - explicit serial_node(tbb::flow::graph& g, std::size_t concurrency, FT f) : - serial_node{g, concurrency, std::move(f), std::tuple{}, std::make_index_sequence<0>{}} - { - } - - template - explicit serial_node(tbb::flow::graph& g, - std::tuple const& serializers, - FT f) : - serial_node{g, - (sizeof...(Serializers) > 0 ? tbb::flow::serial : tbb::flow::unlimited), - std::move(f), - serializers, - std::make_index_sequence{}} - { - } - - private: - tbb::flow::buffer_node buffered_msgs_; - tbb::flow::join_node, tbb::flow::reserving> join_; - tbb::flow::function_node, Input> serialized_function_; - }; -} - -#endif // PHLEX_GRAPH_SERIAL_NODE_HPP diff --git a/phlex/graph/serializer_node.cpp b/phlex/graph/serializer_node.cpp deleted file mode 100644 index e30062b6f..000000000 --- a/phlex/graph/serializer_node.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include "phlex/graph/serializer_node.hpp" - -namespace phlex::experimental { - serializers::serializers(tbb::flow::graph& g) : graph_{g} {} - - serializer_node& serializers::get(std::string const& name) - { - return serializers_.try_emplace(name, serializer_node{graph_, name}).first->second; - } - - void serializers::activate() - { - for (auto& [name, serializer] : serializers_) { - serializer.activate(); - } - } -} diff --git a/phlex/graph/serializer_node.hpp b/phlex/graph/serializer_node.hpp deleted file mode 100644 index 5e5975d69..000000000 --- a/phlex/graph/serializer_node.hpp +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef PHLEX_GRAPH_SERIALIZER_NODE_HPP -#define PHLEX_GRAPH_SERIALIZER_NODE_HPP - -#include "phlex/utilities/sized_tuple.hpp" - -#include "oneapi/tbb/flow_graph.h" - -#include -#include -#include - -namespace phlex::experimental { - - using token_t = int; - using base_impl = tbb::flow::buffer_node; - class serializer_node : public base_impl { - public: - explicit serializer_node(tbb::flow::graph& g, std::string const& name) : - base_impl{g}, name_{name} - { - } - - void activate() - { - // The serializer must not be activated until it resides in its final resting spot. - // IOW, if a container of serializers grows, the locations of the serializers can - // move around, introducing memory errors if try_put(...) has been attempted in a - // different location than when it's used during the graph execution. - try_put(1); - } - - auto const& name() const { return name_; } - - private: - std::string name_; - }; - - class serializers { - public: - explicit serializers(tbb::flow::graph& g); - void activate(); - - auto get(auto... resources) -> sized_tuple - { - // FIXME: Need to make sure there are no duplicates! - return std::tie(get(std::string(resources))...); - } - - private: - serializer_node& get(std::string const& name); - tbb::flow::graph& graph_; - std::map serializers_; - }; -} - -#endif // PHLEX_GRAPH_SERIALIZER_NODE_HPP diff --git a/phlex/model/CMakeLists.txt b/phlex/model/CMakeLists.txt index 256bae49f..6b5eabf27 100644 --- a/phlex/model/CMakeLists.txt +++ b/phlex/model/CMakeLists.txt @@ -18,7 +18,6 @@ cet_make_library( spdlog::spdlog fmt::fmt PRIVATE - phlex::graph phlex::utilities TBB::tbb ) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 6b418e98b..c58f1ec19 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -198,17 +198,7 @@ cet_test( phlex::utilities spdlog::spdlog ) -cet_test( - serializer - USE_CATCH2_MAIN - SOURCE - serializer.cpp - LIBRARIES - phlex::core - TBB::tbb - phlex::graph - spdlog::spdlog -) + cet_test(product_query USE_CATCH2_MAIN SOURCE product_query.cpp LIBRARIES phlex::core ) @@ -256,6 +246,7 @@ add_subdirectory(utilities) add_subdirectory(mock-workflow) add_subdirectory(demo-giantdata) # add_subdirectory(python) +add_subdirectory(tbb-preview) if(PHLEX_USE_FORM) add_subdirectory(form) diff --git a/test/serializer.cpp b/test/serializer.cpp deleted file mode 100644 index ba2680fc4..000000000 --- a/test/serializer.cpp +++ /dev/null @@ -1,123 +0,0 @@ -#include "phlex/graph/serial_node.hpp" -#include "phlex/utilities/thread_counter.hpp" - -#include "catch2/catch_test_macros.hpp" -#include "fmt/std.h" -#include "oneapi/tbb/flow_graph.h" -#include "spdlog/spdlog.h" - -#include -#include - -using namespace phlex::experimental; -using namespace oneapi::tbb; -using namespace spdlog; - -TEST_CASE("Serialize functions based on resource", "[multithreading]") -{ - flow::graph g; - unsigned int i{}; - flow::input_node src{g, [&i](flow_control& fc) { - if (i < 10) { - return ++i; - } - fc.stop(); - return 0u; - }}; - - serializers serialized_resources{g}; - - std::atomic root_counter{}, genie_counter{}; - - serial_node node1{ - g, serialized_resources.get("ROOT"), [&root_counter](unsigned int const i) { - thread_counter c{root_counter}; - debug("Processing from node 1 {}", i); - return i; - }}; - - serial_node node2{g, - serialized_resources.get("ROOT", "GENIE"), - [&root_counter, &genie_counter](unsigned int const i) { - thread_counter c1{root_counter}; - thread_counter c2{genie_counter}; - debug("Processing from node 2 {}", i); - return i; - }}; - - serial_node node3{ - g, serialized_resources.get("GENIE"), [&genie_counter](unsigned int const i) { - thread_counter c{genie_counter}; - debug("Processing from node 3 {}", i); - return i; - }}; - - serial_node node4{ - g, tbb::flow::unlimited, [](unsigned int const i) { return i; }}; - - auto receiving_node_for = [](tbb::flow::graph& g, std::string const& label) { - return flow::function_node{ - g, flow::unlimited, [&label](unsigned int const i) { - debug("Processed {} task {}", label, i); - return i; - }}; - }; - - auto receiving_node_1 = receiving_node_for(g, "ROOT"); - auto receiving_node_2 = receiving_node_for(g, "ROOT/GENIE"); - auto receiving_node_3 = receiving_node_for(g, "GENIE"); - auto receiving_node_4 = receiving_node_for(g, "unlimited"); - - make_edge(src, node1); - make_edge(src, node2); - make_edge(src, node3); - make_edge(src, node4); - - make_edge(node1, receiving_node_1); - make_edge(node2, receiving_node_2); - make_edge(node3, receiving_node_3); - make_edge(node4, receiving_node_4); - - serialized_resources.activate(); - src.activate(); - g.wait_for_all(); -} - -TEST_CASE("Serialize functions in unfold/merge graph", "[multithreading]") -{ - flow::graph g; - flow::input_node src{g, [i = 0u](flow_control& fc) mutable { - if (i < 10u) { - return ++i; - } - fc.stop(); - return 0u; - }}; - - serializers serialized_resources{g}; - - std::atomic root_counter{}; - - auto root_resource = serialized_resources.get("ROOT"); - auto serial_node_for = [&root_resource, &root_counter](auto& g, int label) { - return serial_node{ - g, root_resource, [&root_counter, label](unsigned int const i) { - thread_counter c{root_counter}; - debug("Processing from node {} {}", label, i); - return i; - }}; - }; - - auto node1 = serial_node_for(g, 1); - auto node2 = serial_node_for(g, 2); - auto node3 = serial_node_for(g, 3); - - make_edge(src, node1); - make_edge(src, node2); - make_edge(node1, node3); - make_edge(node2, node3); - - serialized_resources.activate(); - src.activate(); - g.wait_for_all(); -} diff --git a/test/tbb-preview/CMakeLists.txt b/test/tbb-preview/CMakeLists.txt new file mode 100644 index 000000000..183100238 --- /dev/null +++ b/test/tbb-preview/CMakeLists.txt @@ -0,0 +1,54 @@ +include(CheckCXXSourceCompiles) + +# Check if flow::resource_provider is available in TBB +set(CMAKE_REQUIRED_LIBRARIES_SAVE ${CMAKE_REQUIRED_LIBRARIES}) +set(CMAKE_REQUIRED_INCLUDES_SAVE ${CMAKE_REQUIRED_INCLUDES}) +set(CMAKE_REQUIRED_FLAGS_SAVE "${CMAKE_REQUIRED_FLAGS}") + +get_target_property(_TBB_INCLUDES TBB::tbb INTERFACE_INCLUDE_DIRECTORIES) +get_target_property(_TBB_LOCATION TBB::tbb LOCATION) +if(_TBB_INCLUDES) + set(CMAKE_REQUIRED_INCLUDES ${_TBB_INCLUDES}) +endif() +if(_TBB_LOCATION) + set(CMAKE_REQUIRED_LIBRARIES ${_TBB_LOCATION}) +endif() +set(CMAKE_REQUIRED_FLAGS "-std=c++${CMAKE_CXX_STANDARD}") + +check_cxx_source_compiles( + " + #define TBB_PREVIEW_FLOW_GRAPH_RESOURCE_LIMITED_NODE 1 + #include + + using type = oneapi::tbb::flow::resource_provider; + + int main() {} +" + HAVE_TBB_RESOURCE_PROVIDER +) + +set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES_SAVE}) +set(CMAKE_REQUIRED_INCLUDES ${CMAKE_REQUIRED_INCLUDES_SAVE}) +set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS_SAVE}") + +if(HAVE_TBB_RESOURCE_PROVIDER) + cet_test( + resource_limiting + USE_CATCH2_MAIN + SOURCE + resource_limiting_test.cpp + LIBRARIES + phlex::core + TBB::tbb + spdlog::spdlog + ) + target_compile_definitions( + resource_limiting + PRIVATE TBB_PREVIEW_FLOW_GRAPH_RESOURCE_LIMITED_NODE=1 + ) +else() + message( + STATUS + "Skipping resource_limiting test: flow::resource_provider not available in TBB" + ) +endif() diff --git a/test/tbb-preview/resource_limiting_test.cpp b/test/tbb-preview/resource_limiting_test.cpp new file mode 100644 index 000000000..5654f360f --- /dev/null +++ b/test/tbb-preview/resource_limiting_test.cpp @@ -0,0 +1,269 @@ +#include "phlex/utilities/sleep_for.hpp" +#include "phlex/utilities/thread_counter.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/sinks/basic_file_sink.h" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include + +using namespace phlex::experimental; +using namespace oneapi::tbb; + +namespace { + + // ROOT is the representation of the ROOT resource. + struct ROOT {}; + + // GENIE is the representation of the GENIE resource. + struct GENIE {}; + + // DB is the representation of the DB resource. + // The value id is the database ID. + struct DB { + unsigned int id; + }; + + void start(std::string_view algorithm, unsigned int spill, unsigned int data = 0u) + { + spdlog::info("Start\t{}\t{}\t{}", algorithm, spill, data); + } + + void stop(std::string_view algorithm, unsigned int spill, unsigned int data = 0u) + { + spdlog::info("Stop\t{}\t{}\t{}", algorithm, spill, data); + } + + void setup_file_logger(std::string const& test_name) + { + auto logger = spdlog::basic_logger_mt(test_name, test_name + ".txt", true); + spdlog::set_default_logger(logger); + } + +} // namespace + +TEST_CASE("Serialize functions based on resource", "[multithreading]") +{ + setup_file_logger("serialize_functions_based_on_resource"); + flow::graph g; + unsigned int i{}; + flow::input_node src{g, [&i](flow_control& fc) { + if (i < 10) { + return ++i; + } + fc.stop(); + return 0u; + }}; + + std::atomic root_counter{}; + std::atomic genie_counter{}; + + ROOT const root_resource{}; + + flow::resource_provider root_resource_provider{&root_resource}; + flow::resource_provider genie_resource{GENIE{}}; + + flow::resource_limited_node> node1{ + g, + flow::unlimited, + std::tie(root_resource_provider), + [&root_counter](unsigned int const i, auto& outputs, ROOT const*) { + thread_counter c{root_counter}; + spdlog::info("Processing from node 1 {} with root token", i); + std::get<0>(outputs).try_put(i); + }}; + + flow::resource_limited_node> node2{ + g, + flow::unlimited, + std::tie(root_resource_provider, genie_resource), + [&root_counter, &genie_counter](unsigned int const i, auto& outputs, ROOT const*, GENIE) { + thread_counter c1{root_counter}; + thread_counter c2{genie_counter}; + spdlog::info("Processing from node 2 {}", i); + std::get<0>(outputs).try_put(i); + }}; + + flow::resource_limited_node> node3{ + g, + flow::unlimited, + std::tie(genie_resource), + [&genie_counter](unsigned int const i, auto& outputs, GENIE) { + thread_counter c{genie_counter}; + spdlog::info("Processing from node 3 {}", i); + std::get<0>(outputs).try_put(i); + }}; + + auto receiving_node_for = [](flow::graph& g, std::string const& label) { + return flow::function_node{ + g, flow::unlimited, [label](unsigned int const i) { + spdlog::info("Processed {} task {}", label, i); + return i; + }}; + }; + + auto receiving_node_1 = receiving_node_for(g, "ROOT"); + auto receiving_node_2 = receiving_node_for(g, "ROOT/GENIE"); + auto receiving_node_3 = receiving_node_for(g, "GENIE"); + + make_edge(src, node1); + make_edge(src, node2); + make_edge(src, node3); + + make_edge(node1, receiving_node_1); + make_edge(node2, receiving_node_2); + make_edge(node3, receiving_node_3); + + src.activate(); + g.wait_for_all(); +} + +TEST_CASE("Serialize functions in diamond graph", "[multithreading]") +{ + setup_file_logger("serialize_functions_in_diamond_graph"); + flow::graph g; + flow::input_node src{g, [i = 0u](flow_control& fc) mutable -> unsigned int { + if (i < 10u) { + return ++i; + } + fc.stop(); + return 0u; + }}; + + flow::resource_provider root_resource{ROOT{}}; + + std::atomic root_counter{}; + + auto serial_node_for = [&root_resource, &root_counter](auto& g, int label) { + return flow::resource_limited_node>{ + g, + flow::unlimited, + std::tie(root_resource), + [&root_counter, label](unsigned int const i, auto& outputs, ROOT) { + thread_counter c{root_counter}; + spdlog::info("Processing from node {} {}", label, i); + std::get<0>(outputs).try_put(i); + }}; + }; + + auto node1 = serial_node_for(g, 1); + auto node2 = serial_node_for(g, 2); + auto node3 = serial_node_for(g, 3); + + make_edge(src, node1); + make_edge(src, node2); + make_edge(node1, node3); + make_edge(node2, node3); + + src.activate(); + g.wait_for_all(); +} + +TEST_CASE("Test based on oneTBB PR 1677 (RFC)", "[multithreading]") +{ + using namespace std::chrono_literals; + + setup_file_logger("test_based_on_onetbb_pr_1677"); + + // We first print a message that describes the different fields in our log messages + spdlog::set_pattern("%v"); + spdlog::info("time\tthread\tevent\tnode\tmessage\tdata"); + + // Now set the actual pattern for remaining logged messages + spdlog::set_pattern("%H:%M:%S.%f\t%t\t%v"); + flow::graph g; + unsigned int i{}; + flow::input_node src{g, [&i](flow_control& fc) { + if (i < 50) { + start("Source", i + 1); // The message is the spill id we will emit + auto j = ++i; + stop("Source", i); + return j; + } + fc.stop(); + return 0u; + }}; + + // Declare the counters that we use to verify that the resource constraints are being met. + std::atomic root_counter{}; + std::atomic genie_counter{}; + std::atomic db_counter{}; + + flow::resource_provider root_limiter{ROOT{}}; + flow::resource_provider genie_limiter{GENIE{}}; + + DB const db1{1}; + DB const db13{13}; + flow::resource_provider db_limiter{&db1, &db13}; + + auto fill_histo = [&root_counter](unsigned int const i, auto& outputs, ROOT) { + thread_counter c{root_counter}; + start("Histogramming", i); + spin_for(10ms); + stop("Histogramming", i); + std::get<0>(outputs).try_put(i); + }; + + auto gen_fill_histo = [&root_counter, + &genie_counter](unsigned int const i, auto& outputs, ROOT, GENIE) { + thread_counter c1{root_counter}; + thread_counter c2{genie_counter}; + start("Histo-generating", i); + spin_for(10ms); + stop("Histo-generating", i); + std::get<0>(outputs).try_put(i); + }; + + auto generate = [&genie_counter](unsigned int const i, auto& outputs, GENIE) { + thread_counter c{genie_counter}; + start("Generating", i); + spin_for(10ms); + stop("Generating", i); + std::get<0>(outputs).try_put(i); + }; + + auto propagate = [](unsigned int const i) -> unsigned int { + start("Propagating", i); + spin_for(150ms); + stop("Propagating", i); + return i; + }; + + using rl_node = flow::resource_limited_node>; + + rl_node histogrammer{g, flow::unlimited, std::tie(root_limiter), fill_histo}; + rl_node histo_generator{ + g, flow::unlimited, std::tie(root_limiter, genie_limiter), gen_fill_histo}; + rl_node generator{g, flow::unlimited, std::tie(genie_limiter), generate}; + flow::function_node propagator{g, flow::unlimited, propagate}; + + // Nodes that use the DB resource limited to 2 tokens + auto make_calibrator = [&db_counter](std::string_view algorithm) { + return [&db_counter, algorithm](unsigned int const i, auto& outputs, DB const* db) { + thread_counter c{db_counter, 2}; + start(algorithm, i, db->id); + spin_for(10ms); + stop(algorithm, i, db->id); + std::get<0>(outputs).try_put(i); + }; + }; + + rl_node calibratorA{g, flow::unlimited, std::tie(db_limiter), make_calibrator("Calibration[A]")}; + rl_node calibratorB{g, flow::unlimited, std::tie(db_limiter), make_calibrator("Calibration[B]")}; + rl_node calibratorC{g, flow::serial, std::tie(db_limiter), make_calibrator("Calibration[C]")}; + + make_edge(src, histogrammer); + make_edge(src, histo_generator); + make_edge(src, generator); + make_edge(src, propagator); + make_edge(src, calibratorA); + make_edge(src, calibratorB); + make_edge(src, calibratorC); + + src.activate(); + g.wait_for_all(); +}