Skip to content

Commit

Permalink
Upstream quality changes from Apex.AI part 1 (#1903)
Browse files Browse the repository at this point in the history
* Make sure to start with clean subscriptions list in Recorder::record()

- It could be some residual subscriptions in list in case if previous
call to Recorder::record() failed for some reason and Recorder::stop()
wasn't called.

Signed-off-by: Michael Orlov <[email protected]>

* Fix clang16 warning that timestamp comparator is not initialized

Signed-off-by: Michael Orlov <[email protected]>

* Address Axivion warnings in rosbag2_py

1. MisraC++2023-8.1.1 [required]: _transport.cpp:286 This variable shall
 not be implicitly captured in a lambda expression.; Field: []
2. MisraC++2023-8.2.3 [required]: _transport.cpp:50
 Cast removes const qualification; Field: [const char*->char*]

Signed-off-by: Michael Orlov <[email protected]>

* Gracefully handle exceptions from on-play callbacks in player

Signed-off-by: Michael Orlov <[email protected]>

* Move wait_for_playback_to_start() to public section

- Rationale: Need to use in another package
- Added timeout parameter for the wait_for_playback_to_start(..) to make
it non-blocking call optionally and return boolean value.

Signed-off-by: Michael Orlov <[email protected]>

* Fix memory leaks in tests

- Use custom deleter when releasing rcl serialized buffer to a shared
pointer.

Signed-off-by: Michael Orlov <[email protected]>

* Don't keep trying to publish next message if we failed first time

- The Player::play_next() should return false right away if for some
reason failed to publish current message.

Signed-off-by: Michael Orlov <[email protected]>

* Wrap run_play_msg_pre_callbacks(message) in its own try-catch

- Also added `play_next_returns_false_if_pre_callback_throw_exception`
unit test.

Signed-off-by: Michael Orlov <[email protected]>

* Use absolute path instead of relative path in the InfoEndToEndTestFixture

Signed-off-by: Michael Orlov <[email protected]>

* Call discovery_future_.get() if it's ready in the `stop_discovery()`

- Also add missing `include <unordered_set>`

Signed-off-by: Michael Orlov <[email protected]>

* Move wait_for_playback_to_start(..) declaration and fixed wording

Signed-off-by: Michael Orlov <[email protected]>

---------

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
morlov-apexai authored Feb 6, 2025
1 parent 9351e6a commit 871a447
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 51 deletions.
8 changes: 4 additions & 4 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ class Arguments
std::for_each(
arguments_.begin(), arguments_.end(),
[this](const std::string & arg) {
pointers_.push_back(const_cast<char *>(arg.c_str()));
pointers_.push_back(arg.c_str());
}
);
pointers_.push_back(nullptr);
}

char ** argv()
const char ** argv()
{
return arguments_.empty() ? nullptr : pointers_.data();
}
Expand All @@ -65,7 +65,7 @@ class Arguments

private:
std::vector<std::string> arguments_;
std::vector<char *> pointers_;
std::vector<const char *> pointers_;
};

rclcpp::QoS qos_from_handle(const py::handle source)
Expand Down Expand Up @@ -283,7 +283,7 @@ class Player
player->play();

auto wait_for_exit_thread = std::thread(
[&]() {
[this, player]() {
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();});
player->stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFix
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&std_string_msg, serialized_msg.get());

auto ret = std::make_shared<rcutils_uint8_array_t>();
*ret = serialized_msg->release_rcl_serialized_message();
return ret;
auto msg = new rcutils_uint8_array_t;
*msg = serialized_msg->release_rcl_serialized_message();
return std::shared_ptr<rcutils_uint8_array_t>(msg, [](rmw_serialized_message_t * msg) {
EXPECT_EQ(rmw_serialized_message_fini(msg), RMW_RET_OK);
delete msg;
});
}

std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> write_messages_to_mcap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef ROSBAG2_TEST_COMMON__MEMORY_MANAGEMENT_HPP_
#define ROSBAG2_TEST_COMMON__MEMORY_MANAGEMENT_HPP_

#include <gtest/gtest.h>

#include <memory>
#include <string>

Expand All @@ -32,16 +34,18 @@ class MemoryManagement

template<typename T>
inline
std::shared_ptr<rmw_serialized_message_t>
serialize_message(std::shared_ptr<T> message)
std::shared_ptr<rcutils_uint8_array_t> serialize_message(std::shared_ptr<T> message)
{
rclcpp::Serialization<T> ser;
rclcpp::SerializedMessage serialized_message;
ser.serialize_message(message.get(), &serialized_message);
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
ser.serialize_message(message.get(), serialized_msg.get());

auto ret = std::make_shared<rmw_serialized_message_t>();
*ret = serialized_message.release_rcl_serialized_message();
return ret;
auto msg = new rcutils_uint8_array_t;
*msg = serialized_msg->release_rcl_serialized_message();
return std::shared_ptr<rcutils_uint8_array_t>(msg, [](rmw_serialized_message_t * msg) {
EXPECT_EQ(rmw_serialized_message_fini(msg), RMW_RET_OK);
delete msg;
});
}

