Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ namespace phlex::experimental {
product_queries input_products);
virtual ~declared_fold();

virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& to_output() = 0;
virtual tbb::flow::sender<message>& output_port() = 0;
virtual tbb::flow::receiver<flush_message>& flush_port() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;
Expand Down Expand Up @@ -126,7 +125,8 @@ namespace phlex::experimental {
auto parent = std::make_shared<product_store>(fold_index, this->full_name());
commit_(parent);
++product_count_;
output_port<0>(fold_).try_put({parent, counter->original_message_id()});
tbb::flow::output_port<0>(fold_).try_put(
{.store = parent, .id = counter->original_message_id()});
}
}

Expand All @@ -141,8 +141,7 @@ namespace phlex::experimental {
}

tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& sender() override { return output_port<0ull>(fold_); }
tbb::flow::sender<message>& to_output() override { return sender(); }
tbb::flow::sender<message>& output_port() override { return tbb::flow::output_port<0>(fold_); }
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
Expand Down
14 changes: 7 additions & 7 deletions phlex/core/declared_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace phlex::experimental {
std::string const& layer() const noexcept;

virtual tbb::flow::receiver<index_message>* input_port() = 0;
virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& output_port() = 0;
virtual std::size_t num_calls() const = 0;

private:
Expand Down Expand Up @@ -60,7 +60,6 @@ namespace phlex::experimental {
provider_{g,
concurrency,
[this, ft = alg.release_algorithm()](index_message const& index_msg, auto& output) {
auto& [stay_in_graph, to_output] = output;
auto const [index, msg_id, _] = index_msg;

auto result = std::invoke(ft, *index);
Expand All @@ -71,9 +70,7 @@ namespace phlex::experimental {
auto store = std::make_shared<product_store>(
index, this->full_name(), std::move(new_products));

message const new_msg{store, msg_id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
std::get<0>(output).try_put({.store = std::move(store), .id = msg_id});
}}
{
spdlog::debug(
Expand All @@ -82,12 +79,15 @@ namespace phlex::experimental {

private:
tbb::flow::receiver<index_message>* input_port() override { return &provider_; }
tbb::flow::sender<message>& sender() override { return output_port<0>(provider_); }
tbb::flow::sender<message>& output_port() override
{
return tbb::flow::output_port<0>(provider_);
}

std::size_t num_calls() const final { return calls_.load(); }

product_specification output_;
tbb::flow::multifunction_node<index_message, message_tuple<2u>> provider_;
tbb::flow::multifunction_node<index_message, message_tuple<1u>> provider_;
std::atomic<std::size_t> calls_;
};

Expand Down
17 changes: 8 additions & 9 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ namespace phlex::experimental {
product_queries input_products);
virtual ~declared_transform();

virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& to_output() = 0;
virtual tbb::flow::sender<message>& output_port() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;
};
Expand Down Expand Up @@ -82,19 +81,17 @@ namespace phlex::experimental {
[this, ft = alg.release_algorithm()](messages_t<N> const& messages, auto& output) {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);
auto& [stay_in_graph, to_output] = output;

auto result = call(ft, messages, std::make_index_sequence<N>{});
++calls_;
++product_count_[store->index()->layer_hash()];

products new_products;
new_products.add_all(output_, std::move(result));
auto new_store = std::make_shared<product_store>(
store->index(), this->full_name(), std::move(new_products));

message const new_msg{std::move(new_store), message_id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
std::get<0>(output).try_put({.store = std::move(new_store), .id = message_id});
}}
{
if constexpr (N > 1ull) {
Expand All @@ -113,8 +110,10 @@ namespace phlex::experimental {
return input_ports<N>(join_, transform_);
}

tbb::flow::sender<message>& sender() override { return output_port<0>(transform_); }
tbb::flow::sender<message>& to_output() override { return output_port<1>(transform_); }
tbb::flow::sender<message>& output_port() override
{
return tbb::flow::output_port<0>(transform_);
}
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
Expand All @@ -141,7 +140,7 @@ namespace phlex::experimental {
input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
product_specifications output_;
join_or_none_t<N> join_;
tbb::flow::multifunction_node<messages_t<N>, message_tuple<2u>> transform_;
tbb::flow::multifunction_node<messages_t<N>, message_tuple<1u>> transform_;
std::atomic<std::size_t> calls_;
tbb::concurrent_unordered_map<std::size_t, std::atomic<std::size_t>> product_count_;
};
Expand Down
25 changes: 13 additions & 12 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ namespace phlex::experimental {
std::string child_layer);
virtual ~declared_unfold();

virtual tbb::flow::sender<message>& sender() = 0;
virtual tbb::flow::sender<message>& output_port() = 0;
virtual tbb::flow::sender<data_cell_index_ptr>& output_index_port() = 0;
virtual tbb::flow::sender<message>& to_output() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;
virtual flusher_t& flusher() = 0;
Expand Down Expand Up @@ -113,7 +112,9 @@ namespace phlex::experimental {
generator g{store, this->full_name(), child_layer()};
call(p, ufold, store->index(), g, messages, std::make_index_sequence<N>{});

flusher_.try_put({store->index(), g.flush_result(), original_message_id});
flusher_.try_put({.index = store->index(),
.counts = g.flush_result(),
.original_id = original_message_id});
}},
flusher_{g}
{
Expand All @@ -132,12 +133,14 @@ namespace phlex::experimental {
return input_ports<N>(join_, unfold_);
}

tbb::flow::sender<message>& sender() override { return output_port<0>(unfold_); }
tbb::flow::sender<message>& output_port() override
{
return tbb::flow::output_port<0>(unfold_);
}
tbb::flow::sender<data_cell_index_ptr>& output_index_port() override
{
return output_port<1>(unfold_);
return tbb::flow::output_port<1>(unfold_);
}
tbb::flow::sender<message>& to_output() override { return sender(); }
product_specifications const& output() const override { return output_; }
flusher_t& flusher() override { return flusher_; }

Expand Down Expand Up @@ -172,13 +175,11 @@ namespace phlex::experimental {
running_value = next_value;
}
++product_count_;
auto child = g.make_child_for(counter++, std::move(new_products));
message const child_msg{child, msg_counter_.fetch_add(1)};
output_port<0>(unfold_).try_put(child_msg);
output_port<1>(unfold_).try_put(child->index());

// Every data cell needs a flush (for now)
flusher_.try_put({child->index(), nullptr, -1ull});
auto child = g.make_child_for(counter++, std::move(new_products));
tbb::flow::output_port<0>(unfold_).try_put(
{.store = child, .id = msg_counter_.fetch_add(1)});
tbb::flow::output_port<1>(unfold_).try_put(child->index());
}
}

Expand Down
9 changes: 4 additions & 5 deletions phlex/core/edge_creation_policy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ namespace phlex::experimental {

struct named_output_port {
algorithm_name node;
tbb::flow::sender<message>* port;
tbb::flow::sender<message>* to_output;
tbb::flow::sender<message>* output_port;
type_id type;
};

Expand All @@ -47,9 +46,9 @@ namespace phlex::experimental {
for (auto const& product_name : node->output()) {
if (empty(product_name.name()))
continue;
result.emplace(
product_name.name(),
named_output_port{node_name, &node->sender(), &node->to_output(), product_name.type()});

result.emplace(product_name.name(),
named_output_port{node_name, &node->output_port(), product_name.type()});
}
}
return result;
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/edge_maker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace phlex::experimental {
provider.full_name(),
node_name,
port.product_label.to_string());
make_edge(provider.sender(), *(port.port));
make_edge(provider.output_port(), *(port.port));
found_match = true;
break;
}
Expand Down
6 changes: 3 additions & 3 deletions phlex/core/edge_maker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace phlex::experimental {
continue;
}

make_edge(*producer->port, *receiver_port);
make_edge(*producer->output_port, *receiver_port);
}
}
return result;
Expand Down Expand Up @@ -111,10 +111,10 @@ namespace phlex::experimental {
// Create edges to outputs
for (auto const& [output_name, output_node] : outputs) {
for (auto& [_, provider] : providers) {
make_edge(provider->sender(), output_node->port());
make_edge(provider->output_port(), output_node->port());
}
for (auto const& named_port : producers_.values()) {
make_edge(*named_port.to_output, output_node->port());
make_edge(*named_port.output_port, output_node->port());
}
}

Expand Down
Loading