From 7e9ba2b87fea56d33c650a2c0bcbd3833c9a9e8b Mon Sep 17 00:00:00 2001 From: Alberto Soragna Date: Wed, 19 Jun 2024 11:26:58 -0700 Subject: [PATCH 1/2] avoid adding notify waitable twice to events-executor entities collection Signed-off-by: Alberto Soragna --- .../events_executor/events_executor.cpp | 6 +- .../test/rclcpp/executors/test_executors.cpp | 61 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index ce6a103ab2..ff450cfe76 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -415,13 +415,13 @@ EventsExecutor::refresh_current_collection_from_callback_groups() // We could explicitly check for the notify waitable ID when we receive a waitable event // but I think that it's better if the waitable was in the collection and it could be // retrieved in the "standard" way. - // To do it, we need to add the notify waitable as an entry in both the new and - // current collections such that it's neither added or removed. + // To do it, we need to add the notify waitable as an entry in the new collection + // such that it's neither added or removed (it should have already been added + // to the current collection in the constructor) this->add_notify_waitable_to_collection(new_collection.waitables); // Acquire lock before modifying the current collection std::lock_guard lock(collection_mutex_); - this->add_notify_waitable_to_collection(current_entities_collection_->waitables); this->refresh_current_collection(new_collection); } diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index a82b702db5..dfbdbb8f4c 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -807,6 +807,67 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode) } } +// Check that executors are correctly notified while they are spinning +// we notify twice to ensure that the notify waitable is still working +// after the first notification +TYPED_TEST(TestExecutors, notifyTwiceWhileSpinning) +{ + using ExecutorType = TypeParam; + + // Create executor, add the node and start spinning + ExecutorType executor; + executor.add_node(this->node); + std::thread spinner([&]() {executor.spin();}); + + // Wait for executor to be spinning + while (!executor.is_spinning()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // Create the first subscription while the executor is already spinning + std::atomic sub1_msg_count {0}; + auto sub1 = this->node->template create_subscription( + this->publisher->get_topic_name(), + rclcpp::QoS(10), + [&sub1_msg_count](test_msgs::msg::Empty::ConstSharedPtr) { + sub1_msg_count++; + }); + + // Publish a message and verify it's received + this->publisher->publish(test_msgs::msg::Empty()); + auto start = std::chrono::steady_clock::now(); + while (sub1_msg_count == 0 && (std::chrono::steady_clock::now() - start) < 10s) { + std::this_thread::sleep_for(1ms); + } + EXPECT_EQ(sub1_msg_count, 1u); + + // Create a second subscription while the executor is already spinning + std::atomic sub2_msg_count {0}; + auto sub2 = this->node->template create_subscription( + this->publisher->get_topic_name(), + rclcpp::QoS(10), + [&sub2_msg_count](test_msgs::msg::Empty::ConstSharedPtr) { + sub2_msg_count++; + }); + + // Publish a message and verify it's received by both subscriptions + this->publisher->publish(test_msgs::msg::Empty()); + start = std::chrono::steady_clock::now(); + while ( + sub1_msg_count == 1 && + sub2_msg_count == 0 && + (std::chrono::steady_clock::now() - start) < 10s) + { + std::this_thread::sleep_for(1ms); + } + EXPECT_EQ(sub1_msg_count, 2u); + EXPECT_EQ(sub2_msg_count, 1u); + + // Cancel needs to be called before join, so that executor.spin() returns. + executor.cancel(); + spinner.join(); +} + // Check spin_until_future_complete with node base pointer (instantiates its own executor) TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) { From cbde5da8ed42359b940ef7b00013fa0130e7da8a Mon Sep 17 00:00:00 2001 From: Alberto Soragna Date: Sat, 22 Jun 2024 09:01:28 -0700 Subject: [PATCH 2/2] remove redundant mutex lock Signed-off-by: Alberto Soragna --- .../experimental/executors/events_executor/events_executor.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index ff450cfe76..c7d6be7e44 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -420,9 +420,6 @@ EventsExecutor::refresh_current_collection_from_callback_groups() // to the current collection in the constructor) this->add_notify_waitable_to_collection(new_collection.waitables); - // Acquire lock before modifying the current collection - std::lock_guard lock(collection_mutex_); - this->refresh_current_collection(new_collection); }