Skip to content

Commit

Permalink
Get in the upstream executor_notify_waitable
Browse files Browse the repository at this point in the history
Get the upstream executor_notify_waitable.cpp/.hpp and the
test_executors_timer_cancel_behavior unit-test
  • Loading branch information
Josh Finken authored and apojomovsky committed May 16, 2024
1 parent 3071426 commit f42ed52
Show file tree
Hide file tree
Showing 4 changed files with 506 additions and 11 deletions.
33 changes: 31 additions & 2 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
std::shared_ptr<void>
take_data() override;

/// Take the data from an entity ID so that it can be consumed with `execute`.
/**
* \param[in] id ID of the entity to take data from.
* \return If available, data to be used, otherwise nullptr
* \sa rclcpp::Waitable::take_data_by_entity_id
*/
RCLCPP_PUBLIC
std::shared_ptr<void>
take_data_by_entity_id(size_t id) override;

/// Set a callback to be called whenever the waitable becomes ready.
/**
* \param[in] callback callback to set
* \sa rclcpp::Waitable::set_on_ready_callback
*/
RCLCPP_PUBLIC
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override;

/// Add a guard condition to be waited on.
/**
* \param[in] guard_condition The guard condition to add.
Expand All @@ -96,13 +115,21 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
void
add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);

/// Unset any callback registered via set_on_ready_callback.
/**
* \sa rclcpp::Waitable::clear_on_ready_callback
*/
RCLCPP_PUBLIC
void
clear_on_ready_callback() override;

/// Remove a guard condition from being waited on.
/**
* \param[in] guard_condition The guard condition to remove.
* \param[in] weak_guard_condition The guard condition to remove.
*/
RCLCPP_PUBLIC
void
remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);
remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition);

/// Get the number of ready guard_conditions
/**
Expand All @@ -118,6 +145,8 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable

std::mutex guard_condition_mutex_;

std::function<void(size_t)> on_ready_callback_;

/// The collection of guard conditions to be waited on.
std::set<rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
Expand Down
60 changes: 57 additions & 3 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,76 @@ ExecutorNotifyWaitable::take_data()
return nullptr;
}

std::shared_ptr<void>
ExecutorNotifyWaitable::take_data_by_entity_id(size_t id)
{
(void) id;
return nullptr;
}

void
ExecutorNotifyWaitable::set_on_ready_callback(std::function<void(size_t, int)> callback)
{
// The second argument of the callback could be used to identify which guard condition
// triggered the event.
// We could indicate which of the guard conditions was triggered, but the executor
// is already going to check that.
auto gc_callback = [callback](size_t count) {
callback(count, 0);
};

std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = gc_callback;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
if (!gc) {
continue;
}
gc->set_on_trigger_callback(on_ready_callback_);
}
}

RCLCPP_PUBLIC
void
ExecutorNotifyWaitable::clear_on_ready_callback()
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

on_ready_callback_ = nullptr;
for (auto weak_gc : notify_guard_conditions_) {
auto gc = weak_gc.lock();
if (!gc) {
continue;
}
gc->set_on_trigger_callback(nullptr);
}
}

void
ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
auto guard_condition = weak_guard_condition.lock();
if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) {
notify_guard_conditions_.insert(weak_guard_condition);
if (on_ready_callback_) {
guard_condition->set_on_trigger_callback(on_ready_callback_);
}
}
}

void
ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition)
ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);
if (notify_guard_conditions_.count(guard_condition) != 0) {
notify_guard_conditions_.erase(guard_condition);
if (notify_guard_conditions_.count(weak_guard_condition) != 0) {
notify_guard_conditions_.erase(weak_guard_condition);
auto guard_condition = weak_guard_condition.lock();
// If this notify waitable doesn't have an on_ready_callback, then there's nothing to unset
if (guard_condition && on_ready_callback_) {
guard_condition->set_on_trigger_callback(nullptr);
}
}
}

Expand Down
16 changes: 10 additions & 6 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -628,18 +628,22 @@ if(TARGET test_interface_traits)
target_link_libraries(test_interface_traits ${PROJECT_NAME})
endif()

# TODO(brawner) remove when destroying Node for Connext is resolved. See:
# https://github.com/ros2/rclcpp/issues/1250
ament_add_gtest(
test_executors
executors/test_executors.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
TIMEOUT 180)
if(TARGET test_executors)
ament_target_dependencies(test_executors
"rcl"
"test_msgs")
target_link_libraries(test_executors ${PROJECT_NAME})
target_link_libraries(test_executors ${PROJECT_NAME} rcl::rcl ${test_msgs_TARGETS})
endif()

ament_add_gtest(
test_executors_timer_cancel_behavior
executors/test_executors_timer_cancel_behavior.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
TIMEOUT 180)
if(TARGET test_executors)
target_link_libraries(test_executors_timer_cancel_behavior ${PROJECT_NAME} ${rosgraph_msgs_TARGETS})
endif()

ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp
Expand Down
Loading

0 comments on commit f42ed52

Please sign in to comment.