Skip to content

Commit

Permalink
backend/SourceContainer: improve source id handling
Browse files Browse the repository at this point in the history
  • Loading branch information
asherikov committed Jan 17, 2025
1 parent 7006a34 commit 82ef8bb
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 110 deletions.
2 changes: 2 additions & 0 deletions .utils/qa/scspell.dict
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ sherikov
sstream
stringstream
strstream
todo
typeid
typeindex
yaml
zstd

76 changes: 56 additions & 20 deletions frontend/include/intrometry/backend/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <unordered_map>
#include <functional>
#include <typeinfo>
#include <typeindex>

#include <ariles2/ariles.h>

Expand Down Expand Up @@ -60,67 +61,102 @@ namespace intrometry::backend
class INTROMETRY_HIDDEN SourceContainer
{
protected:
// passthrough hasher
using Key = std::pair<std::type_index, std::string>;

struct Hasher
{
std::size_t operator()(const std::size_t key) const
std::size_t operator()(const Key &key) const
{
return key;
std::size_t result = std::hash<std::type_index>{}(key.first);
if (not key.second.empty())
{
result ^= std::hash<std::string>{}(key.second) + 0x9e3779b9 + (result << 6) + (result >> 2);
}
return (result);
}
};

using SourceMap = std::unordered_map<std::size_t, t_Value, Hasher>;
using SourceMap = std::unordered_map<Key, t_Value, Hasher>;
using CollisionMap = std::unordered_map<std::string, std::size_t>;

protected:
SourceMap sources_;
CollisionMap collision_counters_;

std::mutex update_mutex_;
std::mutex drain_mutex_;
std::mutex flush_mutex_;

protected:
static std::size_t hash(const ariles2::DefaultBase &source)
/// @todo requires string copy
static Key getKey(const std::string &id, const ariles2::DefaultBase &source)
{
// redundant
// const std::size_t ariles_hash = std::hash<std::string>{}(source.arilesDefaultID());
return (typeid(source).hash_code());
// source.arilesDefaultID()
// this is redundant and less reliable than type_index
// since it is provided by the user

// id
// this is also provided by the user and is also unreliable,
// but in general provides extra information than type_index
// and should be added to hash
return (Key(std::type_index(typeid(source)), id));
}

std::string getUniqueId(const std::string &id)
{
const typename CollisionMap::iterator collision_counter_it = collision_counters_.find(id);
if (collision_counters_.end() == collision_counter_it)
{
collision_counters_[id] = 0;
return (id);
}

++collision_counter_it->second;
return (str_concat(id, "_intrometry", std::to_string(collision_counter_it->second)));
}

public:
void tryVisit(const std::function<void(t_Value &)> visitor)
{
if (drain_mutex_.try_lock())
if (flush_mutex_.try_lock())
{
for (std::pair<const std::size_t, t_Value> &source : sources_)
for (std::pair<const Key, t_Value> &source : sources_)
{
visitor(source.second);
}

drain_mutex_.unlock();
flush_mutex_.unlock();
}
}

template <class... t_Args>
void tryEmplace(const ariles2::DefaultBase &source, t_Args &&...args)
void tryEmplace(const std::string &id, const ariles2::DefaultBase &source, t_Args &&...args)
{
const std::lock_guard<std::mutex> update_lock(update_mutex_);
const std::lock_guard<std::mutex> drain_lock(drain_mutex_);
const std::lock_guard<std::mutex> flush_lock(flush_mutex_);

sources_.try_emplace(hash(source), source, std::forward<t_Args>(args)...);
sources_.try_emplace(
getKey(id, source),
source,
getUniqueId(id.empty() ? source.arilesDefaultID() : id),
std::forward<t_Args>(args)...);
}

void erase(const ariles2::DefaultBase &source)
void erase(const std::string &id, const ariles2::DefaultBase &source)
{
const std::lock_guard<std::mutex> update_lock(update_mutex_);
const std::lock_guard<std::mutex> drain_lock(drain_mutex_);
const std::lock_guard<std::mutex> flush_lock(flush_mutex_);

sources_.erase(hash(source));
sources_.erase(getKey(id, source));
}

bool tryVisit(const ariles2::DefaultBase &source, const std::function<void(t_Value &)> visitor)
bool tryVisit(
const std::string &id,
const ariles2::DefaultBase &source,
const std::function<void(t_Value &)> visitor)
{
if (update_mutex_.try_lock())
{
const typename SourceMap::iterator source_it = sources_.find(hash(source));
const typename SourceMap::iterator source_it = sources_.find(getKey(id, source));

if (sources_.end() == source_it)
{
Expand Down
17 changes: 15 additions & 2 deletions frontend/include/intrometry/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ namespace intrometry
*
* @note Does nothing if initialization failed.
*/
void assign(const ariles2::DefaultBase &source, const Source::Parameters &parameters = Source::Parameters())
{
assign(std::string(), source, parameters);
}
virtual void assign(
const std::string &id, // use to resolve ambiguity between sources of the same type
const ariles2::DefaultBase &source,
const Source::Parameters &parameters = Source::Parameters()) = 0;

Expand All @@ -52,7 +57,11 @@ namespace intrometry
* @note Does nothing if initialization failed.
* @note Does nothing if the source has not been assigned.
*/
virtual void retract(const ariles2::DefaultBase &source) = 0;
void retract(const ariles2::DefaultBase &source)
{
retract(std::string(), source);
}
virtual void retract(const std::string &id, const ariles2::DefaultBase &source) = 0;

/**
* Write data. Data is copied to internal buffers, and scheduled for
Expand All @@ -61,7 +70,11 @@ namespace intrometry
* @note Does nothing if source is unassigned.
* @note Does nothing if initialization failed.
*/
virtual void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0) = 0;
void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0)
{
write(std::string(), source, timestamp);
}
virtual void write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp = 0) = 0;


/// Batch assignment
Expand Down
14 changes: 10 additions & 4 deletions pjmsg_mcap/include/intrometry/pjmsg_mcap/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace intrometry::pjmsg_mcap

public:
Parameters(const std::string &id = ""); // NOLINT
Parameters(const char *id = ""); // NOLINT
Parameters(const char *id = ""); // NOLINT

Parameters &rate(const std::size_t value);
Parameters &id(const std::string &value);
Expand All @@ -55,11 +55,17 @@ namespace intrometry::pjmsg_mcap
{
public:
using SinkPIMPLBase::SinkPIMPLBase;
using SinkPIMPLBase::assign;
using SinkPIMPLBase::write;
using SinkPIMPLBase::retract;
~Sink();

bool initialize();
void assign(const ariles2::DefaultBase &source, const Source::Parameters &parameters = Source::Parameters());
void retract(const ariles2::DefaultBase &source);
void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0);
void assign(
const std::string &id,
const ariles2::DefaultBase &source,
const Source::Parameters &parameters = Source::Parameters());
void retract(const std::string &id, const ariles2::DefaultBase &source);
void write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp = 0);
};
} // namespace intrometry::pjmsg_mcap
69 changes: 31 additions & 38 deletions pjmsg_mcap/src/intrometry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,20 @@ namespace
{
public:
std::mutex mutex_;
const std::string id_;
ariles2::namevalue2::Writer::Parameters writer_parameters_;
std::shared_ptr<NameValueContainer> data_;
ariles2::namevalue2::Writer writer_;

bool serialized_;

public:
WriterWrapper(const ariles2::DefaultBase &source, const bool persistent_structure, uint32_t &names_version)
: data_(std::make_shared<NameValueContainer>()), writer_(data_)
WriterWrapper(
const ariles2::DefaultBase &source,
std::string id,
const bool persistent_structure,
uint32_t &names_version)
: id_(std::move(id)), data_(std::make_shared<NameValueContainer>()), writer_(data_)
{
writer_parameters_ = writer_.getDefaultParameters();
if (persistent_structure)
Expand All @@ -223,7 +228,7 @@ namespace
}

// write to allocate memory
ariles2::apply(writer_, source);
ariles2::apply(writer_, source, id_);
data_->finalize(writer_parameters_.persistent_structure_, 0, names_version);

serialized_ = true; // do not serialize on assignment
Expand Down Expand Up @@ -252,7 +257,7 @@ namespace
{
if (mutex_.try_lock())
{
ariles2::apply(writer_, source);
ariles2::apply(writer_, source, id_);
data_->finalize(writer_parameters_.persistent_structure_, timestamp, names_version);
serialized_ = false;

Expand Down Expand Up @@ -302,12 +307,13 @@ namespace intrometry::pjmsg_mcap::sink
class Implementation
{
protected:
tut::thread::Supervisor<> thread_supervisor_;
McapCDRWriter mcap_writer_;

public:
uint32_t names_version_;

intrometry::backend::SourceContainer<WriterWrapper> sources_;

McapCDRWriter mcap_writer_;
tut::thread::Supervisor<> thread_supervisor_;

public:
Implementation(const std::filesystem::path &directory, const std::string &sink_id, const std::size_t rate)
Expand Down Expand Up @@ -369,31 +375,6 @@ namespace intrometry::pjmsg_mcap::sink
}
thread_supervisor_.interrupt();
}


void assign(const ariles2::DefaultBase &source, const Source::Parameters &parameters)
{
sources_.tryEmplace(source, parameters.persistent_structure_, names_version_);
}

void retract(const ariles2::DefaultBase &source)
{
sources_.erase(source);
}

void write(const ariles2::DefaultBase &source, const uint64_t timestamp)
{
if (not sources_.tryVisit(
source,
[this, &source, timestamp](WriterWrapper &writer) {
writer.write(
source, (0 == timestamp) ? intrometry::backend::now() : timestamp, names_version_);
}))
{
thread_supervisor_.log(
"Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID());
}
}
};
} // namespace intrometry::pjmsg_mcap::sink

Expand All @@ -414,29 +395,41 @@ namespace intrometry::pjmsg_mcap
}


