Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port unit tests to ROS 2 + Fix CBOR conversion and PNG compression #882

Merged
merged 17 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rosapi/src/rosapi/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ def _get_param_names(node_name):

request = ListParameters.Request()
future = client.call_async(request)
rclpy.spin_until_future_complete(_node, future)
if _node.executor:
_node.executor.spin_until_future_complete(future)
else:
rclpy.spin_until_future_complete(_node, future)
response = future.result()

if response is not None:
Expand Down
9 changes: 2 additions & 7 deletions rosbridge_library/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ ament_package()

if (BUILD_TESTING)
find_package(ament_cmake_pytest REQUIRED)

# TODO: Enable tests
# ament_add_pytest_test(test_capabilities "test/capabilities")
# ament_add_pytest_test(test_internal "test/internal")
# ament_add_pytest_test(test_services "test/internal/test_services.py")
ament_add_pytest_test(test_ros_loader "test/internal/test_ros_loader.py")
ament_add_pytest_test(test_message_conversion "test/internal/test_message_conversion.py")
ament_add_pytest_test(test_capabilities "test/capabilities/")
ament_add_pytest_test(test_internal "test/internal/")
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@ def graceful_shutdown(self):
f"Service {self.service_name} was unadvertised with a service call in progress, "
f"aborting service calls with request IDs {incomplete_ids}",
)
for future in self.request_futures.values():
for future_id in self.request_futures:
future = self.request_futures[future_id]
future.set_exception(RuntimeError(f"Service {self.service_name} was unadvertised"))
self.protocol.node_handle.destroy_service(self.service_handle)
self.service_handle.destroy()
self.protocol.send(
{
"op": "service_response",
"id": future,
"result": False,
}
)


class AdvertiseService(Capability):
Expand Down Expand Up @@ -138,4 +146,4 @@ def advertise_service(self, message):
service_type = message["type"]
service_handler = AdvertisedServiceHandler(service_name, service_type, self.protocol)
self.protocol.external_service_list[service_name] = service_handler
self.protocol.log("info", "Advertised service %s." % service_name)
self.protocol.log("info", f"Advertised service {service_name}")
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CallService(Capability):
services_glob = None

def __init__(self, protocol):
# Call superclas constructor
# Call superclass constructor
Capability.__init__(self, protocol)

# Register the operations that this capability provides
Expand All @@ -69,7 +69,7 @@ def __init__(self, protocol):
protocol.node_handle.get_logger().info("Calling services in existing thread")
protocol.register_operation("call_service", self.call_service)

def call_service(self, message):
def call_service(self, message, sleep_time=0):
# Pull out the ID
cid = message.get("id", None)

Expand Down Expand Up @@ -112,7 +112,9 @@ def call_service(self, message):
e_cb = partial(self._failure, cid, service)

# Run service caller in the same thread.
ServiceCaller(trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle).run()
ServiceCaller(
trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle, sleep_time
).run()

def _success(self, cid, service, fragment_size, compression, message):
outgoing_message = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,11 @@ def unadvertise_service(self, message):
# unregister service in ROS
if service_name in self.protocol.external_service_list.keys():
self.protocol.external_service_list[service_name].graceful_shutdown()
self.protocol.external_service_list[service_name].service_handle.shutdown(
"Unadvertise request."
)
self.protocol.external_service_list[service_name].service_handle.destroy()
del self.protocol.external_service_list[service_name]
self.protocol.log("info", "Unadvertised service %s." % service_name)
self.protocol.log("info", f"Unadvertised service {service_name}")
else:
self.protocol.log(
"error",
"Service %s has not been advertised via rosbridge, can't unadvertise."
% service_name,
f"Service {service_name} has not been advertised via rosbridge, can't unadvertise.",
)
38 changes: 21 additions & 17 deletions rosbridge_library/src/rosbridge_library/internal/cbor_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@
"int64",
"uint64",
]
FLOAT_TYPES = ["float32", "float64"]
FLOAT_TYPES = ["float", "double"]
STRING_TYPES = ["string"]
BOOL_TYPES = ["bool"]
BOOL_TYPES = ["boolean"]
TIME_TYPES = ["time", "duration"]
BOOL_ARRAY_TYPES = ["bool[]"]
BYTESTREAM_TYPES = ["uint8[]", "char[]"]
BOOL_ARRAY_TYPES = ["sequence<boolean>"]
STRING_ARRAY_TYPES = ["sequence<string>"]
BYTESTREAM_TYPES = ["sequence<uint8>", "sequence<char>"]