template<typename T>
Expand All @@ -58,10 +62,13 @@ class MemoryManagement

std::shared_ptr<rmw_serialized_message_t> make_initialized_message()
{
rclcpp::SerializedMessage serialized_message(0u);
auto ret = std::make_shared<rmw_serialized_message_t>();
*ret = serialized_message.release_rcl_serialized_message();
return ret;
rclcpp::SerializedMessage serialized_msg(0u);
auto msg = new rcutils_uint8_array_t;
*msg = serialized_msg.release_rcl_serialized_message();
return std::shared_ptr<rcutils_uint8_array_t>(msg, [](rmw_serialized_message_t * msg) {
EXPECT_EQ(rmw_serialized_message_fini(msg), RMW_RET_OK);
delete msg;
});
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class InfoEndToEndTestFixture : public Test, public WithParamInterface<std::stri
InfoEndToEndTestFixture()
{
// _SRC_RESOURCES_DIR_PATH defined in CMakeLists.txt
bags_path_ = (fs::path(_SRC_RESOURCES_DIR_PATH) / GetParam()).generic_string();
bags_path_ = fs::absolute(fs::path(_SRC_RESOURCES_DIR_PATH) / GetParam()).generic_string();
}

std::string bags_path_;
Expand Down
13 changes: 8 additions & 5 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
bool play();

/// \brief Waits on the condition variable until the play thread starts and the message's queue
/// will be filled.
/// @param timeout Maximum time in the fraction of seconds to wait for player to start.
/// If timeout is negative, the wait_for_playback_to_start will be a blocking call.
/// @return true if playback successfully started during timeout, otherwise false.
ROSBAG2_TRANSPORT_PUBLIC
bool wait_for_playback_to_start(std::chrono::duration<double> timeout = std::chrono::seconds(-1));

/// \brief Waits on the condition variable until the play thread finishes.
/// @param timeout Maximum time in the fraction of seconds to wait for player to finish.
/// If timeout is negative, the wait_for_playback_to_finish will be a blocking call.
Expand Down Expand Up @@ -336,11 +344,6 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr get_clock_publisher();

/// \brief Blocks and wait on condition variable until first message will be taken from read
/// queue
ROSBAG2_TRANSPORT_PUBLIC
void wait_for_playback_to_start();

/// \brief Getter for the number of registered on_play_msg_pre_callbacks
/// \return Number of registered on_play_msg_pre_callbacks
ROSBAG2_TRANSPORT_PUBLIC
Expand Down
77 changes: 50 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,11 @@ class PlayerImpl
/// \return Shared pointer to the inner clock_publisher
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr get_clock_publisher();

/// \brief Blocks and wait on condition variable until first message will be taken from read
/// queue
void wait_for_playback_to_start();
/// \brief Waits on the condition variable until first message will be taken from read queue
/// @param timeout Maximum time in the fraction of seconds to wait for player to start.
/// If timeout is negative, the wait_for_playback_to_start will be a blocking call.
/// @return true if playback successfully started during timeout, otherwise false.
bool wait_for_playback_to_start(std::chrono::duration<double> timeout = std::chrono::seconds(-1));

/// \brief Waits on the condition variable until the play thread finishes.
/// @param timeout Maximum time in the fraction of seconds to wait for player to finish.
Expand Down Expand Up @@ -374,10 +376,10 @@ class PlayerImpl

std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;

BagMessageComparator get_bag_message_comparator(const MessageOrder & order);
static BagMessageComparator get_bag_message_comparator(const MessageOrder & order);

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp.
struct
static inline const struct
{
bool operator()(
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
Expand All @@ -388,7 +390,7 @@ class PlayerImpl
} bag_message_chronological_recv_timestamp_comparator;

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp.
struct
static inline const struct
{
bool operator()(
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
Expand Down Expand Up @@ -586,7 +588,7 @@ bool PlayerImpl::play()
ready_to_play_from_queue_cv_.notify_all();
}
} while (rclcpp::ok() && !stop_playback_ && play_options_.loop);
} catch (std::runtime_error & e) {
} catch (const std::exception & e) {
RCLCPP_ERROR(owner_->get_logger(), "Failed to play: %s", e.what());
load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
Expand Down Expand Up @@ -917,10 +919,17 @@ rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr PlayerImpl::get_clock_pu
return clock_publisher_;
}

void PlayerImpl::wait_for_playback_to_start()
bool PlayerImpl::wait_for_playback_to_start(std::chrono::duration<double> timeout)
{
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
if (timeout.count() < 0) {
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
return true;
} else {
return ready_to_play_from_queue_cv_.wait_for(
lk, timeout, [this] {return is_ready_to_play_from_queue_;}
);
}
}

size_t PlayerImpl::get_number_of_registered_on_play_msg_pre_callbacks()
Expand Down Expand Up @@ -1045,14 +1054,11 @@ void PlayerImpl::play_messages_from_queue()
// If we tried to publish because of play_next(), jump the clock
if (play_next_.load()) {
clock_->jump(get_message_order_timestamp(message_ptr));
// If we successfully played next, notify that we're done, otherwise keep trying
if (message_published) {
play_next_ = false;
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
finished_play_next_ = true;
play_next_result_ = true;
finished_play_next_cv_.notify_all();
}
play_next_ = false;
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
finished_play_next_ = true;
play_next_result_ = message_published;
finished_play_next_cv_.notify_all();
}
}
message_ptr = take_next_message_from_queue();
Expand Down Expand Up @@ -1357,20 +1363,37 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr
{
auto pub_iter = publishers_.find(message->topic_name);
if (pub_iter != publishers_.end()) {
// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);
bool message_published = false;
bool pre_callbacks_failed = true;
try {
pub_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);
pre_callbacks_failed = false;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
owner_->get_logger(), "Failed to publish message on '" << message->topic_name <<
RCLCPP_ERROR_STREAM(owner_->get_logger(),
"Failed to call on play message pre-callback on '" << message->topic_name <<
"' topic. \nError: " << e.what());
}

if (!pre_callbacks_failed) {
try {
pub_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(owner_->get_logger(),
"Failed to publish message on '" << message->topic_name <<
"' topic. \nError: " << e.what());
}
}

// Calling on play message post-callbacks
run_play_msg_post_callbacks(message);
try {
// Calling on play message post-callbacks
run_play_msg_post_callbacks(message);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(owner_->get_logger(),
"Failed to call on play message post-callback on '" << message->topic_name <<
"' topic. \nError: " << e.what());
}
return message_published;
}

