From aef38126c5c2f942d7f4a360bf770fef95f2d8df Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Wed, 11 Feb 2026 13:31:19 -0600 Subject: [PATCH] Resolve various thread-unsafe patterns detected by Copilot --- phlex/core/declared_observer.hpp | 4 ++-- phlex/core/declared_predicate.hpp | 4 ++-- phlex/core/declared_provider.hpp | 4 ++-- phlex/core/declared_transform.hpp | 4 ++-- phlex/core/declared_unfold.hpp | 4 ++-- phlex/core/detail/filter_impl.cpp | 2 +- phlex/core/multiplexer.cpp | 12 ------------ phlex/core/multiplexer.hpp | 4 ---- phlex/core/store_counters.cpp | 23 +++++++++++++++++------ phlex/core/store_counters.hpp | 6 ++++-- phlex/model/products.hpp | 17 +++++++++-------- 11 files changed, 41 insertions(+), 43 deletions(-) diff --git a/phlex/core/declared_observer.hpp b/phlex/core/declared_observer.hpp index 6be0d5aac..eebe0235b 100644 --- a/phlex/core/declared_observer.hpp +++ b/phlex/core/declared_observer.hpp @@ -74,11 +74,11 @@ namespace phlex::experimental { auto const& msg = most_derived(messages); auto const& [store, message_id] = std::tie(msg.store, msg.id); if (store->is_flush()) { - flag_for(store->index()->hash()).flush_received(message_id); + mark_flush_received(store->index()->hash(), message_id); } else if (accessor a; needs_new(store, a)) { call(ft, messages, std::make_index_sequence{}); a->second = true; - flag_for(store->index()->hash()).mark_as_processed(); + mark_processed(store->index()->hash()); } if (done_with(store)) { diff --git a/phlex/core/declared_predicate.hpp b/phlex/core/declared_predicate.hpp index 8c5ed608e..2606cdc1c 100644 --- a/phlex/core/declared_predicate.hpp +++ b/phlex/core/declared_predicate.hpp @@ -81,13 +81,13 @@ namespace phlex::experimental { auto const& [store, message_id] = std::tie(msg.store, msg.id); predicate_result result{}; if (store->is_flush()) { - flag_for(store->index()->hash()).flush_received(message_id); + mark_flush_received(store->index()->hash(), message_id); } else if (const_accessor a; results_.find(a, store->index()->hash())) { result = {message_id, a->second.result}; } else if (accessor a; results_.insert(a, store->index()->hash())) { bool const rc = call(ft, messages, std::make_index_sequence{}); result = a->second = {message_id, rc}; - flag_for(store->index()->hash()).mark_as_processed(); + mark_processed(store->index()->hash()); } if (done_with(store)) { diff --git a/phlex/core/declared_provider.hpp b/phlex/core/declared_provider.hpp index 5447a066a..57e2a3f93 100644 --- a/phlex/core/declared_provider.hpp +++ b/phlex/core/declared_provider.hpp @@ -73,7 +73,7 @@ namespace phlex::experimental { auto& [stay_in_graph, to_output] = output; if (msg.store->is_flush()) { - flag_for(msg.store->index()->hash()).flush_received(msg.original_id); + mark_flush_received(msg.store->index()->hash(), msg.original_id); stay_in_graph.try_put(msg); } else { // Check cache first @@ -101,7 +101,7 @@ namespace phlex::experimental { message const new_msg{store, msg.id}; stay_in_graph.try_put(new_msg); to_output.try_put(new_msg); - flag_for(msg.store->index()->hash()).mark_as_processed(); + mark_processed(msg.store->index()->hash()); } if (done_with(msg.store)) { diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index 9ec286c27..fdb04738b 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -92,7 +92,7 @@ namespace phlex::experimental { auto const& [store, message_id] = std::tie(msg.store, msg.id); auto& [stay_in_graph, to_output] = output; if (store->is_flush()) { - flag_for(store->index()->hash()).flush_received(msg.original_id); + mark_flush_received(store->index()->hash(), msg.original_id); stay_in_graph.try_put(msg); to_output.try_put(msg); } else { @@ -109,7 +109,7 @@ namespace phlex::experimental { message const new_msg{a->second, message_id}; stay_in_graph.try_put(new_msg); to_output.try_put(new_msg); - flag_for(store->index()->hash()).mark_as_processed(); + mark_processed(store->index()->hash()); } else { stay_in_graph.try_put({a->second, message_id}); } diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 393c37d65..967f76d0c 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -107,7 +107,7 @@ namespace phlex::experimental { auto const& msg = most_derived(messages); auto const& store = msg.store; if (store->is_flush()) { - flag_for(store->index()->hash()).flush_received(msg.id); + mark_flush_received(store->index()->hash(), msg.id); std::get<0>(output).try_put(msg); } else if (accessor a; stores_.insert(a, store->index()->hash())) { std::size_t const original_message_id{msg_counter_}; @@ -117,7 +117,7 @@ namespace phlex::experimental { message const flush_msg{ g.flush_store(), msg_counter_.fetch_add(1), original_message_id}; std::get<0>(output).try_put(flush_msg); - flag_for(store->index()->hash()).mark_as_processed(); + mark_processed(store->index()->hash()); } if (done_with(store)) { diff --git a/phlex/core/detail/filter_impl.cpp b/phlex/core/detail/filter_impl.cpp index a7156cc90..9b2e21516 100644 --- a/phlex/core/detail/filter_impl.cpp +++ b/phlex/core/detail/filter_impl.cpp @@ -67,7 +67,7 @@ namespace phlex::experimental { { decltype(stores_)::accessor a; if (stores_.insert(a, msg_id)) { - a->second = std::vector(nargs_); + a->second.resize(nargs_); } auto& elem = a->second; if (nargs_ == 1ull) { diff --git a/phlex/core/multiplexer.cpp b/phlex/core/multiplexer.cpp index 0a1bc346f..b8e7e3753 100644 --- a/phlex/core/multiplexer.cpp +++ b/phlex/core/multiplexer.cpp @@ -10,7 +10,6 @@ #include #include -using namespace std::chrono; using namespace phlex::experimental; namespace { @@ -63,23 +62,12 @@ namespace phlex::experimental { return {}; } - auto start_time = steady_clock::now(); - for (auto const& [product_label, port] : provider_input_ports_ | std::views::values) { if (auto store_to_send = store_for(store, product_label.layer())) { port->try_put({std::move(store_to_send), message_id}); } } - execution_time_ += duration_cast(steady_clock::now() - start_time); return {}; } - - multiplexer::~multiplexer() - { - spdlog::debug("Routed {} messages in {} microseconds ({:.3f} microseconds per message)", - received_messages_, - execution_time_.count(), - execution_time_.count() / received_messages_); - } } diff --git a/phlex/core/multiplexer.hpp b/phlex/core/multiplexer.hpp index 69241ccf3..846aed779 100644 --- a/phlex/core/multiplexer.hpp +++ b/phlex/core/multiplexer.hpp @@ -7,7 +7,6 @@ #include "oneapi/tbb/concurrent_hash_map.h" #include "oneapi/tbb/flow_graph.h" -#include #include #include #include @@ -20,8 +19,6 @@ namespace phlex::experimental { using base = tbb::flow::function_node; public: - ~multiplexer(); - struct named_input_port { product_query product_label; tbb::flow::receiver* port; @@ -40,7 +37,6 @@ namespace phlex::experimental { input_ports_t provider_input_ports_; bool debug_; std::atomic received_messages_{}; - std::chrono::duration execution_time_{}; }; } diff --git a/phlex/core/store_counters.cpp b/phlex/core/store_counters.cpp index a4fec0386..1a8afdeb1 100644 --- a/phlex/core/store_counters.cpp +++ b/phlex/core/store_counters.cpp @@ -18,19 +18,30 @@ namespace phlex::experimental { unsigned int store_flag::original_message_id() const noexcept { return original_message_id_; } - store_flag& detect_flush_flag::flag_for(data_cell_index::hash_type const hash) + void detect_flush_flag::mark_flush_received(data_cell_index::hash_type const hash, + std::size_t const original_message_id) { flag_accessor fa; - flags_.emplace(fa, hash, std::make_unique()); - return *fa->second; + if (flags_.insert(fa, hash)) { + fa->second = std::make_unique(); + } + fa->second->flush_received(original_message_id); + } + + void detect_flush_flag::mark_processed(data_cell_index::hash_type const hash) + { + flag_accessor fa; + if (flags_.insert(fa, hash)) { + fa->second = std::make_unique(); + } + fa->second->mark_as_processed(); } bool detect_flush_flag::done_with(product_store_const_ptr const& store) { auto const h = store->index()->hash(); - if (const_flag_accessor fa; flags_.find(fa, h) && fa->second->is_complete()) { - flags_.erase(fa); - return true; + if (flag_accessor fa; flags_.find(fa, h) && fa->second->is_complete()) { + return flags_.erase(fa); } return false; } diff --git a/phlex/core/store_counters.hpp b/phlex/core/store_counters.hpp index 99216c944..f509c7a8c 100644 --- a/phlex/core/store_counters.hpp +++ b/phlex/core/store_counters.hpp @@ -24,12 +24,14 @@ namespace phlex::experimental { private: std::atomic flush_received_{false}; std::atomic processed_{false}; - std::size_t original_message_id_{}; // Necessary for matching inputs to downstream join nodes. + std::atomic + original_message_id_{}; // Necessary for matching inputs to downstream join nodes. }; class detect_flush_flag { protected: - store_flag& flag_for(data_cell_index::hash_type hash); + void mark_flush_received(data_cell_index::hash_type hash, std::size_t original_message_id); + void mark_processed(data_cell_index::hash_type hash); bool done_with(product_store_const_ptr const& store); private: diff --git a/phlex/model/products.hpp b/phlex/model/products.hpp index 6d330a5c5..1729427f6 100644 --- a/phlex/model/products.hpp +++ b/phlex/model/products.hpp @@ -40,31 +40,32 @@ namespace phlex::experimental { using size_type = collection_t::size_type; template - void add(std::string const& product_name, T&& t) + void add(std::string const& product_name, T t) { - add(product_name, std::make_unique>>(std::forward(t))); + products_.emplace(product_name, + std::make_unique>>(std::move(t))); } template - void add(std::string const& product_name, std::unique_ptr>&& t) + void add(std::string const& product_name, std::unique_ptr> t) { products_.emplace(product_name, std::move(t)); } template - void add_all(product_specifications const& names, Ts&& ts) + void add_all(product_specifications const& names, Ts ts) { assert(names.size() == 1ull); - add(names[0].name(), std::forward(ts)); + add(names[0].name(), std::move(ts)); } template void add_all(product_specifications const& names, std::tuple ts) { assert(names.size() == sizeof...(Ts)); - [this, &names](auto const& ts, std::index_sequence) { - (this->add(names[Is].name(), std::get(ts)), ...); - }(ts, std::index_sequence_for{}); + [this, &names](auto tuple, std::index_sequence) { + (this->add(names[Is].name(), std::move(std::get(tuple))), ...); + }(std::move(ts), std::index_sequence_for{}); } template