diff --git a/phlex/core/declared_fold.hpp b/phlex/core/declared_fold.hpp index 387fe0e26..4d7aa9039 100644 --- a/phlex/core/declared_fold.hpp +++ b/phlex/core/declared_fold.hpp @@ -41,8 +41,7 @@ namespace phlex::experimental { product_queries input_products); virtual ~declared_fold(); - virtual tbb::flow::sender& sender() = 0; - virtual tbb::flow::sender& to_output() = 0; + virtual tbb::flow::sender& output_port() = 0; virtual tbb::flow::receiver& flush_port() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; @@ -126,7 +125,8 @@ namespace phlex::experimental { auto parent = std::make_shared(fold_index, this->full_name()); commit_(parent); ++product_count_; - output_port<0>(fold_).try_put({parent, counter->original_message_id()}); + tbb::flow::output_port<0>(fold_).try_put( + {.store = parent, .id = counter->original_message_id()}); } } @@ -141,8 +141,7 @@ namespace phlex::experimental { } tbb::flow::receiver& flush_port() override { return flush_receiver_; } - tbb::flow::sender& sender() override { return output_port<0ull>(fold_); } - tbb::flow::sender& to_output() override { return sender(); } + tbb::flow::sender& output_port() override { return tbb::flow::output_port<0>(fold_); } product_specifications const& output() const override { return output_; } template diff --git a/phlex/core/declared_provider.hpp b/phlex/core/declared_provider.hpp index 9ba012029..f8a652cc2 100644 --- a/phlex/core/declared_provider.hpp +++ b/phlex/core/declared_provider.hpp @@ -32,7 +32,7 @@ namespace phlex::experimental { std::string const& layer() const noexcept; virtual tbb::flow::receiver* input_port() = 0; - virtual tbb::flow::sender& sender() = 0; + virtual tbb::flow::sender& output_port() = 0; virtual std::size_t num_calls() const = 0; private: @@ -60,7 +60,6 @@ namespace phlex::experimental { provider_{g, concurrency, [this, ft = alg.release_algorithm()](index_message const& index_msg, auto& output) { - auto& [stay_in_graph, to_output] = output; auto const [index, msg_id, _] = index_msg; auto result = std::invoke(ft, *index); @@ -71,9 +70,7 @@ namespace phlex::experimental { auto store = std::make_shared( index, this->full_name(), std::move(new_products)); - message const new_msg{store, msg_id}; - stay_in_graph.try_put(new_msg); - to_output.try_put(new_msg); + std::get<0>(output).try_put({.store = std::move(store), .id = msg_id}); }} { spdlog::debug( @@ -82,12 +79,15 @@ namespace phlex::experimental { private: tbb::flow::receiver* input_port() override { return &provider_; } - tbb::flow::sender& sender() override { return output_port<0>(provider_); } + tbb::flow::sender& output_port() override + { + return tbb::flow::output_port<0>(provider_); + } std::size_t num_calls() const final { return calls_.load(); } product_specification output_; - tbb::flow::multifunction_node> provider_; + tbb::flow::multifunction_node> provider_; std::atomic calls_; }; diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index 4e9afe81d..8ce6c656c 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -43,8 +43,7 @@ namespace phlex::experimental { product_queries input_products); virtual ~declared_transform(); - virtual tbb::flow::sender& sender() = 0; - virtual tbb::flow::sender& to_output() = 0; + virtual tbb::flow::sender& output_port() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; }; @@ -82,19 +81,17 @@ namespace phlex::experimental { [this, ft = alg.release_algorithm()](messages_t const& messages, auto& output) { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); - auto& [stay_in_graph, to_output] = output; auto result = call(ft, messages, std::make_index_sequence{}); ++calls_; ++product_count_[store->index()->layer_hash()]; + products new_products; new_products.add_all(output_, std::move(result)); auto new_store = std::make_shared( store->index(), this->full_name(), std::move(new_products)); - message const new_msg{std::move(new_store), message_id}; - stay_in_graph.try_put(new_msg); - to_output.try_put(new_msg); + std::get<0>(output).try_put({.store = std::move(new_store), .id = message_id}); }} { if constexpr (N > 1ull) { @@ -113,8 +110,10 @@ namespace phlex::experimental { return input_ports(join_, transform_); } - tbb::flow::sender& sender() override { return output_port<0>(transform_); } - tbb::flow::sender& to_output() override { return output_port<1>(transform_); } + tbb::flow::sender& output_port() override + { + return tbb::flow::output_port<0>(transform_); + } product_specifications const& output() const override { return output_; } template @@ -141,7 +140,7 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; join_or_none_t join_; - tbb::flow::multifunction_node, message_tuple<2u>> transform_; + tbb::flow::multifunction_node, message_tuple<1u>> transform_; std::atomic calls_; tbb::concurrent_unordered_map> product_count_; }; diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 4b3212f92..d64621f1d 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -60,9 +60,8 @@ namespace phlex::experimental { std::string child_layer); virtual ~declared_unfold(); - virtual tbb::flow::sender& sender() = 0; + virtual tbb::flow::sender& output_port() = 0; virtual tbb::flow::sender& output_index_port() = 0; - virtual tbb::flow::sender& to_output() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; virtual flusher_t& flusher() = 0; @@ -113,7 +112,9 @@ namespace phlex::experimental { generator g{store, this->full_name(), child_layer()}; call(p, ufold, store->index(), g, messages, std::make_index_sequence{}); - flusher_.try_put({store->index(), g.flush_result(), original_message_id}); + flusher_.try_put({.index = store->index(), + .counts = g.flush_result(), + .original_id = original_message_id}); }}, flusher_{g} { @@ -132,12 +133,14 @@ namespace phlex::experimental { return input_ports(join_, unfold_); } - tbb::flow::sender& sender() override { return output_port<0>(unfold_); } + tbb::flow::sender& output_port() override + { + return tbb::flow::output_port<0>(unfold_); + } tbb::flow::sender& output_index_port() override { - return output_port<1>(unfold_); + return tbb::flow::output_port<1>(unfold_); } - tbb::flow::sender& to_output() override { return sender(); } product_specifications const& output() const override { return output_; } flusher_t& flusher() override { return flusher_; } @@ -172,13 +175,11 @@ namespace phlex::experimental { running_value = next_value; } ++product_count_; - auto child = g.make_child_for(counter++, std::move(new_products)); - message const child_msg{child, msg_counter_.fetch_add(1)}; - output_port<0>(unfold_).try_put(child_msg); - output_port<1>(unfold_).try_put(child->index()); - // Every data cell needs a flush (for now) - flusher_.try_put({child->index(), nullptr, -1ull}); + auto child = g.make_child_for(counter++, std::move(new_products)); + tbb::flow::output_port<0>(unfold_).try_put( + {.store = child, .id = msg_counter_.fetch_add(1)}); + tbb::flow::output_port<1>(unfold_).try_put(child->index()); } } diff --git a/phlex/core/edge_creation_policy.hpp b/phlex/core/edge_creation_policy.hpp index b149e3b6e..666544978 100644 --- a/phlex/core/edge_creation_policy.hpp +++ b/phlex/core/edge_creation_policy.hpp @@ -21,8 +21,7 @@ namespace phlex::experimental { struct named_output_port { algorithm_name node; - tbb::flow::sender* port; - tbb::flow::sender* to_output; + tbb::flow::sender* output_port; type_id type; }; @@ -47,9 +46,9 @@ namespace phlex::experimental { for (auto const& product_name : node->output()) { if (empty(product_name.name())) continue; - result.emplace( - product_name.name(), - named_output_port{node_name, &node->sender(), &node->to_output(), product_name.type()}); + + result.emplace(product_name.name(), + named_output_port{node_name, &node->output_port(), product_name.type()}); } } return result; diff --git a/phlex/core/edge_maker.cpp b/phlex/core/edge_maker.cpp index 5f38da496..3b28f852d 100644 --- a/phlex/core/edge_maker.cpp +++ b/phlex/core/edge_maker.cpp @@ -28,7 +28,7 @@ namespace phlex::experimental { provider.full_name(), node_name, port.product_label.to_string()); - make_edge(provider.sender(), *(port.port)); + make_edge(provider.output_port(), *(port.port)); found_match = true; break; } diff --git a/phlex/core/edge_maker.hpp b/phlex/core/edge_maker.hpp index ec7b7cc28..07dbef265 100644 --- a/phlex/core/edge_maker.hpp +++ b/phlex/core/edge_maker.hpp @@ -77,7 +77,7 @@ namespace phlex::experimental { continue; } - make_edge(*producer->port, *receiver_port); + make_edge(*producer->output_port, *receiver_port); } } return result; @@ -111,10 +111,10 @@ namespace phlex::experimental { // Create edges to outputs for (auto const& [output_name, output_node] : outputs) { for (auto& [_, provider] : providers) { - make_edge(provider->sender(), output_node->port()); + make_edge(provider->output_port(), output_node->port()); } for (auto const& named_port : producers_.values()) { - make_edge(*named_port.to_output, output_node->port()); + make_edge(*named_port.output_port, output_node->port()); } }