Skip to content

Commit

Permalink
PR comments and fixes from testing on production
Browse files Browse the repository at this point in the history
  • Loading branch information
sea-bass committed Oct 13, 2023
1 parent 84f7461 commit 6112c81
Show file tree
Hide file tree
Showing 15 changed files with 59 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self, protocol):
protocol.node_handle.get_logger().info("Calling services in existing thread")
protocol.register_operation("call_service", self.call_service)

def call_service(self, message, sleep_time=0):
def call_service(self, message, spin_rate=0):
# Pull out the ID
cid = message.get("id", None)

Expand Down Expand Up @@ -113,7 +113,7 @@ def call_service(self, message, sleep_time=0):

# Run service caller in the same thread.
ServiceCaller(
trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle, sleep_time
trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle, spin_rate
).run()

def _success(self, cid, service, fragment_size, compression, message):
Expand Down
27 changes: 12 additions & 15 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, servicename):


class ServiceCaller(Thread):
def __init__(self, service, args, success_callback, error_callback, node_handle, sleep_time=0):
def __init__(self, service, args, success_callback, error_callback, node_handle, spin_rate=0):
"""Create a service caller for the specified service. Use start()
to start in a separate thread or run() to run in this thread.
Expand All @@ -65,7 +65,7 @@ def __init__(self, service, args, success_callback, error_callback, node_handle,
error_callback -- a callback to call if an error occurs. The
callback will be passed the exception that caused the failure
node_handle -- a ROS 2 node handle to call services.
sleep_time -- if nonzero, puts sleeps between executor spins
spin_rate -- if nonzero, puts sleeps between executor spins
"""
Thread.__init__(self)
self.daemon = True
Expand All @@ -74,14 +74,14 @@ def __init__(self, service, args, success_callback, error_callback, node_handle,
self.success = success_callback
self.error = error_callback
self.node_handle = node_handle
self.sleep_time = sleep_time
self.spin_rate = spin_rate

def run(self):
try:
# Call the service and pass the result to the success handler
self.success(
call_service(
self.node_handle, self.service, args=self.args, sleep_time=self.sleep_time
self.node_handle, self.service, args=self.args, spin_rate=self.spin_rate
)
)
except Exception as e:
Expand All @@ -105,7 +105,7 @@ def args_to_service_request_instance(service, inst, args):
populate_instance(msg, inst)


def call_service(node_handle, service, args=None, sleep_time=0):
def call_service(node_handle, service, args=None, spin_rate=0):
# Given the service name, fetch the type and class of the service,
# and a request instance
service = expand_topic_name(service, node_handle.get_name(), node_handle.get_namespace())
Expand All @@ -127,19 +127,16 @@ def call_service(node_handle, service, args=None, sleep_time=0):

client = node_handle.create_client(service_class, service)

future = client.call_async(inst)
if sleep_time == 0:
if node_handle.executor:
node_handle.executor.spin_until_future_complete(future)
else:
rclpy.spin_until_future_complete(future)
else:
if spin_rate > 0:
future = client.call_async(inst)
while not future.done():
if node_handle.executor:
node_handle.executor.spin_once(timeout_sec=sleep_time)
node_handle.executor.spin_once(timeout_sec=spin_rate)
else:
rclpy.spin_once(node_handle, timeout_sec=sleep_time)
result = future.result()
rclpy.spin_once(node_handle, timeout_sec=spin_rate)
result = future.result()
else:
result = client.call(inst)

node_handle.destroy_client(client)
if result is not None:
Expand Down
4 changes: 2 additions & 2 deletions rosbridge_library/src/rosbridge_library/util/ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def is_topic_published(node, topic_name):
published_topic_data = node.get_publisher_names_and_types_by_node(
node.get_name(), node.get_namespace()
)
return topic_name in [topic[0] for topic in published_topic_data]
return any(topic[0] == topic_name for topic in published_topic_data)


def is_topic_subscribed(node, topic_name):
"""Checks if a topic is subscribed to by a node."""
subscribed_topic_data = node.get_subscriber_names_and_types_by_node(
node.get_name(), node.get_namespace()
)
return topic_name in [topic[0] for topic in subscribed_topic_data]
return any(topic[0] == topic_name for topic in subscribed_topic_data)
4 changes: 2 additions & 2 deletions rosbridge_library/test/capabilities/test_advertise.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from json import dumps, loads

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rosbridge_library.capabilities.advertise import Advertise
from rosbridge_library.internal import ros_loader
Expand All @@ -20,7 +20,7 @@
class TestAdvertise(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_advertise")
self.executor.add_node(self.node)

Expand Down
10 changes: 5 additions & 5 deletions rosbridge_library/test/capabilities/test_call_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from json import dumps, loads

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rosbridge_library.capabilities.call_service import CallService
from rosbridge_library.internal.exceptions import (
Expand Down Expand Up @@ -33,7 +33,7 @@ def set_bool_cb(self, request, response):

def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_call_service")
self.executor.add_node(self.node)

Expand Down Expand Up @@ -82,7 +82,7 @@ def cb(msg, cid=None, compression="none"):

proto.send = cb

s.call_service(send_msg)
s.call_service(send_msg, spin_rate=0.01)

timeout = 1.0
start = time.time()
Expand Down Expand Up @@ -119,7 +119,7 @@ def cb(msg, cid=None, compression="none"):

proto.send = cb

s.call_service(send_msg)
s.call_service(send_msg, spin_rate=0.01)

timeout = 1.0
start = time.time()
Expand Down Expand Up @@ -158,7 +158,7 @@ def cb(msg, cid=None, compression="none"):

proto.send = cb

s.call_service(send_msg)
s.call_service(send_msg, spin_rate=0.01)

timeout = 1.0
start = time.time()
Expand Down
4 changes: 2 additions & 2 deletions rosbridge_library/test/capabilities/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from json import dumps, loads

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile
from rosbridge_library.capabilities.publish import Publish
Expand All @@ -19,7 +19,7 @@
class TestAdvertise(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_publish")
self.executor.add_node(self.node)

Expand Down
12 changes: 6 additions & 6 deletions rosbridge_library/test/capabilities/test_service_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from threading import Thread

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rosbridge_library.capabilities.advertise_service import AdvertiseService
from rosbridge_library.capabilities.call_service import CallService
Expand All @@ -21,7 +21,7 @@
class TestServiceCapabilities(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_service_capabilities")
self.executor.add_node(self.node)

Expand Down Expand Up @@ -114,8 +114,8 @@ def test_call_advertised_service(self):
)
)
self.received_message = None
sleep_time = 0.01
Thread(target=call_service.call_service, args=(call_msg, sleep_time)).start()
spin_rate = 0.01
Thread(target=call_service.call_service, args=(call_msg, spin_rate)).start()

loop_iterations = 0
while self.received_message is None:
Expand Down Expand Up @@ -183,8 +183,8 @@ def test_unadvertise_with_live_request(self):
)
)
self.received_message = None
sleep_time = 0.01
Thread(target=call_service.call_service, args=(call_msg, sleep_time)).start()
spin_rate = 0.01
Thread(target=call_service.call_service, args=(call_msg, spin_rate)).start()

loop_iterations = 0
while self.received_message is None:
Expand Down
4 changes: 2 additions & 2 deletions rosbridge_library/test/capabilities/test_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from json import dumps, loads

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile
from rosbridge_library.capabilities import subscribe
Expand All @@ -19,7 +19,7 @@
class TestSubscribe(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_subscribe")
self.executor.add_node(self.node)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class TestMultiPublisher(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = MultiThreadedExecutor(num_threads=2)
self.node = Node("test_multi_publisher")
self.executor.add_node(self.node)

Expand Down Expand Up @@ -146,6 +146,7 @@ def cb(msg):

p = MultiPublisher(topic, self.node, msg_type)
p.publish(msg)
time.sleep(0.1)
self.executor.spin_once()
time.sleep(0.1)
self.assertEqual(received["msg"].data, msg["data"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile
from rosbridge_library.internal.message_conversion import FieldTypeMismatchException
Expand All @@ -22,7 +22,7 @@
class TestPublisherManager(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_publisher_manager")
self.executor.add_node(self.node)

Expand Down
19 changes: 14 additions & 5 deletions rosbridge_library/test/internal/services/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np
import rclpy
from rcl_interfaces.srv import ListParameters
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rosbridge_library.internal import message_conversion as c
from rosbridge_library.internal import ros_loader, services
Expand Down Expand Up @@ -48,7 +48,9 @@ def start(self):
gen = c.extract_values(req)
gen = populate_random_args(gen)
self.input = gen
thread = services.ServiceCaller(self.name, gen, self.success, self.error, self.node)
thread = services.ServiceCaller(
self.name, gen, self.success, self.error, self.node, spin_rate=0.01
)
thread.start()
thread.join()

Expand Down Expand Up @@ -85,7 +87,7 @@ def validate(self, equality_function):
class TestServices(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_node")
self.executor.add_node(self.node)

Expand Down Expand Up @@ -157,7 +159,9 @@ def test_service_call(self):
self.node.destroy_client(p)

# Now, call using the services
json_ret = services.call_service(self.node, self.node.get_name() + "/list_parameters")
json_ret = services.call_service(
self.node, self.node.get_name() + "/list_parameters", spin_rate=0.01
)
for x, y in zip(ret.result().result.names, json_ret["result"]["names"]):
self.assertEqual(x, y)

Expand All @@ -182,7 +186,12 @@ def error():

# Now, call using the services
services.ServiceCaller(
self.node.get_name() + "/list_parameters", None, success, error, self.node
self.node.get_name() + "/list_parameters",
None,
success,
error,
self.node,
spin_rate=0.01,
).start()

time.sleep(0.2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile
from rosbridge_library.internal.subscribers import MultiSubscriber
Expand All @@ -17,7 +17,7 @@ def setUp(self):
self.client_id = "test_client_id"

rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_multi_subscriber")
self.executor.add_node(self.node)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import DurabilityPolicy, QoSProfile
from rosbridge_library.internal.subscribers import manager
Expand All @@ -18,7 +18,7 @@
class TestSubscriberManager(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_subscriber_manager")
self.executor.add_node(self.node)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import unittest

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rosbridge_library.internal import subscription_modifiers


class TestMessageHandlers(unittest.TestCase):
def setUp(self):
rclpy.init()
self.executor = MultiThreadedExecutor()
self.executor = SingleThreadedExecutor()
self.node = Node("test_subscription_modifiers")
self.executor.add_node(self.node)

Expand Down
Loading

0 comments on commit 6112c81

Please sign in to comment.