Skip to content

Commit

Permalink
Initial action support and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sea-bass committed Oct 31, 2023
1 parent 1d1cd6b commit 000a9ae
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion, ros_loader


class ActionResult(Capability):

action_result_msg_fields = [
(True, "action", str),
(False, "id", str),
(False, "values", dict),
(True, "result", bool),
]

def __init__(self, protocol):
# Call superclass constructor
Capability.__init__(self, protocol)

# Register the operations that this capability provides
protocol.register_operation("action_result", self.action_result)

def action_result(self, message):
# Typecheck the args
self.basic_type_check(message, self.action_result_msg_fields)

# check for the action
action_name = message["action"]
if action_name in self.protocol.external_action_list:
action_handler = self.protocol.external_action_list[action_name]
# parse the message
goal_id = message["id"]
values = message["values"]
# create a message instance
result = ros_loader.get_action_result_instance(action_handler.action_type)
message_conversion.populate_instance(values, result)
# pass along the result
action_handler.handle_result(goal_id, result)
else:
self.protocol.log(
"error",
f"Action {action_name} has not been advertised via rosbridge.",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import fnmatch

import rclpy
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion
from rosbridge_library.internal.ros_loader import get_action_class


class AdvertisedActionHandler:

id_counter = 1

def __init__(self, action_name, action_type, protocol):
self.goal_futures = {}
self.action_name = action_name
self.action_type = action_type
self.protocol = protocol
# setup the action
self.action_server = ActionServer(
protocol.node_handle,
get_action_class(action_type),
action_name,
self.execute_callback,
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
)

def next_id(self):
id = self.id_counter
self.id_counter += 1
return id

async def execute_callback(self, goal):
# generate a unique ID
goal_id = f"action_goal:{self.action}:{self.next_id()}"

future = rclpy.task.Future()
self.request_futures[goal_id] = future

# build a request to send to the external client
goal_message = {
"op": "send_action_goal",
"id": goal_id,
"action": self.action_name,
"args": message_conversion.extract_values(goal),
}
self.protocol.send(goal_message)

try:
return await future
finally:
del self.goal_futures[goal_id]

def handle_result(self, goal_id, res):
"""
Called by the ActionResult capability to handle an action result from the external client.
"""
if goal_id in self.goal_futures:
self.goal_futures[goal_id].set_result(res)
else:
self.protocol.log("warning", f"Received action result for unrecognized id: {goal_id}")

def graceful_shutdown(self):
"""
Signal the AdvertisedActionHandler to shutdown.
"""
if self.goal_futures:
incomplete_ids = ", ".join(self.goal_futures.keys())
self.protocol.log(
"warning",
f"Action {self.action_name} was unadvertised with an action in progress, "
f"aborting action goals with request IDs {incomplete_ids}",
)
for future_id in self.goal_futures:
future = self.goal_futures[future_id]
future.set_exception(RuntimeError(f"Goal {self.action_name} was unadvertised"))
self.action_server.destroy()


class AdvertiseAction(Capability):
actions_glob = None

advertise_action_msg_fields = [(True, "action", str), (True, "type", str)]

def __init__(self, protocol):
# Call superclass constructor
Capability.__init__(self, protocol)

# Register the operations that this capability provides
protocol.register_operation("advertise_action", self.advertise_action)

def advertise_action(self, message):
# Typecheck the args
self.basic_type_check(message, self.advertise_action_msg_fields)

# parse the incoming message
action_name = message["action"]

if AdvertiseAction.actions_glob is not None and AdvertiseAction.actions_glob:
self.protocol.log(
"debug",
"Action security glob enabled, checking action: " + action_name,
)
match = False
for glob in AdvertiseAction.actions_glob:
if fnmatch.fnmatch(action_name, glob):
self.protocol.log(
"debug",
"Found match with glob " + glob + ", continuing action advertisement...",
)
match = True
break
if not match:
self.protocol.log(
"warn",
"No match found for action, cancelling action advertisement for: "
+ action_name,
)
return
else:
self.protocol.log(
"debug", "No action security glob, not checking action advertisement."
)

# check for an existing entry
if action_name in self.protocol.external_action_list.keys():
self.protocol.log("warn", f"Duplicate action advertised. Overwriting {action_name}.")
self.protocol.external_action_list[action_name].graceful_shutdown()
del self.protocol.external_action_list[action_name]

# setup and store the action information
action_type = message["type"]
action_handler = AdvertisedActionHandler(action_name, action_type, self.protocol)
self.protocol.external_action_list[action_name] = action_handler
self.protocol.log("info", f"Advertised action {action_name}")
136 changes: 136 additions & 0 deletions rosbridge_library/src/rosbridge_library/internal/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2023, PickNik Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import time
from threading import Thread

import rclpy
from rclpy.action import ActionClient
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.expand_topic_name import expand_topic_name
from rosbridge_library.internal.message_conversion import (
extract_values,
populate_instance,
)
from rosbridge_library.internal.ros_loader import (
get_action_class,
get_action_goal_instance,
)


class InvalidActionException(Exception):
def __init__(self, action_name):
Exception.__init__(self, f"Action {action_name} does not exist")


class ActionClientHandler(Thread):
def __init__(self, action, action_type, args, success_callback, error_callback, node_handle):
"""
Create a client handler for the specified action.
Use start() to start in a separate thread or run() to run in this thread.
Keyword arguments:
action -- the name of the action to execute.
args -- arguments to pass to the action. Can be an
ordered list, or a dict of name-value pairs. Anything else will be
treated as though no arguments were provided (which is still valid for
some kinds of actions)
success_callback -- a callback to call with the JSON result of the
service call
error_callback -- a callback to call if an error occurs. The
callback will be passed the exception that caused the failure
node_handle -- a ROS 2 node handle to call services.
"""
Thread.__init__(self)
self.daemon = True
self.action = action
self.action_type = action_type
self.args = args
self.success = success_callback
self.error = error_callback
self.node_handle = node_handle

def run(self):
try:
# Call the service and pass the result to the success handler
self.success(send_goal(self.node_handle, self.action, self.action_type, args=self.args))
except Exception as e:
# On error, just pass the exception to the error handler
self.error(e)


def args_to_action_goal_instance(action, inst, args):
""" "
Populate an action goal instance with the provided args
args can be a dictionary of values, or a list, or None
Propagates any exceptions that may be raised.
"""
msg = {}
if isinstance(args, list):
msg = dict(zip(inst.get_fields_and_field_types().keys(), args))
elif isinstance(args, dict):
msg = args

# Populate the provided instance, propagating any exceptions
populate_instance(msg, inst)


def send_goal(node_handle, action, action_type, args=None, sleep_time=0.001):
# Given the action nam and type, fetch a request instance
action_name = expand_topic_name(action, node_handle.get_name(), node_handle.get_namespace())
action_class = get_action_class(action_type)
inst = get_action_goal_instance(action_type)

# Populate the instance with the provided args
args_to_action_goal_instance(action_name, inst, args)

client = ActionClient(node_handle, action_class, action_name)
send_goal_future = client.send_goal_async(inst)
while rclpy.ok() and not send_goal_future.done():
time.sleep(sleep_time)
goal_handle = send_goal_future.result()

if not goal_handle.accepted:
raise Exception("Action goal was rejected") # TODO: Catch better

result = goal_handle.get_result()
client.destroy()

if result is not None:
# Turn the response into JSON and pass to the callback
json_response = extract_values(result)
else:
raise Exception(result)

return json_response
4 changes: 2 additions & 2 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@


class InvalidServiceException(Exception):
def __init__(self, servicename):
Exception.__init__(self, "Service %s does not exist" % servicename)
def __init__(self, service_name):
Exception.__init__(self, f"Service {service_name} does not exist")


class ServiceCaller(Thread):
Expand Down
2 changes: 2 additions & 0 deletions rosbridge_library/src/rosbridge_library/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Protocol:
delay_between_messages = 0
# global list of non-ros advertised services
external_service_list = {}
# global list of non-ros advertised actions
external_action_list = {}
# Use only BSON for the whole communication if the server has been started with bson_only_mode:=True
bson_only_mode = False

Expand Down
Loading

0 comments on commit 000a9ae

Please sign in to comment.