Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[humble] Handle unsupported serialization formats in rosbag2's. #1851

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};

bag_events::EventCallbackManager callback_manager_;

std::string storage_serialization_format;
};

} // namespace readers
Expand Down
139 changes: 98 additions & 41 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rosbag2_cpp/readers/sequential_reader.hpp"

#include <memory>
#include <stdexcept>
#include <string>
Expand All @@ -20,10 +22,7 @@

#include "rcpputils/asserts.hpp"
#include "rcpputils/filesystem_helper.hpp"

#include "rosbag2_cpp/logging.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"


namespace rosbag2_cpp
{
Expand All @@ -40,8 +39,7 @@ std::vector<std::string> resolve_relative_paths(
base_path = rcpputils::fs::path(base_folder).parent_path();
}

rcpputils::require_true(
base_path.exists(), "base folder does not exist: " + base_folder);
rcpputils::require_true(base_path.exists(), "base folder does not exist: " + base_folder);
rcpputils::require_true(
base_path.is_directory(), "base folder has to be a directory: " + base_folder);

Expand All @@ -65,13 +63,11 @@ SequentialReader::SequentialReader(
converter_(nullptr),
metadata_io_(std::move(metadata_io)),
converter_factory_(std::move(converter_factory))
{}

SequentialReader::~SequentialReader()
{
close();
}

SequentialReader::~SequentialReader() {close();}

void SequentialReader::close()
{
if (storage_) {
Expand Down Expand Up @@ -119,13 +115,20 @@ void SequentialReader::open(
ROSBAG2_CPP_LOG_WARN("No topics were listed in metadata.");
return;
}
fill_topics_metadata();

// Currently a bag file can only be played if all topics have the same serialization format.
// Set the current storage serialization format to the output serialization format
storage_serialization_format = converter_options.output_serialization_format;

check_topics_serialization_formats(topics);
check_converter_serialization_format(
converter_options.output_serialization_format,
topics[0].topic_metadata.serialization_format);
converter_options.output_serialization_format, storage_serialization_format);

// Fill topics metadata. The storage serialization format is known by this point,
// so only supported topics will be added
fill_topics_metadata();

// Set initial filter to read all supported topics.
set_filter({});
}

bool SequentialReader::has_next()
Expand Down Expand Up @@ -172,22 +175,46 @@ std::vector<rosbag2_storage::TopicMetadata> SequentialReader::get_all_topics_and
return topics_metadata_;
}

void SequentialReader::set_filter(
const rosbag2_storage::StorageFilter & storage_filter)
void SequentialReader::set_filter(const rosbag2_storage::StorageFilter & storage_filter)
{
topics_filter_ = storage_filter;
// Clear the current topics_filter
topics_filter_ = {};

// Create a new filter that is the intersection of the storage filter and the topics metadata.
if (storage_filter.topics.empty()) {
// Empty filter. Add all topics with a supported serialization format.
for (const auto & topic : topics_metadata_) {
topics_filter_.topics.push_back(topic.name);
}
} else {
// Non-empty filter. Add all requested topics with a supported serialization format.
for (const auto & topic : storage_filter.topics) {
auto it = std::find_if(
topics_metadata_.begin(), topics_metadata_.end(),
[&topic](const auto & topic_metadata) {return topic_metadata.name == topic;});
if (it != topics_metadata_.end()) {
topics_filter_.topics.push_back(topic);
} else {
ROSBAG2_CPP_LOG_WARN(
"Requested topic %s not found or has unsupported serialization format.", topic.c_str());
}
}

// Edge case: we cannot find any supported topic.
// To avoid reading all messages, throw an error.
if (topics_filter_.topics.empty()) {
throw std::runtime_error("No topics found that match the filter.");
}
}

if (storage_) {
storage_->set_filter(topics_filter_);
return;
}
throw std::runtime_error(
"Bag is not open. Call open() before setting filter.");
throw std::runtime_error("Bag is not open. Call open() before setting filter.");
}

void SequentialReader::reset_filter()
{
set_filter(rosbag2_storage::StorageFilter());
}
void SequentialReader::reset_filter() {set_filter(rosbag2_storage::StorageFilter());}

void SequentialReader::seek(const rcutils_time_point_value_t & timestamp)
{
Expand All @@ -198,8 +225,7 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp)
load_current_file();
return;
}
throw std::runtime_error(
"Bag is not open. Call open() before seeking time.");
throw std::runtime_error("Bag is not open. Call open() before seeking time.");
}

bool SequentialReader::has_next_file() const
Expand All @@ -223,7 +249,6 @@ void SequentialReader::load_current_file()
}
// set filters
storage_->seek(seek_time_);
set_filter(topics_filter_);
}

void SequentialReader::load_next_file()
Expand All @@ -237,10 +262,7 @@ void SequentialReader::load_next_file()
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
}

std::string SequentialReader::get_current_file() const
{
return *current_file_iterator_;
}
std::string SequentialReader::get_current_file() const {return *current_file_iterator_;}

