Skip to content

Commit

Permalink
Refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
asherikov committed Jan 10, 2025
1 parent 9498be2 commit cdf8404
Show file tree
Hide file tree
Showing 13 changed files with 261 additions and 188 deletions.
3 changes: 3 additions & 0 deletions .utils/qa/scspell.dict
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ cppcheck
defgroup
eprosima
fastcdr
gmtime
google
gtest
https
ingroup
intrometry
intrometryfixture
iomanip
killall
mcap
msgs
Expand All @@ -33,6 +35,7 @@ rclcpp
rdparty
rosout
sherikov
sstream
stringstream
strstream
yaml
Expand Down
16 changes: 12 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,20 @@ does NOT depend on any ROS components. The resulting files can also be viewed
by `PlotJuggler`.


Using backends with cmake
-------------------------
Using library
-------------

### cmake

```
find_package(intrometry_<BACKEND> REQUIRED)
target_link_libraries(my_library intrometry::<BACKEND>)
```

### C++

```
find_package(intrometry_pjmsg_mcap REQUIRED)
target_link_libraries(my_library intrometry::pjmsg_mcap)
#include <intrometry/<BACKEND>/<BACKEND>.h>
```


Expand Down
1 change: 1 addition & 0 deletions frontend/include/intrometry/backend/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace intrometry::backend

std::string getRandomId(const std::size_t length);
std::string normalizeId(const std::string &input_id);
std::string getDateString();


class RateTimer
Expand Down
18 changes: 17 additions & 1 deletion frontend/src/backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <algorithm>
#include <ratio>
#include <thread>
#include <sstream>
#include <iomanip>

#include <intrometry/backend/utils.h>

Expand All @@ -20,7 +22,7 @@ namespace
namespace intrometry_private::backend
{
const std::string valid_chars = "0123456789abcdefghijklmnopqrstuvwxyz";
} // namespace intrometry_private::backend
} // namespace intrometry_private::backend
} // namespace


Expand Down Expand Up @@ -94,6 +96,20 @@ namespace intrometry::backend
}


std::string getDateString()
{
const std::time_t date_now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::stringstream date_stream;
// thread-unsafe
date_stream << std::put_time(std::gmtime(&date_now), "%Y%m%d_%H%M%S"); // NOLINT

return (date_stream.str());
}
} // namespace intrometry::backend


