From 8de4b90512469d74802871e6c88db2e3135720e7 Mon Sep 17 00:00:00 2001
From: Alberto Soragna <alberto.soragna@gmail.com>
Date: Sat, 29 Jun 2024 14:55:28 -0700
Subject: [PATCH] avoid adding notify waitable twice to events-executor
 collection (#2564)

* avoid adding notify waitable twice to events-executor entities collection

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

* remove redundant mutex lock

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>

---------

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
---
 .../events_executor/events_executor.cpp       |  9 +--
 .../test/rclcpp/executors/test_executors.cpp  | 61 +++++++++++++++++++
 2 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp
index ce6a103ab2..c7d6be7e44 100644
--- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp
+++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp
@@ -415,14 +415,11 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
   // We could explicitly check for the notify waitable ID when we receive a waitable event
   // but I think that it's better if the waitable was in the collection and it could be
   // retrieved in the "standard" way.
-  // To do it, we need to add the notify waitable as an entry in both the new and
-  // current collections such that it's neither added or removed.
+  // To do it, we need to add the notify waitable as an entry in the new collection
+  // such that it's neither added or removed (it should have already been added
+  // to the current collection in the constructor)
   this->add_notify_waitable_to_collection(new_collection.waitables);
 
-  // Acquire lock before modifying the current collection
-  std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
-  this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
-
   this->refresh_current_collection(new_collection);
 }
 
diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp
index a82b702db5..dfbdbb8f4c 100644
--- a/rclcpp/test/rclcpp/executors/test_executors.cpp
+++ b/rclcpp/test/rclcpp/executors/test_executors.cpp
@@ -807,6 +807,67 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
   }
 }
 
+// Check that executors are correctly notified while they are spinning
+// we notify twice to ensure that the notify waitable is still working
+// after the first notification
+TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning)
+{
+  using ExecutorType = TypeParam;
+
+  // Create executor, add the node and start spinning
+  ExecutorType executor;
+  executor.add_node(this->node);
+  std::thread spinner([&]() {executor.spin();});
+
+  // Wait for executor to be spinning
+  while (!executor.is_spinning()) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+  }
+
+  // Create the first subscription while the executor is already spinning
+  std::atomic<size_t> sub1_msg_count {0};
+  auto sub1 = this->node->template create_subscription<test_msgs::msg::Empty>(
+    this->publisher->get_topic_name(),
+    rclcpp::QoS(10),
+    [&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
+      sub1_msg_count++;
+    });
+
+  // Publish a message and verify it's received
+  this->publisher->publish(test_msgs::msg::Empty());
+  auto start = std::chrono::steady_clock::now();
+  while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) {
+    std::this_thread::sleep_for(1ms);
+  }
+  EXPECT_EQ(sub1_msg_count, 1u);
+
+  // Create a second subscription while the executor is already spinning
+  std::atomic<size_t> sub2_msg_count {0};
+  auto sub2 = this->node->template create_subscription<test_msgs::msg::Empty>(
+    this->publisher->get_topic_name(),
+    rclcpp::QoS(10),
+    [&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) {
+      sub2_msg_count++;
+    });
+
+  // Publish a message and verify it's received by both subscriptions
+  this->publisher->publish(test_msgs::msg::Empty());
+  start = std::chrono::steady_clock::now();
+  while (
+    sub1_msg_count == 1 &&
+    sub2_msg_count == 0 &&
+    (std::chrono::steady_clock::now() - start) < 10s)
+  {
+    std::this_thread::sleep_for(1ms);
+  }
+  EXPECT_EQ(sub1_msg_count, 2u);
+  EXPECT_EQ(sub2_msg_count, 1u);
+
+  // Cancel needs to be called before join, so that executor.spin() returns.
+  executor.cancel();
+  spinner.join();
+}
+
 // Check spin_until_future_complete with node base pointer (instantiates its own executor)
 TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
 {