From 6112c81c2e4610ecbc1c144ad852656ffefe9b8a Mon Sep 17 00:00:00 2001 From: Sebastian Castro Date: Fri, 13 Oct 2023 16:02:24 -0400 Subject: [PATCH] PR comments and fixes from testing on production --- .../capabilities/call_service.py | 4 +-- .../rosbridge_library/internal/services.py | 27 +++++++++---------- .../src/rosbridge_library/util/ros.py | 4 +-- .../test/capabilities/test_advertise.py | 4 +-- .../test/capabilities/test_call_service.py | 10 +++---- .../test/capabilities/test_publish.py | 4 +-- .../capabilities/test_service_capabilities.py | 12 ++++----- .../test/capabilities/test_subscribe.py | 4 +-- .../publishers/test_multi_publisher.py | 3 ++- .../publishers/test_publisher_manager.py | 4 +-- .../test/internal/services/test_services.py | 19 +++++++++---- .../subscribers/test_multi_subscriber.py | 4 +-- .../subscribers/test_subscriber_manager.py | 4 +-- .../test_subscription_modifiers.py | 4 +-- rosbridge_server/test/websocket/common.py | 4 +-- 15 files changed, 59 insertions(+), 52 deletions(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/call_service.py b/rosbridge_library/src/rosbridge_library/capabilities/call_service.py index 6d2c6961a..28d593431 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/call_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/call_service.py @@ -69,7 +69,7 @@ def __init__(self, protocol): protocol.node_handle.get_logger().info("Calling services in existing thread") protocol.register_operation("call_service", self.call_service) - def call_service(self, message, sleep_time=0): + def call_service(self, message, spin_rate=0): # Pull out the ID cid = message.get("id", None) @@ -113,7 +113,7 @@ def call_service(self, message, sleep_time=0): # Run service caller in the same thread. ServiceCaller( - trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle, sleep_time + trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle, spin_rate ).run() def _success(self, cid, service, fragment_size, compression, message): diff --git a/rosbridge_library/src/rosbridge_library/internal/services.py b/rosbridge_library/src/rosbridge_library/internal/services.py index e9753ba4b..0d861eb4c 100644 --- a/rosbridge_library/src/rosbridge_library/internal/services.py +++ b/rosbridge_library/src/rosbridge_library/internal/services.py @@ -50,7 +50,7 @@ def __init__(self, servicename): class ServiceCaller(Thread): - def __init__(self, service, args, success_callback, error_callback, node_handle, sleep_time=0): + def __init__(self, service, args, success_callback, error_callback, node_handle, spin_rate=0): """Create a service caller for the specified service. Use start() to start in a separate thread or run() to run in this thread. @@ -65,7 +65,7 @@ def __init__(self, service, args, success_callback, error_callback, node_handle, error_callback -- a callback to call if an error occurs. The callback will be passed the exception that caused the failure node_handle -- a ROS 2 node handle to call services. - sleep_time -- if nonzero, puts sleeps between executor spins + spin_rate -- if nonzero, puts sleeps between executor spins """ Thread.__init__(self) self.daemon = True @@ -74,14 +74,14 @@ def __init__(self, service, args, success_callback, error_callback, node_handle, self.success = success_callback self.error = error_callback self.node_handle = node_handle - self.sleep_time = sleep_time + self.spin_rate = spin_rate def run(self): try: # Call the service and pass the result to the success handler self.success( call_service( - self.node_handle, self.service, args=self.args, sleep_time=self.sleep_time + self.node_handle, self.service, args=self.args, spin_rate=self.spin_rate ) ) except Exception as e: @@ -105,7 +105,7 @@ def args_to_service_request_instance(service, inst, args): populate_instance(msg, inst) -def call_service(node_handle, service, args=None, sleep_time=0): +def call_service(node_handle, service, args=None, spin_rate=0): # Given the service name, fetch the type and class of the service, # and a request instance service = expand_topic_name(service, node_handle.get_name(), node_handle.get_namespace()) @@ -127,19 +127,16 @@ def call_service(node_handle, service, args=None, sleep_time=0): client = node_handle.create_client(service_class, service) - future = client.call_async(inst) - if sleep_time == 0: - if node_handle.executor: - node_handle.executor.spin_until_future_complete(future) - else: - rclpy.spin_until_future_complete(future) - else: + if spin_rate > 0: + future = client.call_async(inst) while not future.done(): if node_handle.executor: - node_handle.executor.spin_once(timeout_sec=sleep_time) + node_handle.executor.spin_once(timeout_sec=spin_rate) else: - rclpy.spin_once(node_handle, timeout_sec=sleep_time) - result = future.result() + rclpy.spin_once(node_handle, timeout_sec=spin_rate) + result = future.result() + else: + result = client.call(inst) node_handle.destroy_client(client) if result is not None: diff --git a/rosbridge_library/src/rosbridge_library/util/ros.py b/rosbridge_library/src/rosbridge_library/util/ros.py index f38735996..f2e3fb234 100644 --- a/rosbridge_library/src/rosbridge_library/util/ros.py +++ b/rosbridge_library/src/rosbridge_library/util/ros.py @@ -36,7 +36,7 @@ def is_topic_published(node, topic_name): published_topic_data = node.get_publisher_names_and_types_by_node( node.get_name(), node.get_namespace() ) - return topic_name in [topic[0] for topic in published_topic_data] + return any(topic[0] == topic_name for topic in published_topic_data) def is_topic_subscribed(node, topic_name): @@ -44,4 +44,4 @@ def is_topic_subscribed(node, topic_name): subscribed_topic_data = node.get_subscriber_names_and_types_by_node( node.get_name(), node.get_namespace() ) - return topic_name in [topic[0] for topic in subscribed_topic_data] + return any(topic[0] == topic_name for topic in subscribed_topic_data) diff --git a/rosbridge_library/test/capabilities/test_advertise.py b/rosbridge_library/test/capabilities/test_advertise.py index 9a98c6b8b..11468d80e 100755 --- a/rosbridge_library/test/capabilities/test_advertise.py +++ b/rosbridge_library/test/capabilities/test_advertise.py @@ -4,7 +4,7 @@ from json import dumps, loads import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.capabilities.advertise import Advertise from rosbridge_library.internal import ros_loader @@ -20,7 +20,7 @@ class TestAdvertise(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_advertise") self.executor.add_node(self.node) diff --git a/rosbridge_library/test/capabilities/test_call_service.py b/rosbridge_library/test/capabilities/test_call_service.py index d9617eb68..12c114ad0 100755 --- a/rosbridge_library/test/capabilities/test_call_service.py +++ b/rosbridge_library/test/capabilities/test_call_service.py @@ -4,7 +4,7 @@ from json import dumps, loads import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.capabilities.call_service import CallService from rosbridge_library.internal.exceptions import ( @@ -33,7 +33,7 @@ def set_bool_cb(self, request, response): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_call_service") self.executor.add_node(self.node) @@ -82,7 +82,7 @@ def cb(msg, cid=None, compression="none"): proto.send = cb - s.call_service(send_msg) + s.call_service(send_msg, spin_rate=0.01) timeout = 1.0 start = time.time() @@ -119,7 +119,7 @@ def cb(msg, cid=None, compression="none"): proto.send = cb - s.call_service(send_msg) + s.call_service(send_msg, spin_rate=0.01) timeout = 1.0 start = time.time() @@ -158,7 +158,7 @@ def cb(msg, cid=None, compression="none"): proto.send = cb - s.call_service(send_msg) + s.call_service(send_msg, spin_rate=0.01) timeout = 1.0 start = time.time() diff --git a/rosbridge_library/test/capabilities/test_publish.py b/rosbridge_library/test/capabilities/test_publish.py index 8a1b717eb..e276dc220 100755 --- a/rosbridge_library/test/capabilities/test_publish.py +++ b/rosbridge_library/test/capabilities/test_publish.py @@ -4,7 +4,7 @@ from json import dumps, loads import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.capabilities.publish import Publish @@ -19,7 +19,7 @@ class TestAdvertise(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_publish") self.executor.add_node(self.node) diff --git a/rosbridge_library/test/capabilities/test_service_capabilities.py b/rosbridge_library/test/capabilities/test_service_capabilities.py index 36ce6f7b7..2dee143de 100755 --- a/rosbridge_library/test/capabilities/test_service_capabilities.py +++ b/rosbridge_library/test/capabilities/test_service_capabilities.py @@ -5,7 +5,7 @@ from threading import Thread import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.capabilities.advertise_service import AdvertiseService from rosbridge_library.capabilities.call_service import CallService @@ -21,7 +21,7 @@ class TestServiceCapabilities(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_service_capabilities") self.executor.add_node(self.node) @@ -114,8 +114,8 @@ def test_call_advertised_service(self): ) ) self.received_message = None - sleep_time = 0.01 - Thread(target=call_service.call_service, args=(call_msg, sleep_time)).start() + spin_rate = 0.01 + Thread(target=call_service.call_service, args=(call_msg, spin_rate)).start() loop_iterations = 0 while self.received_message is None: @@ -183,8 +183,8 @@ def test_unadvertise_with_live_request(self): ) ) self.received_message = None - sleep_time = 0.01 - Thread(target=call_service.call_service, args=(call_msg, sleep_time)).start() + spin_rate = 0.01 + Thread(target=call_service.call_service, args=(call_msg, spin_rate)).start() loop_iterations = 0 while self.received_message is None: diff --git a/rosbridge_library/test/capabilities/test_subscribe.py b/rosbridge_library/test/capabilities/test_subscribe.py index 70d837189..640b4525b 100755 --- a/rosbridge_library/test/capabilities/test_subscribe.py +++ b/rosbridge_library/test/capabilities/test_subscribe.py @@ -4,7 +4,7 @@ from json import dumps, loads import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.capabilities import subscribe @@ -19,7 +19,7 @@ class TestSubscribe(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_subscribe") self.executor.add_node(self.node) diff --git a/rosbridge_library/test/internal/publishers/test_multi_publisher.py b/rosbridge_library/test/internal/publishers/test_multi_publisher.py index a732f97b5..3ff6eefdf 100755 --- a/rosbridge_library/test/internal/publishers/test_multi_publisher.py +++ b/rosbridge_library/test/internal/publishers/test_multi_publisher.py @@ -16,7 +16,7 @@ class TestMultiPublisher(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = MultiThreadedExecutor(num_threads=2) self.node = Node("test_multi_publisher") self.executor.add_node(self.node) @@ -146,6 +146,7 @@ def cb(msg): p = MultiPublisher(topic, self.node, msg_type) p.publish(msg) + time.sleep(0.1) self.executor.spin_once() time.sleep(0.1) self.assertEqual(received["msg"].data, msg["data"]) diff --git a/rosbridge_library/test/internal/publishers/test_publisher_manager.py b/rosbridge_library/test/internal/publishers/test_publisher_manager.py index d974dcc9a..77ce51b68 100755 --- a/rosbridge_library/test/internal/publishers/test_publisher_manager.py +++ b/rosbridge_library/test/internal/publishers/test_publisher_manager.py @@ -3,7 +3,7 @@ import unittest import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal.message_conversion import FieldTypeMismatchException @@ -22,7 +22,7 @@ class TestPublisherManager(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_publisher_manager") self.executor.add_node(self.node) diff --git a/rosbridge_library/test/internal/services/test_services.py b/rosbridge_library/test/internal/services/test_services.py index 709e3f67d..a7a602536 100755 --- a/rosbridge_library/test/internal/services/test_services.py +++ b/rosbridge_library/test/internal/services/test_services.py @@ -6,7 +6,7 @@ import numpy as np import rclpy from rcl_interfaces.srv import ListParameters -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.internal import message_conversion as c from rosbridge_library.internal import ros_loader, services @@ -48,7 +48,9 @@ def start(self): gen = c.extract_values(req) gen = populate_random_args(gen) self.input = gen - thread = services.ServiceCaller(self.name, gen, self.success, self.error, self.node) + thread = services.ServiceCaller( + self.name, gen, self.success, self.error, self.node, spin_rate=0.01 + ) thread.start() thread.join() @@ -85,7 +87,7 @@ def validate(self, equality_function): class TestServices(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_node") self.executor.add_node(self.node) @@ -157,7 +159,9 @@ def test_service_call(self): self.node.destroy_client(p) # Now, call using the services - json_ret = services.call_service(self.node, self.node.get_name() + "/list_parameters") + json_ret = services.call_service( + self.node, self.node.get_name() + "/list_parameters", spin_rate=0.01 + ) for x, y in zip(ret.result().result.names, json_ret["result"]["names"]): self.assertEqual(x, y) @@ -182,7 +186,12 @@ def error(): # Now, call using the services services.ServiceCaller( - self.node.get_name() + "/list_parameters", None, success, error, self.node + self.node.get_name() + "/list_parameters", + None, + success, + error, + self.node, + spin_rate=0.01, ).start() time.sleep(0.2) diff --git a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py index 4d1034997..512346b8d 100755 --- a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py +++ b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py @@ -3,7 +3,7 @@ import unittest import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal.subscribers import MultiSubscriber @@ -17,7 +17,7 @@ def setUp(self): self.client_id = "test_client_id" rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_multi_subscriber") self.executor.add_node(self.node) diff --git a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py index ad9ffe870..791eb0cd0 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py +++ b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py @@ -3,7 +3,7 @@ import unittest import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal.subscribers import manager @@ -18,7 +18,7 @@ class TestSubscriberManager(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_subscriber_manager") self.executor.add_node(self.node) diff --git a/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py b/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py index 02e3417e8..27d149b80 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py +++ b/rosbridge_library/test/internal/subscribers/test_subscription_modifiers.py @@ -3,7 +3,7 @@ import unittest import rclpy -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.internal import subscription_modifiers @@ -11,7 +11,7 @@ class TestMessageHandlers(unittest.TestCase): def setUp(self): rclpy.init() - self.executor = MultiThreadedExecutor() + self.executor = SingleThreadedExecutor() self.node = Node("test_subscription_modifiers") self.executor.add_node(self.node) diff --git a/rosbridge_server/test/websocket/common.py b/rosbridge_server/test/websocket/common.py index 563fbfcd4..450534ae9 100644 --- a/rosbridge_server/test/websocket/common.py +++ b/rosbridge_server/test/websocket/common.py @@ -8,7 +8,7 @@ import rclpy.task from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol from rcl_interfaces.srv import GetParameters -from rclpy.executors import MultiThreadedExecutor +from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from twisted.internet import reactor from twisted.internet.endpoints import TCP4ClientEndpoint @@ -118,7 +118,7 @@ def run_websocket_test( ): context = rclpy.Context() rclpy.init(context=context) - executor = MultiThreadedExecutor(context=context) + executor = SingleThreadedExecutor(context=context) node = Node(node_name, context=context) executor.add_node(node)