namespace intrometry::backend
{
class RateTimer::Implementation
{
public:
Expand Down
11 changes: 11 additions & 0 deletions pjmsg_mcap/include/intrometry/pjmsg_mcap/pjmsg_mcap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
@file
@author Alexander Sherikov
@copyright 2025 Alexander Sherikov. Licensed under the Apache License,
Version 2.0. (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
*/

#pragma once

#include <intrometry/intrometry.h>
#include "sink.h"
10 changes: 9 additions & 1 deletion pjmsg_mcap/include/intrometry/pjmsg_mcap/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

#pragma once

#include "intrometry/sink.h"
#include <filesystem>

#include <intrometry/sink.h>


namespace intrometry::pjmsg_mcap
Expand All @@ -27,12 +29,18 @@ namespace intrometry::pjmsg_mcap
/// id of the sink, disables publishing if empty
std::string id_;


/// output directory
std::filesystem::path directory_;


public:
Parameters(const std::string &id = ""); // NOLINT
Parameters(const char *id = ""); // NOLINT

Parameters &rate(const std::size_t value);
Parameters &id(const std::string &value);
Parameters &directory(const std::filesystem::path &value);
};

class Implementation;
Expand Down
176 changes: 93 additions & 83 deletions pjmsg_mcap/src/intrometry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
#include "intrometry/backend/utils.h"
#include "intrometry/pjmsg_mcap/sink.h"

#include "schema_names.h"
#include "schema_values.h"
#include "messages.h"


namespace
Expand Down Expand Up @@ -95,111 +94,109 @@ namespace
{
class McapCDRWriter
{
public:
mcap::Message mcap_message_names_;
mcap::Message mcap_message_values_;

std::vector<std::byte> buffer_;
eprosima::fastcdr::CdrSizeCalculator cdr_size_calculator_;
mcap::McapWriter writer_;

public:
McapCDRWriter() : cdr_size_calculator_(eprosima::fastcdr::CdrVersion::XCDRv1)
protected:
template <class t_Message>
class Channel
{
}
protected:
mcap::Message message_;
eprosima::fastcdr::CdrSizeCalculator cdr_size_calculator_;

~McapCDRWriter()
{
writer_.close();
}
protected:
uint32_t getSize(const t_Message &message)
{
size_t current_alignment{ 0 };
return (cdr_size_calculator_.calculate_serialized_size(message, current_alignment)
+ 4u /*encapsulation*/);
}

void initialize(const std::string &filename, const std::string &topic_prefix) // NOLINT
{
public:
Channel() : cdr_size_calculator_(eprosima::fastcdr::CdrVersion::XCDRv1)
{
const mcap::McapWriterOptions options = mcap::McapWriterOptions("ros2msg");
const mcap::Status res = writer_.open(filename, options);
if (not res.ok())
{
throw std::runtime_error(intrometry::backend::str_concat(
"Failed to open ", filename, " for writing: ", res.message));
}
}

void initialize(mcap::McapWriter &writer, const std::string_view &msg_topic)
{
mcap::Schema schema(
"plotjuggler_msgs/msg/StatisticsNames",
intrometry_private::pjmsg_mcap::Message<t_Message>::type,
"ros2msg",
intrometry_private::pjmsg_mcap::schema::names);
writer_.addSchema(schema);
intrometry_private::pjmsg_mcap::Message<t_Message>::schema);
writer.addSchema(schema);

mcap::Channel channel(intrometry::backend::str_concat(topic_prefix, "/names"), "ros2msg", schema.id);
writer_.addChannel(channel);
mcap::Channel channel(msg_topic, "ros2msg", schema.id);
writer.addChannel(channel);

mcap_message_names_.channelId = channel.id;
message_.channelId = channel.id;
}

void write(mcap::McapWriter &writer, std::vector<std::byte> &buffer, const t_Message &message)
{
mcap::Schema schema(
"plotjuggler_msgs/msg/StatisticsValues",
"ros2msg",
intrometry_private::pjmsg_mcap::schema::values);
writer_.addSchema(schema);

mcap::Channel channel(intrometry::backend::str_concat(topic_prefix, "/values"), "ros2msg", schema.id);
writer_.addChannel(channel);
buffer.resize(getSize(message));
message_.data = buffer.data();

mcap_message_values_.channelId = channel.id;
}
}
{
eprosima::fastcdr::FastBuffer cdr_buffer(
reinterpret_cast<char *>(buffer.data()), buffer.size()); // NOLINT
eprosima::fastcdr::Cdr ser(
cdr_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);
ser.set_encoding_flag(eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR);

template <class t_Message>
uint32_t getSize(const t_Message &message)
{
size_t current_alignment{ 0 };
return (cdr_size_calculator_.calculate_serialized_size(message, current_alignment) + 4u /*encapsulation*/);
}
ser.serialize_encapsulation();
ser << message;
ser.set_dds_cdr_options({ 0, 0 });

template <class t_Message>
void write(mcap::Message &mcap_message, const t_Message &message)
{
buffer_.resize(getSize(message));
mcap_message.data = buffer_.data();
message_.dataSize = ser.get_serialized_data_length();
}

{
eprosima::fastcdr::FastBuffer cdr_buffer(
reinterpret_cast<char *>(buffer_.data()), buffer_.size()); // NOLINT
eprosima::fastcdr::Cdr ser(
cdr_buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);
ser.set_encoding_flag(eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR);
message_.logTime = intrometry::backend::now();
message_.publishTime = message_.logTime;

ser.serialize_encapsulation();
ser << message;
ser.set_dds_cdr_options({ 0, 0 });

mcap_message.dataSize = ser.get_serialized_data_length();
const mcap::Status res = writer.write(message_);
if (not res.ok())
{
throw std::runtime_error(
intrometry::backend::str_concat("Failed to write a message: ", res.message));
}
}
};

mcap_message.logTime = intrometry::backend::now();
mcap_message.publishTime = mcap_message.logTime;
public:
std::tuple<Channel<plotjuggler_msgs::msg::StatisticsNames>, Channel<plotjuggler_msgs::msg::StatisticsValues>>
channels_;

std::vector<std::byte> buffer_;
mcap::McapWriter writer_;

const mcap::Status res = writer_.write(mcap_message);
if (not res.ok())
{
throw std::runtime_error(intrometry::backend::str_concat("Failed to write a message: ", res.message));
}
public:
~McapCDRWriter()
{
writer_.close();
}

template <class t_Message>
void writeValues(const t_Message &message)
void initialize(const std::filesystem::path &filename, const std::string &topic_prefix)
{
write(mcap_message_values_, message);
{
const mcap::McapWriterOptions options = mcap::McapWriterOptions("ros2msg");
const mcap::Status res = writer_.open(filename.native(), options);
if (not res.ok())
{
throw std::runtime_error(intrometry::backend::str_concat(
"Failed to open ", filename.native(), " for writing: ", res.message));
}
}

std::get<Channel<plotjuggler_msgs::msg::StatisticsNames>>(channels_).initialize(
writer_, intrometry::backend::str_concat(topic_prefix, "/names"));

std::get<Channel<plotjuggler_msgs::msg::StatisticsValues>>(channels_).initialize(
writer_, intrometry::backend::str_concat(topic_prefix, "/values"));
}

template <class t_Message>
void writeNames(const t_Message &message)
void write(const t_Message &message)
{
write(mcap_message_names_, message);
std::get<Channel<t_Message>>(channels_).write(writer_, buffer_, message);
}
};
} // namespace
Expand Down Expand Up @@ -241,11 +238,11 @@ namespace
{
if (data_->new_names_version_)
{
mcap_writer.writeNames(data_->names_);
mcap_writer.write(data_->names_);
data_->new_names_version_ = false;
}

mcap_writer.writeValues(data_->values_);
mcap_writer.write(data_->values_);
serialized_ = true;
}
mutex_.unlock();
Expand Down Expand Up @@ -292,6 +289,12 @@ namespace intrometry::pjmsg_mcap::sink
id_ = value;
return (*this);
}