std::string SequentialReader::get_current_uri() const
{
Expand All @@ -252,27 +274,53 @@ std::string SequentialReader::get_current_uri() const
void SequentialReader::check_topics_serialization_formats(
const std::vector<rosbag2_storage::TopicInformation> & topics)
{
auto storage_serialization_format = topics[0].topic_metadata.serialization_format;
// Check if we have any messages stored in the output serialization format.
// If that's the case, we don't need to check all converter plugins.
bool need_topic_conversion = true;
for (const auto & topic : topics) {
if (topic.topic_metadata.serialization_format != storage_serialization_format) {
throw std::runtime_error(
"Topics with different rwm serialization format have been found. "
"All topics must have the same serialization format.");
if (topic.topic_metadata.serialization_format == storage_serialization_format) {
need_topic_conversion = false;
break;
}
}

// If we haven't found any messages in the output serialization format,
// lets see if we have any serialization format we can convert from.
if (need_topic_conversion) {
bool found_topic_to_convert = false;
// Clarify that we haven't found any serialization format to convert from.
storage_serialization_format = "";
for (const auto & topic : topics) {
if (
converter_factory_->load_deserializer(topic.topic_metadata.serialization_format) !=
nullptr)
{
storage_serialization_format = topic.topic_metadata.serialization_format;
found_topic_to_convert = true;
break;
}
}

// If we cannot convert, fail.
if (!found_topic_to_convert) {
throw std::runtime_error("No topics with a known serialization format have been found. ");
}
}

// User is warned of non-supported topics in set_filter
}

void SequentialReader::check_converter_serialization_format(
const std::string & converter_serialization_format,
const std::string & storage_serialization_format)
{
if (converter_serialization_format.empty()) {return;}
if (converter_serialization_format.empty()) {
return;
}

if (converter_serialization_format != storage_serialization_format) {
converter_ = std::make_unique<Converter>(
storage_serialization_format,
converter_serialization_format,
converter_factory_);
storage_serialization_format, converter_serialization_format, converter_factory_);
auto topics = storage_->get_all_topics_and_types();
for (const auto & topic_with_type : topics) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
Expand All @@ -283,19 +331,28 @@ void SequentialReader::check_converter_serialization_format(
void SequentialReader::fill_topics_metadata()
{
rcpputils::check_true(storage_ != nullptr, "Bag is not open. Call open() before reading.");

// Add only topics with the same serialization format as the storage serialization format
topics_metadata_.clear();
topics_metadata_.reserve(metadata_.topics_with_message_count.size());
for (const auto & topic_information : metadata_.topics_with_message_count) {
topics_metadata_.push_back(topic_information.topic_metadata);
if (topic_information.topic_metadata.serialization_format == storage_serialization_format) {
topics_metadata_.push_back(topic_information.topic_metadata);
} else {
ROSBAG2_CPP_LOG_WARN(
"Topic %s with serialization format %s doesn't match the storage format %s.",
topic_information.topic_metadata.name.c_str(),
topic_information.topic_metadata.serialization_format.c_str(),
storage_serialization_format.c_str());
}
}
}

void SequentialReader::add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks)
{
if (callbacks.read_split_callback) {
callback_manager_.add_event_callback(
callbacks.read_split_callback,
bag_events::BagEvent::READ_SPLIT);
callbacks.read_split_callback, bag_events::BagEvent::READ_SPLIT);
}
}

Expand Down
23 changes: 17 additions & 6 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ class SequentialReaderTest : public Test
topic_with_type.name = "topic";
topic_with_type.type = "test_msgs/BasicTypes";
topic_with_type.serialization_format = storage_serialization_format_;
auto topics_and_types = std::vector<rosbag2_storage::TopicMetadata>{topic_with_type};
rosbag2_storage::TopicMetadata topic2_with_type;
topic2_with_type.name = "topic2";
topic2_with_type.type = "test_msgs/BasicTypes";
topic2_with_type.serialization_format = storage_serialization_format_;
auto topics_and_types =
std::vector<rosbag2_storage::TopicMetadata>{topic_with_type, topic2_with_type};

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = topic_with_type.name;
Expand All @@ -72,6 +77,7 @@ class SequentialReaderTest : public Test
metadata_.relative_file_paths = {bag_file_1_path_.string(), bag_file_2_path_.string()};
metadata_.version = 4;
metadata_.topics_with_message_count.push_back({{topic_with_type}, 6});
metadata_.topics_with_message_count.push_back({{topic2_with_type}, 1});
metadata_.storage_identifier = "mock_storage";

EXPECT_CALL(*metadata_io, read_metadata(_)).WillRepeatedly(Return(metadata_));
Expand Down Expand Up @@ -128,13 +134,16 @@ class SequentialReaderTest : public Test
TEST_F(SequentialReaderTest, read_next_uses_converters_to_convert_serialization_format) {
std::string output_format = "rmw2_format";

auto format1_converter = std::make_unique<StrictMock<MockConverter>>();
auto format2_converter = std::make_unique<StrictMock<MockConverter>>();
EXPECT_CALL(*format1_converter, deserialize(_, _, _)).Times(1);
EXPECT_CALL(*format2_converter, serialize(_, _, _)).Times(1);

EXPECT_CALL(*converter_factory_, load_deserializer(storage_serialization_format_))
.WillOnce(Return(ByMove(std::move(format1_converter))));
.WillRepeatedly(
[](const std::string &) {
auto deserializer = std::make_unique<StrictMock<MockConverter>>();
EXPECT_CALL(*deserializer, deserialize(_, _, _)).Times(AtMost(1));
return deserializer;
});
EXPECT_CALL(*converter_factory_, load_serializer(output_format))
.WillOnce(Return(ByMove(std::move(format2_converter))));

Expand All @@ -145,9 +154,11 @@ TEST_F(SequentialReaderTest, read_next_uses_converters_to_convert_serialization_
TEST_F(SequentialReaderTest, open_throws_error_if_converter_plugin_does_not_exist) {
std::string output_format = "rmw2_format";

auto format1_converter = std::make_unique<StrictMock<MockConverter>>();
EXPECT_CALL(*converter_factory_, load_deserializer(storage_serialization_format_))
.WillOnce(Return(ByMove(std::move(format1_converter))));
.WillRepeatedly(
[](const std::string &) {
return std::make_unique<StrictMock<MockConverter>>();
});
EXPECT_CALL(*converter_factory_, load_serializer(output_format))
.WillOnce(Return(ByMove(nullptr)));

Expand Down
Loading