Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup Executor::spin_all() regression fix #2517

Merged
merged 9 commits into from
May 2, 2024
50 changes: 39 additions & 11 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,24 +366,52 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

// clear the wait result and wait for work without blocking to collect the work
// for the first time
// both spin_some and spin_all wait for work at the beginning
wait_result_.reset();
wait_for_work(std::chrono::milliseconds(0));
bool just_waited = true;
Comment on lines +369 to +374
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change wasn't part of the original report (busy wait in spin_all) but it is related and is needed. It essentially ensures that we don't start out spin_some or spin_all by iterating over existing work from a previous spin method. Ideally the other spin methods would reset the result before returning so that this wouldn't be necessary, but this is safer. We do want to do this because in the case of spin_some this could cause it to never wait, iterating over old work and exiting after. This isn't really what we want.

Also, it is safe to throw away a wait result and wait again as all the things we wait on need to be able to be waited on, be ready, not be handled, be waited on again, and then be ready again. Or else they need to be ok with not being handled. Put another way, being waited on and then being ready does not guarantee being executed, and it may require being waited on again.


// The logic of this while loop is as follows:
//
// - while not shutdown, and spinning (not canceled), and not max duration reached...
// - try to get an executable item to execute, and execute it if available
// - otherwise, reset the wait result, and ...
// - if there was no work available just after waiting, break the loop unconditionally
// - this is appropriate for both spin_some and spin_all which use this function
// - else if exhaustive = true, then wait for work again
// - this is only used for spin_all and not spin_some
// - else break
// - this only occurs with spin_some
//
// The logic of this loop is subtle and should be carefully changed if at all.
// See also:
// https://github.com/ros2/rclcpp/issues/2508
// https://github.com/ros2/rclcpp/pull/2517
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
if (!wait_result_.has_value()) {
wait_for_work(std::chrono::milliseconds(0));
}

AnyExecutable any_exec;
if (get_next_ready_executable(any_exec)) {
execute_any_executable(any_exec);
just_waited = false;
} else {
// If nothing is ready, reset the result to signal we are
// ready to wait again
// if nothing is ready, reset the result to clear it
wait_result_.reset();
}

if (!wait_result_.has_value() && !exhaustive) {
// In the case of spin some, then we can exit
// In the case of spin all, then we will allow ourselves to wait again.
break;
if (just_waited) {
// there was no work after just waiting, always exit in this case
// before the exhaustive condition can be checked
break;
}

if (exhaustive) {
// if exhaustive, wait for work again
// this only happens for spin_all; spin_some only waits at the start
wait_for_work(std::chrono::milliseconds(0));
just_waited = true;
} else {
break;
}
}
}
}
Expand Down
160 changes: 160 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ class TestWaitable : public rclcpp::Waitable
bool
is_ready(const rcl_wait_set_t & wait_set) override
{
is_ready_count_++;
for (size_t i = 0; i < wait_set.size_of_guard_conditions; ++i) {
auto rcl_guard_condition = wait_set.guard_conditions[i];
if (&gc_.get_rcl_guard_condition() == rcl_guard_condition) {
Expand Down Expand Up @@ -424,8 +425,15 @@ class TestWaitable : public rclcpp::Waitable
return count_;
}

size_t
get_is_ready_call_count() const
{
return is_ready_count_;
}

private:
std::atomic<size_t> trigger_count_ = 0;
std::atomic<size_t> is_ready_count_ = 0;
std::atomic<size_t> count_ = 0;
rclcpp::GuardCondition gc_;
std::function<void()> on_execute_callback_ = nullptr;
Expand Down Expand Up @@ -869,3 +877,155 @@ TEST(TestExecutors, testSpinWithNonDefaultContext)

rclcpp::shutdown(non_default_context);
}

template<typename T>
class TestBusyWaiting : public ::testing::Test
{
public:
void SetUp() override
{
rclcpp::init(0, nullptr);

const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
node = std::make_shared<rclcpp::Node>("node", test_name.str());
callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
/* automatically_add_to_executor_with_node =*/ false);

auto waitable_interfaces = node->get_node_waitables_interface();
waitable = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(waitable, callback_group);

executor = std::make_shared<T>();
executor->add_callback_group(callback_group, node->get_node_base_interface());
}

void TearDown() override
{
rclcpp::shutdown();
}

void
set_up_and_trigger_waitable(std::function<void()> extra_callback = nullptr)
{
this->has_executed = false;
this->waitable->set_on_execute_callback([this, extra_callback]() {
if (!this->has_executed) {
// trigger once to see if the second trigger is handled or not
// this follow up trigger simulates new entities becoming ready while
// the executor is executing something else, e.g. subscription got data
// or a timer expired, etc.
// spin_some would not handle this second trigger, since it collects
// work only once, whereas spin_all should handle it since it
// collects work multiple times
this->waitable->trigger();
this->has_executed = true;
}
if (nullptr != extra_callback) {
extra_callback();
}
});
this->waitable->trigger();
}

void
check_for_busy_waits(std::chrono::steady_clock::time_point start_time)
{
// rough time based check, since the work to be done was very small it
// should be safe to check that we didn't use more than half the
// max duration, which itself is much larger than necessary
// however, it could still produce a false-positive
EXPECT_LT(
std::chrono::steady_clock::now() - start_time,
max_duration / 2)
<< "executor took a long time to execute when it should have done "
<< "nothing and should not have blocked either, but this could be a "
<< "false negative if the computer is really slow";

// this check is making some assumptions about the implementation of the
// executors, but it should be safe to say that a busy wait may result in
// hundreds or thousands of calls to is_ready(), but "normal" executor
// behavior should be within an order of magnitude of the number of
// times that the waitable was executed
ASSERT_LT(waitable->get_is_ready_call_count(), 10u * this->waitable->get_count());
}

static constexpr auto max_duration = 10s;

rclcpp::Node::SharedPtr node;
rclcpp::CallbackGroup::SharedPtr callback_group;
std::shared_ptr<TestWaitable> waitable;
std::chrono::steady_clock::time_point start_time;
std::shared_ptr<T> executor;
bool has_executed;
};