# Typed array tags according to <https://tools.ietf.org/html/draft-ietf-cbor-array-tags-00>
# Always encode to little-endian variant, for now.
TAGGED_ARRAY_FORMATS = {
"uint16[]": (69, "<{}H"),
"uint32[]": (70, "<{}I"),
"uint64[]": (71, "<{}Q"),
"byte[]": (72, "{}b"),
"int8[]": (72, "{}b"),
"int16[]": (77, "<{}h"),
"int32[]": (78, "<{}i"),
"int64[]": (79, "<{}q"),
"float32[]": (85, "<{}f"),
"float64[]": (86, "<{}d"),
"sequence<uint16>": (69, "<{}H"),
"sequence<uint32>": (70, "<{}I"),
"sequence<uint64>": (71, "<{}Q"),
"sequence<byte>": (72, "{}b"),
"sequence<int8>": (72, "{}b"),
"sequence<int16>": (77, "<{}h"),
"sequence<int32>": (78, "<{}i"),
"sequence<int64>": (79, "<{}q"),
"sequence<float>": (85, "<{}f"),
"sequence<double>": (86, "<{}d"),
}


Expand All @@ -50,7 +51,7 @@ def extract_cbor_values(msg):
Typed arrays will be tagged and packed into byte arrays.
"""
out = {}
for slot, slot_type in zip(msg.__slots__, msg._slot_types):
for slot, slot_type in msg.get_fields_and_field_types().items():
val = getattr(msg, slot)

# string
Expand All @@ -72,8 +73,8 @@ def extract_cbor_values(msg):
# time/duration
elif slot_type in TIME_TYPES:
out[slot] = {
"secs": int(val.secs),
"nsecs": int(val.nsecs),
"sec": int(val.sec),
"nanosec": int(val.nanosec),
}

# byte array
Expand All @@ -84,6 +85,9 @@ def extract_cbor_values(msg):
elif slot_type in BOOL_ARRAY_TYPES:
out[slot] = [bool(i) for i in val]

elif slot_type in STRING_ARRAY_TYPES:
out[slot] = [str(i) for i in val]

# numeric arrays
elif slot_type in TAGGED_ARRAY_FORMATS:
tag, fmt = TAGGED_ARRAY_FORMATS[slot_type]
Expand Down
25 changes: 13 additions & 12 deletions rosbridge_library/src/rosbridge_library/internal/pngcompression.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,32 @@
# POSSIBILITY OF SUCH DAMAGE.

from base64 import standard_b64decode, standard_b64encode
from io import StringIO
from io import BytesIO
from math import ceil, floor, sqrt

from PIL import Image


def encode(string):
"""PNG-compress the string in a square RBG image padded with '\n', return the b64 encoded bytes"""
length = len(string)
"""PNG-compress the string in a square RGB image padded with '\n', return the b64 encoded bytes"""
string_bytes = string.encode("utf-8")
length = len(string_bytes)
width = floor(sqrt(length / 3.0))
height = ceil((length / 3.0) / width)
bytes_needed = int(width * height * 3)
while length < bytes_needed:
string += "\n"
length += 1
i = Image.frombytes("RGB", (int(width), int(height)), string)
buff = StringIO()
string_padded = string_bytes + (b"\n" * (bytes_needed - length))
i = Image.frombytes("RGB", (int(width), int(height)), string_padded)
buff = BytesIO()
i.save(buff, "png")
encoded = standard_b64encode(buff.getvalue())
return encoded


def decode(string):
"""b64 decode the string, then PNG-decompress"""
"""b64 decode the string, then PNG-decompress and remove the '\n' padding"""
decoded = standard_b64decode(string)
buff = StringIO(decoded)
i = Image.open(buff)
return i.tostring()
buff = BytesIO(decoded)
i = Image.open(buff, formats=("png",)).convert("RGB")
dec_str = i.tobytes().decode("utf-8")
dec_str = dec_str.replace("\n", "") # Remove padding from encoding
return dec_str
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def register(self, client_id, topic, node_handle, msg_type=None, latch=False, qu
node_handle -- Handle to a rclpy node to create the publisher.
msg_type -- (optional) the type to publish
latch -- (optional) whether to make this publisher latched
queue_size -- (optional) rospy publisher queue_size to use
queue_size -- (optional) publisher queue_size to use

Throws:
Exception -- exceptions are propagated from the MultiPublisher if
Expand Down Expand Up @@ -303,7 +303,7 @@ def publish(self, client_id, topic, msg, node_handle, latch=False, queue_size=10
msg -- a JSON-like dict of fields and values
node_handle -- Handle to a rclpy node to create the publisher.
latch -- (optional) whether to make this publisher latched
queue_size -- (optional) rospy publisher queue_size to use
queue_size -- (optional) publisher queue_size to use

Throws:
Exception -- a variety of exceptions are propagated. They can be
Expand Down
31 changes: 24 additions & 7 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from threading import Thread

import rclpy
from rclpy.expand_topic_name import expand_topic_name
from rosbridge_library.internal.message_conversion import (
extract_values,
Expand All @@ -49,7 +50,7 @@ def __init__(self, servicename):


class ServiceCaller(Thread):
def __init__(self, service, args, success_callback, error_callback, node_handle):
def __init__(self, service, args, success_callback, error_callback, node_handle, sleep_time=0):
"""Create a service caller for the specified service. Use start()
to start in a separate thread or run() to run in this thread.

