diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 42a4d274dc..95b0d7fcc8 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -366,24 +366,52 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + // clear the wait result and wait for work without blocking to collect the work + // for the first time + // both spin_some and spin_all wait for work at the beginning + wait_result_.reset(); + wait_for_work(std::chrono::milliseconds(0)); + bool just_waited = true; + + // The logic of this while loop is as follows: + // + // - while not shutdown, and spinning (not canceled), and not max duration reached... + // - try to get an executable item to execute, and execute it if available + // - otherwise, reset the wait result, and ... + // - if there was no work available just after waiting, break the loop unconditionally + // - this is appropriate for both spin_some and spin_all which use this function + // - else if exhaustive = true, then wait for work again + // - this is only used for spin_all and not spin_some + // - else break + // - this only occurs with spin_some + // + // The logic of this loop is subtle and should be carefully changed if at all. + // See also: + // https://github.com/ros2/rclcpp/issues/2508 + // https://github.com/ros2/rclcpp/pull/2517 while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { - if (!wait_result_.has_value()) { - wait_for_work(std::chrono::milliseconds(0)); - } - AnyExecutable any_exec; if (get_next_ready_executable(any_exec)) { execute_any_executable(any_exec); + just_waited = false; } else { - // If nothing is ready, reset the result to signal we are - // ready to wait again + // if nothing is ready, reset the result to clear it wait_result_.reset(); - } - if (!wait_result_.has_value() && !exhaustive) { - // In the case of spin some, then we can exit - // In the case of spin all, then we will allow ourselves to wait again. - break; + if (just_waited) { + // there was no work after just waiting, always exit in this case + // before the exhaustive condition can be checked + break; + } + + if (exhaustive) { + // if exhaustive, wait for work again + // this only happens for spin_all; spin_some only waits at the start + wait_for_work(std::chrono::milliseconds(0)); + just_waited = true; + } else { + break; + } } } } diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1a538eaa30..a82b702db5 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -357,6 +357,7 @@ class TestWaitable : public rclcpp::Waitable bool is_ready(const rcl_wait_set_t & wait_set) override { + is_ready_count_++; for (size_t i = 0; i < wait_set.size_of_guard_conditions; ++i) { auto rcl_guard_condition = wait_set.guard_conditions[i]; if (&gc_.get_rcl_guard_condition() == rcl_guard_condition) { @@ -424,8 +425,15 @@ class TestWaitable : public rclcpp::Waitable return count_; } + size_t + get_is_ready_call_count() const + { + return is_ready_count_; + } + private: std::atomic trigger_count_ = 0; + std::atomic is_ready_count_ = 0; std::atomic count_ = 0; rclcpp::GuardCondition gc_; std::function on_execute_callback_ = nullptr; @@ -869,3 +877,155 @@ TEST(TestExecutors, testSpinWithNonDefaultContext) rclcpp::shutdown(non_default_context); } + +template +class TestBusyWaiting : public ::testing::Test +{ +public: + void SetUp() override + { + rclcpp::init(0, nullptr); + + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + std::stringstream test_name; + test_name << test_info->test_case_name() << "_" << test_info->name(); + node = std::make_shared("node", test_name.str()); + callback_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + /* automatically_add_to_executor_with_node =*/ false); + + auto waitable_interfaces = node->get_node_waitables_interface(); + waitable = std::make_shared(); + waitable_interfaces->add_waitable(waitable, callback_group); + + executor = std::make_shared(); + executor->add_callback_group(callback_group, node->get_node_base_interface()); + } + + void TearDown() override + { + rclcpp::shutdown(); + } + + void + set_up_and_trigger_waitable(std::function extra_callback = nullptr) + { + this->has_executed = false; + this->waitable->set_on_execute_callback([this, extra_callback]() { + if (!this->has_executed) { + // trigger once to see if the second trigger is handled or not + // this follow up trigger simulates new entities becoming ready while + // the executor is executing something else, e.g. subscription got data + // or a timer expired, etc. + // spin_some would not handle this second trigger, since it collects + // work only once, whereas spin_all should handle it since it + // collects work multiple times + this->waitable->trigger(); + this->has_executed = true; + } + if (nullptr != extra_callback) { + extra_callback(); + } + }); + this->waitable->trigger(); + } + + void + check_for_busy_waits(std::chrono::steady_clock::time_point start_time) + { + // rough time based check, since the work to be done was very small it + // should be safe to check that we didn't use more than half the + // max duration, which itself is much larger than necessary + // however, it could still produce a false-positive + EXPECT_LT( + std::chrono::steady_clock::now() - start_time, + max_duration / 2) + << "executor took a long time to execute when it should have done " + << "nothing and should not have blocked either, but this could be a " + << "false negative if the computer is really slow"; + + // this check is making some assumptions about the implementation of the + // executors, but it should be safe to say that a busy wait may result in + // hundreds or thousands of calls to is_ready(), but "normal" executor + // behavior should be within an order of magnitude of the number of + // times that the waitable was executed + ASSERT_LT(waitable->get_is_ready_call_count(), 10u * this->waitable->get_count()); + } + + static constexpr auto max_duration = 10s; + + rclcpp::Node::SharedPtr node; + rclcpp::CallbackGroup::SharedPtr callback_group; + std::shared_ptr waitable; + std::chrono::steady_clock::time_point start_time; + std::shared_ptr executor; + bool has_executed; +}; + +TYPED_TEST_SUITE(TestBusyWaiting, ExecutorTypes, ExecutorTypeNames); + +TYPED_TEST(TestBusyWaiting, test_spin_all) +{ + this->set_up_and_trigger_waitable(); + + auto start_time = std::chrono::steady_clock::now(); + this->executor->spin_all(this->max_duration); + this->check_for_busy_waits(start_time); + // this should get the initial trigger, and the follow up from in the callback + ASSERT_EQ(this->waitable->get_count(), 2u); +} + +TYPED_TEST(TestBusyWaiting, test_spin_some) +{ + this->set_up_and_trigger_waitable(); + + auto start_time = std::chrono::steady_clock::now(); + this->executor->spin_some(this->max_duration); + this->check_for_busy_waits(start_time); + // this should get the inital trigger, but not the follow up in the callback + ASSERT_EQ(this->waitable->get_count(), 1u); +} + +TYPED_TEST(TestBusyWaiting, test_spin) +{ + std::condition_variable cv; + std::mutex cv_m; + bool first_check_passed = false; + + this->set_up_and_trigger_waitable([&cv, &cv_m, &first_check_passed]() { + cv.notify_one(); + if (!first_check_passed) { + std::unique_lock lk(cv_m); + cv.wait_for(lk, 1s, [&]() {return first_check_passed;}); + } + }); + + auto start_time = std::chrono::steady_clock::now(); + std::thread t([this]() { + this->executor->spin(); + }); + + // wait until thread has started (first execute of waitable) + { + std::unique_lock lk(cv_m); + cv.wait_for(lk, 10s); + } + EXPECT_GT(this->waitable->get_count(), 0u); + + first_check_passed = true; + cv.notify_one(); + + // wait until the executor has finished (second execute of waitable) + { + std::unique_lock lk(cv_m); + cv.wait_for(lk, 10s); + } + EXPECT_EQ(this->waitable->get_count(), 2u); + + this->executor->cancel(); + t.join(); + + this->check_for_busy_waits(start_time); + // this should get the initial trigger, and the follow up from in the callback + ASSERT_EQ(this->waitable->get_count(), 2u); +}