diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp index 43884388ed..d60e01bfdb 100644 --- a/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp @@ -70,6 +70,8 @@ class ExecutorEntitiesCollector RCLCPP_PUBLIC ~ExecutorEntitiesCollector(); + bool has_pending(); + /// Add a node to the entity collector /** * \param[in] node_ptr a shared pointer that points to a node base interface @@ -164,63 +166,66 @@ class ExecutorEntitiesCollector using WeakNodesToGuardConditionsMap = std::map< rclcpp::node_interfaces::NodeBaseInterface::WeakPtr, - const rclcpp::GuardCondition *, + rclcpp::GuardCondition::WeakPtr, std::owner_less>; using WeakGroupsToGuardConditionsMap = std::map< rclcpp::CallbackGroup::WeakPtr, - const rclcpp::GuardCondition *, + rclcpp::GuardCondition::WeakPtr, std::owner_less>; RCLCPP_PUBLIC NodeCollection::iterator - remove_weak_node(NodeCollection::iterator weak_node) RCPPUTILS_TSA_REQUIRES(mutex_); + remove_weak_node(NodeCollection::iterator weak_node); RCLCPP_PUBLIC CallbackGroupCollection::iterator remove_weak_callback_group( CallbackGroupCollection::iterator weak_group_it, - CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_); + CallbackGroupCollection & collection); RCLCPP_PUBLIC void add_callback_group_to_collection( rclcpp::CallbackGroup::SharedPtr group_ptr, - CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_); + CallbackGroupCollection & collection); RCLCPP_PUBLIC void remove_callback_group_from_collection( rclcpp::CallbackGroup::SharedPtr group_ptr, - CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_); + CallbackGroupCollection & collection); RCLCPP_PUBLIC void - add_automatically_associated_callback_groups( - const NodeCollection & nodes_to_check) - RCPPUTILS_TSA_REQUIRES(mutex_); + process_queues(); RCLCPP_PUBLIC void - prune_invalid_nodes_and_groups() RCPPUTILS_TSA_REQUIRES(mutex_); + add_automatically_associated_callback_groups( + const NodeCollection & nodes_to_check); - mutable std::mutex mutex_; + RCLCPP_PUBLIC + void + prune_invalid_nodes_and_groups(); /// Callback groups that were added via `add_callback_group` - CallbackGroupCollection - manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + CallbackGroupCollection manually_added_groups_; /// Callback groups that were added by their association with added nodes - CallbackGroupCollection - automatically_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + CallbackGroupCollection automatically_added_groups_; /// nodes that are associated with the executor - NodeCollection - weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + NodeCollection weak_nodes_; - WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::mutex pending_mutex_; + NodeCollection pending_added_nodes_; + NodeCollection pending_removed_nodes_; + CallbackGroupCollection pending_manually_added_groups_; + CallbackGroupCollection pending_manually_removed_groups_; - WeakGroupsToGuardConditionsMap weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + WeakGroupsToGuardConditionsMap weak_groups_to_guard_conditions_; std::shared_ptr notify_waitable_; }; diff --git a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp index 5fb2532dec..aac0820dbe 100644 --- a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp @@ -17,6 +17,7 @@ #include #include +#include #include "rclcpp/guard_condition.hpp" #include "rclcpp/waitable.hpp" @@ -33,6 +34,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable public: RCLCPP_SMART_PTR_DEFINITIONS(ExecutorNotifyWaitable) + // Constructor /** * \param[in] on_execute_callback Callback to execute when one of the conditions @@ -45,6 +47,13 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable RCLCPP_PUBLIC ~ExecutorNotifyWaitable() override = default; + RCLCPP_PUBLIC + ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other); + + + RCLCPP_PUBLIC + ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other); + /// Add conditions to the wait set /** * \param[inout] wait_set structure that conditions will be added to @@ -85,7 +94,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable */ RCLCPP_PUBLIC void - add_guard_condition(const rclcpp::GuardCondition * guard_condition); + add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition); /// Remove a guard condition from being waited on. /** @@ -93,7 +102,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable */ RCLCPP_PUBLIC void - remove_guard_condition(const rclcpp::GuardCondition * guard_condition); + remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition); /// Get the number of ready guard_conditions /** @@ -107,17 +116,11 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable /// Callback to run when waitable executes std::function execute_callback_; - /// The collection of guard conditions to be waited on. std::mutex guard_condition_mutex_; /// The collection of guard conditions to be waited on. - std::list notify_guard_conditions_; - - /// The collection of guard conditions to be waited on. - std::list to_add_; - - /// The collection of guard conditions to be waited on. - std::list to_remove_; + std::set> notify_guard_conditions_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index a6c84e4aa0..6131681b80 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -122,9 +122,13 @@ class NodeBase : public NodeBaseInterface, public std::enable_shared_from_this notify_guard_condition_; bool notify_guard_condition_is_valid_; }; diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp index fd4f64b22b..eb87771eab 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp @@ -152,9 +152,14 @@ class NodeBaseInterface */ RCLCPP_PUBLIC virtual - rclcpp::GuardCondition & + rclcpp::GuardCondition::SharedPtr get_notify_guard_condition() = 0; + RCLCPP_PUBLIC + virtual + void + trigger_notify_guard_condition() = 0; + /// Return the default preference for using intra process communication. RCLCPP_PUBLIC virtual diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp index 7c17f68939..4afdeb3a2b 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp @@ -31,8 +31,6 @@ ExecutorEntitiesCollector::ExecutorEntitiesCollector( ExecutorEntitiesCollector::~ExecutorEntitiesCollector() { - std::lock_guard guard{mutex_}; - for (auto weak_node_it = weak_nodes_.begin(); weak_node_it != weak_nodes_.end(); ) { weak_node_it = remove_weak_node(weak_node_it); } @@ -48,13 +46,40 @@ ExecutorEntitiesCollector::~ExecutorEntitiesCollector() { weak_group_it = remove_weak_callback_group(weak_group_it, manually_added_groups_); } + + std::lock_guard pending_lock(pending_mutex_); + + for (auto weak_node_ptr : pending_added_nodes_) { + auto node_ptr = weak_node_ptr.lock(); + if (node_ptr) { + node_ptr->get_associated_with_executor_atomic().store(false); + } + } + pending_added_nodes_.clear(); + pending_removed_nodes_.clear(); + + for (auto weak_group_ptr : pending_manually_added_groups_) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + group_ptr->get_associated_with_executor_atomic().store(false); + } + } + pending_manually_added_groups_.clear(); + pending_manually_removed_groups_.clear(); +} + +bool ExecutorEntitiesCollector::has_pending() +{ + std::lock_guard pending_lock(pending_mutex_); + return pending_manually_added_groups_.size() != 0 || + pending_manually_removed_groups_.size() != 0 || + pending_added_nodes_.size() != 0 || + pending_removed_nodes_.size() != 0; } void ExecutorEntitiesCollector::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) { - std::lock_guard guard{mutex_}; - // If the node already has an executor std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); if (has_executor.exchange(true)) { @@ -62,64 +87,88 @@ ExecutorEntitiesCollector::add_node(rclcpp::node_interfaces::NodeBaseInterface:: std::string("Node '") + node_ptr->get_fully_qualified_name() + "' has already been added to an executor."); } - weak_nodes_.insert(node_ptr); - this->add_automatically_associated_callback_groups({node_ptr}); - // Store node guard condition in map and add it to the notify waitable - rclcpp::GuardCondition * node_guard_condition = &node_ptr->get_notify_guard_condition(); - weak_nodes_to_guard_conditions_.insert({node_ptr, node_guard_condition}); + std::lock_guard pending_lock(pending_mutex_); + bool associated = weak_nodes_.count(node_ptr) != 0; + bool add_queued = pending_added_nodes_.count(node_ptr) != 0; + bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0; + + if ((associated || add_queued) && !remove_queued) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' has already been added to this executor."); + } - this->notify_waitable_->add_guard_condition(node_guard_condition); + this->pending_added_nodes_.insert(node_ptr); } void ExecutorEntitiesCollector::remove_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) { - std::lock_guard guard{mutex_}; - if (!node_ptr->get_associated_with_executor_atomic().load()) { throw std::runtime_error("Node needs to be associated with an executor."); } - auto node_it = weak_nodes_.find(node_ptr); - if (node_it != weak_nodes_.end()) { - remove_weak_node(node_it); - } else { - throw std::runtime_error("Node needs to be associated with this executor."); - } + std::lock_guard pending_lock(pending_mutex_); + bool associated = weak_nodes_.count(node_ptr) != 0; + bool add_queued = pending_added_nodes_.count(node_ptr) != 0; + bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0; - for (auto group_it = automatically_added_groups_.begin(); - group_it != automatically_added_groups_.end(); ) - { - auto group_ptr = group_it->lock(); - if (node_ptr->callback_group_in_node(group_ptr)) { - group_it = remove_weak_callback_group(group_it, automatically_added_groups_); - } else { - ++group_it; - } + if (!(associated || add_queued) || remove_queued) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' needs to be associated with this executor."); } + + this->pending_removed_nodes_.insert(node_ptr); } void ExecutorEntitiesCollector::add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr) { - std::lock_guard guard{mutex_}; - this->add_callback_group_to_collection(group_ptr, manually_added_groups_); + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + + std::lock_guard pending_lock(pending_mutex_); + bool associated = manually_added_groups_.count(group_ptr) != 0; + bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0; + bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0; + + if ((associated || add_queued) && !remove_queued) { + throw std::runtime_error("Callback group has already been added to this executor."); + } + + this->pending_manually_added_groups_.insert(group_ptr); } void ExecutorEntitiesCollector::remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr) { - std::lock_guard guard{mutex_}; - this->remove_callback_group_from_collection(group_ptr, manually_added_groups_); + if (!group_ptr->get_associated_with_executor_atomic().load()) { + throw std::runtime_error("Callback group needs to be associated with an executor."); + } + + auto weak_group_ptr = rclcpp::CallbackGroup::WeakPtr(group_ptr); + + std::lock_guard pending_lock(pending_mutex_); + bool associated = manually_added_groups_.count(group_ptr) != 0; + bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0; + bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0; + + if ((associated || add_queued) && !remove_queued) { + throw std::runtime_error("Callback group needs to be associated with this executor."); + } + + this->pending_manually_removed_groups_.insert(group_ptr); } std::vector ExecutorEntitiesCollector::get_all_callback_groups() { std::vector groups; - std::lock_guard guard{mutex_}; for (const auto & group_ptr : manually_added_groups_) { groups.push_back(group_ptr); @@ -134,7 +183,6 @@ std::vector ExecutorEntitiesCollector::get_manually_added_callback_groups() { std::vector groups; - std::lock_guard guard{mutex_}; for (const auto & group_ptr : manually_added_groups_) { groups.push_back(group_ptr); } @@ -145,7 +193,6 @@ std::vector ExecutorEntitiesCollector::get_automatically_added_callback_groups() { std::vector groups; - std::lock_guard guard{mutex_}; for (auto const & group_ptr : automatically_added_groups_) { groups.push_back(group_ptr); } @@ -155,7 +202,7 @@ ExecutorEntitiesCollector::get_automatically_added_callback_groups() void ExecutorEntitiesCollector::update_collections() { - std::lock_guard guard{mutex_}; + this->process_queues(); this->add_automatically_associated_callback_groups(this->weak_nodes_); this->prune_invalid_nodes_and_groups(); } @@ -224,7 +271,7 @@ ExecutorEntitiesCollector::add_callback_group_to_collection( } // Store node guard condition in map and add it to the notify waitable - rclcpp::GuardCondition * group_guard_condition = group_ptr->get_notify_guard_condition().get(); + auto group_guard_condition = group_ptr->get_notify_guard_condition(); weak_groups_to_guard_conditions_.insert({group_ptr, group_guard_condition}); this->notify_waitable_->add_guard_condition(group_guard_condition); } @@ -235,7 +282,73 @@ ExecutorEntitiesCollector::remove_callback_group_from_collection( CallbackGroupCollection & collection) { auto group_it = collection.find(group_ptr); - remove_weak_callback_group(group_it, collection); + + if (group_it != collection.end()) { + remove_weak_callback_group(group_it, collection); + } else { + throw std::runtime_error("Attempting to remove a callback group not added to this executor."); + } +} + +void +ExecutorEntitiesCollector::process_queues() +{ + std::lock_guard pending_lock(pending_mutex_); + + for (auto weak_node_ptr: pending_added_nodes_) { + auto node_ptr = weak_node_ptr.lock(); + if (!node_ptr) { + continue; + } + weak_nodes_.insert(weak_node_ptr); + this->add_automatically_associated_callback_groups({weak_node_ptr}); + + // Store node guard condition in map and add it to the notify waitable + auto node_guard_condition = node_ptr->get_notify_guard_condition(); + weak_nodes_to_guard_conditions_.insert({weak_node_ptr, node_guard_condition}); + this->notify_waitable_->add_guard_condition(node_guard_condition); + } + pending_added_nodes_.clear(); + + for (auto weak_node_ptr: pending_removed_nodes_) { + auto node_it = weak_nodes_.find(weak_node_ptr); + if (node_it != weak_nodes_.end()) { + remove_weak_node(node_it); + } else { + throw std::runtime_error("Node needs to be associated with this executor."); + } + + auto node_ptr = weak_node_ptr.lock(); + if (node_ptr) { + for (auto group_it = automatically_added_groups_.begin(); + group_it != automatically_added_groups_.end(); ) + { + auto group_ptr = group_it->lock(); + if (node_ptr->callback_group_in_node(group_ptr)) { + group_it = remove_weak_callback_group(group_it, automatically_added_groups_); + } else { + ++group_it; + } + } + } + } + pending_removed_nodes_.clear(); + + for (auto weak_group_ptr: pending_manually_added_groups_) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + this->add_callback_group_to_collection(group_ptr, manually_added_groups_); + } + } + pending_manually_added_groups_.clear(); + + for (auto weak_group_ptr: pending_manually_removed_groups_) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + this->remove_callback_group_from_collection(group_ptr, manually_added_groups_); + } + } + pending_manually_removed_groups_.clear(); } void diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index 78e2db91a2..11ff07a732 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -27,20 +27,39 @@ ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function on_exec { } +ExecutorNotifyWaitable::ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other) +: ExecutorNotifyWaitable(other.execute_callback_) +{ + this->notify_guard_conditions_ = other.notify_guard_conditions_; +} + +ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(const ExecutorNotifyWaitable & other) +{ + if (this != &other) { + this->execute_callback_ = other.execute_callback_; + this->notify_guard_conditions_ = other.notify_guard_conditions_; + } + return *this; +} + void ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set) { std::lock_guard lock(guard_condition_mutex_); - for (auto guard_condition : this->notify_guard_conditions_) { - auto rcl_guard_condition = guard_condition->get_rcl_guard_condition(); - rcl_ret_t ret = rcl_wait_set_add_guard_condition( - wait_set, - &rcl_guard_condition, NULL); + for (auto weak_guard_condition : this->notify_guard_conditions_) { + auto guard_condition = weak_guard_condition.lock(); + if (guard_condition) { + auto rcl_guard_condition = &guard_condition->get_rcl_guard_condition(); + + rcl_ret_t ret = rcl_wait_set_add_guard_condition( + wait_set, + rcl_guard_condition, NULL); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error( - ret, "failed to add guard condition to wait set"); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } } } } @@ -49,20 +68,22 @@ bool ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) { std::lock_guard lock(guard_condition_mutex_); + + bool any_ready = false; for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) { auto rcl_guard_condition = wait_set->guard_conditions[ii]; if (nullptr == rcl_guard_condition) { continue; } - - for (auto guard_condition : this->notify_guard_conditions_) { - if (&guard_condition->get_rcl_guard_condition() == rcl_guard_condition) { - return true; + for (auto weak_guard_condition : this->notify_guard_conditions_) { + auto guard_condition = weak_guard_condition.lock(); + if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) { + any_ready = true; } } } - return false; + return any_ready; } void @@ -79,52 +100,27 @@ ExecutorNotifyWaitable::take_data() } void -ExecutorNotifyWaitable::add_guard_condition(const rclcpp::GuardCondition * guard_condition) +ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition) { - if (guard_condition == nullptr) { - throw std::runtime_error("Attempting to add null guard condition."); - } std::lock_guard lock(guard_condition_mutex_); - to_add_.push_back(guard_condition); + if (notify_guard_conditions_.count(guard_condition) == 0) { + notify_guard_conditions_.insert(guard_condition); + } } void -ExecutorNotifyWaitable::remove_guard_condition(const rclcpp::GuardCondition * guard_condition) +ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition) { - if (guard_condition == nullptr) { - throw std::runtime_error("Attempting to remove null guard condition."); - } std::lock_guard lock(guard_condition_mutex_); - to_remove_.push_back(guard_condition); + if (notify_guard_conditions_.count(guard_condition) != 0) { + notify_guard_conditions_.erase(guard_condition); + } } size_t ExecutorNotifyWaitable::get_number_of_ready_guard_conditions() { std::lock_guard lock(guard_condition_mutex_); - - for (auto add_guard_condition : to_add_) { - auto guard_it = std::find( - notify_guard_conditions_.begin(), - notify_guard_conditions_.end(), - add_guard_condition); - if (guard_it == notify_guard_conditions_.end()) { - notify_guard_conditions_.push_back(add_guard_condition); - } - } - to_add_.clear(); - - for (auto remove_guard_condition : to_remove_) { - auto guard_it = std::find( - notify_guard_conditions_.begin(), - notify_guard_conditions_.end(), - remove_guard_condition); - if (guard_it != notify_guard_conditions_.end()) { - notify_guard_conditions_.erase(guard_it); - } - } - to_remove_.clear(); - return notify_guard_conditions_.size(); } diff --git a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp index 7b4a70d411..2b08b746b6 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp @@ -45,7 +45,7 @@ NodeBase::NodeBase( node_handle_(nullptr), default_callback_group_(default_callback_group), associated_with_executor_(false), - notify_guard_condition_(context), + notify_guard_condition_(std::make_shared(context)), notify_guard_condition_is_valid_(false) { // Create the rcl node and store it in a shared_ptr with a custom destructor. @@ -254,7 +254,7 @@ NodeBase::get_associated_with_executor_atomic() return associated_with_executor_; } -rclcpp::GuardCondition & +rclcpp::GuardCondition::SharedPtr NodeBase::get_notify_guard_condition() { std::lock_guard notify_condition_lock(notify_guard_condition_mutex_); @@ -264,6 +264,16 @@ NodeBase::get_notify_guard_condition() return notify_guard_condition_; } +void +NodeBase::trigger_notify_guard_condition() +{ + std::lock_guard notify_condition_lock(notify_guard_condition_mutex_); + if (!notify_guard_condition_is_valid_) { + throw std::runtime_error("failed to trigger notify guard condition because it is invalid"); + } + notify_guard_condition_->trigger(); +} + bool NodeBase::get_use_intra_process_default() const { diff --git a/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp b/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp index e13ec7cd4c..bf47a1e3b5 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp @@ -533,9 +533,8 @@ NodeGraph::notify_graph_change() } } graph_cv_.notify_all(); - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( std::string("failed to notify wait set on graph change: ") + ex.what()); diff --git a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp index 2f1afd3224..e9c4a5266e 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp @@ -42,9 +42,8 @@ NodeServices::add_service( group->add_service(service_base_ptr); // Notify the executor that a new service was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( @@ -69,9 +68,8 @@ NodeServices::add_client( group->add_client(client_base_ptr); // Notify the executor that a new client was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp b/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp index d2e821a9e6..96097def22 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp @@ -42,9 +42,8 @@ NodeTimers::add_timer( } callback_group->add_timer(timer); - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); callback_group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 167a35f35d..659129d62c 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -70,9 +70,8 @@ NodeTopics::add_publisher( } // Notify the executor that a new publisher was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); callback_group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( @@ -119,9 +118,8 @@ NodeTopics::add_subscription( } // Notify the executor that a new subscription was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); callback_group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp b/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp index 1d1fe2ce59..02a9de82b0 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp @@ -42,9 +42,8 @@ NodeWaitables::add_waitable( group->add_waitable(waitable_ptr); // Notify the executor that a new waitable was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp b/rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp index 8c9aaad819..a286de5fa8 100644 --- a/rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp +++ b/rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp @@ -61,10 +61,10 @@ TEST_F(TestExecutorNotifyWaitable, add_remove_guard_conditions) { std::make_shared(on_execute_callback); auto node = std::make_shared("my_node", "/ns"); - auto & notify_guard_condition = node->get_node_base_interface()->get_notify_guard_condition(); + auto notify_guard_condition = node->get_node_base_interface()->get_notify_guard_condition(); - EXPECT_NO_THROW(waitable->add_guard_condition(¬ify_guard_condition)); - EXPECT_NO_THROW(waitable->remove_guard_condition(¬ify_guard_condition)); + EXPECT_NO_THROW(waitable->add_guard_condition(notify_guard_condition)); + EXPECT_NO_THROW(waitable->remove_guard_condition(notify_guard_condition)); } TEST_F(TestExecutorNotifyWaitable, wait) { @@ -75,8 +75,8 @@ TEST_F(TestExecutorNotifyWaitable, wait) { std::make_shared(on_execute_callback); auto node = std::make_shared("my_node", "/ns"); - auto & notify_guard_condition = node->get_node_base_interface()->get_notify_guard_condition(); - EXPECT_NO_THROW(waitable->add_guard_condition(¬ify_guard_condition)); + auto notify_guard_condition = node->get_node_base_interface()->get_notify_guard_condition(); + EXPECT_NO_THROW(waitable->add_guard_condition(notify_guard_condition)); auto default_cbg = node->get_node_base_interface()->get_default_callback_group(); ASSERT_NE(nullptr, default_cbg->get_notify_guard_condition());