Expand All @@ -63,7 +64,8 @@ def __init__(self, service, args, success_callback, error_callback, node_handle)
service call
error_callback -- a callback to call if an error occurs. The
callback will be passed the exception that caused the failure
node_handle -- a ROS2 node handle to call services.
node_handle -- a ROS 2 node handle to call services.
sleep_time -- if nonzero, puts sleeps between executor spins
sea-bass marked this conversation as resolved.
Show resolved Hide resolved
"""
Thread.__init__(self)
self.daemon = True
Expand All @@ -72,11 +74,16 @@ def __init__(self, service, args, success_callback, error_callback, node_handle)
self.success = success_callback
self.error = error_callback
self.node_handle = node_handle
self.sleep_time = sleep_time

def run(self):
try:
# Call the service and pass the result to the success handler
self.success(call_service(self.node_handle, self.service, self.args))
self.success(
call_service(
self.node_handle, self.service, args=self.args, sleep_time=self.sleep_time
)
)
except Exception as e:
# On error, just pass the exception to the error handler
self.error(e)
Expand All @@ -98,11 +105,9 @@ def args_to_service_request_instance(service, inst, args):
populate_instance(msg, inst)


def call_service(node_handle, service, args=None):
def call_service(node_handle, service, args=None, sleep_time=0):
# Given the service name, fetch the type and class of the service,
# and a request instance

# This should be equivalent to rospy.resolve_name.
service = expand_topic_name(service, node_handle.get_name(), node_handle.get_namespace())

service_names_and_types = dict(node_handle.get_service_names_and_types())
Expand All @@ -122,7 +127,19 @@ def call_service(node_handle, service, args=None):

client = node_handle.create_client(service_class, service)

result = client.call(inst)
future = client.call_async(inst)
if sleep_time == 0:
sea-bass marked this conversation as resolved.
Show resolved Hide resolved
if node_handle.executor:
node_handle.executor.spin_until_future_complete(future)
else:
rclpy.spin_until_future_complete(future)
else:
while not future.done():
if node_handle.executor:
node_handle.executor.spin_once(timeout_sec=sleep_time)
else:
rclpy.spin_once(node_handle, timeout_sec=sleep_time)
result = future.result()

node_handle.destroy_client(client)
if result is not None:
Expand Down
47 changes: 47 additions & 0 deletions rosbridge_library/src/rosbridge_library/util/ros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2023, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.


def is_topic_published(node, topic_name):
"""Checks if a topic is published on a node."""
published_topic_data = node.get_publisher_names_and_types_by_node(
node.get_name(), node.get_namespace()
)
return topic_name in [topic[0] for topic in published_topic_data]
sea-bass marked this conversation as resolved.
Show resolved Hide resolved


def is_topic_subscribed(node, topic_name):
"""Checks if a topic is subscribed to by a node."""
subscribed_topic_data = node.get_subscriber_names_and_types_by_node(
node.get_name(), node.get_namespace()
)
return topic_name in [topic[0] for topic in subscribed_topic_data]
Loading