void Sink::assign(const ariles2::DefaultBase &source, const Source::Parameters &parameters)
void Sink::assign(const std::string &id, const ariles2::DefaultBase &source, const Source::Parameters &parameters)
{
if (pimpl_)
{
pimpl_->assign(source, parameters);
pimpl_->sources_.tryEmplace(id, source, parameters.persistent_structure_, pimpl_->names_version_);
}
}


void Sink::retract(const ariles2::DefaultBase &source)
void Sink::retract(const std::string &id, const ariles2::DefaultBase &source)
{
if (pimpl_)
{
pimpl_->retract(source);
pimpl_->sources_.erase(id, source);
}
}


void Sink::write(const ariles2::DefaultBase &source, const uint64_t timestamp)
void Sink::write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp)
{
if (pimpl_)
{
pimpl_->write(source, timestamp);
if (not pimpl_->sources_.tryVisit(
id,
source,
[this, &source, &timestamp](WriterWrapper &writer) {
writer.write(
source,
(0 == timestamp) ? intrometry::backend::now() : timestamp,
pimpl_->names_version_);
}))
{
pimpl_->thread_supervisor_.log(
"Measurement source handler is not assigned, skipping id: ", source.arilesDefaultID());
}
}
}
} // namespace intrometry::pjmsg_mcap
14 changes: 10 additions & 4 deletions pjmsg_topic/include/intrometry/pjmsg_topic/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace intrometry::pjmsg_topic

public:
Parameters(const std::string &id = ""); // NOLINT
Parameters(const char *id = ""); // NOLINT
Parameters(const char *id = ""); // NOLINT

Parameters &rate(const std::size_t value);
Parameters &id(const std::string &value);
Expand All @@ -47,11 +47,17 @@ namespace intrometry::pjmsg_topic
{
public:
using SinkPIMPLBase::SinkPIMPLBase;
using SinkPIMPLBase::assign;
using SinkPIMPLBase::write;
using SinkPIMPLBase::retract;
~Sink();

bool initialize();
void assign(const ariles2::DefaultBase &source, const Source::Parameters &parameters = Source::Parameters());
void retract(const ariles2::DefaultBase &source);
void write(const ariles2::DefaultBase &source, const uint64_t timestamp = 0);
void assign(
const std::string &id,
const ariles2::DefaultBase &source,
const Source::Parameters &parameters = Source::Parameters());
void retract(const std::string &id, const ariles2::DefaultBase &source);
void write(const std::string &id, const ariles2::DefaultBase &source, const uint64_t timestamp = 0);
};
} // namespace intrometry::pjmsg_topic
Loading

0 comments on commit 82ef8bb

Please sign in to comment.