Expand Down Expand Up @@ -1897,9 +1920,9 @@ rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr Player::get_clock_publis
return pimpl_->get_clock_publisher();
}

void Player::wait_for_playback_to_start()
bool Player::wait_for_playback_to_start(std::chrono::duration<double> timeout)
{
pimpl_->wait_for_playback_to_start();
return pimpl_->wait_for_playback_to_start(timeout);
}

size_t Player::get_number_of_registered_on_play_msg_pre_callbacks()
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -267,6 +268,7 @@ void RecorderImpl::record()
throw std::runtime_error("No serialization format specified!");
}

subscriptions_.clear();
writer_->open(
storage_options_,
{rmw_get_serialization_format(), record_options_.rmw_serialization_format});
Expand Down Expand Up @@ -460,6 +462,8 @@ void RecorderImpl::stop_discovery()
"discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() <<
") return status: " <<
(status == std::future_status::timeout ? "timeout" : "deferred"));
} else {
discovery_future_.get();
}
}
} else {
Expand Down
78 changes: 78 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,81 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) {
// All we care is that any messages arrived
EXPECT_THAT(replayed_topic2, SizeIs(Eq(3u)));
}

TEST_F(RosBag2PlayTestFixture, play_next_calls_pre_callback_only_once_if_fail) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 2100, primitive_message),
serialize_test_message("topic1", 3300, primitive_message),
serialize_test_message("topic1", 4600, primitive_message),
serialize_test_message("topic1", 5900, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
size_t callbacks_counter = 0;
{
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);
player->pause();
ASSERT_TRUE(player->is_paused());

player->play();
player->wait_for_playback_to_start();
ASSERT_TRUE(player->is_paused());
auto callback = [&callbacks_counter](std::shared_ptr<rosbag2_storage::SerializedBagMessage>) {
callbacks_counter++;
throw std::runtime_error("Throw exception from callback");
};
player->add_on_play_message_pre_callback(callback);
ASSERT_FALSE(player->play_next());
}
ASSERT_EQ(callbacks_counter, 1);
}

TEST_F(RosBag2PlayTestFixture, play_next_returns_false_if_pre_callback_throw_exception) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 2100, primitive_message),
serialize_test_message("topic1", 3300, primitive_message),
serialize_test_message("topic1", 4600, primitive_message),
serialize_test_message("topic1", 5900, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);
player->pause();

ASSERT_TRUE(player->is_paused());
ASSERT_FALSE(player->play_next());

player->play();
player->wait_for_playback_to_start();
ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next());
auto callback_with_exception = [](std::shared_ptr<rosbag2_storage::SerializedBagMessage>) {
throw std::runtime_error("Throw exception from callback");
};
auto pre_callback_handle = player->add_on_play_message_pre_callback(callback_with_exception);
ASSERT_FALSE(player->play_next());
player->delete_on_play_message_callback(pre_callback_handle);

auto post_callback_handle = player->add_on_play_message_post_callback(callback_with_exception);
// play_next shall return true if post_callback fails, because message was published.
ASSERT_TRUE(player->play_next());
player->delete_on_play_message_callback(post_callback_handle);
}

0 comments on commit 871a447

Please sign in to comment.