Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ namespace phlex::experimental {
auto const& msg = most_derived(messages);
auto const& [store, message_id] = std::tie(msg.store, msg.id);
if (store->is_flush()) {
flag_for(store->index()->hash()).flush_received(message_id);
mark_flush_received(store->index()->hash(), message_id);
} else if (accessor a; needs_new(store, a)) {
call(ft, messages, std::make_index_sequence<N>{});
a->second = true;
flag_for(store->index()->hash()).mark_as_processed();
mark_processed(store->index()->hash());
}

if (done_with(store)) {
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ namespace phlex::experimental {
auto const& [store, message_id] = std::tie(msg.store, msg.id);
predicate_result result{};
if (store->is_flush()) {
flag_for(store->index()->hash()).flush_received(message_id);
mark_flush_received(store->index()->hash(), message_id);
} else if (const_accessor a; results_.find(a, store->index()->hash())) {
result = {message_id, a->second.result};
} else if (accessor a; results_.insert(a, store->index()->hash())) {
bool const rc = call(ft, messages, std::make_index_sequence<N>{});
result = a->second = {message_id, rc};
flag_for(store->index()->hash()).mark_as_processed();
mark_processed(store->index()->hash());
}

if (done_with(store)) {
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace phlex::experimental {
auto& [stay_in_graph, to_output] = output;

if (msg.store->is_flush()) {
flag_for(msg.store->index()->hash()).flush_received(msg.original_id);
mark_flush_received(msg.store->index()->hash(), msg.original_id);
stay_in_graph.try_put(msg);
} else {
// Check cache first
Expand Down Expand Up @@ -101,7 +101,7 @@ namespace phlex::experimental {
message const new_msg{store, msg.id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
flag_for(msg.store->index()->hash()).mark_as_processed();
mark_processed(msg.store->index()->hash());
}

if (done_with(msg.store)) {
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ namespace phlex::experimental {
auto const& [store, message_id] = std::tie(msg.store, msg.id);
auto& [stay_in_graph, to_output] = output;
if (store->is_flush()) {
flag_for(store->index()->hash()).flush_received(msg.original_id);
mark_flush_received(store->index()->hash(), msg.original_id);
stay_in_graph.try_put(msg);
to_output.try_put(msg);
} else {
Expand All @@ -109,7 +109,7 @@ namespace phlex::experimental {
message const new_msg{a->second, message_id};
stay_in_graph.try_put(new_msg);
to_output.try_put(new_msg);
flag_for(store->index()->hash()).mark_as_processed();
mark_processed(store->index()->hash());
} else {
stay_in_graph.try_put({a->second, message_id});
}
Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ namespace phlex::experimental {
auto const& msg = most_derived(messages);
auto const& store = msg.store;
if (store->is_flush()) {
flag_for(store->index()->hash()).flush_received(msg.id);
mark_flush_received(store->index()->hash(), msg.id);
std::get<0>(output).try_put(msg);
} else if (accessor a; stores_.insert(a, store->index()->hash())) {
std::size_t const original_message_id{msg_counter_};
Expand All @@ -117,7 +117,7 @@ namespace phlex::experimental {
message const flush_msg{
g.flush_store(), msg_counter_.fetch_add(1), original_message_id};
std::get<0>(output).try_put(flush_msg);
flag_for(store->index()->hash()).mark_as_processed();
mark_processed(store->index()->hash());
}

if (done_with(store)) {
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/detail/filter_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ namespace phlex::experimental {
{
decltype(stores_)::accessor a;
if (stores_.insert(a, msg_id)) {
a->second = std::vector<product_store_const_ptr>(nargs_);
a->second.resize(nargs_);
}
auto& elem = a->second;
if (nargs_ == 1ull) {
Expand Down
12 changes: 0 additions & 12 deletions phlex/core/multiplexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <ranges>
#include <stdexcept>

using namespace std::chrono;
using namespace phlex::experimental;

namespace {
Expand Down Expand Up @@ -63,23 +62,12 @@ namespace phlex::experimental {
return {};
}

auto start_time = steady_clock::now();

for (auto const& [product_label, port] : provider_input_ports_ | std::views::values) {
if (auto store_to_send = store_for(store, product_label.layer())) {
port->try_put({std::move(store_to_send), message_id});
}
}

execution_time_ += duration_cast<microseconds>(steady_clock::now() - start_time);
return {};
}

multiplexer::~multiplexer()
{
spdlog::debug("Routed {} messages in {} microseconds ({:.3f} microseconds per message)",
received_messages_,
execution_time_.count(),
execution_time_.count() / received_messages_);
}
}
4 changes: 0 additions & 4 deletions phlex/core/multiplexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "oneapi/tbb/concurrent_hash_map.h"
#include "oneapi/tbb/flow_graph.h"

#include <chrono>
#include <functional>
#include <map>
#include <set>
Expand All @@ -20,8 +19,6 @@ namespace phlex::experimental {
using base = tbb::flow::function_node<message>;

public:
~multiplexer();

struct named_input_port {
product_query product_label;
tbb::flow::receiver<message>* port;
Expand All @@ -40,7 +37,6 @@ namespace phlex::experimental {
input_ports_t provider_input_ports_;
bool debug_;
std::atomic<std::size_t> received_messages_{};
std::chrono::duration<float, std::chrono::microseconds::period> execution_time_{};
};

}
Expand Down
23 changes: 17 additions & 6 deletions phlex/core/store_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,30 @@ namespace phlex::experimental {

unsigned int store_flag::original_message_id() const noexcept { return original_message_id_; }

store_flag& detect_flush_flag::flag_for(data_cell_index::hash_type const hash)
void detect_flush_flag::mark_flush_received(data_cell_index::hash_type const hash,
std::size_t const original_message_id)
{
flag_accessor fa;
flags_.emplace(fa, hash, std::make_unique<store_flag>());
return *fa->second;
if (flags_.insert(fa, hash)) {
fa->second = std::make_unique<store_flag>();
}
fa->second->flush_received(original_message_id);
}

void detect_flush_flag::mark_processed(data_cell_index::hash_type const hash)
{
flag_accessor fa;
if (flags_.insert(fa, hash)) {
fa->second = std::make_unique<store_flag>();
}
fa->second->mark_as_processed();
}

bool detect_flush_flag::done_with(product_store_const_ptr const& store)
{
auto const h = store->index()->hash();
if (const_flag_accessor fa; flags_.find(fa, h) && fa->second->is_complete()) {
flags_.erase(fa);
return true;
if (flag_accessor fa; flags_.find(fa, h) && fa->second->is_complete()) {
return flags_.erase(fa);
}
return false;
}
Expand Down
6 changes: 4 additions & 2 deletions phlex/core/store_counters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ namespace phlex::experimental {
private:
std::atomic<bool> flush_received_{false};
std::atomic<bool> processed_{false};
std::size_t original_message_id_{}; // Necessary for matching inputs to downstream join nodes.
std::atomic<std::size_t>
original_message_id_{}; // Necessary for matching inputs to downstream join nodes.
};

class detect_flush_flag {
protected:
store_flag& flag_for(data_cell_index::hash_type hash);
void mark_flush_received(data_cell_index::hash_type hash, std::size_t original_message_id);
void mark_processed(data_cell_index::hash_type hash);
bool done_with(product_store_const_ptr const& store);

private:
Expand Down
17 changes: 9 additions & 8 deletions phlex/model/products.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,32 @@ namespace phlex::experimental {
using size_type = collection_t::size_type;

template <typename T>
void add(std::string const& product_name, T&& t)
void add(std::string const& product_name, T t)
{
add(product_name, std::make_unique<product<std::remove_cvref_t<T>>>(std::forward<T>(t)));
products_.emplace(product_name,
std::make_unique<product<std::remove_cvref_t<T>>>(std::move(t)));
}

template <typename T>
void add(std::string const& product_name, std::unique_ptr<product<T>>&& t)
void add(std::string const& product_name, std::unique_ptr<product<T>> t)
{
products_.emplace(product_name, std::move(t));
}

template <typename Ts>
void add_all(product_specifications const& names, Ts&& ts)
void add_all(product_specifications const& names, Ts ts)
{
assert(names.size() == 1ull);
add(names[0].name(), std::forward<Ts>(ts));
add(names[0].name(), std::move(ts));
}

template <typename... Ts>
void add_all(product_specifications const& names, std::tuple<Ts...> ts)
{
assert(names.size() == sizeof...(Ts));
[this, &names]<std::size_t... Is>(auto const& ts, std::index_sequence<Is...>) {
(this->add(names[Is].name(), std::get<Is>(ts)), ...);
}(ts, std::index_sequence_for<Ts...>{});
[this, &names]<std::size_t... Is>(auto tuple, std::index_sequence<Is...>) {
(this->add(names[Is].name(), std::move(std::get<Is>(tuple))), ...);
}(std::move(ts), std::index_sequence_for<Ts...>{});
}

template <typename T>
Expand Down
Loading