TYPED_TEST_SUITE(TestBusyWaiting, ExecutorTypes, ExecutorTypeNames);

TYPED_TEST(TestBusyWaiting, test_spin_all)
{
this->set_up_and_trigger_waitable();

auto start_time = std::chrono::steady_clock::now();
this->executor->spin_all(this->max_duration);
this->check_for_busy_waits(start_time);
// this should get the initial trigger, and the follow up from in the callback
ASSERT_EQ(this->waitable->get_count(), 2u);
}

TYPED_TEST(TestBusyWaiting, test_spin_some)
{
this->set_up_and_trigger_waitable();

auto start_time = std::chrono::steady_clock::now();
this->executor->spin_some(this->max_duration);
this->check_for_busy_waits(start_time);
// this should get the inital trigger, but not the follow up in the callback
ASSERT_EQ(this->waitable->get_count(), 1u);
}

TYPED_TEST(TestBusyWaiting, test_spin)
{
std::condition_variable cv;
std::mutex cv_m;
bool first_check_passed = false;

this->set_up_and_trigger_waitable([&cv, &cv_m, &first_check_passed]() {
cv.notify_one();
if (!first_check_passed) {
std::unique_lock<std::mutex> lk(cv_m);
cv.wait_for(lk, 1s, [&]() {return first_check_passed;});
}
});

auto start_time = std::chrono::steady_clock::now();
std::thread t([this]() {
this->executor->spin();
});

// wait until thread has started (first execute of waitable)
{
std::unique_lock<std::mutex> lk(cv_m);
cv.wait_for(lk, 10s);
}
EXPECT_GT(this->waitable->get_count(), 0u);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is racy / does not really check for busy waits. You need to wait a bit here, to give a busy waiting executor time to call is_ready a lot of times. As the test is implemented right now, it could pass even if the executor is busy waiting.

first_check_passed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to lock cv_m while accessing the share state (e.g. first_check_passed). If you don't do this, you get race conditions

cv.notify_one();

// wait until the executor has finished (second execute of waitable)
{
std::unique_lock<std::mutex> lk(cv_m);
cv.wait_for(lk, 10s);
}
EXPECT_EQ(this->waitable->get_count(), 2u);

this->executor->cancel();
t.join();

this->check_for_busy_waits(start_time);
// this should get the initial trigger, and the follow up from in the callback
ASSERT_EQ(this->waitable->get_count(), 2u);
}