diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index fce959a7b..f280f1bc7 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -70,6 +70,10 @@ def add_arguments(self, parser, cli_name): group.add_argument( '--stdin', action='store_true', help='Read values from standard input') + group.add_argument( + '--yaml-file', type=str, default=None, + help='YAML file that has message contents, ' + 'e.g STDOUT from ros2 topic echo ') parser.add_argument( '-r', '--rate', metavar='N', type=positive_float, default=1.0, help='Publishing rate in Hz (default: 1)') @@ -131,6 +135,7 @@ def main(args): args.message_type, args.topic_name, values, + args.yaml_file, 1. / args.rate, args.print, times, @@ -146,6 +151,7 @@ def publisher( message_type: MsgType, topic_name: str, values: dict, + yaml_file: str, period: float, print_nth: int, times: int, @@ -159,9 +165,14 @@ def publisher( msg_module = get_message(message_type) except (AttributeError, ModuleNotFoundError, ValueError): raise RuntimeError('The passed message type is invalid') - values_dictionary = yaml.safe_load(values) - if not isinstance(values_dictionary, dict): - return 'The passed value needs to be a dictionary in YAML format' + + msg_reader = None + if yaml_file: + msg_reader = read_msg_from_yaml(yaml_file) + else: + values_dictionary = yaml.safe_load(values) + if not isinstance(values_dictionary, dict): + return 'The passed value needs to be a dictionary in YAML format' pub = node.create_publisher(msg_module, topic_name, qos_profile) @@ -184,15 +195,38 @@ def publisher( total_wait_time += DEFAULT_WAIT_TIME msg = msg_module() - try: - timestamp_fields = set_message_fields( - msg, values_dictionary, expand_header_auto=True, expand_time_now=True) - except Exception as e: - return 'Failed to populate field: {0}'.format(e) + timestamp_fields = None + + if not msg_reader: + # Set the static message from specified values once + try: + timestamp_fields = set_message_fields( + msg, values_dictionary, expand_header_auto=True, expand_time_now=True) + except Exception as e: + return 'Failed to populate field: {0}'.format(e) + print('publisher: beginning loop') count = 0 + more_message = True def timer_callback(): + if msg_reader: + # Try to read out the contents for each message + try: + one_msg = next(msg_reader) + if not isinstance(one_msg, dict): + print('The contents in YAML file need to be a YAML format') + except StopIteration: + nonlocal more_message + more_message = False + return + # Set the message with contents + try: + nonlocal timestamp_fields + timestamp_fields = set_message_fields( + msg, one_msg, expand_header_auto=True, expand_time_now=True) + except Exception as e: + return 'Failed to populate field: {0}'.format(e) stamp_now = node.get_clock().now().to_msg() for field_setter in timestamp_fields: field_setter(stamp_now) @@ -205,7 +239,7 @@ def timer_callback(): timer_callback() if times != 1: timer = node.create_timer(period, timer_callback) - while times == 0 or count < times: + while (times == 0 or count < times) and more_message: rclpy.spin_once(node) # give some time for the messages to reach the wire before exiting time.sleep(keep_alive) @@ -213,3 +247,12 @@ def timer_callback(): else: # give some time for the messages to reach the wire before exiting time.sleep(keep_alive) + + +def read_msg_from_yaml(yaml_file): + with open(yaml_file, 'r') as f: + for document in yaml.load_all(f, Loader=yaml.FullLoader): + if document is None: + continue # Skip if there's no more document + + yield document diff --git a/ros2topic/test/resources/chatter.yaml b/ros2topic/test/resources/chatter.yaml new file mode 100644 index 000000000..4ac2880b8 --- /dev/null +++ b/ros2topic/test/resources/chatter.yaml @@ -0,0 +1,10 @@ +--- +data: 'Hello ROS Users' +--- +--- +data: Hello ROS Developers +--- +data: Hello ROS Developers +--- +--- +data: 'Hello ROS Users' diff --git a/ros2topic/test/test_echo_pub.py b/ros2topic/test/test_echo_pub.py index 6182073e5..14b278a11 100644 --- a/ros2topic/test/test_echo_pub.py +++ b/ros2topic/test/test_echo_pub.py @@ -13,6 +13,7 @@ # limitations under the License. import functools +import pathlib import sys import unittest @@ -49,6 +50,8 @@ TEST_NODE = 'cli_echo_pub_test_node' TEST_NAMESPACE = 'cli_echo_pub' +TEST_RESOURCES_DIR = pathlib.Path(__file__).resolve().parent / 'resources' + @pytest.mark.rostest @launch_testing.markers.keep_alive @@ -294,6 +297,42 @@ def test_pub_times(self, launch_service, proc_info, proc_output): strict=True ) + @launch_testing.markers.retry_on_failure(times=5) + def test_pub_yaml(self, launch_service, proc_info, proc_output): + command_action = ExecuteProcess( + # yaml file prevails to the values 'data: hello' + cmd=(['ros2', 'topic', 'pub', '/clitest/topic/chatter', + 'std_msgs/String', '--yaml-file', + str(TEST_RESOURCES_DIR / 'chatter.yaml')]), + additional_env={ + 'PYTHONUNBUFFERED': '1' + }, + output='screen' + ) + with launch_testing.tools.launch_process( + launch_service, command_action, proc_info, proc_output, + output_filter=launch_testing_ros.tools.basic_output_filter( + filtered_rmw_implementation=get_rmw_implementation_identifier() + ) + ) as command: + assert command.wait_for_shutdown(timeout=10) + assert command.exit_code == launch_testing.asserts.EXIT_OK + assert launch_testing.tools.expect_output( + expected_lines=[ + 'publisher: beginning loop', + "publishing #1: std_msgs.msg.String(data='Hello ROS Users')", + '', + "publishing #2: std_msgs.msg.String(data='Hello ROS Developers')", + '', + "publishing #3: std_msgs.msg.String(data='Hello ROS Developers')", + '', + "publishing #4: std_msgs.msg.String(data='Hello ROS Users')", + '', + ], + text=command.output, + strict=True + ) + @launch_testing.markers.retry_on_failure(times=5) def test_echo_basic(self, launch_service, proc_info, proc_output): params = [