From f6d652fa2d5d3a8164cc69fb543ae0693e958262 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 22 Mar 2023 21:36:56 +0800 Subject: [PATCH] Implement matched event (#1083) Signed-off-by: Barry Xu --- rclpy/rclpy/event_handler.py | 28 +++++++++ rclpy/src/rclpy/event_handle.cpp | 19 +++++- rclpy/test/test_qos_event.py | 102 +++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 2 deletions(-) diff --git a/rclpy/rclpy/event_handler.py b/rclpy/rclpy/event_handler.py index 5c588bdd4..d813117fd 100644 --- a/rclpy/rclpy/event_handler.py +++ b/rclpy/rclpy/event_handler.py @@ -43,12 +43,18 @@ # Payload type for Subscription Incompatible QoS callback. QoSRequestedIncompatibleQoSInfo = _rclpy.rmw_requested_qos_incompatible_event_status_t +# Payload type for Subscription matched callback. +QoSSubscriptionMatchedInfo = _rclpy.rmw_matched_status_t + # Payload type for Publisher Deadline callback. QoSOfferedDeadlineMissedInfo = _rclpy.rmw_offered_deadline_missed_status_t # Payload type for Publisher Liveliness callback. QoSLivelinessLostInfo = _rclpy.rmw_liveliness_lost_status_t +# Payload type for Publisher matched callback. +QoSPublisherMatchedInfo = _rclpy.rmw_matched_status_t + """ Payload type for Publisher Incompatible QoS callback. @@ -154,6 +160,7 @@ def __init__( liveliness: Optional[Callable[[QoSLivelinessChangedInfo], None]] = None, message_lost: Optional[Callable[[QoSMessageLostInfo], None]] = None, incompatible_type: Optional[Callable[[IncompatibleTypeInfo], None]] = None, + matched: Optional[Callable[[QoSSubscriptionMatchedInfo], None]] = None, use_default_callbacks: bool = True, ) -> None: """ @@ -168,6 +175,8 @@ def __init__( :param message_lost: A user-defined callback that is called when a messages is lost. :param incompatible_type: A user-defined callback that is called when a topic type doesn't match. + :param matched: A user-defined callback that is called when a Publisher is connected or + disconnected. :param use_default_callbacks: Whether or not to use default callbacks when the user doesn't supply one """ @@ -176,6 +185,7 @@ def __init__( self.liveliness = liveliness self.message_lost = message_lost self.incompatible_type = incompatible_type + self.matched = matched self.use_default_callbacks = use_default_callbacks def create_event_handlers( @@ -250,6 +260,13 @@ def _default_incompatible_type_callback(event): except UnsupportedEventTypeError: pass + if self.matched: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.matched, + event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED, + parent_impl=subscription)) + return event_handlers @@ -263,6 +280,7 @@ def __init__( liveliness: Optional[Callable[[QoSLivelinessLostInfo], None]] = None, incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None, incompatible_type: Optional[Callable[[IncompatibleTypeInfo], None]] = None, + matched: Optional[Callable[[QoSPublisherMatchedInfo], None]] = None, use_default_callbacks: bool = True, ) -> None: """ @@ -276,6 +294,8 @@ def __init__( with incompatible QoS policies is discovered on subscribed topic. :param incompatible_type: A user-defined callback that is called when a topic type doesn't match. + :param matched: A user-defined callback that is called when a Subscription is connected or + disconnected. :param use_default_callbacks: Whether or not to use default callbacks when the user doesn't supply one """ @@ -283,6 +303,7 @@ def __init__( self.liveliness = liveliness self.incompatible_qos = incompatible_qos self.incompatible_type = incompatible_type + self.matched = matched self.use_default_callbacks = use_default_callbacks def create_event_handlers( @@ -350,4 +371,11 @@ def _default_incompatible_type_callback(event): except UnsupportedEventTypeError: pass + if self.matched: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.matched, + event_type=QoSPublisherEventType.RCL_PUBLISHER_MATCHED, + parent_impl=publisher)) + return event_handlers diff --git a/rclpy/src/rclpy/event_handle.cpp b/rclpy/src/rclpy/event_handle.cpp index 72ca7265d..e8fd679d4 100644 --- a/rclpy/src/rclpy/event_handle.cpp +++ b/rclpy/src/rclpy/event_handle.cpp @@ -108,10 +108,12 @@ typedef union event_callback_data { rmw_liveliness_changed_status_t liveliness_changed; rmw_message_lost_status_t message_lost; rmw_requested_qos_incompatible_event_status_t requested_incompatible_qos; + rmw_matched_status_t subscription_matched; // Publisher events rmw_offered_deadline_missed_status_t offered_deadline_missed; rmw_liveliness_lost_status_t liveliness_lost; rmw_offered_qos_incompatible_event_status_t offered_incompatible_qos; + rmw_matched_status_t publisher_matched; rmw_incompatible_type_status_t incompatible_type; } event_callback_data_t; @@ -144,6 +146,8 @@ EventHandle::take_event() return py::cast(data.requested_incompatible_qos); case RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE: return py::cast(data.incompatible_type); + case RCL_SUBSCRIPTION_MATCHED: + return py::cast(data.subscription_matched); default: // suggests a misalignment between C and Python interfaces throw py::value_error("event type for subscriptions not understood"); @@ -158,6 +162,8 @@ EventHandle::take_event() return py::cast(data.offered_incompatible_qos); case RCL_PUBLISHER_INCOMPATIBLE_TYPE: return py::cast(data.incompatible_type); + case RCL_PUBLISHER_MATCHED: + return py::cast(data.publisher_matched); default: // suggests a misalignment between C and Python interfaces throw py::value_error("event type for publishers not understood"); @@ -186,13 +192,15 @@ define_event_handle(py::module module) .value("RCL_SUBSCRIPTION_LIVELINESS_CHANGED", RCL_SUBSCRIPTION_LIVELINESS_CHANGED) .value("RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS", RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS) .value("RCL_SUBSCRIPTION_MESSAGE_LOST", RCL_SUBSCRIPTION_MESSAGE_LOST) - .value("RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE", RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE); + .value("RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE", RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE) + .value("RCL_SUBSCRIPTION_MATCHED", RCL_SUBSCRIPTION_MATCHED); py::enum_(module, "rcl_publisher_event_type_t") .value("RCL_PUBLISHER_OFFERED_DEADLINE_MISSED", RCL_PUBLISHER_OFFERED_DEADLINE_MISSED) .value("RCL_PUBLISHER_LIVELINESS_LOST", RCL_PUBLISHER_LIVELINESS_LOST) .value("RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS", RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS) - .value("RCL_PUBLISHER_INCOMPATIBLE_TYPE", RCL_PUBLISHER_INCOMPATIBLE_TYPE); + .value("RCL_PUBLISHER_INCOMPATIBLE_TYPE", RCL_PUBLISHER_INCOMPATIBLE_TYPE) + .value("RCL_PUBLISHER_MATCHED", RCL_PUBLISHER_MATCHED); py::class_( module, "rmw_requested_deadline_missed_status_t") @@ -231,6 +239,13 @@ define_event_handle(py::module module) .def_readonly("total_count", &rmw_liveliness_lost_status_t::total_count) .def_readonly("total_count_change", &rmw_liveliness_lost_status_t::total_count_change); + py::class_(module, "rmw_matched_status_t") + .def(py::init<>()) + .def_readonly("total_count", &rmw_matched_status_t::total_count) + .def_readonly("total_count_change", &rmw_matched_status_t::total_count_change) + .def_readonly("current_count", &rmw_matched_status_t::current_count) + .def_readonly("current_count_change", &rmw_matched_status_t::current_count_change); + py::enum_(module, "rmw_qos_policy_kind_t") .value("RMW_QOS_POLICY_INVALID", RMW_QOS_POLICY_INVALID) .value("RMW_QOS_POLICY_DURABILITY", RMW_QOS_POLICY_DURABILITY) diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py index 1ac87a5c8..09359ae30 100644 --- a/rclpy/test/test_qos_event.py +++ b/rclpy/test/test_qos_event.py @@ -23,9 +23,11 @@ from rclpy.event_handler import QoSOfferedDeadlineMissedInfo from rclpy.event_handler import QoSOfferedIncompatibleQoSInfo from rclpy.event_handler import QoSPublisherEventType +from rclpy.event_handler import QoSPublisherMatchedInfo from rclpy.event_handler import QoSRequestedDeadlineMissedInfo from rclpy.event_handler import QoSRequestedIncompatibleQoSInfo from rclpy.event_handler import QoSSubscriptionEventType +from rclpy.event_handler import QoSSubscriptionMatchedInfo from rclpy.event_handler import SubscriptionEventCallbacks from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.qos import QoSDurabilityPolicy @@ -208,6 +210,8 @@ def test_publisher_event_create_destroy(self): publisher, QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST) self._do_create_destroy( publisher, QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS) + self._do_create_destroy( + publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED) self.node.destroy_publisher(publisher) def test_subscription_event_create_destroy(self): @@ -220,6 +224,8 @@ def test_subscription_event_create_destroy(self): subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED) self._do_create_destroy( subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS) + self._do_create_destroy( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED) self.node.destroy_subscription(subscription) def test_call_publisher_rclpy_event_apis(self): @@ -354,3 +360,99 @@ def test_call_subscription_rclpy_event_apis(self): pass self.node.destroy_subscription(subscription) + + def test_call_publisher_rclpy_event_matched(self): + publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10) + with self.context.handle: + wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle) + + matched_event_handle = self._create_event_handle( + publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED) + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + wait_set.wait(0) + self.assertFalse(wait_set.is_ready('event', matched_event_index)) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + subscription = self.node.create_subscription(EmptyMsg, self.topic_name, Mock(), 10) + # wait 500ms + wait_set.wait(500000000) + self.assertTrue(wait_set.is_ready('event', matched_event_index)) + + matched_status = matched_event_handle.take_event() + self.assertIsInstance(matched_status, QoSPublisherMatchedInfo) + self.assertEqual(matched_status.total_count, 1) + self.assertEqual(matched_status.total_count_change, 1) + self.assertEqual(matched_status.current_count, 1) + self.assertEqual(matched_status.current_count_change, 1) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + subscription.destroy() + # wait 500ms + wait_set.wait(500000000) + self.assertTrue(wait_set.is_ready('event', matched_event_index)) + + matched_status = matched_event_handle.take_event() + self.assertEqual(matched_status.total_count, 1) + self.assertEqual(matched_status.total_count_change, 0) + self.assertEqual(matched_status.current_count, 0) + self.assertEqual(matched_status.current_count_change, -1) + + def test_call_subscription_rclpy_event_matched_unmatched(self): + message_callback = Mock() + subscription = self.node.create_subscription( + EmptyMsg, self.topic_name, message_callback, 10) + with self.context.handle: + wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle) + + matched_event_handle = self._create_event_handle( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED) + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + wait_set.wait(0) + self.assertFalse(wait_set.is_ready('event', matched_event_index)) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10) + # wait 500ms + wait_set.wait(500000000) + self.assertTrue(wait_set.is_ready('event', matched_event_index)) + + matched_status = matched_event_handle.take_event() + self.assertIsInstance(matched_status, QoSSubscriptionMatchedInfo) + self.assertEqual(matched_status.total_count, 1) + self.assertEqual(matched_status.total_count_change, 1) + self.assertEqual(matched_status.current_count, 1) + self.assertEqual(matched_status.current_count_change, 1) + + wait_set.clear_entities() + with matched_event_handle: + matched_event_index = wait_set.add_event(matched_event_handle) + self.assertIsNotNone(matched_event_index) + + publisher.destroy() + # wait 500ms + wait_set.wait(500000000) + self.assertTrue(wait_set.is_ready('event', matched_event_index)) + + matched_status = matched_event_handle.take_event() + self.assertEqual(matched_status.total_count, 1) + self.assertEqual(matched_status.total_count_change, 0) + self.assertEqual(matched_status.current_count, 0) + self.assertEqual(matched_status.current_count_change, -1)