diff --git a/README.md b/README.md index 7d733c73cd..16499038f6 100644 --- a/README.md +++ b/README.md @@ -29,11 +29,11 @@ rosbag2 is part of the ROS 2 command line interface as `ros2 bag`. These verbs are available for `ros2 bag`: * `ros2 bag burst` -* `ros2 bag convert` -* `ros2 bag info` +* [`ros2 bag convert`](#converting-bags) +* [`ros2 bag info`](#analyzing-data) * `ros2 bag list` -* `ros2 bag play` -* `ros2 bag record` +* [`ros2 bag play`](#replaying-data) +* [`ros2 bag record`](#recording-data) * `ros2 bag reindex` For up-to-date information on the available options for each, use `ros2 bag --help`. @@ -146,6 +146,27 @@ $ ros2 bag play -i -i Messages from all provided bags will be played in order, based on their original recording reception timestamps. +Options: + +* `--topics`: + Space-delimited list of topics to play. +* `--services`: + Space-delimited list of services to play. +* `-e,--regex`: + Play only topics and services matches with regular expression. +* `-x,--exclude-regex`: + Regular expressions to exclude topics and services from replay. +* `--exclude-topics`: + Space-delimited list of topics not to play. +* `--exclude-services`: + Space-delimited list of services not to play. +* `--message-order {received,sent}`: + The reference to use for bag message chronological ordering. + Choices: reception timestamp (`received`), publication timestamp (`sent`). + Default: reception timestamp. + +For more options, run with `--help`. + #### Controlling playback via services The Rosbag2 player provides the following services for remote control, which can be called via `ros2 service` commandline or from your nodes, diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 4373182d70..c645de4318 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -26,6 +26,7 @@ from ros2bag.api import print_warn from ros2bag.verb import VerbExtension from ros2cli.node import NODE_NAME_PREFIX +from rosbag2_py import MessageOrder from rosbag2_py import Player from rosbag2_py import PlayOptions from rosbag2_py import ServiceRequestsSource @@ -167,6 +168,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Determine the source of the service requests to be replayed. This option only ' 'makes sense if the "--publish-service-requests" option is set. By default,' ' the service requests replaying from recorded service introspection message.') + parser.add_argument( + '--message-order', default='received', + choices=['received', 'sent'], + help='The reference to use for bag message chronological ordering. Choices: reception ' + 'timestamp, publication timestamp. Default: reception timestamp. ' + 'If messages are significantly disordered (within a single bag or across ' + 'multiple bags), replayed messages may not be correctly ordered. A possible ' + 'solution could be to increase the read_ahead_queue_size value to buffer (and ' + 'order) more messages.') parser.add_argument( '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], @@ -276,6 +286,11 @@ def main(self, *, args): # noqa: D102 play_options.service_requests_source = ServiceRequestsSource.SERVICE_INTROSPECTION else: play_options.service_requests_source = ServiceRequestsSource.CLIENT_INTROSPECTION + # argparse makes sure that we get a valid arg value + play_options.message_order = { + 'received': MessageOrder.RECEIVED_TIMESTAMP, + 'sent': MessageOrder.SENT_TIMESTAMP, + }.get(args.message_order) player = Player(args.log_level) try: diff --git a/rosbag2_py/rosbag2_py/__init__.py b/rosbag2_py/rosbag2_py/__init__.py index f682509976..46354dbc7a 100644 --- a/rosbag2_py/rosbag2_py/__init__.py +++ b/rosbag2_py/rosbag2_py/__init__.py @@ -59,6 +59,7 @@ Info, ) from rosbag2_py._transport import ( + MessageOrder, Player, PlayOptions, ServiceRequestsSource, @@ -98,6 +99,7 @@ 'TopicInformation', 'BagMetadata', 'MessageDefinition', + 'MessageOrder', 'MetadataIo', 'Info', 'Player', diff --git a/rosbag2_py/rosbag2_py/__init__.pyi b/rosbag2_py/rosbag2_py/__init__.pyi index c38eaa3383..7fdbfe63d7 100644 --- a/rosbag2_py/rosbag2_py/__init__.pyi +++ b/rosbag2_py/rosbag2_py/__init__.pyi @@ -4,7 +4,7 @@ from rosbag2_py._message_definitions import LocalMessageDefinitionSource as Loca from rosbag2_py._reader import SequentialCompressionReader as SequentialCompressionReader, SequentialReader as SequentialReader, get_registered_readers as get_registered_readers from rosbag2_py._reindexer import Reindexer as Reindexer from rosbag2_py._storage import BagMetadata as BagMetadata, ConverterOptions as ConverterOptions, FileInformation as FileInformation, MessageDefinition as MessageDefinition, MetadataIo as MetadataIo, ReadOrder as ReadOrder, ReadOrderSortBy as ReadOrderSortBy, StorageFilter as StorageFilter, StorageOptions as StorageOptions, TopicInformation as TopicInformation, TopicMetadata as TopicMetadata, convert_rclcpp_qos_to_rclpy_qos as convert_rclcpp_qos_to_rclpy_qos, get_default_storage_id as get_default_storage_id, to_rclcpp_qos_vector as to_rclcpp_qos_vector -from rosbag2_py._transport import PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite +from rosbag2_py._transport import MessageOrder as MessageOrder, PlayOptions as PlayOptions, Player as Player, RecordOptions as RecordOptions, Recorder as Recorder, ServiceRequestsSource as ServiceRequestsSource, bag_rewrite as bag_rewrite from rosbag2_py._writer import SequentialCompressionWriter as SequentialCompressionWriter, SequentialWriter as SequentialWriter, get_registered_compressors as get_registered_compressors, get_registered_serializers as get_registered_serializers, get_registered_writers as get_registered_writers -__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] +__all__ = ['bag_rewrite', 'convert_rclcpp_qos_to_rclpy_qos', 'CompressionMode', 'CompressionOptions', 'compression_mode_from_string', 'compression_mode_to_string', 'ConverterOptions', 'FileInformation', 'get_default_storage_id', 'get_registered_readers', 'get_registered_writers', 'get_registered_compressors', 'get_registered_serializers', 'to_rclcpp_qos_vector', 'ReadOrder', 'ReadOrderSortBy', 'Reindexer', 'SequentialCompressionReader', 'SequentialCompressionWriter', 'SequentialReader', 'SequentialWriter', 'StorageFilter', 'StorageOptions', 'TopicMetadata', 'TopicInformation', 'BagMetadata', 'MessageDefinition', 'MessageOrder', 'MetadataIo', 'Info', 'Player', 'PlayOptions', 'ServiceRequestsSource', 'Recorder', 'RecordOptions', 'LocalMessageDefinitionSource'] diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi index c2512f483b..c9fc3a23fc 100644 --- a/rosbag2_py/rosbag2_py/_transport.pyi +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -3,6 +3,22 @@ import rosbag2_py._storage from _typeshed import Incomplete from typing import ClassVar, List, overload +class MessageOrder: + __members__: ClassVar[dict] = ... # read-only + RECEIVED_TIMESTAMP: ClassVar[MessageOrder] = ... + SENT_TIMESTAMP: ClassVar[MessageOrder] = ... + __entries: ClassVar[dict] = ... + def __init__(self, value: int) -> None: ... + def __eq__(self, other: object) -> bool: ... + def __hash__(self) -> int: ... + def __index__(self) -> int: ... + def __int__(self) -> int: ... + def __ne__(self, other: object) -> bool: ... + @property + def name(self) -> str: ... + @property + def value(self) -> int: ... + class PlayOptions: clock_publish_frequency: float clock_publish_on_topic_publish: bool @@ -14,6 +30,7 @@ class PlayOptions: exclude_service_events_to_filter: List[str] exclude_topics_to_filter: List[str] loop: bool + message_order: Incomplete node_prefix: str playback_duration: float playback_until_timestamp: int diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index afe813b2f5..efb7fc536d 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -560,6 +560,7 @@ PYBIND11_MODULE(_transport, m) { .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) .def_readwrite("publish_service_requests", &PlayOptions::publish_service_requests) .def_readwrite("service_requests_source", &PlayOptions::service_requests_source) + .def_readwrite("message_order", &PlayOptions::message_order) ; py::enum_(m, "ServiceRequestsSource") @@ -567,6 +568,11 @@ PYBIND11_MODULE(_transport, m) { .value("CLIENT_INTROSPECTION", rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION) ; + py::enum_(m, "MessageOrder") + .value("RECEIVED_TIMESTAMP", rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP) + .value("SENT_TIMESTAMP", rosbag2_transport::MessageOrder::SENT_TIMESTAMP) + ; + py::class_(m, "RecordOptions") .def(py::init<>()) .def_readwrite("all_topics", &RecordOptions::all_topics) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index cb12c11f9d..b7d305dbfc 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -34,6 +34,14 @@ enum class ServiceRequestsSource : int8_t CLIENT_INTROSPECTION = 1 }; +enum class MessageOrder : std::uint8_t +{ + // Order chronologically by message reception timestamp + RECEIVED_TIMESTAMP = 0, + // Order chronologically by message publication timestamp + SENT_TIMESTAMP = 1 +}; + struct PlayOptions { public: @@ -123,6 +131,12 @@ struct PlayOptions // The source of the service request ServiceRequestsSource service_requests_source = ServiceRequestsSource::SERVICE_INTROSPECTION; + + // The reference to use for bag message chronological ordering. + // If messages are significantly disordered (within a single bag or across multiple bags), + // replayed messages may not be correctly ordered. A possible solution could be to increase the + // read_ahead_queue_size value to buffer (and order) more messages. + MessageOrder message_order = MessageOrder::RECEIVED_TIMESTAMP; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp index ef6408db7c..3a29a418e2 100644 --- a/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp +++ b/rosbag2_transport/src/rosbag2_transport/config_options_from_node_params.cpp @@ -237,6 +237,21 @@ PlayOptions get_play_options_from_node_params(rclcpp::Node & node) play_options.publish_service_requests = node.declare_parameter("play.publish_service_request", false); + auto message_order = + node.declare_parameter("play.message_order", "RECEIVED_TIMESTAMP"); + if (message_order == "RECEIVED_TIMESTAMP") { + play_options.message_order = MessageOrder::RECEIVED_TIMESTAMP; + } else if (message_order == "SENT_TIMESTAMP") { + play_options.message_order = MessageOrder::SENT_TIMESTAMP; + } else { + play_options.message_order = MessageOrder::RECEIVED_TIMESTAMP; + RCLCPP_ERROR( + node.get_logger(), + "play.message_order doesn't support %s. It must be one of RECEIVED_TIMESTAMP" + " and SENT_TIMESTAMP. Changed it to default value RECEIVED_TIMESTAMP.", + message_order.c_str()); + } + return play_options; } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index e9fd594094..1547eba4d3 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -303,6 +303,8 @@ class PlayerImpl void create_control_services(); void configure_play_until_timestamp(); bool shall_stop_at_timestamp(const rcutils_time_point_value_t & msg_timestamp) const; + rcutils_time_point_value_t get_message_order_timestamp( + const rosbag2_storage::SerializedBagMessageSharedPtr & message) const; static constexpr double read_ahead_lower_bound_percentage_ = 0.9; static const std::chrono::milliseconds queue_read_wait_period_; @@ -320,7 +322,7 @@ class PlayerImpl rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; using BagMessageComparator = std::function< - int( + bool( const rosbag2_storage::SerializedBagMessageSharedPtr &, const rosbag2_storage::SerializedBagMessageSharedPtr &)>; LockedPriorityQueue< @@ -372,6 +374,8 @@ class PlayerImpl std::shared_ptr player_service_client_manager_; + BagMessageComparator get_bag_message_comparator(const MessageOrder & order); + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp. struct { @@ -382,8 +386,33 @@ class PlayerImpl return l->recv_timestamp > r->recv_timestamp; } } bag_message_chronological_recv_timestamp_comparator; + + /// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp. + struct + { + bool operator()( + const rosbag2_storage::SerializedBagMessageSharedPtr & l, + const rosbag2_storage::SerializedBagMessageSharedPtr & r) const + { + return l->send_timestamp > r->send_timestamp; + } + } bag_message_chronological_send_timestamp_comparator; }; +PlayerImpl::BagMessageComparator PlayerImpl::get_bag_message_comparator(const MessageOrder & order) +{ + switch (order) { + case MessageOrder::RECEIVED_TIMESTAMP: + return bag_message_chronological_recv_timestamp_comparator; + case MessageOrder::SENT_TIMESTAMP: + return bag_message_chronological_send_timestamp_comparator; + default: + throw std::runtime_error( + "unknown MessageOrder: " + + std::to_string(static_cast>(order))); + } +} + PlayerImpl::PlayerImpl( Player * owner, std::vector && readers_with_options, @@ -392,7 +421,7 @@ PlayerImpl::PlayerImpl( : readers_with_options_(std::move(readers_with_options)), owner_(owner), play_options_(play_options), - message_queue_(bag_message_chronological_recv_timestamp_comparator), + message_queue_(get_bag_message_comparator(play_options_.message_order)), keyboard_handler_(std::move(keyboard_handler)), player_service_client_manager_(std::make_shared()) { @@ -987,13 +1016,13 @@ void PlayerImpl::play_messages_from_queue() while (rclcpp::ok() && !stop_playback_) { // While there's a message to play and we haven't reached the end timestamp yet while (rclcpp::ok() && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) + message_ptr != nullptr && !shall_stop_at_timestamp(get_message_order_timestamp(message_ptr))) { // Sleep until the message's replay time, do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause // However, skip sleeping if we're trying to play the next message while (rclcpp::ok() && !stop_playback_ && !play_next_.load() && - !clock_->sleep_until(message_ptr->recv_timestamp)) + !clock_->sleep_until(get_message_order_timestamp(message_ptr))) { // Stop sleeping if cancelled if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { @@ -1015,7 +1044,7 @@ void PlayerImpl::play_messages_from_queue() const bool message_published = publish_message(message_ptr); // If we tried to publish because of play_next(), jump the clock if (play_next_.load()) { - clock_->jump(message_ptr->recv_timestamp); + clock_->jump(get_message_order_timestamp(message_ptr)); // If we successfully played next, notify that we're done, otherwise keep trying if (message_published) { play_next_ = false; @@ -1061,6 +1090,22 @@ void PlayerImpl::play_messages_from_queue() } } +rcutils_time_point_value_t PlayerImpl::get_message_order_timestamp( + const rosbag2_storage::SerializedBagMessageSharedPtr & message) const +{ + switch (play_options_.message_order) { + case MessageOrder::RECEIVED_TIMESTAMP: + return message->recv_timestamp; + case MessageOrder::SENT_TIMESTAMP: + return message->send_timestamp; + default: + throw std::runtime_error( + "unknown MessageOrder: " + + std::to_string( + static_cast>(play_options_.message_order))); + } +} + namespace { bool allow_topic( diff --git a/rosbag2_transport/test/resources/player_node_params.yaml b/rosbag2_transport/test/resources/player_node_params.yaml index 79e1b47a93..f471d6cb91 100644 --- a/rosbag2_transport/test/resources/player_node_params.yaml +++ b/rosbag2_transport/test/resources/player_node_params.yaml @@ -39,6 +39,7 @@ player_params_node: disable_loan_message: false publish_service_requests: false service_requests_source: "CLIENT_INTROSPECTION" # SERVICE_INTROSPECTION or CLIENT_INTROSPECTION + message_order: "SENT_TIMESTAMP" # RECEIVED_TIMESTAMP or SENT_TIMESTAMP storage: uri: "path/to/some_bag" diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 594c1a92a3..fa27e58f47 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -55,17 +55,42 @@ class Rosbag2TransportTestFixture : public Test std::shared_ptr serialize_test_message( const std::string & topic, - int64_t milliseconds, + int64_t milliseconds_recv, + std::shared_ptr message) + { + return serialize_test_message(topic, milliseconds_recv, 0, message); + } + + template + std::shared_ptr + serialize_test_message( + const std::string & topic, + int64_t milliseconds_recv, + int64_t milliseconds_send, std::shared_ptr message) { auto bag_msg = std::make_shared(); bag_msg->serialized_data = memory_management_.serialize_message(message); - bag_msg->recv_timestamp = milliseconds * 1000000; + bag_msg->recv_timestamp = milliseconds_recv * 1000000; + bag_msg->send_timestamp = milliseconds_send * 1000000; bag_msg->topic_name = topic; return bag_msg; } + static std::string format_message_order( + const TestParamInfo & info) + { + switch (info.param) { + case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP: + return "received_timestamp"; + case rosbag2_transport::MessageOrder::SENT_TIMESTAMP: + return "sent_timestamp"; + default: + throw std::runtime_error("unknown value"); + } + } + MemoryManagement memory_management_; rosbag2_storage::StorageOptions storage_options_; diff --git a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp index eadd6fbd0a..c9d7fd710c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_composable_player.cpp @@ -177,6 +177,7 @@ TEST_P(ComposablePlayerTests, player_can_parse_parameters_from_file) { EXPECT_EQ( play_options.service_requests_source, rosbag2_transport::ServiceRequestsSource::CLIENT_INTROSPECTION); + EXPECT_EQ(play_options.message_order, rosbag2_transport::MessageOrder::SENT_TIMESTAMP); ASSERT_EQ(1, storage_options.size()); EXPECT_EQ(storage_options[0].uri, uri_str); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 64952fe39f..401ad9b77f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -109,6 +109,9 @@ void spin_thread_and_wait_for_sent_service_requests_to_finish( } } // namespace +class RosBag2PlayTestFixtureMessageOrder + : public RosBag2PlayTestFixture, public WithParamInterface {}; + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) { auto primitive_message1 = get_messages_basic_types()[0]; @@ -174,7 +177,7 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } -TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_three_bags) +TEST_P(RosBag2PlayTestFixtureMessageOrder, recorded_msgs_are_played_for_all_topics_from_three_bags) { auto msg = get_messages_basic_types()[0]; msg->int32_value = 42; @@ -184,30 +187,31 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ {2u, "topic2", "test_msgs/msg/BasicTypes", "", {}, ""}, }; - // Make sure each reader's/bag's messages are ordered by time - // However, do interlace messages across bags + // Make sure each reader's/bag's messages are ordered by recv_timestamp + // However, do interlace messages based on recv_timestamp across bags and based on send_timestamp + // within a bag and across bags std::vector>> messages_list{}; messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 5, msg), - serialize_test_message("topic1", 8, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 13, msg), - serialize_test_message("topic2", 14, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 5, 2, msg), + serialize_test_message("topic1", 8, 4, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 13, 7, msg), + serialize_test_message("topic2", 14, 15, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 2, msg), - serialize_test_message("topic2", 3, msg), - serialize_test_message("topic1", 6, msg), - serialize_test_message("topic2", 10, msg), - serialize_test_message("topic1", 12, msg), - serialize_test_message("topic2", 16, msg)}); + serialize_test_message("topic1", 2, 1, msg), + serialize_test_message("topic2", 3, 2, msg), + serialize_test_message("topic1", 6, 5, msg), + serialize_test_message("topic2", 10, 8, msg), + serialize_test_message("topic1", 12, 7, msg), + serialize_test_message("topic2", 16, 14, msg)}); messages_list.emplace_back(std::vector>{ - serialize_test_message("topic1", 1, msg), - serialize_test_message("topic2", 4, msg), - serialize_test_message("topic1", 7, msg), - serialize_test_message("topic2", 9, msg), - serialize_test_message("topic1", 11, msg), - serialize_test_message("topic2", 15, msg)}); + serialize_test_message("topic1", 1, 1, msg), + serialize_test_message("topic2", 4, 3, msg), + serialize_test_message("topic1", 7, 2, msg), + serialize_test_message("topic2", 9, 9, msg), + serialize_test_message("topic1", 11, 8, msg), + serialize_test_message("topic2", 15, 7, msg)}); std::vector bags{}; std::size_t total_messages = 0u; for (std::size_t i = 0u; i < messages_list.size(); i++) { @@ -219,13 +223,27 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ } ASSERT_GT(total_messages, 0u); + const rosbag2_transport::MessageOrder message_order = GetParam(); + play_options_.message_order = message_order; auto player = std::make_shared(std::move(bags), play_options_); std::size_t num_played_messages = 0u; rcutils_time_point_value_t last_timetamp = 0; + const auto get_timestamp = + [message_order](std::shared_ptr msg) { + switch (message_order) { + case rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP: + return msg->recv_timestamp; + case rosbag2_transport::MessageOrder::SENT_TIMESTAMP: + return msg->send_timestamp; + default: + throw std::runtime_error("unknown rosbag2_transport::MessageOrder value"); + } + }; const auto callback = [&](std::shared_ptr msg) { // Make sure messages are played in order - EXPECT_LE(last_timetamp, msg->recv_timestamp); - last_timetamp = msg->recv_timestamp; + const auto timestamp = get_timestamp(msg); + EXPECT_LE(last_timetamp, timestamp); + last_timetamp = timestamp; num_played_messages++; }; player->add_on_play_message_pre_callback(callback); @@ -234,6 +252,16 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_from_ EXPECT_EQ(total_messages, num_played_messages); } +INSTANTIATE_TEST_SUITE_P( + ParametrizedPlayTests, + RosBag2PlayTestFixtureMessageOrder, + Values( + rosbag2_transport::MessageOrder::RECEIVED_TIMESTAMP, + rosbag2_transport::MessageOrder::SENT_TIMESTAMP + ), + Rosbag2TransportTestFixture::format_message_order +); + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_services) { const std::string service_name1 = "/test_service1"; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 8e12da5ebf..af64009cb3 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -154,6 +154,48 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) EXPECT_EQ(replayed_topic1[1]->int32_value, 1); } +TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration_message_order) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {1u, kTopic1Name_, "test_msgs/msg/BasicTypes", "", {}, ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 46, 45, primitive_message1), + serialize_test_message(kTopic1Name_, 50, 5, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 2 messages from play() due to the send_timestamp order + sub_->add_subscription(kTopic1_, 2u); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; + play_options_.message_order = MessageOrder::SENT_TIMESTAMP; + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + player_->wait_for_playback_to_finish(); + + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 2); + EXPECT_EQ(replayed_topic1[1]->int32_value, 1); +} + TEST_F( RosBag2PlayUntilTestFixture, play_until_intermediate_duration_recorded_messages_with_filtered_topics)