Parameters &Parameters::directory(const std::filesystem::path &value)
{
directory_ = value;
return (*this);
}
} // namespace intrometry::pjmsg_mcap::sink

namespace intrometry::pjmsg_mcap::sink
Expand All @@ -314,15 +317,22 @@ namespace intrometry::pjmsg_mcap::sink
McapCDRWriter mcap_writer_;

public:
Implementation(const std::string &sink_id, const std::size_t rate)
Implementation(const std::filesystem::path &directory, const std::string &sink_id, const std::size_t rate)
{
const std::string node_id = intrometry::backend::normalizeId(sink_id);
const std::string random_id = intrometry::backend::getRandomId(8);
const std::string id =
node_id.empty() ? random_id : intrometry::backend::str_concat(node_id, "_", random_id);
const std::string topic_prefix =
intrometry::backend::str_concat("/intrometry/", node_id.empty() ? random_id : node_id);
const std::string filename = intrometry::backend::str_concat(id, ".mcap");

std::filesystem::create_directories(directory);
const std::filesystem::path filename = directory
/ intrometry::backend::str_concat(
node_id,
node_id.empty() ? "" : "_",
random_id,
"_",
intrometry::backend::getDateString(),
".mcap");


mcap_writer_.initialize(filename, topic_prefix);
Expand Down Expand Up @@ -431,7 +441,7 @@ namespace intrometry::pjmsg_mcap
{
return (false);
}
make_pimpl(parameters_.id_, parameters_.rate_);
make_pimpl(parameters_.directory_, parameters_.id_, parameters_.rate_);
return (true);
}

Expand Down
Loading

0 comments on commit cdf8404

Please sign in to comment.