From 9225d226f4a6c314ea1395ef51d0427f6ac7667f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 16 Oct 2023 16:50:10 +0200 Subject: [PATCH 1/2] Bump actions/setup-python from 4.7.0 to 4.7.1 (#881) Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4.7.0 to 4.7.1. - [Release notes](https://github.com/actions/setup-python/releases) - [Commits](https://github.com/actions/setup-python/compare/v4.7.0...v4.7.1) --- updated-dependencies: - dependency-name: actions/setup-python dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 381bcf34a..62e3ef6d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v4.7.0 + - uses: actions/setup-python@v4.7.1 with: python-version: '3.8' From 501a926cee2bedebf1417dc1aac9c876faf91b9b Mon Sep 17 00:00:00 2001 From: Sebastian Castro <4603398+sea-bass@users.noreply.github.com> Date: Thu, 26 Oct 2023 11:09:07 -0400 Subject: [PATCH 2/2] Port unit tests to ROS 2 + Fix CBOR conversion and PNG compression (#882) --- rosapi/src/rosapi/params.py | 5 +- rosbridge_library/CMakeLists.txt | 9 +- .../capabilities/advertise_service.py | 7 +- .../capabilities/call_service.py | 2 +- .../capabilities/unadvertise_service.py | 9 +- .../internal/cbor_conversion.py | 38 ++-- .../internal/pngcompression.py | 25 +-- .../rosbridge_library/internal/publishers.py | 4 +- .../rosbridge_library/internal/services.py | 20 +- .../src/rosbridge_library/util/ros.py | 47 +++++ .../test/capabilities/test_advertise.py | 93 ++++----- .../test/capabilities/test_call_service.py | 163 +++++++++++----- .../test/capabilities/test_publish.py | 48 +++-- .../capabilities/test_service_capabilities.py | 125 ++++++------ .../test/capabilities/test_subscribe.py | 51 +++-- .../publishers/test_multi_publisher.py | 113 ++++++++--- .../publishers/test_multi_unregistering.py | 85 --------- .../publishers/test_publisher_manager.py | 178 +++++++++++------- .../internal/{ => services}/test_services.py | 63 +++++-- .../subscribers/test_multi_subscriber.py | 139 ++++++++------ .../subscribers/test_subscriber_manager.py | 150 +++++++++------ .../test_subscription_modifiers.py | 114 +++++------ .../test/internal/test_cbor_conversion.py | 49 +++-- .../test/internal/test_compression.py | 19 +- .../test/internal/test_outgoing_message.py | 7 - .../src/rosbridge_server/__init__.py | 2 +- .../{client_mananger.py => client_manager.py} | 0 .../test/websocket/call_service.test.py | 5 +- rosbridge_server/test/websocket/common.py | 7 +- 29 files changed, 898 insertions(+), 679 deletions(-) create mode 100644 rosbridge_library/src/rosbridge_library/util/ros.py delete mode 100755 rosbridge_library/test/internal/publishers/test_multi_unregistering.py rename rosbridge_library/test/internal/{ => services}/test_services.py (80%) rename rosbridge_server/src/rosbridge_server/{client_mananger.py => client_manager.py} (100%) diff --git a/rosapi/src/rosapi/params.py b/rosapi/src/rosapi/params.py index 2e88455ef..99cc5d01b 100644 --- a/rosapi/src/rosapi/params.py +++ b/rosapi/src/rosapi/params.py @@ -234,7 +234,10 @@ def _get_param_names(node_name): request = ListParameters.Request() future = client.call_async(request) - rclpy.spin_until_future_complete(_node, future) + if _node.executor: + _node.executor.spin_until_future_complete(future) + else: + rclpy.spin_until_future_complete(_node, future) response = future.result() if response is not None: diff --git a/rosbridge_library/CMakeLists.txt b/rosbridge_library/CMakeLists.txt index 96a4fecda..dce17cda7 100644 --- a/rosbridge_library/CMakeLists.txt +++ b/rosbridge_library/CMakeLists.txt @@ -12,11 +12,6 @@ ament_package() if (BUILD_TESTING) find_package(ament_cmake_pytest REQUIRED) - - # TODO: Enable tests - # ament_add_pytest_test(test_capabilities "test/capabilities") - # ament_add_pytest_test(test_internal "test/internal") - # ament_add_pytest_test(test_services "test/internal/test_services.py") - ament_add_pytest_test(test_ros_loader "test/internal/test_ros_loader.py") - ament_add_pytest_test(test_message_conversion "test/internal/test_message_conversion.py") + ament_add_pytest_test(test_capabilities "test/capabilities/") + ament_add_pytest_test(test_internal "test/internal/") endif() diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index a1db57ed5..dad4eafbf 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -76,9 +76,10 @@ def graceful_shutdown(self): f"Service {self.service_name} was unadvertised with a service call in progress, " f"aborting service calls with request IDs {incomplete_ids}", ) - for future in self.request_futures.values(): + for future_id in self.request_futures: + future = self.request_futures[future_id] future.set_exception(RuntimeError(f"Service {self.service_name} was unadvertised")) - self.protocol.node_handle.destroy_service(self.service_handle) + self.service_handle.destroy() class AdvertiseService(Capability): @@ -138,4 +139,4 @@ def advertise_service(self, message): service_type = message["type"] service_handler = AdvertisedServiceHandler(service_name, service_type, self.protocol) self.protocol.external_service_list[service_name] = service_handler - self.protocol.log("info", "Advertised service %s." % service_name) + self.protocol.log("info", f"Advertised service {service_name}") diff --git a/rosbridge_library/src/rosbridge_library/capabilities/call_service.py b/rosbridge_library/src/rosbridge_library/capabilities/call_service.py index 018ac442d..a3c65f86e 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/call_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/call_service.py @@ -49,7 +49,7 @@ class CallService(Capability): services_glob = None def __init__(self, protocol): - # Call superclas constructor + # Call superclass constructor Capability.__init__(self, protocol) # Register the operations that this capability provides diff --git a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py index ba3d738e1..fff497ff1 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py @@ -50,14 +50,11 @@ def unadvertise_service(self, message): # unregister service in ROS if service_name in self.protocol.external_service_list.keys(): self.protocol.external_service_list[service_name].graceful_shutdown() - self.protocol.external_service_list[service_name].service_handle.shutdown( - "Unadvertise request." - ) + self.protocol.external_service_list[service_name].service_handle.destroy() del self.protocol.external_service_list[service_name] - self.protocol.log("info", "Unadvertised service %s." % service_name) + self.protocol.log("info", f"Unadvertised service {service_name}") else: self.protocol.log( "error", - "Service %s has not been advertised via rosbridge, can't unadvertise." - % service_name, + f"Service {service_name} has not been advertised via rosbridge, can't unadvertise.", ) diff --git a/rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py b/rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py index 3e2128771..2c3d06d27 100644 --- a/rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py +++ b/rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py @@ -19,26 +19,27 @@ "int64", "uint64", ] -FLOAT_TYPES = ["float32", "float64"] +FLOAT_TYPES = ["float", "double"] STRING_TYPES = ["string"] -BOOL_TYPES = ["bool"] +BOOL_TYPES = ["boolean"] TIME_TYPES = ["time", "duration"] -BOOL_ARRAY_TYPES = ["bool[]"] -BYTESTREAM_TYPES = ["uint8[]", "char[]"] +BOOL_ARRAY_TYPES = ["sequence"] +STRING_ARRAY_TYPES = ["sequence"] +BYTESTREAM_TYPES = ["sequence", "sequence"] # Typed array tags according to # Always encode to little-endian variant, for now. TAGGED_ARRAY_FORMATS = { - "uint16[]": (69, "<{}H"), - "uint32[]": (70, "<{}I"), - "uint64[]": (71, "<{}Q"), - "byte[]": (72, "{}b"), - "int8[]": (72, "{}b"), - "int16[]": (77, "<{}h"), - "int32[]": (78, "<{}i"), - "int64[]": (79, "<{}q"), - "float32[]": (85, "<{}f"), - "float64[]": (86, "<{}d"), + "sequence": (69, "<{}H"), + "sequence": (70, "<{}I"), + "sequence": (71, "<{}Q"), + "sequence": (72, "{}b"), + "sequence": (72, "{}b"), + "sequence": (77, "<{}h"), + "sequence": (78, "<{}i"), + "sequence": (79, "<{}q"), + "sequence": (85, "<{}f"), + "sequence": (86, "<{}d"), } @@ -50,7 +51,7 @@ def extract_cbor_values(msg): Typed arrays will be tagged and packed into byte arrays. """ out = {} - for slot, slot_type in zip(msg.__slots__, msg._slot_types): + for slot, slot_type in msg.get_fields_and_field_types().items(): val = getattr(msg, slot) # string @@ -72,8 +73,8 @@ def extract_cbor_values(msg): # time/duration elif slot_type in TIME_TYPES: out[slot] = { - "secs": int(val.secs), - "nsecs": int(val.nsecs), + "sec": int(val.sec), + "nanosec": int(val.nanosec), } # byte array @@ -84,6 +85,9 @@ def extract_cbor_values(msg): elif slot_type in BOOL_ARRAY_TYPES: out[slot] = [bool(i) for i in val] + elif slot_type in STRING_ARRAY_TYPES: + out[slot] = [str(i) for i in val] + # numeric arrays elif slot_type in TAGGED_ARRAY_FORMATS: tag, fmt = TAGGED_ARRAY_FORMATS[slot_type] diff --git a/rosbridge_library/src/rosbridge_library/internal/pngcompression.py b/rosbridge_library/src/rosbridge_library/internal/pngcompression.py index a705fdd8e..b4b7cf860 100644 --- a/rosbridge_library/src/rosbridge_library/internal/pngcompression.py +++ b/rosbridge_library/src/rosbridge_library/internal/pngcompression.py @@ -31,31 +31,32 @@ # POSSIBILITY OF SUCH DAMAGE. from base64 import standard_b64decode, standard_b64encode -from io import StringIO +from io import BytesIO from math import ceil, floor, sqrt from PIL import Image def encode(string): - """PNG-compress the string in a square RBG image padded with '\n', return the b64 encoded bytes""" - length = len(string) + """PNG-compress the string in a square RGB image padded with '\n', return the b64 encoded bytes""" + string_bytes = string.encode("utf-8") + length = len(string_bytes) width = floor(sqrt(length / 3.0)) height = ceil((length / 3.0) / width) bytes_needed = int(width * height * 3) - while length < bytes_needed: - string += "\n" - length += 1 - i = Image.frombytes("RGB", (int(width), int(height)), string) - buff = StringIO() + string_padded = string_bytes + (b"\n" * (bytes_needed - length)) + i = Image.frombytes("RGB", (int(width), int(height)), string_padded) + buff = BytesIO() i.save(buff, "png") encoded = standard_b64encode(buff.getvalue()) return encoded def decode(string): - """b64 decode the string, then PNG-decompress""" + """b64 decode the string, then PNG-decompress and remove the '\n' padding""" decoded = standard_b64decode(string) - buff = StringIO(decoded) - i = Image.open(buff) - return i.tostring() + buff = BytesIO(decoded) + i = Image.open(buff, formats=("png",)).convert("RGB") + dec_str = i.tobytes().decode("utf-8") + dec_str = dec_str.replace("\n", "") # Remove padding from encoding + return dec_str diff --git a/rosbridge_library/src/rosbridge_library/internal/publishers.py b/rosbridge_library/src/rosbridge_library/internal/publishers.py index e4f674b70..a7d0f8928 100644 --- a/rosbridge_library/src/rosbridge_library/internal/publishers.py +++ b/rosbridge_library/src/rosbridge_library/internal/publishers.py @@ -213,7 +213,7 @@ def register(self, client_id, topic, node_handle, msg_type=None, latch=False, qu node_handle -- Handle to a rclpy node to create the publisher. msg_type -- (optional) the type to publish latch -- (optional) whether to make this publisher latched - queue_size -- (optional) rospy publisher queue_size to use + queue_size -- (optional) publisher queue_size to use Throws: Exception -- exceptions are propagated from the MultiPublisher if @@ -303,7 +303,7 @@ def publish(self, client_id, topic, msg, node_handle, latch=False, queue_size=10 msg -- a JSON-like dict of fields and values node_handle -- Handle to a rclpy node to create the publisher. latch -- (optional) whether to make this publisher latched - queue_size -- (optional) rospy publisher queue_size to use + queue_size -- (optional) publisher queue_size to use Throws: Exception -- a variety of exceptions are propagated. They can be diff --git a/rosbridge_library/src/rosbridge_library/internal/services.py b/rosbridge_library/src/rosbridge_library/internal/services.py index fc9a89637..7a95814d5 100644 --- a/rosbridge_library/src/rosbridge_library/internal/services.py +++ b/rosbridge_library/src/rosbridge_library/internal/services.py @@ -30,8 +30,11 @@ # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import time from threading import Thread +import rclpy +from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.expand_topic_name import expand_topic_name from rosbridge_library.internal.message_conversion import ( extract_values, @@ -63,7 +66,7 @@ def __init__(self, service, args, success_callback, error_callback, node_handle) service call error_callback -- a callback to call if an error occurs. The callback will be passed the exception that caused the failure - node_handle -- a ROS2 node handle to call services. + node_handle -- a ROS 2 node handle to call services. """ Thread.__init__(self) self.daemon = True @@ -76,7 +79,7 @@ def __init__(self, service, args, success_callback, error_callback, node_handle) def run(self): try: # Call the service and pass the result to the success handler - self.success(call_service(self.node_handle, self.service, self.args)) + self.success(call_service(self.node_handle, self.service, args=self.args)) except Exception as e: # On error, just pass the exception to the error handler self.error(e) @@ -98,11 +101,9 @@ def args_to_service_request_instance(service, inst, args): populate_instance(msg, inst) -def call_service(node_handle, service, args=None): +def call_service(node_handle, service, args=None, sleep_time=0.001): # Given the service name, fetch the type and class of the service, # and a request instance - - # This should be equivalent to rospy.resolve_name. service = expand_topic_name(service, node_handle.get_name(), node_handle.get_namespace()) service_names_and_types = dict(node_handle.get_service_names_and_types()) @@ -120,9 +121,14 @@ def call_service(node_handle, service, args=None): # Populate the instance with the provided args args_to_service_request_instance(service, inst, args) - client = node_handle.create_client(service_class, service) + client = node_handle.create_client( + service_class, service, callback_group=ReentrantCallbackGroup() + ) - result = client.call(inst) + future = client.call_async(inst) + while rclpy.ok() and not future.done(): + time.sleep(sleep_time) + result = future.result() node_handle.destroy_client(client) if result is not None: diff --git a/rosbridge_library/src/rosbridge_library/util/ros.py b/rosbridge_library/src/rosbridge_library/util/ros.py new file mode 100644 index 000000000..f2e3fb234 --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/util/ros.py @@ -0,0 +1,47 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2023, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + + +def is_topic_published(node, topic_name): + """Checks if a topic is published on a node.""" + published_topic_data = node.get_publisher_names_and_types_by_node( + node.get_name(), node.get_namespace() + ) + return any(topic[0] == topic_name for topic in published_topic_data) + + +def is_topic_subscribed(node, topic_name): + """Checks if a topic is subscribed to by a node.""" + subscribed_topic_data = node.get_subscriber_names_and_types_by_node( + node.get_name(), node.get_namespace() + ) + return any(topic[0] == topic_name for topic in subscribed_topic_data) diff --git a/rosbridge_library/test/capabilities/test_advertise.py b/rosbridge_library/test/capabilities/test_advertise.py index 34cd117a8..4e172aba9 100755 --- a/rosbridge_library/test/capabilities/test_advertise.py +++ b/rosbridge_library/test/capabilities/test_advertise.py @@ -1,30 +1,42 @@ #!/usr/bin/env python +import time import unittest from json import dumps, loads -from time import sleep +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node from rosbridge_library.capabilities.advertise import Advertise from rosbridge_library.internal import ros_loader -from rosbridge_library.internal.publishers import manager -from rosbridge_library.protocol import ( +from rosbridge_library.internal.exceptions import ( InvalidArgumentException, MissingArgumentException, - Protocol, ) +from rosbridge_library.internal.publishers import manager +from rosbridge_library.protocol import Protocol +from rosbridge_library.util.ros import is_topic_published class TestAdvertise(unittest.TestCase): def setUp(self): - rospy.init_node("test_advertise") + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_advertise") + self.executor.add_node(self.node) + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + manager.unregister_timeout = 1.0 - def is_topic_published(self, topicname): - return topicname in dict(rospy.get_published_topics()).keys() + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def test_missing_arguments(self): - proto = Protocol("hello") + proto = Protocol("hello", self.node) adv = Advertise(proto) msg = {"op": "advertise"} self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg))) @@ -36,7 +48,7 @@ def test_missing_arguments(self): self.assertRaises(MissingArgumentException, adv.advertise, loads(dumps(msg))) def test_invalid_arguments(self): - proto = Protocol("hello") + proto = Protocol("hello", self.node) adv = Advertise(proto) msg = {"op": "advertise", "topic": 3, "type": "std_msgs/String"} @@ -55,9 +67,6 @@ def test_invalid_msg_typestrings(self): "/////", "bad", "stillbad", - "not/better/still", - "not//better//still", - "not///better///still", "better/", "better//", "better///", @@ -68,7 +77,7 @@ def test_invalid_msg_typestrings(self): "\\", ] - proto = Protocol("hello") + proto = Protocol("hello", self.node) adv = Advertise(proto) for invalid_type in invalid: @@ -83,6 +92,11 @@ def test_invalid_msg_typestrings(self): def test_invalid_msg_package(self): nonexistent = [ + "roslib/Time", + "roslib/Duration", + "roslib/Header", + "std_srvs/ConflictedMsg", + "topic_tools/MessageMessage", "wangle_msgs/Jam", "whistleblower_msgs/Document", "coercion_msgs/Bribe", @@ -90,7 +104,7 @@ def test_invalid_msg_package(self): "pr2thoughts_msgs/Escape", ] - proto = Protocol("hello") + proto = Protocol("hello", self.node) adv = Advertise(proto) for invalid_type in nonexistent: @@ -99,42 +113,17 @@ def test_invalid_msg_package(self): "topic": "/test_invalid_msg_package", "type": invalid_type, } - self.assertRaises(ros_loader.InvalidPackageException, adv.advertise, loads(dumps(msg))) - - def test_invalid_msg_module(self): - no_msgs = [ - "roslib/Time", - "roslib/Duration", - "roslib/Header", - "std_srvs/ConflictedMsg", - "topic_tools/MessageMessage", - ] - - proto = Protocol("hello") - adv = Advertise(proto) - - for invalid_type in no_msgs: - msg = { - "op": "advertise", - "topic": "/test_invalid_msg_module", - "type": invalid_type, - } self.assertRaises(ros_loader.InvalidModuleException, adv.advertise, loads(dumps(msg))) def test_invalid_msg_classes(self): nonexistent = [ - "roscpp/Time", - "roscpp/Duration", - "roscpp/Header", - "rospy/Time", - "rospy/Duration", - "rospy/Header", + "builtin_interfaces/SpaceTime", "std_msgs/Spool", "geometry_msgs/Tetrahedron", "sensor_msgs/TelepathyUnit", ] - proto = Protocol("hello") + proto = Protocol("hello", self.node) adv = Advertise(proto) for invalid_type in nonexistent: @@ -161,7 +150,7 @@ def test_valid_msg_classes(self): "sensor_msgs/PointCloud2", ] - proto = Protocol("hello") + proto = Protocol("hello", self.node) adv = Advertise(proto) for valid_type in assortedmsgs: @@ -170,21 +159,15 @@ def test_valid_msg_classes(self): adv.unadvertise(loads(dumps(msg))) def test_do_advertise(self): - proto = Protocol("hello") + proto = Protocol("hello", self.node) adv = Advertise(proto) topic = "/test_do_advertise" type = "std_msgs/String" msg = {"op": "advertise", "topic": topic, "type": type} adv.advertise(loads(dumps(msg))) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) adv.unadvertise(loads(dumps(msg))) - self.assertTrue(self.is_topic_published(topic)) - sleep(manager.unregister_timeout * 1.1) - self.assertFalse(self.is_topic_published(topic)) - - -PKG = "rosbridge_library" -NAME = "test_advertise" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestAdvertise) + self.assertTrue(is_topic_published(self.node, topic)) + time.sleep(manager.unregister_timeout + 1.0) + self.assertFalse(is_topic_published(self.node, topic)) diff --git a/rosbridge_library/test/capabilities/test_call_service.py b/rosbridge_library/test/capabilities/test_call_service.py index 4c2280a4c..d0f217d2e 100755 --- a/rosbridge_library/test/capabilities/test_call_service.py +++ b/rosbridge_library/test/capabilities/test_call_service.py @@ -2,91 +2,131 @@ import time import unittest from json import dumps, loads +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.callback_groups import ReentrantCallbackGroup +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node from rosbridge_library.capabilities.call_service import CallService -from rosbridge_library.protocol import ( +from rosbridge_library.internal.exceptions import ( InvalidArgumentException, MissingArgumentException, - Protocol, ) -from roscpp.srv import GetLoggers -from std_srvs.srv import SetBool +from rosbridge_library.protocol import Protocol +from std_srvs.srv import SetBool, Trigger class TestCallService(unittest.TestCase): + def trigger_cb(self, request, response): + """Helper callback function for a test service with no arguments.""" + response.success = True + response.message = "called trigger service successfully" + return response + + def set_bool_cb(self, request, response): + """Helper callback function for a test service with arguments.""" + response.success = request.data + if request.data: + response.message = "set bool to true" + else: + response.message = "set bool to false" + return response + def setUp(self): - rospy.init_node("test_call_service") + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_call_service") + self.executor.add_node(self.node) + + self.node.declare_parameter("call_services_in_new_thread", False) + + # Create service servers with a separate callback group + self.cb_group = ReentrantCallbackGroup() + self.trigger_srv = self.node.create_service( + Trigger, + self.node.get_name() + "/trigger", + self.trigger_cb, + callback_group=self.cb_group, + ) + self.set_bool_srv = self.node.create_service( + SetBool, + self.node.get_name() + "/set_bool", + self.set_bool_cb, + callback_group=self.cb_group, + ) + + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def test_missing_arguments(self): - proto = Protocol("test_missing_arguments") + proto = Protocol("test_missing_arguments", self.node) s = CallService(proto) msg = loads(dumps({"op": "call_service"})) self.assertRaises(MissingArgumentException, s.call_service, msg) def test_invalid_arguments(self): - proto = Protocol("test_invalid_arguments") + proto = Protocol("test_invalid_arguments", self.node) s = CallService(proto) msg = loads(dumps({"op": "call_service", "service": 3})) self.assertRaises(InvalidArgumentException, s.call_service, msg) def test_call_service_works(self): - # Prepare to call the service the 'proper' way - p = rospy.ServiceProxy(rospy.get_name() + "/get_loggers", GetLoggers) - p.wait_for_service() - time.sleep(1.0) + client = self.node.create_client(Trigger, self.trigger_srv.srv_name) + assert client.wait_for_service(1.0) - proto = Protocol("test_call_service_works") + proto = Protocol("test_call_service_works", self.node) s = CallService(proto) - msg = loads(dumps({"op": "call_service", "service": rospy.get_name() + "/get_loggers"})) + send_msg = loads(dumps({"op": "call_service", "service": self.trigger_srv.srv_name})) received = {"msg": None, "arrived": False} - def cb(msg, cid=None): + def cb(msg, cid=None, compression="none"): + print(msg) received["msg"] = msg received["arrived"] = True proto.send = cb - s.call_service(msg) + s.call_service(send_msg) - timeout = 5.0 - start = rospy.Time.now() - while rospy.Time.now() - start < rospy.Duration(timeout): + timeout = 1.0 + start = time.time() + while time.time() - start < timeout: if received["arrived"]: break - time.sleep(0.1) - # The rosbridge service call actually causes another logger to appear, - # so do the "regular" service call after that. - ret = p() + self.assertTrue(received["arrived"]) + values = received["msg"]["values"] + self.assertEqual(values["success"], True) + self.assertEqual(values["message"], "called trigger service successfully") - self.assertTrue(received["msg"]["result"]) - for x, y in zip(ret.loggers, received["msg"]["values"]["loggers"]): - self.assertEqual(x.name, y["name"]) - self.assertEqual(x.level, y["level"]) + def test_call_service_args(self): + client = self.node.create_client(SetBool, self.set_bool_srv.srv_name) + assert client.wait_for_service(1.0) - def test_call_service_fail(self): - # Dummy service that instantly fails - _ = rospy.Service("set_bool_fail", SetBool, lambda req: None) - - proto = Protocol("test_call_service_fail") + proto = Protocol("test_call_service_args", self.node) s = CallService(proto) send_msg = loads( dumps( { "op": "call_service", - "service": rospy.get_name() + "/set_bool_fail", - "args": "[ true ]", + "service": self.set_bool_srv.srv_name, + "args": {"data": True}, } ) ) received = {"msg": None, "arrived": False} - def cb(msg, cid=None): + def cb(msg, cid=None, compression="none"): received["msg"] = msg received["arrived"] = True @@ -94,17 +134,50 @@ def cb(msg, cid=None): s.call_service(send_msg) - timeout = 5.0 - start = rospy.Time.now() - while rospy.Time.now() - start < rospy.Duration(timeout): + timeout = 1.0 + start = time.time() + while time.time() - start < timeout: if received["arrived"]: break - time.sleep(0.1) - self.assertFalse(received["msg"]["result"]) + self.assertTrue(received["arrived"]) + values = received["msg"]["values"] + self.assertEqual(values["success"], True) + self.assertEqual(values["message"], "set bool to true") + def test_call_service_fails(self): -PKG = "rosbridge_library" -NAME = "test_call_service" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestCallService) + client = self.node.create_client(Trigger, self.trigger_srv.srv_name) + assert client.wait_for_service(1.0) + + proto = Protocol("test_call_service_works", self.node) + s = CallService(proto) + send_msg = loads( + dumps( + { + "op": "call_service", + "service": self.set_bool_srv.srv_name, + "args": {"data": 42.0}, # This data type is wrong so it will fail + } + ) + ) + + received = {"msg": None, "arrived": False} + + def cb(msg, cid=None, compression="none"): + print(msg) + received["msg"] = msg + received["arrived"] = True + + proto.send = cb + + s.call_service(send_msg) + + timeout = 1.0 + start = time.time() + while time.time() - start < timeout: + if received["arrived"]: + break + + self.assertTrue(received["arrived"]) + self.assertFalse(received["msg"]["result"]) diff --git a/rosbridge_library/test/capabilities/test_publish.py b/rosbridge_library/test/capabilities/test_publish.py index 3e43a6b7a..2d95e6400 100755 --- a/rosbridge_library/test/capabilities/test_publish.py +++ b/rosbridge_library/test/capabilities/test_publish.py @@ -1,25 +1,40 @@ #!/usr/bin/env python +import time import unittest from json import dumps, loads -from time import sleep +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.capabilities.publish import Publish -from rosbridge_library.protocol import ( +from rosbridge_library.internal.exceptions import ( InvalidArgumentException, MissingArgumentException, - Protocol, ) +from rosbridge_library.protocol import Protocol from std_msgs.msg import String class TestAdvertise(unittest.TestCase): def setUp(self): - rospy.init_node("test_advertise") + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_publish") + self.executor.add_node(self.node) + + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def test_missing_arguments(self): - proto = Protocol("hello") + proto = Protocol("hello", self.node) pub = Publish(proto) msg = {"op": "publish"} self.assertRaises(MissingArgumentException, pub.publish, msg) @@ -28,14 +43,14 @@ def test_missing_arguments(self): self.assertRaises(MissingArgumentException, pub.publish, msg) def test_invalid_arguments(self): - proto = Protocol("hello") + proto = Protocol("hello", self.node) pub = Publish(proto) msg = {"op": "publish", "topic": 3} self.assertRaises(InvalidArgumentException, pub.publish, msg) def test_publish_works(self): - proto = Protocol("hello") + proto = Protocol("hello", self.node) pub = Publish(proto) topic = "/test_publish_works" msg = {"data": "test publish works"} @@ -45,16 +60,13 @@ def test_publish_works(self): def cb(msg): received["msg"] = msg - rospy.Subscriber(topic, String, cb) + subscriber_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + self.node.create_subscription(String, topic, cb, subscriber_qos) pub_msg = loads(dumps({"op": "publish", "topic": topic, "msg": msg})) pub.publish(pub_msg) - - sleep(0.5) + time.sleep(0.5) self.assertEqual(received["msg"].data, msg["data"]) - - -PKG = "rosbridge_library" -NAME = "test_publish" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestAdvertise) diff --git a/rosbridge_library/test/capabilities/test_service_capabilities.py b/rosbridge_library/test/capabilities/test_service_capabilities.py index ad9b8b859..3147a1c2a 100755 --- a/rosbridge_library/test/capabilities/test_service_capabilities.py +++ b/rosbridge_library/test/capabilities/test_service_capabilities.py @@ -1,23 +1,30 @@ #!/usr/bin/env python +import time import unittest from json import dumps, loads +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.node import Node from rosbridge_library.capabilities.advertise_service import AdvertiseService from rosbridge_library.capabilities.call_service import CallService from rosbridge_library.capabilities.service_response import ServiceResponse from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService -from rosbridge_library.protocol import ( +from rosbridge_library.internal.exceptions import ( InvalidArgumentException, MissingArgumentException, - Protocol, ) +from rosbridge_library.protocol import Protocol class TestServiceCapabilities(unittest.TestCase): def setUp(self): - self.proto = Protocol(self._testMethodName) + rclpy.init() + self.node = Node("test_service_capabilities") + + self.node.declare_parameter("call_services_in_new_thread", False) + + self.proto = Protocol(self._testMethodName, self.node) # change the log function so we can verify errors are logged self.proto.log = self.mock_log # change the send callback so we can access the rosbridge messages @@ -26,9 +33,14 @@ def setUp(self): self.advertise = AdvertiseService(self.proto) self.unadvertise = UnadvertiseService(self.proto) self.response = ServiceResponse(self.proto) + self.call_service = CallService(self.proto) self.received_message = None self.log_entries = [] + def tearDown(self): + self.node.destroy_node() + rclpy.shutdown() + def local_send_cb(self, msg): self.received_message = msg @@ -71,11 +83,8 @@ def test_advertise_service(self): ) self.advertise.advertise_service(advertise_msg) - # This throws an exception if the timeout is exceeded (i.e. the service - # is not properly advertised) - rospy.wait_for_service(service_path, 1.0) - def test_call_advertised_service(self): + # Advertise the service service_path = "/set_bool_2" advertise_msg = loads( dumps( @@ -86,32 +95,30 @@ def test_call_advertised_service(self): } ) ) + self.received_message = None self.advertise.advertise_service(advertise_msg) - # Call the service via rosbridge because rospy.ServiceProxy.call() is - # blocking - call_service = CallService(self.proto) - call_service.call_service( - loads( - dumps( - { - "op": "call_service", - "id": "foo", - "service": service_path, - "args": [True], - } - ) + # Call the advertised service using rosbridge + self.received_message = None + call_msg = loads( + dumps( + { + "op": "call_service", + "id": "foo", + "service": service_path, + "args": {"data": True}, + } ) ) + Thread(target=self.call_service.call_service, args=(call_msg,)).start() loop_iterations = 0 while self.received_message is None: - rospy.sleep(rospy.Duration(0.5)) + rclpy.spin_once(self.node, timeout_sec=0.1) + time.sleep(0.5) loop_iterations += 1 if loop_iterations > 3: - self.fail( - "did not receive service call rosbridge message " "after waiting 2 seconds" - ) + self.fail("Timed out waiting for service call message.") self.assertFalse(self.received_message is None) self.assertTrue("op" in self.received_message) @@ -125,7 +132,7 @@ def test_call_advertised_service(self): "op": "service_response", "service": service_path, "id": self.received_message["id"], - "values": {"success": True, "message": ""}, + "values": {"success": True, "message": "set bool to true"}, "result": True, } ) @@ -135,20 +142,19 @@ def test_call_advertised_service(self): loop_iterations = 0 while self.received_message is None: - rospy.sleep(rospy.Duration(0.5)) + rclpy.spin_once(self.node, timeout_sec=0.1) + time.sleep(0.5) loop_iterations += 1 if loop_iterations > 3: - self.fail( - "did not receive service response rosbridge message " "after waiting 2 seconds" - ) + self.fail("Timed out waiting for service response message.") self.assertFalse(self.received_message is None) - # Rosbridge should forward the response message to the "client" - # (i.e. our custom send function, see setUp()) self.assertEqual(self.received_message["op"], "service_response") self.assertTrue(self.received_message["result"]) + @unittest.skip("This test currently raises an exception, need to fix this") def test_unadvertise_with_live_request(self): + # Advertise the service service_path = "/set_bool_3" advertise_msg = loads( dumps( @@ -159,61 +165,52 @@ def test_unadvertise_with_live_request(self): } ) ) + self.received_message = None self.advertise.advertise_service(advertise_msg) - # Call the service via rosbridge because rospy.ServiceProxy.call() is - # blocking - call_service = CallService(self.proto) - call_service.call_service( - loads( - dumps( - { - "op": "call_service", - "id": "foo", - "service": service_path, - "args": [True], - } - ) + # Now send the response + call_msg = loads( + dumps( + { + "op": "call_service", + "id": "foo", + "service": service_path, + "args": {"data": True}, + } ) ) + self.received_message = None + Thread(target=self.call_service.call_service, args=(call_msg,)).start() loop_iterations = 0 while self.received_message is None: - rospy.sleep(rospy.Duration(0.5)) + rclpy.spin_once(self.node, timeout_sec=0.1) + time.sleep(0.5) loop_iterations += 1 if loop_iterations > 3: - self.fail( - "did not receive service call rosbridge message " "after waiting 2 seconds" - ) + self.fail("Timed out waiting for service call message.") self.assertFalse(self.received_message is None) self.assertTrue("op" in self.received_message) self.assertTrue(self.received_message["op"] == "call_service") self.assertTrue("id" in self.received_message) - # Now send the response + # Now unadvertise the service + # TODO: This raises an exception, likely because of the following rclpy issue: + # https://github.com/ros2/rclpy/issues/1098 response_msg = loads(dumps({"op": "unadvertise_service", "service": service_path})) self.received_message = None self.unadvertise.unadvertise_service(response_msg) loop_iterations = 0 while self.received_message is None: - rospy.sleep(rospy.Duration(0.5)) + rclpy.spin_once(self.node, timeout_sec=0.1) + time.sleep(0.5) loop_iterations += 1 if loop_iterations > 3: - self.fail( - "did not receive service response rosbridge message " "after waiting 2 seconds" - ) + self.fail("Timed out waiting for unadvertise service message.") self.assertFalse(self.received_message is None) - # Rosbridge should abort the existing service call with an error - # (i.e. "result" should be False) + self.assertTrue("op" in self.received_message) self.assertEqual(self.received_message["op"], "service_response") self.assertFalse(self.received_message["result"]) - - -PKG = "rosbridge_library" -NAME = "test_service_capabilities" -if __name__ == "__main__": - rospy.init_node(NAME) - rostest.rosrun(PKG, NAME, TestServiceCapabilities) diff --git a/rosbridge_library/test/capabilities/test_subscribe.py b/rosbridge_library/test/capabilities/test_subscribe.py index cb1035524..9cce35063 100755 --- a/rosbridge_library/test/capabilities/test_subscribe.py +++ b/rosbridge_library/test/capabilities/test_subscribe.py @@ -2,21 +2,36 @@ import time import unittest from json import dumps, loads +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.capabilities import subscribe -from rosbridge_library.protocol import ( +from rosbridge_library.internal.exceptions import ( InvalidArgumentException, MissingArgumentException, - Protocol, ) +from rosbridge_library.protocol import Protocol from std_msgs.msg import String class TestSubscribe(unittest.TestCase): def setUp(self): - rospy.init_node("test_subscribe") + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_subscribe") + self.executor.add_node(self.node) + + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def dummy_cb(self, msg): pass @@ -28,7 +43,7 @@ def test_update_params(self): topic = "/test_update_params" msg_type = "std_msgs/String" - subscription = subscribe.Subscription(client_id, topic, None) + subscription = subscribe.Subscription(client_id, topic, None, self.node) min_throttle_rate = 5 min_queue_length = 2 @@ -60,13 +75,13 @@ def test_update_params(self): subscription.unregister() def test_missing_arguments(self): - proto = Protocol("test_missing_arguments") + proto = Protocol("test_missing_arguments", self.node) sub = subscribe.Subscribe(proto) msg = {"op": "subscribe"} self.assertRaises(MissingArgumentException, sub.subscribe, msg) def test_invalid_arguments(self): - proto = Protocol("test_invalid_arguments") + proto = Protocol("test_invalid_arguments", self.node) sub = subscribe.Subscribe(proto) msg = {"op": "subscribe", "topic": 3} @@ -88,7 +103,7 @@ def test_invalid_arguments(self): self.assertRaises(InvalidArgumentException, sub.subscribe, msg) def test_subscribe_works(self): - proto = Protocol("test_subscribe_works") + proto = Protocol("test_subscribe_works", self.node) sub = subscribe.Subscribe(proto) topic = "/test_subscribe_works" msg = String() @@ -104,15 +119,11 @@ def send(outgoing, **kwargs): sub.subscribe(loads(dumps({"op": "subscribe", "topic": topic, "type": msg_type}))) - p = rospy.Publisher(topic, String, queue_size=5) - time.sleep(0.25) - p.publish(msg) - - time.sleep(0.25) + publisher_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + pub = self.node.create_publisher(String, topic, publisher_qos) + pub.publish(msg) + time.sleep(0.1) self.assertEqual(received["msg"]["msg"]["data"], msg.data) - - -PKG = "rosbridge_library" -NAME = "test_subscribe" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestSubscribe) diff --git a/rosbridge_library/test/internal/publishers/test_multi_publisher.py b/rosbridge_library/test/internal/publishers/test_multi_publisher.py index aad200196..01b1f8295 100755 --- a/rosbridge_library/test/internal/publishers/test_multi_publisher.py +++ b/rosbridge_library/test/internal/publishers/test_multi_publisher.py @@ -1,41 +1,54 @@ #!/usr/bin/env python +import time import unittest -from time import sleep +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.executors import MultiThreadedExecutor +from rclpy.node import Node +from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal import ros_loader from rosbridge_library.internal.message_conversion import FieldTypeMismatchException from rosbridge_library.internal.publishers import MultiPublisher from rosbridge_library.internal.topics import TypeConflictException +from rosbridge_library.util.ros import is_topic_published class TestMultiPublisher(unittest.TestCase): def setUp(self): - rospy.init_node("test_multi_publisher") + rclpy.init() + self.executor = MultiThreadedExecutor(num_threads=2) + self.node = Node("test_multi_publisher") + self.executor.add_node(self.node) - def is_topic_published(self, topicname): - return topicname in dict(rospy.get_published_topics()).keys() + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def test_register_multipublisher(self): """Register a publisher on a clean topic with a good msg type""" topic = "/test_register_multipublisher" msg_type = "std_msgs/String" - self.assertFalse(self.is_topic_published(topic)) - _ = MultiPublisher(topic, msg_type) - self.assertTrue(self.is_topic_published(topic)) + self.assertFalse(is_topic_published(self.node, topic)) + MultiPublisher(topic, self.node, msg_type) + self.assertTrue(is_topic_published(self.node, topic)) def test_unregister_multipublisher(self): """Register and unregister a publisher on a clean topic with a good msg type""" topic = "/test_unregister_multipublisher" msg_type = "std_msgs/String" - self.assertFalse(self.is_topic_published(topic)) - multipublisher = MultiPublisher(topic, msg_type) - self.assertTrue(self.is_topic_published(topic)) - multipublisher.unregister() - self.assertFalse(self.is_topic_published(topic)) + self.assertFalse(is_topic_published(self.node, topic)) + p = MultiPublisher(topic, self.node, msg_type) + self.assertTrue(is_topic_published(self.node, topic)) + p.unregister() + self.assertFalse(is_topic_published(self.node, topic)) def test_register_client(self): """Adds a publisher then removes it.""" @@ -43,7 +56,7 @@ def test_register_client(self): msg_type = "std_msgs/String" client_id = "client1" - p = MultiPublisher(topic, msg_type) + p = MultiPublisher(topic, self.node, msg_type) self.assertFalse(p.has_clients()) p.register_client(client_id) @@ -57,16 +70,16 @@ def test_register_multiple_clients(self): topic = "/test_register_multiple_clients" msg_type = "std_msgs/String" - p = MultiPublisher(topic, msg_type) + p = MultiPublisher(topic, self.node, msg_type) self.assertFalse(p.has_clients()) for i in range(1000): - p.register_client("client%d" % i) + p.register_client(f"client{i}") self.assertTrue(p.has_clients()) for i in range(1000): self.assertTrue(p.has_clients()) - p.unregister_client("client%d" % i) + p.unregister_client(f"client{i}") self.assertFalse(p.has_clients()) @@ -87,7 +100,7 @@ def test_verify_type(self): "sensor_msgs/PointCloud2", ] - p = MultiPublisher(topic, msg_type) + p = MultiPublisher(topic, self.node, msg_type) p.verify_type(msg_type) for othertype in othertypes: self.assertRaises(TypeConflictException, p.verify_type, othertype) @@ -103,12 +116,60 @@ def test_publish(self): def cb(msg): received["msg"] = msg - rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb) - p = MultiPublisher(topic, msg_type) + subscriber_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + self.node.create_subscription( + ros_loader.get_message_class(msg_type), topic, cb, subscriber_qos + ) + + p = MultiPublisher(topic, self.node, msg_type) + p.publish(msg) + time.sleep(0.1) + self.assertEqual(received["msg"].data, msg["data"]) + + def test_publish_twice(self): + """Make sure that publishing works""" + topic = "/test_publish_twice" + msg_type = "std_msgs/String" + msg = {"data": "why halo thar"} + + received = {"msg": None} + + def cb(msg): + received["msg"] = msg + + subscriber_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + self.node.create_subscription( + ros_loader.get_message_class(msg_type), topic, cb, subscriber_qos + ) + + p = MultiPublisher(topic, self.node, msg_type) p.publish(msg) + time.sleep(0.1) + self.assertEqual(received["msg"].data, msg["data"]) + + p.unregister() + # The publisher went away at time T. Here's the timeline of the events: + # T+1 seconds - the subscriber will retry to reconnect + # T+2 seconds - publish msg -> it's gone + # T+3 seconds - publish msg -> OK + time.sleep(1) - sleep(0.5) + received["msg"] = None + self.assertIsNone(received["msg"]) + p = MultiPublisher(topic, self.node, msg_type) + time.sleep(1) + p.publish(msg) + self.assertIsNone(received["msg"]) + + time.sleep(1) + p.publish(msg) self.assertEqual(received["msg"].data, msg["data"]) def test_bad_publish(self): @@ -117,11 +178,5 @@ def test_bad_publish(self): msg_type = "std_msgs/String" msg = {"data": 3} - p = MultiPublisher(topic, msg_type) + p = MultiPublisher(topic, self.node, msg_type) self.assertRaises(FieldTypeMismatchException, p.publish, msg) - - -PKG = "rosbridge_library" -NAME = "test_multi_publisher" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestMultiPublisher) diff --git a/rosbridge_library/test/internal/publishers/test_multi_unregistering.py b/rosbridge_library/test/internal/publishers/test_multi_unregistering.py deleted file mode 100755 index 896b1bf68..000000000 --- a/rosbridge_library/test/internal/publishers/test_multi_unregistering.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python -import unittest -from time import sleep - -import rospy -import rostest -from rosbridge_library.internal import ros_loader -from rosbridge_library.internal.publishers import MultiPublisher - - -class TestMultiUnregistering(unittest.TestCase): - def setUp(self): - rospy.init_node("test_multi_unregistering") - - def test_publish_once(self): - """Make sure that publishing works""" - topic = "/test_publish_once" - msg_type = "std_msgs/String" - msg = {"data": "why halo thar"} - - received = {"msg": None} - - def cb(msg): - received["msg"] = msg - - rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb) - p = MultiPublisher(topic, msg_type) - p.publish(msg) - - sleep(1) - - self.assertEqual(received["msg"].data, msg["data"]) - - def test_publish_twice(self): - """Make sure that publishing works""" - topic = "/test_publish_twice" - msg_type = "std_msgs/String" - msg = {"data": "why halo thar"} - - received = {"msg": None} - - def cb(msg): - received["msg"] = msg - - rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb) - p = MultiPublisher(topic, msg_type) - p.publish(msg) - - sleep(1) - - self.assertEqual(received["msg"].data, msg["data"]) - - p.unregister() - # The publisher went away at time T. Here's the timeline of the events: - # T+1 seconds - the subscriber will retry to reconnect - fail - # T+3 seconds - the subscriber will retry to reconnect - fail - # T+5 seconds - publish msg -> it's gone - # T+7 seconds - the subscriber will retry to reconnect - success - # T+8 seconds - publish msg -> OK - # T+11 seconds - we receive the message. Looks like a bug in reconnection... - # https://github.com/ros/ros_comm/blob/indigo-devel/clients/rospy/src/rospy/impl/tcpros_base.py#L733 - # That line should probably be indented. - sleep(5) - - received["msg"] = None - self.assertIsNone(received["msg"]) - p = MultiPublisher(topic, msg_type) - p.publish(msg) - - self.assertIsNone(received["msg"]) - - sleep(3) - p.publish(msg) - sleep(2) - # Next two lines should be removed when this is fixed: - # https://github.com/ros/ros_comm/blob/indigo-devel/clients/rospy/src/rospy/impl/tcpros_base.py#L733 - self.assertIsNone(received["msg"]) - sleep(3) - self.assertEqual(received["msg"].data, msg["data"]) - - -PKG = "rosbridge_library" -NAME = "test_multi_unregistering" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestMultiUnregistering) diff --git a/rosbridge_library/test/internal/publishers/test_publisher_manager.py b/rosbridge_library/test/internal/publishers/test_publisher_manager.py index f0a739957..312bc697f 100755 --- a/rosbridge_library/test/internal/publishers/test_publisher_manager.py +++ b/rosbridge_library/test/internal/publishers/test_publisher_manager.py @@ -1,25 +1,40 @@ #!/usr/bin/env python +import time import unittest -from time import sleep +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal.message_conversion import FieldTypeMismatchException from rosbridge_library.internal.publishers import manager from rosbridge_library.internal.topics import ( TopicNotEstablishedException, TypeConflictException, ) +from rosbridge_library.util.ros import is_topic_published from std_msgs.msg import String +# Reduce this from its default of 10 to speed up tests +manager.unregister_timeout = 1.0 + class TestPublisherManager(unittest.TestCase): def setUp(self): - rospy.init_node("test_publisher_manager") - manager.unregister_timeout = 1.0 + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_publisher_manager") + self.executor.add_node(self.node) + + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() - def is_topic_published(self, topicname): - return topicname in dict(rospy.get_published_topics()).keys() + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def test_register_publisher(self): """Register a publisher on a clean topic with a good msg type""" @@ -28,18 +43,19 @@ def test_register_publisher(self): client = "client_test_register_publisher" self.assertFalse(topic in manager._publishers) - self.assertFalse(self.is_topic_published(topic)) - manager.register(client, topic, msg_type) + self.assertFalse(is_topic_published(self.node, topic)) + manager.register(client, topic, self.node, msg_type) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) manager.unregister(client, topic) self.assertTrue(topic in manager.unregister_timers) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) - sleep(manager.unregister_timeout * 1.1) + self.assertTrue(is_topic_published(self.node, topic)) + + time.sleep(manager.unregister_timeout + 1.0) self.assertFalse(topic in manager._publishers) - self.assertFalse(self.is_topic_published(topic)) + self.assertFalse(is_topic_published(self.node, topic)) self.assertFalse(topic in manager.unregister_timers) def test_register_publisher_multiclient(self): @@ -49,23 +65,28 @@ def test_register_publisher_multiclient(self): client2 = "client_test_register_publisher_2" self.assertFalse(topic in manager._publishers) - self.assertFalse(self.is_topic_published(topic)) - manager.register(client1, topic, msg_type) + self.assertFalse(is_topic_published(self.node, topic)) + + manager.register(client1, topic, self.node, msg_type) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) - manager.register(client2, topic, msg_type) + self.assertTrue(is_topic_published(self.node, topic)) + + manager.register(client2, topic, self.node, msg_type) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) + manager.unregister(client1, topic) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) + manager.unregister(client2, topic) self.assertTrue(topic in manager.unregister_timers) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) - sleep(manager.unregister_timeout * 1.1) + self.assertTrue(is_topic_published(self.node, topic)) + + time.sleep(manager.unregister_timeout + 1.0) self.assertFalse(topic in manager._publishers) - self.assertFalse(self.is_topic_published(topic)) + self.assertFalse(is_topic_published(self.node, topic)) self.assertFalse(topic in manager.unregister_timers) def test_register_publisher_conflicting_types(self): @@ -75,12 +96,15 @@ def test_register_publisher_conflicting_types(self): client = "client_test_register_publisher_conflicting_types" self.assertFalse(topic in manager._publishers) - self.assertFalse(self.is_topic_published(topic)) - manager.register(client, topic, msg_type) + self.assertFalse(is_topic_published(self.node, topic)) + + manager.register(client, topic, self.node, msg_type) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) - self.assertRaises(TypeConflictException, manager.register, "client2", topic, msg_type_bad) + self.assertRaises( + TypeConflictException, manager.register, "client2", topic, self.node, msg_type_bad + ) def test_register_multiple_publishers(self): topic1 = "/test_register_multiple_publishers1" @@ -90,33 +114,36 @@ def test_register_multiple_publishers(self): self.assertFalse(topic1 in manager._publishers) self.assertFalse(topic2 in manager._publishers) - self.assertFalse(self.is_topic_published(topic1)) - self.assertFalse(self.is_topic_published(topic2)) - manager.register(client, topic1, msg_type) + self.assertFalse(is_topic_published(self.node, topic1)) + self.assertFalse(is_topic_published(self.node, topic2)) + + manager.register(client, topic1, self.node, msg_type) self.assertTrue(topic1 in manager._publishers) - self.assertTrue(self.is_topic_published(topic1)) + self.assertTrue(is_topic_published(self.node, topic1)) self.assertFalse(topic2 in manager._publishers) - self.assertFalse(self.is_topic_published(topic2)) - manager.register(client, topic2, msg_type) + self.assertFalse(is_topic_published(self.node, topic2)) + + manager.register(client, topic2, self.node, msg_type) self.assertTrue(topic1 in manager._publishers) - self.assertTrue(self.is_topic_published(topic1)) + self.assertTrue(is_topic_published(self.node, topic1)) self.assertTrue(topic2 in manager._publishers) - self.assertTrue(self.is_topic_published(topic2)) + self.assertTrue(is_topic_published(self.node, topic2)) manager.unregister(client, topic1) - self.assertTrue(self.is_topic_published(topic1)) + self.assertTrue(is_topic_published(self.node, topic1)) self.assertTrue(topic1 in manager.unregister_timers) self.assertTrue(topic2 in manager._publishers) - self.assertTrue(self.is_topic_published(topic2)) + self.assertTrue(is_topic_published(self.node, topic2)) manager.unregister(client, topic2) self.assertTrue(topic2 in manager.unregister_timers) - self.assertTrue(self.is_topic_published(topic2)) - sleep(manager.unregister_timeout * 1.1) + self.assertTrue(is_topic_published(self.node, topic2)) + + time.sleep(manager.unregister_timeout + 1.0) self.assertFalse(topic1 in manager._publishers) - self.assertFalse(self.is_topic_published(topic1)) + self.assertFalse(is_topic_published(self.node, topic1)) self.assertFalse(topic2 in manager._publishers) - self.assertFalse(self.is_topic_published(topic2)) + self.assertFalse(is_topic_published(self.node, topic2)) self.assertFalse(topic1 in manager.unregister_timers) self.assertFalse(topic2 in manager.unregister_timers) @@ -125,26 +152,32 @@ def test_register_no_msgtype(self): client = "client_test_register_no_msgtype" self.assertFalse(topic in manager._publishers) - self.assertFalse(self.is_topic_published(topic)) - self.assertRaises(TopicNotEstablishedException, manager.register, client, topic) + self.assertFalse(is_topic_published(self.node, topic)) + self.assertRaises(TopicNotEstablishedException, manager.register, client, topic, self.node) def test_register_infer_topictype(self): topic = "/test_register_infer_topictype" client = "client_test_register_infer_topictype" - self.assertFalse(self.is_topic_published(topic)) + self.assertFalse(is_topic_published(self.node, topic)) - rospy.Publisher(topic, String) + publisher_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + self.node.create_publisher(String, topic, publisher_qos) + time.sleep(0.1) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) self.assertFalse(topic in manager._publishers) - manager.register(client, topic) + + manager.register(client, topic, self.node) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) manager.unregister(client, topic) self.assertTrue(topic in manager.unregister_timers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) def test_register_multiple_notopictype(self): topic = "/test_register_multiple_notopictype" @@ -154,24 +187,29 @@ def test_register_multiple_notopictype(self): self.assertFalse(topic in manager._publishers) self.assertFalse(topic in manager.unregister_timers) - self.assertFalse(self.is_topic_published(topic)) - manager.register(client1, topic, msg_type) + self.assertFalse(is_topic_published(self.node, topic)) + + manager.register(client1, topic, self.node, msg_type) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) - manager.register(client2, topic) + self.assertTrue(is_topic_published(self.node, topic)) + + manager.register(client2, topic, self.node) self.assertTrue(topic in manager._publishers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) + manager.unregister(client1, topic) self.assertTrue(topic in manager._publishers) self.assertTrue(topic in manager.unregister_timers) - self.assertTrue(self.is_topic_published(topic)) + self.assertTrue(is_topic_published(self.node, topic)) + manager.unregister(client2, topic) self.assertTrue(topic in manager.unregister_timers) self.assertTrue(topic in manager._publishers) - sleep(manager.unregister_timeout * 1.1) + + time.sleep(manager.unregister_timeout + 1.0) self.assertFalse(topic in manager._publishers) self.assertFalse(topic in manager.unregister_timers) - self.assertFalse(self.is_topic_published(topic)) + self.assertFalse(is_topic_published(self.node, topic)) def test_publish_not_registered(self): topic = "/test_publish_not_registered" @@ -179,8 +217,10 @@ def test_publish_not_registered(self): client = "client_test_publish_not_registered" self.assertFalse(topic in manager._publishers) - self.assertFalse(self.is_topic_published(topic)) - self.assertRaises(TopicNotEstablishedException, manager.publish, client, topic, msg) + self.assertFalse(is_topic_published(self.node, topic)) + self.assertRaises( + TopicNotEstablishedException, manager.publish, client, topic, msg, self.node + ) def test_publisher_manager_publish(self): """Make sure that publishing works""" @@ -193,10 +233,14 @@ def test_publisher_manager_publish(self): def cb(msg): received["msg"] = msg - rospy.Subscriber(topic, String, cb) - manager.publish(client, topic, msg) - sleep(0.5) + subscriber_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + self.node.create_subscription(String, topic, cb, subscriber_qos) + manager.publish(client, topic, msg, self.node) + time.sleep(0.5) self.assertEqual(received["msg"].data, msg["data"]) def test_publisher_manager_bad_publish(self): @@ -206,11 +250,7 @@ def test_publisher_manager_bad_publish(self): msg_type = "std_msgs/String" msg = {"data": 3} - manager.register(client, topic, msg_type) - self.assertRaises(FieldTypeMismatchException, manager.publish, client, topic, msg) - - -PKG = "rosbridge_library" -NAME = "test_publisher_manager" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestPublisherManager) + manager.register(client, topic, self.node, msg_type) + self.assertRaises( + FieldTypeMismatchException, manager.publish, client, topic, msg, self.node + ) diff --git a/rosbridge_library/test/internal/test_services.py b/rosbridge_library/test/internal/services/test_services.py similarity index 80% rename from rosbridge_library/test/internal/test_services.py rename to rosbridge_library/test/internal/services/test_services.py index d96638bf3..b549cc7b3 100755 --- a/rosbridge_library/test/internal/test_services.py +++ b/rosbridge_library/test/internal/services/test_services.py @@ -2,10 +2,12 @@ import random import time import unittest +from threading import Thread import numpy as np import rclpy from rcl_interfaces.srv import ListParameters +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.internal import message_conversion as c from rosbridge_library.internal import ros_loader, services @@ -31,24 +33,35 @@ def populate_random_args(d): class ServiceTester: - def __init__(self, name, srv_type): + def __init__(self, executor, name, srv_type): self.name = name + self.executor = executor self.node = Node("service_tester_" + srv_type.replace("/", "_")) + self.executor.add_node(self.node) self.srvClass = ros_loader.get_service_class(srv_type) self.service = self.node.create_service(self.srvClass, name, self.callback) + def __del__(self): + self.executor.remove_node(self.node) + def start(self): req = self.srvClass.Request() gen = c.extract_values(req) gen = populate_random_args(gen) self.input = gen - thread = services.ServiceCaller(self.name, gen, self.success, self.error, self.node) + thread = services.ServiceCaller( + self.name, + gen, + self.success, + self.error, + self.node, + ) thread.start() thread.join() def callback(self, req, res): self.req = req - time.sleep(0.5) + time.sleep(0.1) gen = c.extract_values(res) gen = populate_random_args(gen) try: @@ -79,9 +92,16 @@ def validate(self, equality_function): class TestServices(unittest.TestCase): def setUp(self): rclpy.init() + self.executor = SingleThreadedExecutor() self.node = Node("test_node") + self.executor.add_node(self.node) + + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() def tearDown(self): + self.executor.remove_node(self.node) + self.executor.shutdown() rclpy.shutdown() def msgs_equal(self, msg1, msg2): @@ -144,10 +164,15 @@ def test_service_call(self): p = self.node.create_client(ListParameters, self.node.get_name() + "/list_parameters") p.wait_for_service(0.5) ret = p.call_async(ListParameters.Request()) - rclpy.spin_until_future_complete(self.node, ret) + while not ret.done(): + time.sleep(0.1) + self.node.destroy_client(p) # Now, call using the services - json_ret = services.call_service(self.node, self.node.get_name() + "/list_parameters") + json_ret = services.call_service( + self.node, + self.node.get_name() + "/list_parameters", + ) for x, y in zip(ret.result().result.names, json_ret["result"]["names"]): self.assertEqual(x, y) @@ -160,7 +185,9 @@ def test_service_caller(self): p = self.node.create_client(ListParameters, self.node.get_name() + "/list_parameters") p.wait_for_service(0.5) ret = p.call_async(ListParameters.Request()) - rclpy.spin_until_future_complete(self.node, ret) + while not ret.done(): + time.sleep(0.1) + self.node.destroy_client(p) rcvd = {"json": None} @@ -172,18 +199,24 @@ def error(): # Now, call using the services services.ServiceCaller( - self.node.get_name() + "/list_parameters", None, success, error, self.node + self.node.get_name() + "/list_parameters", + None, + success, + error, + self.node, ).start() - time.sleep(0.5) + time.sleep(0.2) for x, y in zip(ret.result().result.names, rcvd["json"]["result"]["names"]): self.assertEqual(x, y) def test_service_tester(self): - t = ServiceTester("/test_service_tester", "rosbridge_test_msgs/TestRequestAndResponse") + t = ServiceTester( + self.executor, "/test_service_tester", "rosbridge_test_msgs/TestRequestAndResponse" + ) t.start() - time.sleep(1.0) + time.sleep(0.2) t.validate(self.msgs_equal) def test_service_tester_alltypes(self): @@ -197,11 +230,13 @@ def test_service_tester_alltypes(self): "TestMultipleRequestFields", "TestArrayRequest", ]: - t = ServiceTester("/test_service_tester_alltypes_" + srv, "rosbridge_test_msgs/" + srv) + t = ServiceTester( + self.executor, "/test_service_tester_alltypes_" + srv, "rosbridge_test_msgs/" + srv + ) t.start() ts.append(t) - time.sleep(1.0) + time.sleep(0.2) for t in ts: t.validate(self.msgs_equal) @@ -219,11 +254,11 @@ def test_random_service_types(self): ] ts = [] for srv in common: - t = ServiceTester("/test_random_service_types/" + srv, srv) + t = ServiceTester(self.executor, "/test_random_service_types/" + srv, srv) t.start() ts.append(t) - time.sleep(1.0) + time.sleep(0.2) for t in ts: t.validate(self.msgs_equal) diff --git a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py index 915b92f93..76f084b5f 100755 --- a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py +++ b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py @@ -1,44 +1,57 @@ #!/usr/bin/env python +import time import unittest -from time import sleep +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal.subscribers import MultiSubscriber from rosbridge_library.internal.topics import TypeConflictException -from rosgraph import Master +from rosbridge_library.util.ros import is_topic_subscribed from std_msgs.msg import Int32, String class TestMultiSubscriber(unittest.TestCase): def setUp(self): - rospy.init_node("test_multi_subscriber") + self.client_id = "test_client_id" - def is_topic_published(self, topicname): - return topicname in dict(rospy.get_published_topics()).keys() + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_multi_subscriber") + self.executor.add_node(self.node) - def is_topic_subscribed(self, topicname): - return topicname in dict(Master("test_multi_subscriber").getSystemState()[1]) + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() + + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def test_register_multisubscriber(self): """Register a subscriber on a clean topic with a good msg type""" topic = "/test_register_multisubscriber" msg_type = "std_msgs/String" - self.assertFalse(self.is_topic_subscribed(topic)) - MultiSubscriber(topic, msg_type) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) + MultiSubscriber(topic, self.client_id, lambda *args: None, self.node, msg_type=msg_type) + self.assertTrue(is_topic_subscribed(self.node, topic)) def test_unregister_multisubscriber(self): """Register and unregister a subscriber on a clean topic with a good msg type""" topic = "/test_unregister_multisubscriber" msg_type = "std_msgs/String" - self.assertFalse(self.is_topic_subscribed(topic)) - multi = MultiSubscriber(topic, msg_type) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) + multi = MultiSubscriber( + topic, self.client_id, lambda *args: None, self.node, msg_type=msg_type + ) + self.assertTrue(is_topic_subscribed(self.node, topic)) multi.unregister() - self.assertFalse(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_verify_type(self): topic = "/test_verify_type" @@ -57,7 +70,7 @@ def test_verify_type(self): "sensor_msgs/PointCloud2", ] - s = MultiSubscriber(topic, msg_type) + s = MultiSubscriber(topic, self.client_id, lambda *args: None, self.node, msg_type=msg_type) s.verify_type(msg_type) for othertype in othertypes: self.assertRaises(TypeConflictException, s.verify_type, othertype) @@ -65,93 +78,98 @@ def test_verify_type(self): def test_subscribe_unsubscribe(self): topic = "/test_subscribe_unsubscribe" msg_type = "std_msgs/String" - client = "client_test_subscribe_unsubscribe" - self.assertFalse(self.is_topic_subscribed(topic)) - multi = MultiSubscriber(topic, msg_type) - self.assertTrue(self.is_topic_subscribed(topic)) - self.assertFalse(multi.has_subscribers()) + self.assertFalse(is_topic_subscribed(self.node, topic)) + multi = MultiSubscriber( + topic, self.client_id, lambda *args: None, self.node, msg_type=msg_type + ) + self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertEqual(len(multi.new_subscriptions), 0) - multi.subscribe(client, None) - self.assertTrue(multi.has_subscribers()) + multi.subscribe(self.client_id, None) + self.assertEqual(len(multi.new_subscriptions), 1) - multi.unsubscribe(client) - self.assertFalse(multi.has_subscribers()) + multi.unsubscribe(self.client_id) + self.assertEqual(len(multi.new_subscriptions), 0) multi.unregister() - self.assertFalse(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_subscribe_receive_json(self): topic = "/test_subscribe_receive_json" msg_type = "std_msgs/String" - client = "client_test_subscribe_receive_json" msg = String() msg.data = "dsajfadsufasdjf" - pub = rospy.Publisher(topic, String) - multi = MultiSubscriber(topic, msg_type) - + publisher_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + pub = self.node.create_publisher(String, topic, publisher_qos) received = {"msg": None} def cb(msg): received["msg"] = msg.get_json_values() - multi.subscribe(client, cb) - sleep(0.5) + MultiSubscriber(topic, self.client_id, cb, self.node, msg_type=msg_type) + time.sleep(0.1) pub.publish(msg) - sleep(0.5) + time.sleep(0.1) self.assertEqual(msg.data, received["msg"]["data"]) def test_subscribe_receive_json_multiple(self): topic = "/test_subscribe_receive_json_multiple" msg_type = "std_msgs/Int32" - client = "client_test_subscribe_receive_json_multiple" numbers = list(range(100)) - pub = rospy.Publisher(topic, Int32) - multi = MultiSubscriber(topic, msg_type) - + publisher_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + pub = self.node.create_publisher(Int32, topic, publisher_qos) received = {"msgs": []} def cb(msg): received["msgs"].append(msg.get_json_values()["data"]) - multi.subscribe(client, cb) - sleep(0.5) + MultiSubscriber(topic, self.client_id, cb, self.node, msg_type=msg_type) + time.sleep(0.1) for x in numbers: msg = Int32() msg.data = x pub.publish(msg) - sleep(0.5) + time.sleep(0.01) + time.sleep(0.1) self.assertEqual(numbers, received["msgs"]) def test_unsubscribe_does_not_receive_further_msgs(self): topic = "/test_unsubscribe_does_not_receive_further_msgs" msg_type = "std_msgs/String" - client = "client_test_unsubscribe_does_not_receive_further_msgs" msg = String() msg.data = "dsajfadsufasdjf" - pub = rospy.Publisher(topic, String) - multi = MultiSubscriber(topic, msg_type) - + publisher_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + pub = self.node.create_publisher(String, topic, publisher_qos) received = {"count": 0} def cb(msg): received["count"] = received["count"] + 1 - multi.subscribe(client, cb) - sleep(0.5) + multi = MultiSubscriber(topic, self.client_id, cb, self.node, msg_type=msg_type) + time.sleep(0.1) pub.publish(msg) - sleep(0.5) + time.sleep(0.1) self.assertEqual(received["count"], 1) - multi.unsubscribe(client) - sleep(0.5) + multi.unsubscribe(self.client_id) + time.sleep(0.1) pub.publish(msg) - sleep(0.5) + time.sleep(0.1) self.assertEqual(received["count"], 1) def test_multiple_subscribers(self): @@ -163,8 +181,11 @@ def test_multiple_subscribers(self): msg = String() msg.data = "dsajfadsufasdjf" - pub = rospy.Publisher(topic, String) - multi = MultiSubscriber(topic, msg_type) + publisher_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + pub = self.node.create_publisher(String, topic, publisher_qos) received = {"msg1": None, "msg2": None} @@ -174,16 +195,10 @@ def cb1(msg): def cb2(msg): received["msg2"] = msg.get_json_values() - multi.subscribe(client1, cb1) + multi = MultiSubscriber(topic, client1, cb1, self.node, msg_type=msg_type) multi.subscribe(client2, cb2) - sleep(0.5) + time.sleep(0.1) pub.publish(msg) - sleep(0.5) + time.sleep(0.1) self.assertEqual(msg.data, received["msg1"]["data"]) self.assertEqual(msg.data, received["msg2"]["data"]) - - -PKG = "rosbridge_library" -NAME = "test_multi_subscriber" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestMultiSubscriber) diff --git a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py index 9e0f82769..a23693b80 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py +++ b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py @@ -1,27 +1,36 @@ #!/usr/bin/env python +import time import unittest -from time import sleep +from threading import Thread -import rospy -import rostest +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal.subscribers import manager from rosbridge_library.internal.topics import ( TopicNotEstablishedException, TypeConflictException, ) -from rosgraph import Master +from rosbridge_library.util.ros import is_topic_subscribed from std_msgs.msg import String class TestSubscriberManager(unittest.TestCase): def setUp(self): - rospy.init_node("test_subscriber_manager") + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_subscriber_manager") + self.executor.add_node(self.node) - def is_topic_published(self, topicname): - return topicname in dict(rospy.get_published_topics()).keys() + self.exec_thread = Thread(target=self.executor.spin) + self.exec_thread.start() - def is_topic_subscribed(self, topicname): - return topicname in dict(Master("test_subscriber_manager").getSystemState()[1]) + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() def test_subscribe(self): """Register a publisher on a clean topic with a good msg type""" @@ -30,14 +39,14 @@ def test_subscribe(self): client = "client_test_subscribe" self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) - manager.subscribe(client, topic, None, msg_type) + self.assertFalse(is_topic_subscribed(self.node, topic)) + manager.subscribe(client, topic, None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client, topic) self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_register_subscriber_multiclient(self): topic = "/test_register_subscriber_multiclient" @@ -46,19 +55,22 @@ def test_register_subscriber_multiclient(self): client2 = "client_test_register_subscriber_multiclient_2" self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) - manager.subscribe(client1, topic, None, msg_type) + self.assertFalse(is_topic_subscribed(self.node, topic)) + manager.subscribe(client1, topic, None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) - manager.subscribe(client2, topic, None, msg_type) + self.assertTrue(is_topic_subscribed(self.node, topic)) + + manager.subscribe(client2, topic, None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) + manager.unsubscribe(client1, topic) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) + manager.unsubscribe(client2, topic) self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_register_publisher_conflicting_types(self): topic = "/test_register_publisher_conflicting_types" @@ -67,10 +79,10 @@ def test_register_publisher_conflicting_types(self): client = "client_test_register_publisher_conflicting_types" self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) - manager.subscribe(client, topic, None, msg_type) + self.assertFalse(is_topic_subscribed(self.node, topic)) + manager.subscribe(client, topic, None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) self.assertRaises( TypeConflictException, @@ -78,6 +90,7 @@ def test_register_publisher_conflicting_types(self): "client2", topic, None, + self.node, msg_type_bad, ) @@ -89,56 +102,65 @@ def test_register_multiple_publishers(self): self.assertFalse(topic1 in manager._subscribers) self.assertFalse(topic2 in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic1)) - self.assertFalse(self.is_topic_subscribed(topic2)) - manager.subscribe(client, topic1, None, msg_type) + self.assertFalse(is_topic_subscribed(self.node, topic1)) + self.assertFalse(is_topic_subscribed(self.node, topic2)) + + manager.subscribe(client, topic1, None, self.node, msg_type) self.assertTrue(topic1 in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic1)) + self.assertTrue(is_topic_subscribed(self.node, topic1)) self.assertFalse(topic2 in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic2)) - manager.subscribe(client, topic2, None, msg_type) + self.assertFalse(is_topic_subscribed(self.node, topic2)) + + manager.subscribe(client, topic2, None, self.node, msg_type) self.assertTrue(topic1 in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic1)) + self.assertTrue(is_topic_subscribed(self.node, topic1)) self.assertTrue(topic2 in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic2)) + self.assertTrue(is_topic_subscribed(self.node, topic2)) manager.unsubscribe(client, topic1) self.assertFalse(topic1 in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic1)) + self.assertFalse(is_topic_subscribed(self.node, topic1)) self.assertTrue(topic2 in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic2)) + self.assertTrue(is_topic_subscribed(self.node, topic2)) manager.unsubscribe(client, topic2) self.assertFalse(topic1 in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic1)) + self.assertFalse(is_topic_subscribed(self.node, topic1)) self.assertFalse(topic2 in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic2)) + self.assertFalse(is_topic_subscribed(self.node, topic2)) def test_register_no_msgtype(self): topic = "/test_register_no_msgtype" client = "client_test_register_no_msgtype" self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) - self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None) + self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertRaises( + TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node + ) def test_register_infer_topictype(self): topic = "/test_register_infer_topictype" client = "client_test_register_infer_topictype" - self.assertFalse(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) - rospy.Subscriber(topic, String, None) + subscriber_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + self.node.create_subscription(String, topic, lambda *args: None, subscriber_qos) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) self.assertFalse(topic in manager._subscribers) - manager.subscribe(client, topic, None) + + manager.subscribe(client, topic, None, self.node) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client, topic) self.assertFalse(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) def test_register_multiple_notopictype(self): topic = "/test_register_multiple_notopictype" @@ -147,27 +169,33 @@ def test_register_multiple_notopictype(self): client2 = "client_test_register_multiple_notopictype_2" self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) - manager.subscribe(client1, topic, None, msg_type) + self.assertFalse(is_topic_subscribed(self.node, topic)) + + manager.subscribe(client1, topic, None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) - manager.subscribe(client2, topic, None) + self.assertTrue(is_topic_subscribed(self.node, topic)) + + manager.subscribe(client2, topic, None, self.node) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) + manager.unsubscribe(client1, topic) self.assertTrue(topic in manager._subscribers) - self.assertTrue(self.is_topic_subscribed(topic)) + self.assertTrue(is_topic_subscribed(self.node, topic)) + manager.unsubscribe(client2, topic) self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_subscribe_not_registered(self): topic = "/test_subscribe_not_registered" client = "client_test_subscribe_not_registered" self.assertFalse(topic in manager._subscribers) - self.assertFalse(self.is_topic_subscribed(topic)) - self.assertRaises(TopicNotEstablishedException, manager.subscribe, client, topic, None) + self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertRaises( + TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node + ) def test_publisher_manager_publish(self): topic = "/test_publisher_manager_publish" @@ -177,20 +205,18 @@ def test_publisher_manager_publish(self): msg = String() msg.data = "dsajfadsufasdjf" - pub = rospy.Publisher(topic, String) + publisher_qos = QoSProfile( + depth=10, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + ) + pub = self.node.create_publisher(String, topic, publisher_qos) received = {"msg": None} def cb(msg): received["msg"] = msg.get_json_values() - manager.subscribe(client, topic, cb, msg_type) - sleep(0.5) + manager.subscribe(client, topic, cb, self.node, msg_type) + time.sleep(0.1) pub.publish(msg) - sleep(0.5) + time.sleep(0.1) self.assertEqual(msg.data, received["msg"]["data"]) - - -PKG = "rosbridge_library" -NAME = "test_subscriber_manager" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestSubscriberManager) diff --git a/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py b/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py index 40fad012c..27d149b80 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py +++ b/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py @@ -2,28 +2,41 @@ import time import unittest -import rospy -import rostest -from rosbridge_library.internal import subscription_modifiers as subscribe +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rosbridge_library.internal import subscription_modifiers class TestMessageHandlers(unittest.TestCase): def setUp(self): - rospy.init_node("test_message_handlers") + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_subscription_modifiers") + self.executor.add_node(self.node) + + def tearDown(self): + self.executor.remove_node(self.node) + self.node.destroy_node() + rclpy.shutdown() def dummy_cb(self, msg): pass def test_default_message_handler(self): - handler = subscribe.MessageHandler(None, self.dummy_cb) + handler = subscription_modifiers.MessageHandler(None, self.dummy_cb) self.help_test_default(handler) def test_throttle_message_handler(self): - handler = subscribe.ThrottleMessageHandler(subscribe.MessageHandler(None, self.dummy_cb)) + handler = subscription_modifiers.ThrottleMessageHandler( + subscription_modifiers.MessageHandler(None, self.dummy_cb) + ) self.help_test_throttle(handler, 50) def test_queue_message_handler_passes_msgs(self): - handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, self.dummy_cb)) + handler = subscription_modifiers.QueueMessageHandler( + subscription_modifiers.MessageHandler(None, self.dummy_cb) + ) self.help_test_queue(handler, 1000) handler.finish() @@ -33,7 +46,9 @@ def test_queue_message_handler_stops(self): def cb(msg): received["msgs"].append(msg) - handler = subscribe.QueueMessageHandler(subscribe.MessageHandler(None, cb)) + handler = subscription_modifiers.QueueMessageHandler( + subscription_modifiers.MessageHandler(None, cb) + ) self.assertTrue(handler.is_alive()) @@ -49,11 +64,11 @@ def cb(msg): msgs = range(1000) - handler = subscribe.MessageHandler(None, cb) + handler = subscription_modifiers.MessageHandler(None, cb) handler = handler.set_throttle_rate(10000) handler = handler.set_queue_length(10) - self.assertIsInstance(handler, subscribe.QueueMessageHandler) + self.assertIsInstance(handler, subscription_modifiers.QueueMessageHandler) # 'hello' is handled immediately handler.handle_message("hello") @@ -85,10 +100,10 @@ def cb(msg): queue_length = 5 msgs = range(queue_length * 5) - handler = subscribe.MessageHandler(None, cb) + handler = subscription_modifiers.MessageHandler(None, cb) handler = handler.set_queue_length(queue_length) - self.assertIsInstance(handler, subscribe.QueueMessageHandler) + self.assertIsInstance(handler, subscription_modifiers.QueueMessageHandler) # send all messages at once. # only the first and the last queue_length should get through, @@ -97,13 +112,13 @@ def cb(msg): handler.handle_message(x) # yield the thread so the first callback can append, # otherwise the first handled value is non-deterministic. - time.sleep(0) + time.sleep(0.01) # wait long enough for all the callbacks, and then some. time.sleep(queue_length + 3) try: - self.assertEqual([msgs[0]] + msgs[-queue_length:], received["msgs"]) + self.assertEqual([msgs[0]] + list(msgs[-queue_length:]), received["msgs"]) except: # noqa: E722 # Will finish and raise handler.finish() raise @@ -111,7 +126,7 @@ def cb(msg): handler.finish() def test_queue_message_handler_rate(self): - handler = subscribe.MessageHandler(None, self.dummy_cb) + handler = subscription_modifiers.MessageHandler(None, self.dummy_cb) self.help_test_queue_rate(handler, 50, 10) handler.finish() @@ -120,7 +135,7 @@ def test_queue_message_handler_rate(self): def help_test_default(self, handler): handler = handler.set_queue_length(0) handler = handler.set_throttle_rate(0) - self.assertIsInstance(handler, subscribe.MessageHandler) + self.assertIsInstance(handler, subscription_modifiers.MessageHandler) msg = "test_default_message_handler" received = {"msg": None} @@ -156,7 +171,7 @@ def cb(msg): def help_test_throttle(self, handler, throttle_rate): handler = handler.set_queue_length(0) handler = handler.set_throttle_rate(throttle_rate) - self.assertIsInstance(handler, subscribe.ThrottleMessageHandler) + self.assertIsInstance(handler, subscription_modifiers.ThrottleMessageHandler) msg = "test_throttle_message_handler" @@ -199,7 +214,7 @@ def cb(msg): def help_test_queue(self, handler, queue_length): handler = handler.set_queue_length(queue_length) - self.assertIsInstance(handler, subscribe.QueueMessageHandler) + self.assertIsInstance(handler, subscription_modifiers.QueueMessageHandler) received = {"msgs": []} @@ -220,7 +235,7 @@ def cb(msg): def help_test_queue_rate(self, handler, throttle_rate, queue_length): handler = handler.set_throttle_rate(throttle_rate) handler = handler.set_queue_length(queue_length) - self.assertIsInstance(handler, subscribe.QueueMessageHandler) + self.assertIsInstance(handler, subscription_modifiers.QueueMessageHandler) received = {"msg": None} @@ -251,125 +266,112 @@ def cb(msg): # Test that each transition works and is stable def test_transitions(self): # MessageHandler.transition is stable - handler = subscribe.MessageHandler(None, self.dummy_cb) + handler = subscription_modifiers.MessageHandler(None, self.dummy_cb) next_handler = handler.transition() self.assertEqual(handler, next_handler) # Going from MessageHandler to ThrottleMessageHandler... - handler = subscribe.MessageHandler(None, self.dummy_cb) + handler = subscription_modifiers.MessageHandler(None, self.dummy_cb) next_handler = handler.set_throttle_rate(100) - self.assertIsInstance(next_handler, subscribe.ThrottleMessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.ThrottleMessageHandler) handler = next_handler # Testing transition returns another ThrottleMessageHandler next_handler = handler.transition() self.assertEqual(handler, next_handler) # And finally going back to MessageHandler next_handler = handler.set_throttle_rate(0) - self.assertIsInstance(next_handler, subscribe.MessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.MessageHandler) # Same for QueueMessageHandler - handler = subscribe.MessageHandler(None, self.dummy_cb) + handler = subscription_modifiers.MessageHandler(None, self.dummy_cb) next_handler = handler.set_queue_length(100) - self.assertIsInstance(next_handler, subscribe.QueueMessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.QueueMessageHandler) handler = next_handler next_handler = handler.transition() self.assertEqual(handler, next_handler) next_handler = handler.set_queue_length(0) - self.assertIsInstance(next_handler, subscribe.MessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.MessageHandler) # Checking a QueueMessageHandler with rate limit can be generated both ways - handler = subscribe.MessageHandler(None, self.dummy_cb) + handler = subscription_modifiers.MessageHandler(None, self.dummy_cb) next_handler = handler.set_queue_length(100).set_throttle_rate(100) - self.assertIsInstance(next_handler, subscribe.QueueMessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.QueueMessageHandler) next_handler.finish() next_handler = handler.set_throttle_rate(100).set_queue_length(100) - self.assertIsInstance(next_handler, subscribe.QueueMessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.QueueMessageHandler) next_handler.finish() handler = next_handler next_handler = handler.transition() self.assertEqual(handler, next_handler) # Check both steps on the way back to plain MessageHandler next_handler = handler.set_throttle_rate(0) - self.assertIsInstance(next_handler, subscribe.QueueMessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.QueueMessageHandler) next_handler = handler.set_queue_length(0) - self.assertIsInstance(next_handler, subscribe.MessageHandler) + self.assertIsInstance(next_handler, subscription_modifiers.MessageHandler) def test_transition_functionality(self): # Test individually - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_queue(handler, 10) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_throttle(handler, 50) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_default(handler) handler.finish() # Test combinations - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_queue(handler, 10) handler = self.help_test_throttle(handler, 50) handler = self.help_test_default(handler) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_queue(handler, 10) handler = self.help_test_default(handler) handler = self.help_test_throttle(handler, 50) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_throttle(handler, 50) handler = self.help_test_queue_rate(handler, 50, 10) handler = self.help_test_default(handler) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_throttle(handler, 50) handler = self.help_test_default(handler) handler = self.help_test_queue_rate(handler, 50, 10) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_default(handler) handler = self.help_test_throttle(handler, 50) handler = self.help_test_queue_rate(handler, 50, 10) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_default(handler) handler = self.help_test_queue(handler, 10) handler = self.help_test_throttle(handler, 50) handler.finish() # Test duplicates - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_queue_rate(handler, 50, 10) handler = self.help_test_queue_rate(handler, 100, 10) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_throttle(handler, 50) handler = self.help_test_throttle(handler, 100) handler.finish() - handler = subscribe.MessageHandler(None, None) + handler = subscription_modifiers.MessageHandler(None, None) handler = self.help_test_default(handler) handler = self.help_test_default(handler) handler.finish() - - -# handler = self.help_test_throttle(handler, 50) -# handler = self.help_test_default(handler) -# handler = self.help_test_throttle(handler, 50) -# handler = self.help_test_default(handler) -# handler = self.help_test_throttle(handler, 50) - - -PKG = "rosbridge_library" -NAME = "test_message_handlers" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestMessageHandlers) diff --git a/rosbridge_library/test/internal/test_cbor_conversion.py b/rosbridge_library/test/internal/test_cbor_conversion.py index 8a5f0e4f1..a03505bb4 100755 --- a/rosbridge_library/test/internal/test_cbor_conversion.py +++ b/rosbridge_library/test/internal/test_cbor_conversion.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 import struct import unittest +from array import array -import rostest from rosbridge_library.internal.cbor_conversion import ( TAGGED_ARRAY_FORMATS, extract_cbor_values, @@ -13,9 +13,9 @@ except ImportError: from rosbridge_library.util.cbor import Tag +from builtin_interfaces.msg import Duration, Time from std_msgs.msg import ( Bool, - Duration, Float32, Float32MultiArray, Float64, @@ -31,7 +31,6 @@ MultiArrayDimension, MultiArrayLayout, String, - Time, UInt8, UInt8MultiArray, UInt16, @@ -86,10 +85,10 @@ def test_time(self): msg = msg_type() extracted = extract_cbor_values(msg) - self.assertEqual(extracted["data"]["secs"], msg.data.secs, f"type={msg_type}") - self.assertEqual(extracted["data"]["nsecs"], msg.data.nsecs, f"type={msg_type}") - self.assertEqual(type(extracted["data"]["secs"]), int, f"type={msg_type}") - self.assertEqual(type(extracted["data"]["nsecs"]), int, f"type={msg_type}") + self.assertEqual(extracted["sec"], msg.sec, f"type={msg_type}") + self.assertEqual(extracted["nanosec"], msg.nanosec, f"type={msg_type}") + self.assertEqual(type(extracted["sec"]), int, f"type={msg_type}") + self.assertEqual(type(extracted["nanosec"]), int, f"type={msg_type}") def test_byte_array(self): msg = UInt8MultiArray(data=[0, 1, 2]) @@ -100,7 +99,7 @@ def test_byte_array(self): for i, val in enumerate(msg.data): self.assertEqual(data[i], val) - def test_numeric_array(self): + def test_integer_array(self): for msg_type in [ Int8MultiArray, Int16MultiArray, @@ -109,6 +108,28 @@ def test_numeric_array(self): UInt16MultiArray, UInt32MultiArray, UInt64MultiArray, + ]: + msg = msg_type(data=[0, 1, 2]) + extracted = extract_cbor_values(msg) + + tag = extracted["data"] + self.assertEqual(type(tag), Tag, f"type={msg_type}") + self.assertEqual(type(tag.value), bytes, f"type={msg_type}") + + # This is as consistent as the message definitions. + array_type = msg.get_fields_and_field_types()["data"] + + expected_tag = TAGGED_ARRAY_FORMATS[array_type][0] + self.assertEqual(tag.tag, expected_tag, f"type={msg_type}") + + fmt = TAGGED_ARRAY_FORMATS[array_type][1] + fmt_to_length = fmt.format(len(msg.data)) + unpacked = list(struct.unpack(fmt_to_length, tag.value)) + + self.assertEqual(array("b", unpacked), msg.data, f"type={msg_type}") + + def test_float_array(self): + for msg_type in [ Float32MultiArray, Float64MultiArray, ]: @@ -119,8 +140,8 @@ def test_numeric_array(self): self.assertEqual(type(tag), Tag, f"type={msg_type}") self.assertEqual(type(tag.value), bytes, f"type={msg_type}") - # This is as consistent as the message defs.. - array_type = msg._slot_types[1] + # This is as consistent as the message definitions. + array_type = msg.get_fields_and_field_types()["data"] expected_tag = TAGGED_ARRAY_FORMATS[array_type][0] self.assertEqual(tag.tag, expected_tag, f"type={msg_type}") @@ -129,7 +150,7 @@ def test_numeric_array(self): fmt_to_length = fmt.format(len(msg.data)) unpacked = list(struct.unpack(fmt_to_length, tag.value)) - self.assertEqual(unpacked, msg.data, f"type={msg_type}") + self.assertEqual(array("f", unpacked), msg.data, f"type={msg_type}") def test_nested_messages(self): msg = UInt8MultiArray( @@ -167,9 +188,3 @@ def test_unicode_keys(self): keys = extracted.keys() for key in keys: self.assertEqual(type(key), str) - - -PKG = "rosbridge_library" -NAME = "test_cbor_conversion" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestCBORConversion) diff --git a/rosbridge_library/test/internal/test_compression.py b/rosbridge_library/test/internal/test_compression.py index 7e5ebfcfb..a7f7d5fff 100755 --- a/rosbridge_library/test/internal/test_compression.py +++ b/rosbridge_library/test/internal/test_compression.py @@ -1,31 +1,20 @@ #!/usr/bin/env python import unittest -import rospy -import rostest from rosbridge_library.internal import pngcompression class TestCompression(unittest.TestCase): - def setUp(self): - rospy.init_node("test_compression") - def test_compress(self): - bytes = list(range(128)) * 10000 - string = str(bytearray(bytes)) + bytes_data = list(range(128)) * 10000 + string = str(bytearray(bytes_data)) encoded = pngcompression.encode(string) self.assertNotEqual(string, encoded) def test_compress_decompress(self): - bytes = list(range(128)) * 10000 - string = str(bytearray(bytes)) + bytes_data = list(range(128)) * 10000 + string = str(bytes(bytes_data)) encoded = pngcompression.encode(string) self.assertNotEqual(string, encoded) decoded = pngcompression.decode(encoded) self.assertEqual(string, decoded) - - -PKG = "rosbridge_library" -NAME = "test_compression" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestCompression) diff --git a/rosbridge_library/test/internal/test_outgoing_message.py b/rosbridge_library/test/internal/test_outgoing_message.py index 08d6cf601..8293822d2 100755 --- a/rosbridge_library/test/internal/test_outgoing_message.py +++ b/rosbridge_library/test/internal/test_outgoing_message.py @@ -1,7 +1,6 @@ #!/usr/bin/env python import unittest -import rostest from rosbridge_library.internal.outgoing_message import OutgoingMessage from std_msgs.msg import String @@ -26,9 +25,3 @@ def test_cbor_values(self): again = outgoing.get_cbor_values() self.assertTrue(result is again) - - -PKG = "rosbridge_library" -NAME = "test_outgoing_message" -if __name__ == "__main__": - rostest.unitrun(PKG, NAME, TestOutgoingMessage) diff --git a/rosbridge_server/src/rosbridge_server/__init__.py b/rosbridge_server/src/rosbridge_server/__init__.py index fe3ed3131..5a9f49cba 100644 --- a/rosbridge_server/src/rosbridge_server/__init__.py +++ b/rosbridge_server/src/rosbridge_server/__init__.py @@ -1,2 +1,2 @@ -from .client_mananger import ClientManager # noqa: F401 +from .client_manager import ClientManager # noqa: F401 from .websocket_handler import RosbridgeWebSocket # noqa: F401 diff --git a/rosbridge_server/src/rosbridge_server/client_mananger.py b/rosbridge_server/src/rosbridge_server/client_manager.py similarity index 100% rename from rosbridge_server/src/rosbridge_server/client_mananger.py rename to rosbridge_server/src/rosbridge_server/client_manager.py diff --git a/rosbridge_server/test/websocket/call_service.test.py b/rosbridge_server/test/websocket/call_service.test.py index 549b4cbb3..c8e041819 100644 --- a/rosbridge_server/test/websocket/call_service.test.py +++ b/rosbridge_server/test/websocket/call_service.test.py @@ -3,6 +3,7 @@ import sys import unittest +from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.node import Node from std_srvs.srv import SetBool from twisted.python import log @@ -26,7 +27,9 @@ def service_cb(req, res): res.message = "Hello, world!" return res - service = node.create_service(SetBool, "/test_service", service_cb) + service = node.create_service( + SetBool, "/test_service", service_cb, callback_group=ReentrantCallbackGroup() + ) ws_client = await make_client() responses_future, ws_client.message_handler = expect_messages( diff --git a/rosbridge_server/test/websocket/common.py b/rosbridge_server/test/websocket/common.py index 460f145d5..450534ae9 100644 --- a/rosbridge_server/test/websocket/common.py +++ b/rosbridge_server/test/websocket/common.py @@ -119,7 +119,7 @@ def run_websocket_test( context = rclpy.Context() rclpy.init(context=context) executor = SingleThreadedExecutor(context=context) - node = rclpy.create_node(node_name, context=context) + node = Node(node_name, context=context) executor.add_node(node) async def task(): @@ -128,11 +128,12 @@ async def task(): future = executor.create_task(task) - reactor.callInThread(rclpy.spin_until_future_complete, node, future, executor) + reactor.callInThread(executor.spin_until_future_complete, future) reactor.run(installSignalHandlers=False) - rclpy.shutdown(context=context) + executor.remove_node(node) node.destroy_node() + rclpy.shutdown(context=context) def sleep(node: Node, duration: float) -> Awaitable[None]: