diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 68138eb82b..47bb94c16f 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -36,8 +36,11 @@ #include "rclcpp/duration.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/time_source.hpp" +#include "rcpputils/scope_exit.hpp" #include "test_msgs/msg/empty.hpp" +#include "test_msgs/srv/empty.hpp" using namespace std::chrono_literals; @@ -461,19 +464,32 @@ class TestWaitable : public rclcpp::Waitable void add_to_wait_set(rcl_wait_set_t * wait_set) override { + if (trigger_count_ > 0) { + // Keep the gc triggered until the trigger count is reduced back to zero. + // This is necessary if trigger() results in the wait set waking, but not + // executing this waitable, in which case it needs to be re-triggered. + gc_.trigger(); + } rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); } void trigger() { + trigger_count_++; gc_.trigger(); } bool is_ready(rcl_wait_set_t * wait_set) override { - (void)wait_set; - return true; + is_ready_count_++; + for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) { + auto rcl_guard_condition = wait_set->guard_conditions[i]; + if (&gc_.get_rcl_guard_condition() == rcl_guard_condition) { + return true; + } + } + return false; } std::shared_ptr @@ -490,11 +506,24 @@ class TestWaitable : public rclcpp::Waitable } void - execute(std::shared_ptr & data) override + execute(std::shared_ptr &) override { - (void) data; + trigger_count_--; count_++; - std::this_thread::sleep_for(3ms); + if (nullptr != on_execute_callback_) { + on_execute_callback_(); + } else { + // TODO(wjwwood): I don't know why this was here, but probably it should + // not be there, or test cases where that is important should use the + // on_execute_callback? + std::this_thread::sleep_for(3ms); + } + } + + void + set_on_execute_callback(std::function on_execute_callback) + { + on_execute_callback_ = on_execute_callback; } void @@ -516,14 +545,23 @@ class TestWaitable : public rclcpp::Waitable get_number_of_ready_guard_conditions() override {return 1;} size_t - get_count() + get_count() const { return count_; } + size_t + get_is_ready_call_count() const + { + return is_ready_count_; + } + private: - size_t count_ = 0; + std::atomic trigger_count_ = 0; + std::atomic is_ready_count_ = 0; + std::atomic count_ = 0; rclcpp::GuardCondition gc_; + std::function on_execute_callback_ = nullptr; }; TYPED_TEST(TestExecutors, spinAll) @@ -837,6 +875,374 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) rclcpp::shutdown(); } +// The purpose of this test is to check that the order of callbacks happen +// in some relation to the order of events and the order in which the callbacks +// were registered. +// This is not a guarantee of executor API, but it is a bit of UB that some +// have come to depend on, see: +// +// https://github.com/ros2/rclcpp/issues/2532 +// +// It should not be changed unless there's a good reason for it (users find it +// the least surprising out come even if it is not guaranteed), but if there +// is a good reason for changing it, then the executors effected can be skipped, +// or the test can be removed. +// The purpose of this test is to catch this regressions and let the authors of +// the change read up on the above context and act accordingly. +TYPED_TEST(TestExecutors, deterministic_execution_order_ub) +{ + using ExecutorType = TypeParam; + + // number of each entity to test + constexpr size_t number_of_entities = 20; + std::vector forward(number_of_entities); + std::iota(std::begin(forward), std::end(forward), 0); + std::vector reverse(number_of_entities); + std::reverse_copy(std::begin(forward), std::end(forward), std::begin(reverse)); + + // The expected results vary based on the registration order (always 0..N-1), + // the call order (what this means varies based on the entity type), the + // entity types, and in some cases the executor type. + // It is also possible that the rmw implementation can play a role in the + // ordering, depending on how the executor uses the rmw layer. + // The follow structure and logic tries to capture these details. + // Each test case represents a case-entity pair, + // e.g. "forward call order for waitables" or "reverse call order for timers" + struct test_case + { + // If this is true, then the test case should be skipped. + bool should_skip; + // Order in which to invoke the entities, where that is possible to control. + // For example, the order in which we trigger() the waitables, or the + // order in which we set the timers up to execute (using increasing periods). + std::vector call_order; + // Order in which we expect the entities to be executed by the executor. + std::vector expected_execution_order; + }; + // tests cases are "test_name: {"entity type": {call_order, expected_execution_order}" + std::map> test_cases = { + { + "forward call order", + { + {"waitable", {false, forward, forward}}, + {"subscription", {false, forward, forward}}, + {"service", {false, forward, forward}}, + {"timer", {false, forward, forward}} + } + }, + { + "reverse call order", + { + {"waitable", {false, reverse, forward}}, + {"subscription", {false, reverse, forward}}, + {"service", {false, reverse, forward}}, + // timers are always called in order of which expires soonest, so + // the registration order doesn't necessarily affect them + {"timer", {false, reverse, reverse}} + } + }, + }; + + // Note use this to exclude or modify expected results for executors if this + // undefined behavior doesn't hold for them: + if ( + std::is_same()) + { + // for the EventsExecutor the call order is the execution order because it + // tracks the individual events (triggers in the case of waitables) and + // executes in that order + test_cases["reverse call order"]["waitable"] = {false, reverse, reverse}; + // timers are unaffected by the above about waitables, as they are always + // executed in "call order" even in the other executors + // but, subscription and service execution order is driven by the rmw impl + // due to how the EventsExecutor uses the rmw interface, so we'll skip those + for (auto & test_case_pair : test_cases) { + for (auto & entity_test_case_pair : test_case_pair.second) { + if ( + entity_test_case_pair.first == "subscription" || + entity_test_case_pair.first == "service") + { + entity_test_case_pair.second = {true, {}, {}}; + } + } + } + } + + // Set up a situation with N waitables, added in order (1, ..., N) and then + // trigger them in various orders between calls to spin, to see that the order + // is impacted by the registration order (in most cases). + // Note that we always add/register, trigger, then wait/spin, because this + // undefined behavior related to execution order only applies to entities + // that were "ready" in between calls to spin, i.e. they appear to become + // "ready" to the executor at the "same time". + // Also note, that this ordering only applies within entities of the same type + // as well, there are other parts of the executor that determine the order + // between entity types, e.g. the default scheduling (at the time of writing) + // prefers timers, then subscriptions, then service servers, then service + // clients, and then waitables, see: Executor::get_next_ready_executable() + // But that might be different for different executors and may change in the + // future. + // So here we just test order withing a few different waitable instances only. + // Further down we test similar set ups with other entities like subscriptions + // and timers. + + constexpr bool automatically_add_to_executor_with_node = false; + auto isolated_callback_group = this->node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + automatically_add_to_executor_with_node); + + // perform each of the test cases for waitables + { + auto waitable_interfaces = this->node->get_node_waitables_interface(); + + std::vector> waitables; + for (size_t i = 0; i < number_of_entities; ++i) { + auto my_waitable = std::make_shared(); + waitable_interfaces->add_waitable(my_waitable, isolated_callback_group); + waitables.push_back(my_waitable); + } + + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("waitable"); + if (test_case.should_skip) { + continue; + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT( + { + for (size_t i = 0; i < number_of_entities; ++i) { + waitables[i]->set_on_execute_callback(nullptr); + } + }); + + std::vector actual_order; + for (size_t i : test_case.call_order) { + waitables[i]->set_on_execute_callback([&actual_order, i]() {actual_order.push_back(i);}); + waitables[i]->trigger(); + } + + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of waitables in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } + + // perform each of the test cases for subscriptions + { + const std::string test_topic_name = "~/deterministic_execution_order_ub"; + std::map> on_sub_data_callbacks; + std::vector::SharedPtr> subscriptions; + rclcpp::SubscriptionOptions so; + so.callback_group = isolated_callback_group; + for (size_t i = 0; i < number_of_entities; ++i) { + size_t next_sub_index = subscriptions.size(); + auto sub = this->node->template create_subscription( + test_topic_name, + 10, + [&on_sub_data_callbacks, &subscriptions, next_sub_index](const test_msgs::msg::Empty &) { + auto this_sub_pointer = subscriptions[next_sub_index].get(); + auto callback_for_sub_it = on_sub_data_callbacks.find(this_sub_pointer); + ASSERT_NE(callback_for_sub_it, on_sub_data_callbacks.end()); + auto on_sub_data_callback = callback_for_sub_it->second; + if (on_sub_data_callback) { + on_sub_data_callback(); + } + }, + so); + subscriptions.push_back(sub); + } + + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("subscription"); + if (test_case.should_skip) { + continue; + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT( + { + on_sub_data_callbacks.clear(); + }); + + std::vector actual_order; + for (size_t i = 0; i < number_of_entities; ++i) { + auto sub = subscriptions[i]; + on_sub_data_callbacks[sub.get()] = [&actual_order, i]() { + actual_order.push_back(i); + }; + } + + // create publisher and wait for all of the subscriptions to match + auto pub = this->node->template create_publisher(test_topic_name, 10); + size_t number_of_matches = pub->get_subscription_count(); + while (number_of_matches < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + number_of_matches = pub->get_subscription_count(); + } + + // publish once and wait for all subscriptions to be handled + pub->publish(test_msgs::msg::Empty()); + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of subscriptions in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } + + // perform each of the test cases for service servers + { + const std::string test_service_name = "~/deterministic_execution_order_ub"; + std::map> on_request_callbacks; + std::vector::SharedPtr> services; + std::vector::SharedPtr> clients; + for (size_t i = 0; i < number_of_entities; ++i) { + size_t next_srv_index = services.size(); + auto srv = this->node->template create_service( + test_service_name + "_" + std::to_string(i), + [&on_request_callbacks, &services, next_srv_index]( + std::shared_ptr, + std::shared_ptr + ) { + auto this_srv_pointer = services[next_srv_index].get(); + auto callback_for_srv_it = on_request_callbacks.find(this_srv_pointer); + ASSERT_NE(callback_for_srv_it, on_request_callbacks.end()); + auto on_request_callback = callback_for_srv_it->second; + if (on_request_callback) { + on_request_callback(); + } + }, + 10, + isolated_callback_group); + services.push_back(srv); + auto client = this->node->template create_client( + test_service_name + "_" + std::to_string(i), + 10, + isolated_callback_group + ); + clients.push_back(client); + } + + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("service"); + if (test_case.should_skip) { + continue; + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT( + { + on_request_callbacks.clear(); + }); + + std::vector actual_order; + for (size_t i = 0; i < number_of_entities; ++i) { + auto srv = services[i]; + on_request_callbacks[srv.get()] = [&actual_order, i]() { + actual_order.push_back(i); + }; + } + + // wait for all of the services to match + for (const auto & client : clients) { + bool matched = client->wait_for_service(10s); // long timeout, but should be quick + ASSERT_TRUE(matched); + } + + // send requests in order + for (size_t i : test_case.call_order) { + clients[i]->async_send_request(std::make_shared()); + } + + // wait for all the requests to be handled + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of service servers in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } + + // perform each of the test cases for timers + { + for (const auto & test_case_pair : test_cases) { + const std::string & test_case_name = test_case_pair.first; + const auto & test_case = test_case_pair.second.at("timer"); + if (test_case.should_skip) { + continue; + } + + std::map> timer_callbacks; + std::vector timers; + for (size_t i = 0; i < number_of_entities; ++i) { + // "call order" for timers will be simulated by setting them at different + // periods, with the "first" ones having the smallest period. + auto period = 1ms + std::chrono::milliseconds(test_case.call_order[i]); + auto timer = this->node->create_timer( + period, + [&timer_callbacks](rclcpp::TimerBase & timer) { + auto timer_callback_it = timer_callbacks.find(&timer); + ASSERT_NE(timer_callback_it, timer_callbacks.end()); + if (nullptr != timer_callback_it->second) { + timer_callback_it->second(); + } + }, + isolated_callback_group); + timers.push_back(timer); + } + + ExecutorType executor; + executor.add_callback_group(isolated_callback_group, this->node->get_node_base_interface()); + + RCPPUTILS_SCOPE_EXIT( + { + timer_callbacks.clear(); + }); + + std::vector actual_order; + for (size_t i = 0; i < number_of_entities; ++i) { + ASSERT_LT(i, timers.size()); + auto & timer = timers[i]; + timer_callbacks[timer.get()] = [&actual_order, &timer, i]() { + actual_order.push_back(i); + // only allow execution once + timer->cancel(); + }; + } + + while (actual_order.size() < number_of_entities && rclcpp::ok()) { + executor.spin_once(10s); // large timeout because it should normally exit quickly + } + + EXPECT_EQ(actual_order, test_case.expected_execution_order) + << "callback call order of timers in test case '" << test_case_name + << "' different than expected, this may be a false positive, see test " + << "description"; + } + } +} + template class TestIntraprocessExecutors : public ::testing::Test {