Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data race fixes #2494

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,16 @@ class Executor
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/**
* This function triggers a recollect of all entities
* that are registered to the executor.
*
* Calling this function is thread safe.
*
* @param notify if true will execute a trigger that will wake up a waiting executor
*/
void trigger_entity_recollect(bool notify);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/wait_result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class WaitResult final

if (this->kind() == WaitResultKind::Ready) {
auto & wait_set = this->get_wait_set();
auto rcl_wait_set = wait_set.get_rcl_wait_set();
auto & rcl_wait_set = wait_set.get_rcl_wait_set();
while (next_waitable_index_ < wait_set.size_of_waitables()) {
auto cur_waitable = wait_set.waitables(next_waitable_index_++);
if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) {
Expand Down
115 changes: 55 additions & 60 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <algorithm>
#include <cassert>
#include <chrono>
#include <iterator>
#include <memory>
Expand Down Expand Up @@ -72,13 +73,10 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
}
});

notify_waitable_->set_on_ready_callback(
[this](auto, auto) {
this->entities_need_rebuild_.store(true);
});

notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);

wait_set_.add_waitable(notify_waitable_);
}

Executor::~Executor()
Expand Down Expand Up @@ -122,6 +120,20 @@ Executor::~Executor()
}
}

void Executor::trigger_entity_recollect(bool notify)
{
this->entities_need_rebuild_.store(true);

if (!spinning.load() && entities_need_rebuild_.exchange(false)) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if(notify) {
interrupt_guard_condition_->trigger();
}
}

std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_all_callback_groups()
{
Expand Down Expand Up @@ -152,19 +164,12 @@ Executor::add_callback_group(
(void) node_ptr;
this->collector_.add_callback_group(group_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
}

Expand All @@ -173,19 +178,12 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
{
this->collector_.add_node(node_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
}
}
}

Expand All @@ -196,18 +194,12 @@ Executor::remove_callback_group(
{
this->collector_.remove_callback_group(group_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}
if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}
}

Expand All @@ -222,19 +214,12 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
{
this->collector_.remove_node(node_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node remove: ") + ex.what());
}
}
}

Expand Down Expand Up @@ -379,6 +364,9 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
return;
}

assert((void("Internal error, tried to execute a AnyExecutable without a valid callback group"),
any_exec.callback_group));

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
Expand All @@ -403,9 +391,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
}

// Reset the callback_group, regardless of type
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
any_exec.callback_group->can_be_taken_from().store(true);
jmachowinski marked this conversation as resolved.
Show resolved Hide resolved
}

template<typename Taker, typename Handler>
Expand Down Expand Up @@ -642,7 +628,6 @@ Executor::collect_entities()
// In the case that an entity already has an expired weak pointer
// before being removed from the waitset, additionally prune the waitset.
this->wait_set_.prune_deleted_entities();
this->entities_need_rebuild_.store(false);
}

void
Expand All @@ -655,7 +640,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)

{
std::lock_guard<std::mutex> guard(mutex_);
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}
}
Expand All @@ -664,6 +649,13 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
} else {
if(this->wait_result_->kind() == WaitResultKind::Ready && notify_waitable_) {
auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
if(notify_waitable_->is_ready(rcl_wait_set)) {
notify_waitable_->execute(notify_waitable_->take_data());
}
}
}
}

Expand All @@ -689,7 +681,8 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
if (entity_iter != current_collection_.timers.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
current_timer_index++;
continue;
}
// At this point the timer is either ready for execution or was perhaps
Expand All @@ -699,13 +692,15 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
wait_result_->clear_timer_with_index(current_timer_index);
// Check that the timer should be called still, i.e. it wasn't canceled.
if (!timer->call()) {
current_timer_index++;
continue;
}
any_executable.timer = timer;
any_executable.callback_group = callback_group;
valid_executable = true;
break;
}
current_timer_index++;
}
}

Expand All @@ -715,7 +710,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
subscription->get_subscription_handle().get());
if (entity_iter != current_collection_.subscriptions.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.subscription = subscription;
Expand All @@ -731,7 +726,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
if (entity_iter != current_collection_.services.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.service = service;
Expand All @@ -747,7 +742,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
if (entity_iter != current_collection_.clients.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.client = client;
Expand All @@ -763,7 +758,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.waitables.find(waitable.get());
if (entity_iter != current_collection_.waitables.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.waitable = waitable;
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/src/rclcpp/executors/executor_entities_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
if (!entity->call()) {
Expand All @@ -176,7 +176,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -196,7 +196,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -216,7 +216,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -236,7 +236,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entry.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout)
{
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout));
Expand Down
9 changes: 9 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,15 @@ if(TARGET test_executors)
target_link_libraries(test_executors_timer_cancel_behavior ${PROJECT_NAME} ${rosgraph_msgs_TARGETS})
endif()

ament_add_gtest(
test_executors_callback_group_behavior
executors/test_executors_callback_group_behavior.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
TIMEOUT 180)
if(TARGET test_executors)
target_link_libraries(test_executors_callback_group_behavior ${PROJECT_NAME})
endif()

ament_add_gtest(
test_executors_intraprocess
executors/test_executors_intraprocess.cpp
Expand Down
Loading