diff --git a/.utils/qa/scspell.dict b/.utils/qa/scspell.dict index fa7d79e..bd960bd 100644 --- a/.utils/qa/scspell.dict +++ b/.utils/qa/scspell.dict @@ -40,7 +40,9 @@ sherikov sstream stringstream strstream +todo typeid +typeindex yaml zstd diff --git a/frontend/include/intrometry/backend/utils.h b/frontend/include/intrometry/backend/utils.h index 46019f4..982fbbe 100644 --- a/frontend/include/intrometry/backend/utils.h +++ b/frontend/include/intrometry/backend/utils.h @@ -14,6 +14,7 @@ #include #include #include +#include #include @@ -60,67 +61,102 @@ namespace intrometry::backend class INTROMETRY_HIDDEN SourceContainer { protected: - // passthrough hasher + using Key = std::pair; + struct Hasher { - std::size_t operator()(const std::size_t key) const + std::size_t operator()(const Key &key) const { - return key; + std::size_t result = std::hash{}(key.first); + if (not key.second.empty()) + { + result ^= std::hash{}(key.second) + 0x9e3779b9 + (result << 6) + (result >> 2); + } + return (result); } }; - using SourceMap = std::unordered_map; + using SourceMap = std::unordered_map; + using CollisionMap = std::unordered_map; protected: SourceMap sources_; + CollisionMap collision_counters_; std::mutex update_mutex_; - std::mutex drain_mutex_; + std::mutex flush_mutex_; protected: - static std::size_t hash(const ariles2::DefaultBase &source) + /// @todo requires string copy + static Key getKey(const std::string &id, const ariles2::DefaultBase &source) { - // redundant - // const std::size_t ariles_hash = std::hash{}(source.arilesDefaultID()); - return (typeid(source).hash_code()); + // source.arilesDefaultID() + // this is redundant and less reliable than type_index + // since it is provided by the user + + // id + // this is also provided by the user and is also unreliable, + // but in general provides extra information than type_index + // and should be added to hash + return (Key(std::type_index(typeid(source)), id)); + } + + std::string getUniqueId(const std::string &id) + { + const typename CollisionMap::iterator collision_counter_it = collision_counters_.find(id); + if (collision_counters_.end() == collision_counter_it) + { + collision_counters_[id] = 0; + return (id); + } + + ++collision_counter_it->second; + return (str_concat(id, "_intrometry", std::to_string(collision_counter_it->second))); } public: void tryVisit(const std::function visitor) { - if (drain_mutex_.try_lock()) + if (flush_mutex_.try_lock()) { - for (std::pair &source : sources_) + for (std::pair &source : sources_) { visitor(source.second); } - drain_mutex_.unlock(); + flush_mutex_.unlock(); } } template - void tryEmplace(const ariles2::DefaultBase &source, t_Args &&...args) + void tryEmplace(const std::string &id, const ariles2::DefaultBase &source, t_Args &&...args) { const std::lock_guard update_lock(update_mutex_); - const std::lock_guard drain_lock(drain_mutex_); + const std::lock_guard flush_lock(flush_mutex_); - sources_.try_emplace(hash(source), source, std::forward(args)...); + sources_.try_emplace( + getKey(id, source), + source, + getUniqueId(id.empty() ? source.arilesDefaultID() : id), + std::forward(args)...); } - void erase(const ariles2::DefaultBase &source) + void erase(const std::string &id, const ariles2::DefaultBase &source) { const std::lock_guard update_lock(update_mutex_); - const std::lock_guard drain_lock(drain_mutex_); + const std::lock_guard flush_lock(flush_mutex_); - sources_.erase(hash(source)); + sources_.erase(getKey(id, source)); } - bool tryVisit(const ariles2::DefaultBase &source, const std::function visitor) + bool tryVisit( + const std::string &id, + const ariles2::DefaultBase &source, + const std::function visitor) { if (update_mutex_.try_lock()) { - const typename SourceMap::iterator source_it = sources_.find(hash(source)); + const typename SourceMap::iterator source_it = sources_.find(getKey(id, source)); if (sources_.end() == source_it) { diff --git a/frontend/include/intrometry/sink.h b/frontend/include/intrometry/sink.h index 5187d09..ff09a59 100644 --- a/frontend/include/intrometry/sink.h +++ b/frontend/include/intrometry/sink.h @@ -41,7 +41,12 @@ namespace intrometry * * @note Does nothing if initialization failed. */ + void assign(const ariles2::DefaultBase &source, const Source::Parameters ¶meters = Source::Parameters()) + { + assign(std::string(), source, parameters); + } virtual void assign( + const std::string &id, // use to resolve ambiguity between sources of the same type const ariles2::DefaultBase &source, const Source::Parameters ¶meters = Source::Parameters()) = 0; @@ -52,7 +57,11 @@ namespace intrometry * @note Does nothing if initialization failed. * @note Does nothing if the source has not been assigned. */ - virtual void retract(const ariles2::DefaultBase &source) = 0; + void retract(const ariles2::DefaultBase &source) + { + retract(std::string(), source); + } + virtual void retract(const std::string &id, const ariles2::DefaultBase &source) = 0; /** * Write data. Data is copied to internal buffers, and scheduled for @@ -61,7 +70,11 @@ namespace intrometry * @note Does nothing if source is unassigned. * @note Does nothing if initialization failed. */ - virtual void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0) = 0; + void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0) + { + write(std::string(), source, timestamp); + } + virtual void write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp = 0) = 0; /// Batch assignment diff --git a/pjmsg_mcap/include/intrometry/pjmsg_mcap/sink.h b/pjmsg_mcap/include/intrometry/pjmsg_mcap/sink.h index 6548ae4..04ed867 100644 --- a/pjmsg_mcap/include/intrometry/pjmsg_mcap/sink.h +++ b/pjmsg_mcap/include/intrometry/pjmsg_mcap/sink.h @@ -37,7 +37,7 @@ namespace intrometry::pjmsg_mcap public: Parameters(const std::string &id = ""); // NOLINT - Parameters(const char *id = ""); // NOLINT + Parameters(const char *id = ""); // NOLINT Parameters &rate(const std::size_t value); Parameters &id(const std::string &value); @@ -55,11 +55,17 @@ namespace intrometry::pjmsg_mcap { public: using SinkPIMPLBase::SinkPIMPLBase; + using SinkPIMPLBase::assign; + using SinkPIMPLBase::write; + using SinkPIMPLBase::retract; ~Sink(); bool initialize(); - void assign(const ariles2::DefaultBase &source, const Source::Parameters ¶meters = Source::Parameters()); - void retract(const ariles2::DefaultBase &source); - void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0); + void assign( + const std::string &id, + const ariles2::DefaultBase &source, + const Source::Parameters ¶meters = Source::Parameters()); + void retract(const std::string &id, const ariles2::DefaultBase &source); + void write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp = 0); }; } // namespace intrometry::pjmsg_mcap diff --git a/pjmsg_mcap/src/intrometry.cpp b/pjmsg_mcap/src/intrometry.cpp index dd03375..9540bb2 100644 --- a/pjmsg_mcap/src/intrometry.cpp +++ b/pjmsg_mcap/src/intrometry.cpp @@ -206,6 +206,7 @@ namespace { public: std::mutex mutex_; + const std::string id_; ariles2::namevalue2::Writer::Parameters writer_parameters_; std::shared_ptr data_; ariles2::namevalue2::Writer writer_; @@ -213,8 +214,12 @@ namespace bool serialized_; public: - WriterWrapper(const ariles2::DefaultBase &source, const bool persistent_structure, uint32_t &names_version) - : data_(std::make_shared()), writer_(data_) + WriterWrapper( + const ariles2::DefaultBase &source, + std::string id, + const bool persistent_structure, + uint32_t &names_version) + : id_(std::move(id)), data_(std::make_shared()), writer_(data_) { writer_parameters_ = writer_.getDefaultParameters(); if (persistent_structure) @@ -223,7 +228,7 @@ namespace } // write to allocate memory - ariles2::apply(writer_, source); + ariles2::apply(writer_, source, id_); data_->finalize(writer_parameters_.persistent_structure_, 0, names_version); serialized_ = true; // do not serialize on assignment @@ -252,7 +257,7 @@ namespace { if (mutex_.try_lock()) { - ariles2::apply(writer_, source); + ariles2::apply(writer_, source, id_); data_->finalize(writer_parameters_.persistent_structure_, timestamp, names_version); serialized_ = false; @@ -302,12 +307,13 @@ namespace intrometry::pjmsg_mcap::sink class Implementation { protected: - tut::thread::Supervisor<> thread_supervisor_; + McapCDRWriter mcap_writer_; + + public: uint32_t names_version_; intrometry::backend::SourceContainer sources_; - - McapCDRWriter mcap_writer_; + tut::thread::Supervisor<> thread_supervisor_; public: Implementation(const std::filesystem::path &directory, const std::string &sink_id, const std::size_t rate) @@ -369,31 +375,6 @@ namespace intrometry::pjmsg_mcap::sink } thread_supervisor_.interrupt(); } - - - void assign(const ariles2::DefaultBase &source, const Source::Parameters ¶meters) - { - sources_.tryEmplace(source, parameters.persistent_structure_, names_version_); - } - - void retract(const ariles2::DefaultBase &source) - { - sources_.erase(source); - } - - void write(const ariles2::DefaultBase &source, const uint64_t timestamp) - { - if (not sources_.tryVisit( - source, - [this, &source, timestamp](WriterWrapper &writer) { - writer.write( - source, (0 == timestamp) ? intrometry::backend::now() : timestamp, names_version_); - })) - { - thread_supervisor_.log( - "Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID()); - } - } }; } // namespace intrometry::pjmsg_mcap::sink @@ -414,29 +395,41 @@ namespace intrometry::pjmsg_mcap } - void Sink::assign(const ariles2::DefaultBase &source, const Source::Parameters ¶meters) + void Sink::assign(const std::string &id, const ariles2::DefaultBase &source, const Source::Parameters ¶meters) { if (pimpl_) { - pimpl_->assign(source, parameters); + pimpl_->sources_.tryEmplace(id, source, parameters.persistent_structure_, pimpl_->names_version_); } } - void Sink::retract(const ariles2::DefaultBase &source) + void Sink::retract(const std::string &id, const ariles2::DefaultBase &source) { if (pimpl_) { - pimpl_->retract(source); + pimpl_->sources_.erase(id, source); } } - void Sink::write(const ariles2::DefaultBase &source, const uint64_t timestamp) + void Sink::write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp) { if (pimpl_) { - pimpl_->write(source, timestamp); + if (not pimpl_->sources_.tryVisit( + id, + source, + [this, &source, ×tamp](WriterWrapper &writer) { + writer.write( + source, + (0 == timestamp) ? intrometry::backend::now() : timestamp, + pimpl_->names_version_); + })) + { + pimpl_->thread_supervisor_.log( + "Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID()); + } } } } // namespace intrometry::pjmsg_mcap diff --git a/pjmsg_topic/include/intrometry/pjmsg_topic/sink.h b/pjmsg_topic/include/intrometry/pjmsg_topic/sink.h index f5f0e9c..a67cedd 100644 --- a/pjmsg_topic/include/intrometry/pjmsg_topic/sink.h +++ b/pjmsg_topic/include/intrometry/pjmsg_topic/sink.h @@ -30,7 +30,7 @@ namespace intrometry::pjmsg_topic public: Parameters(const std::string &id = ""); // NOLINT - Parameters(const char *id = ""); // NOLINT + Parameters(const char *id = ""); // NOLINT Parameters &rate(const std::size_t value); Parameters &id(const std::string &value); @@ -47,11 +47,17 @@ namespace intrometry::pjmsg_topic { public: using SinkPIMPLBase::SinkPIMPLBase; + using SinkPIMPLBase::assign; + using SinkPIMPLBase::write; + using SinkPIMPLBase::retract; ~Sink(); bool initialize(); - void assign(const ariles2::DefaultBase &source, const Source::Parameters ¶meters = Source::Parameters()); - void retract(const ariles2::DefaultBase &source); - void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0); + void assign( + const std::string &id, + const ariles2::DefaultBase &source, + const Source::Parameters ¶meters = Source::Parameters()); + void retract(const std::string &id, const ariles2::DefaultBase &source); + void write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp = 0); }; } // namespace intrometry::pjmsg_topic diff --git a/pjmsg_topic/src/intrometry.cpp b/pjmsg_topic/src/intrometry.cpp index 1e1aba2..81be45e 100644 --- a/pjmsg_topic/src/intrometry.cpp +++ b/pjmsg_topic/src/intrometry.cpp @@ -93,6 +93,7 @@ namespace { public: std::mutex mutex_; + const std::string id_; ariles2::namevalue2::Writer::Parameters writer_parameters_; std::shared_ptr data_; ariles2::namevalue2::Writer writer_; @@ -101,8 +102,12 @@ namespace public: - WriterWrapper(const ariles2::DefaultBase &source, const bool persistent_structure, uint32_t &names_version) - : data_(std::make_shared()), writer_(data_) + WriterWrapper( + const ariles2::DefaultBase &source, + std::string id, + const bool persistent_structure, + uint32_t &names_version) + : id_(std::move(id)), data_(std::make_shared()), writer_(data_) { writer_parameters_ = writer_.getDefaultParameters(); if (persistent_structure) @@ -111,7 +116,7 @@ namespace } // write to allocate memory - ariles2::apply(writer_, source); + ariles2::apply(writer_, source, id_); data_->finalize(writer_parameters_.persistent_structure_, rclcpp::Time(0), names_version); published_ = true; // do not publish on assignment @@ -141,7 +146,7 @@ namespace { if (mutex_.try_lock()) { - ariles2::apply(writer_, source); + ariles2::apply(writer_, source, id_); data_->finalize(writer_parameters_.persistent_structure_, timestamp, names_version); published_ = false; @@ -216,16 +221,19 @@ namespace intrometry::pjmsg_topic { class Implementation { - protected: + public: std::shared_ptr node_; - tut::thread::Supervisor thread_supervisor_; - uint32_t names_version_; - - intrometry::backend::SourceContainer sources_; + protected: NamesPublisherPtr names_publisher_; ValuesPublisherPtr values_publisher_; + public: + uint32_t names_version_; + + intrometry::backend::SourceContainer sources_; + tut::thread::Supervisor thread_supervisor_; + public: Implementation(const std::string &sink_id, const std::size_t rate) { @@ -293,33 +301,6 @@ namespace intrometry::pjmsg_topic } thread_supervisor_.interrupt(); } - - - void assign(const ariles2::DefaultBase &source, const Source::Parameters ¶meters) - { - sources_.tryEmplace(source, parameters.persistent_structure_, names_version_); - } - - void retract(const ariles2::DefaultBase &source) - { - sources_.erase(source); - } - - void write(const ariles2::DefaultBase &source, const rclcpp::Time ×tamp) - { - if (not sources_.tryVisit( - source, - [this, &source, ×tamp](WriterWrapper &writer) { - writer.write( - source, - (0 == timestamp.nanoseconds()) ? node_->now() : timestamp, - names_version_); - })) - { - thread_supervisor_.log( - "Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID()); - } - } }; } // namespace sink } // namespace intrometry::pjmsg_topic @@ -342,29 +323,43 @@ namespace intrometry::pjmsg_topic } - void Sink::assign(const ariles2::DefaultBase &source, const Source::Parameters ¶meters) + void Sink::assign(const std::string &id, const ariles2::DefaultBase &source, const Source::Parameters ¶meters) { if (pimpl_) { - pimpl_->assign(source, parameters); + pimpl_->sources_.tryEmplace(id, source, parameters.persistent_structure_, pimpl_->names_version_); } } - void Sink::retract(const ariles2::DefaultBase &source) + void Sink::retract(const std::string &id, const ariles2::DefaultBase &source) { if (pimpl_) { - pimpl_->retract(source); + pimpl_->sources_.erase(id, source); } } - void Sink::write(const ariles2::DefaultBase &source, const uint64_t timestamp) + void Sink::write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp) { if (pimpl_) { - pimpl_->write(source, rclcpp::Time(static_cast(timestamp))); + if (not pimpl_->sources_.tryVisit( + id, + source, + [this, &source, ×tamp](WriterWrapper &writer) + { + writer.write( + source, + (0 == timestamp) ? pimpl_->node_->now() : + rclcpp::Time(static_cast(timestamp)), + pimpl_->names_version_); + })) + { + pimpl_->thread_supervisor_.log( + "Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID()); + } } } } // namespace intrometry::pjmsg_topic