Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions phlex/app/load_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ namespace phlex::experimental {
creator(g.source_proxy(config), config);
}

detail::next_index_t load_driver(boost::json::object const& raw_config)
driver_bundle load_driver(boost::json::object const& raw_config)
{
configuration const config{raw_config};
auto const& spec = config.get<std::string>("cpp");
create_driver = plugin_loader<detail::driver_creator_t>(spec, "create_driver");
return create_driver(config);
driver_proxy const proxy{};
return create_driver(proxy, config);
}
}
4 changes: 1 addition & 3 deletions phlex/app/load_module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

#include "boost/json.hpp"

#include <functional>

namespace phlex::experimental {
namespace detail {
// Adjust_config adds the module_label as a parameter, and it checks if the 'py'
Expand All @@ -17,7 +15,7 @@ namespace phlex::experimental {

void load_module(framework_graph& g, std::string const& label, boost::json::object config);
void load_source(framework_graph& g, std::string const& label, boost::json::object config);
detail::next_index_t load_driver(boost::json::object const& config);
driver_bundle load_driver(boost::json::object const& config);
}

#endif // PHLEX_APP_LOAD_MODULE_HPP
26 changes: 15 additions & 11 deletions phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,29 @@
#include <iostream>

namespace phlex::experimental {
framework_graph::framework_graph(data_cell_index_ptr index, int const max_parallelism) :
framework_graph{[index](framework_driver& driver) { driver.yield(index); }, max_parallelism}
framework_graph::framework_graph(int const max_parallelism) :
framework_graph{[](framework_driver& driver) { driver.yield(data_cell_index::job()); },
max_parallelism}
{
}

framework_graph::framework_graph(detail::next_index_t next_index, int const max_parallelism) :
framework_graph{driver_bundle{std::move(next_index), {}}, max_parallelism}
{
}

framework_graph::framework_graph(driver_bundle bundle, int const max_parallelism) :
parallelism_limit_{static_cast<std::size_t>(max_parallelism)},
driver_{std::move(next_index)},
fixed_hierarchy_{std::move(bundle.hierarchy)},
driver_{std::move(bundle.driver)},
src_{graph_,
[this](tbb::flow_control& fc) mutable -> data_cell_index_ptr {
auto item = driver_();
if (not item) {
index_router_.drain();
fc.stop();
return {};
if (auto item = driver_()) {
return index_router_.route(*item);
}

return index_router_.route(*item);
index_router_.drain();
fc.stop();
return {};
}},
index_router_{graph_},
hierarchy_node_{graph_,
Expand Down Expand Up @@ -175,5 +180,4 @@ namespace phlex::experimental {
make_edge(node->output_index_port(), hierarchy_node_);
}
}

}
6 changes: 4 additions & 2 deletions phlex/core/framework_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ namespace phlex {
namespace phlex::experimental {
class framework_graph {
public:
explicit framework_graph(data_cell_index_ptr index,
explicit framework_graph(int max_parallelism = oneapi::tbb::info::default_concurrency());
explicit framework_graph(detail::next_index_t next_index,
int max_parallelism = oneapi::tbb::info::default_concurrency());
explicit framework_graph(detail::next_index_t f,
explicit framework_graph(driver_bundle bundle,
int max_parallelism = oneapi::tbb::info::default_concurrency());
~framework_graph();

Expand Down Expand Up @@ -151,6 +152,7 @@ namespace phlex::experimental {

resource_usage graph_resource_usage_{};
max_allowed_parallelism parallelism_limit_;
fixed_hierarchy fixed_hierarchy_;
data_layer_hierarchy hierarchy_{};
node_catalog nodes_{};
std::map<std::string, filter> filters_{};
Expand Down
19 changes: 19 additions & 0 deletions phlex/detail/plugin_macros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,23 @@
BOOST_DLL_ALIAS(func_name, dll_alias) \
PHLEX_DETAIL_SELECT_SIGNATURE(token_type, func_name, __VA_ARGS__)

#define PHLEX_DETAIL_CREATE_DRIVER_1ARG(func_name, d) \
phlex::experimental::driver_bundle func_name(phlex::experimental::driver_proxy const& d, \
phlex::configuration const&)

#define PHLEX_DETAIL_CREATE_DRIVER_2ARGS(func_name, d, config) \
phlex::experimental::driver_bundle func_name(phlex::experimental::driver_proxy const& d, \
phlex::configuration const& config)

#define PHLEX_DETAIL_SELECT_DRIVER_SIGNATURE(func_name, ...) \
BOOST_PP_IF(BOOST_PP_EQUAL(PHLEX_DETAIL_NARGS(__VA_ARGS__), 1), \
PHLEX_DETAIL_CREATE_DRIVER_1ARG, \
PHLEX_DETAIL_CREATE_DRIVER_2ARGS) \
(func_name, __VA_ARGS__)

#define PHLEX_DETAIL_REGISTER_DRIVER_PLUGIN(func_name, dll_alias, ...) \
static PHLEX_DETAIL_SELECT_DRIVER_SIGNATURE(func_name, __VA_ARGS__); \
BOOST_DLL_ALIAS(func_name, dll_alias) \
PHLEX_DETAIL_SELECT_DRIVER_SIGNATURE(func_name, __VA_ARGS__)

#endif // PHLEX_DETAIL_PLUGIN_MACROS_HPP
83 changes: 32 additions & 51 deletions phlex/driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,70 +5,51 @@

#include "phlex/configuration.hpp"
#include "phlex/core/fwd.hpp"
#include "phlex/detail/plugin_macros.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/fixed_hierarchy.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/utilities/async_driver.hpp"

#include <concepts>
#include <memory>
#include <functional>
#include <utility>

namespace phlex {
using framework_driver = experimental::async_driver<data_cell_index_ptr>;
}

namespace phlex::experimental::detail {
namespace phlex::experimental {
class driver_proxy;
struct driver_bundle;

// See note below.
template <typename T>
auto make(configuration const& config)
{
if constexpr (requires { T{config}; }) {
return std::make_shared<T>(config);
} else {
return std::make_shared<T>();
}
}
using framework_driver = experimental::async_driver<data_cell_index_ptr>;

template <typename T>
concept next_function_with_driver = requires(T t, framework_driver& driver) {
{ t.next(driver) } -> std::same_as<void>;
namespace detail {
using next_index_t = std::function<void(framework_driver&)>;
using driver_creator_t = driver_bundle(driver_proxy const&, configuration const&);
};

template <typename T>
concept next_function_without_driver = requires(T t) {
{ t.next() } -> std::same_as<void>;
struct driver_bundle {
detail::next_index_t driver;
fixed_hierarchy hierarchy;
};
}

// Workaround for static_assert(false) until P2593R1 is adopted
// https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2023/p2593r1.html
// static_assert(false) is supported in GCC 13 and newer
template <typename T>
constexpr bool always_false{false};

template <typename T>
std::function<void(framework_driver&)> create_next(configuration const& config = {})
{
// N.B. Because we are initializing an std::function object with a lambda, the lambda
// (and therefore its captured values) must be copy-constructible. This means
// that make<T>(config) must return a copy-constructible object. Because we do not
// know if a user's provided driver class is copyable, we create the object on
// the heap, and capture a shared pointer to the object. This also ensures that
// the driver object is created only once, thus avoiding potential errors in the
// implementations of the driver class' copy/move constructors (e.g. if the
// source is caching an iterator).
if constexpr (next_function_with_driver<T>) {
return [t = make<T>(config)](framework_driver& driver) { t->next(driver); };
} else if constexpr (next_function_without_driver<T>) {
return [t = make<T>(config)](framework_driver&) { t->next(); };
} else {
static_assert(always_false<T>, "Must have a 'next()' function that returns 'void'");
namespace phlex::experimental {
template <typename F>
concept is_driver_like = std::invocable<F, data_cell_cursor const&>;

class driver_proxy {
public:
driver_bundle driver(fixed_hierarchy hierarchy, is_driver_like auto driver_function) const
{
auto h = hierarchy;
return {[f = std::move(driver_function), h = std::move(h)](framework_driver& d) mutable {
f(h.yield_job(d));
},
std::move(hierarchy)};
}
}

using next_index_t = std::function<void(framework_driver&)>;
using driver_creator_t = next_index_t(configuration const&);
};
}

#define PHLEX_EXPERIMENTAL_REGISTER_DRIVER(driver) \
BOOST_DLL_ALIAS(phlex::experimental::detail::create_next<driver>, create_driver)
#define PHLEX_EXPERIMENTAL_REGISTER_DRIVER(...) \
PHLEX_DETAIL_REGISTER_DRIVER_PLUGIN(create, create_driver, __VA_ARGS__)

#endif // PHLEX_DRIVER_HPP
2 changes: 2 additions & 0 deletions phlex/model/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ cet_make_library(
SOURCE
algorithm_name.cpp
data_cell_counter.cpp
fixed_hierarchy.cpp
data_layer_hierarchy.cpp
data_cell_index.cpp
identifier.cpp
Expand All @@ -25,6 +26,7 @@ cet_make_library(
install(
FILES
algorithm_name.hpp
fixed_hierarchy.hpp
fwd.hpp
handle.hpp
data_cell_counter.hpp
Expand Down
7 changes: 3 additions & 4 deletions phlex/model/data_cell_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ namespace phlex {
// FIXME: Should it be an error to create an ID with an empty name?
}

data_cell_index const& data_cell_index::base() { return *base_ptr(); }
data_cell_index_ptr data_cell_index::base_ptr()
data_cell_index_ptr data_cell_index::job()
{
static data_cell_index_ptr base_id{new data_cell_index};
return base_id;
static data_cell_index_ptr job_index{new data_cell_index};
return job_index;
}

experimental::identifier const& data_cell_index::layer_name() const noexcept
Expand Down
3 changes: 1 addition & 2 deletions phlex/model/data_cell_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
namespace phlex {
class data_cell_index : public std::enable_shared_from_this<data_cell_index> {
public:
static data_cell_index const& base();
static data_cell_index_ptr base_ptr();
static data_cell_index_ptr job();

using hash_type = std::size_t;
data_cell_index_ptr make_child(std::string layer_name, std::size_t data_cell_number) const;
Expand Down
108 changes: 108 additions & 0 deletions phlex/model/fixed_hierarchy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include "phlex/model/fixed_hierarchy.hpp"

#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/identifier.hpp"
#include "phlex/utilities/async_driver.hpp"
#include "phlex/utilities/hashing.hpp"

#include "fmt/format.h"

#include <algorithm>
#include <ranges>
#include <set>
#include <span>
#include <stdexcept>

namespace {
// Each path must be non-empty and may only contain "job" as the first element.
std::span<std::string const> validated_path(std::vector<std::string> const& path)
{
if (path.empty()) {
throw std::runtime_error("Layer paths cannot be empty.");
}
auto const rest = std::span{path}.subspan(path[0] == "job" ? 1 : 0);
if (std::ranges::contains(rest, "job")) {
throw std::runtime_error("Layer paths may only contain 'job' as the first element.");
}
return rest;
}

// Builds the set of cumulative layer hashes that define the fixed hierarchy.
// For example, if the layer paths are ["job", "run", "subrun"] and ["job", "spill"],
// the hashes included will correspond to:
// From the first path:
// - "job"
// - "job/run"
// - "job/run/subrun"
//
// From the second path:
// - "job" (already included from the first path)
// - "job/spill"
//
// Each path must be non-empty and may only contain "job" as the first element.
std::set<std::size_t> build_hashes(std::vector<std::vector<std::string>> const& layer_paths)
{
using namespace phlex::experimental;
identifier const job{"job"};
std::set<std::size_t> hashes{job.hash()};
for (std::vector<std::string> const& path : layer_paths) {
std::size_t cumulative_hash = job.hash();
for (auto const& name : validated_path(path)) {
cumulative_hash = hash(cumulative_hash, identifier{name}.hash());
hashes.insert(cumulative_hash);
}
}
return hashes;
}
}

namespace phlex::experimental {

data_cell_cursor::data_cell_cursor(data_cell_index_ptr index,
fixed_hierarchy const& h,
async_driver<data_cell_index_ptr>& d) :
index_{std::move(index)}, hierarchy_{h}, driver_{d}
{
}

data_cell_cursor data_cell_cursor::yield_child(std::string const& layer_name,
std::size_t number) const
{
auto child = index_->make_child(layer_name, number);
hierarchy_.validate(child);
driver_.yield(child);
return data_cell_cursor{child, hierarchy_, driver_};
}

std::string data_cell_cursor::layer_path() const { return index_->layer_path(); }

fixed_hierarchy::fixed_hierarchy(std::initializer_list<std::vector<std::string>> layer_paths) :
fixed_hierarchy(std::vector<std::vector<std::string>>(layer_paths))
{
}

fixed_hierarchy::fixed_hierarchy(std::vector<std::vector<std::string>> layer_paths) :
layer_hashes_(std::from_range, build_hashes(layer_paths))
{
}

void fixed_hierarchy::validate(data_cell_index_ptr const& index) const
{
if (layer_hashes_.empty()) {
return;
}
if (std::ranges::binary_search(layer_hashes_, index->layer_hash())) {
return;
}
throw std::runtime_error(
fmt::format("Layer {} is not part of the fixed hierarchy.", index->layer_path()));
}

data_cell_cursor fixed_hierarchy::yield_job(async_driver<data_cell_index_ptr>& d) const
{
auto job = data_cell_index::job();
d.yield(job);
return data_cell_cursor{job, *this, d};
}

}
Loading
Loading