Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-threaded Executor starvation fix #2702

Open
wants to merge 7 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,14 @@ class Executor
void
execute_any_executable(AnyExecutable & any_exec);

/// Trigger the notify guard condition to wake up the executor.
/**
* This function is thread safe.
*/
RCLCPP_PUBLIC
void
trigger_executor_notify();

/// Run subscription executable.
/**
* Do necessary setup and tear-down as well as executing the subscription.
Expand Down Expand Up @@ -520,6 +528,27 @@ class Executor
void
wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Prepare the wait set to bee waited on.
/**
* Builds a set of waitable entities, which are passed to the middleware.
* After building wait set, waits on middleware to notify.
* \throws std::runtime_error if the wait set can be cleared
*/
RCLCPP_PUBLIC
void
prepare_work();

/// Block until more work becomes avilable or timeout is reached.
/**
* Builds a set of waitable entities, which are passed to the middleware.
* After building wait set, waits on middleware to notify.
* \param[in] timeout duration to wait for new work to become available.
* \throws std::runtime_error if the wait set can be cleared
*/
RCLCPP_PUBLIC
void
wait_for_work_simple(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Check for executable in ready state and populate union structure.
/**
* \param[out] any_executable populated union structure of ready executable
Expand Down Expand Up @@ -568,6 +597,10 @@ class Executor

mutable std::mutex mutex_;

mutable std::mutex notify_mutex_;

std::vector<rclcpp::CallbackGroup::SharedPtr> wait_for_work_cbgs_;

/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;

Expand Down Expand Up @@ -595,6 +628,10 @@ class Executor
rclcpp::WaitSet wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// Previous WaitSet to be waited on.
rclcpp::WaitSet previous_wait_set_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>> previous_wait_result_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// Hold the current state of the collection being waited on by the waitset
rclcpp::executors::ExecutorEntitiesCollection current_collection_ RCPPUTILS_TSA_GUARDED_BY(
mutex_);
Expand Down
67 changes: 65 additions & 2 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "rcl/allocator.h"
#include "rcl/error_handling.h"
#include "rclcpp/executors/executor_notify_waitable.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/subscription_wait_set_mask.hpp"
#include "rcpputils/scope_exit.hpp"

Expand Down Expand Up @@ -496,9 +497,18 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
const std::shared_ptr<void> & const_data = any_exec.data;
any_exec.waitable->execute(const_data);
}
}

// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
void
Executor::trigger_executor_notify()
{
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group change: ") + ex.what());
}
}

template<typename Taker, typename Handler>
Expand Down Expand Up @@ -770,6 +780,53 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
}
}

void Executor::prepare_work()
{
{
std::lock_guard<std::mutex> guard(mutex_);
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Rebuilding executor entities");
this->collect_entities();
}

auto callback_groups = this->collector_.get_all_callback_groups();
wait_for_work_cbgs_.resize(callback_groups.size());
for(const auto & w_ptr : callback_groups) {
auto shr_ptr = w_ptr.lock();
if(shr_ptr) {
wait_for_work_cbgs_.push_back(std::move(shr_ptr));
}
}
}
}

void
Executor::wait_for_work_simple(std::chrono::nanoseconds timeout)
{
TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());

this->wait_result_.reset();
this->wait_result_.emplace(wait_set_.wait(timeout));

// drop references to the callback groups, before trying to execute anything
wait_for_work_cbgs_.clear();

if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
} else {
if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Getting wait set");
auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
if (current_notify_waitable_->is_ready(rcl_wait_set)) {
RCLCPP_INFO(rclcpp::get_logger("rclcpp"), "Executing notify waitable");
current_notify_waitable_->execute(current_notify_waitable_->take_data());
}
}
}
}

bool
Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
Expand Down Expand Up @@ -896,20 +953,26 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
bool
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
notify_mutex_.lock();
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
success = get_next_ready_executable(any_executable);
// If there are none
if (!success) {
// Wait for subscriptions or timers to work on
prepare_work();
notify_mutex_.unlock();
wait_for_work(timeout);
if (!spinning.load()) {
return false;
}
// Try again
success = get_next_ready_executable(any_executable);
}
else {
notify_mutex_.unlock();
}
return success;
}

Expand Down
16 changes: 8 additions & 8 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ MultiThreadedExecutor::run([[maybe_unused]] size_t this_thread_number)

execute_any_executable(any_exec);

if (any_exec.callback_group &&
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group change: ") + ex.what());
std::lock_guard wait_lock{notify_mutex_};

any_exec.callback_group->can_be_taken_from().store(true);

if (any_exec.callback_group &&
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
trigger_executor_notify();
}
}

Expand Down
51 changes: 51 additions & 0 deletions rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,54 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) {
executor.add_node(node);
executor.spin();
}

/*
Test that no tasks are starved
*/
TEST_F(TestMultiThreadedExecutor, starvation) {
rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(),
2u);

std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_multi_threaded_executor_starvation");

std::atomic_int timer_one_count{0};
std::atomic_int timer_two_count{0};

rclcpp::TimerBase::SharedPtr timer_one;
rclcpp::TimerBase::SharedPtr timer_two;

auto timer_one_callback = [&timer_one_count, &timer_two_count]() {
std::this_thread::sleep_for(100ms);

timer_one_count++;

auto diff = std::abs(timer_one_count - timer_two_count);

std::cout << "Difference in counts: " << diff << std::endl;

if (timer_one_count > 10 || timer_two_count > 10) {
rclcpp::shutdown();
ASSERT_LE(diff, 1);
}
};

auto timer_two_callback = [&timer_one_count, &timer_two_count]() {
std::this_thread::sleep_for(100ms);

timer_two_count++;

auto diff = std::abs(timer_one_count - timer_two_count);

if (timer_one_count > 10 || timer_two_count > 10) {
rclcpp::shutdown();
ASSERT_LE(diff, 1);
}
};

timer_one = node->create_wall_timer(10ms, timer_one_callback);
timer_two = node->create_wall_timer(10ms, timer_two_callback);

executor.add_node(node);
executor.spin();
}