From 974e8455824dd1b95a6478c950859549e231a7fd Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Wed, 29 Mar 2023 17:59:14 +0000 Subject: [PATCH] Utilize rclcpp::WaitSet as part of the executors Signed-off-by: Michael Carroll --- rclcpp/CMakeLists.txt | 2 - rclcpp/include/rclcpp/executor.hpp | 169 +---- rclcpp/include/rclcpp/executors.hpp | 1 - .../detail/storage_policy_common.hpp | 4 + .../thread_safe_synchronization.hpp | 3 + rclcpp/include/rclcpp/wait_set_template.hpp | 1 + rclcpp/src/rclcpp/executor.cpp | 663 +++++------------- .../executor_entities_collection.cpp | 1 + .../executors/executor_notify_waitable.cpp | 2 + rclcpp/test/benchmark/benchmark_executor.cpp | 127 ---- rclcpp/test/rclcpp/CMakeLists.txt | 17 - .../test/rclcpp/executors/test_executors.cpp | 10 +- .../test_add_callback_groups_to_executor.cpp | 10 +- rclcpp/test/rclcpp/test_executor.cpp | 109 +-- 14 files changed, 207 insertions(+), 912 deletions(-) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3fba0d7c08..72fb24de3b 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -59,8 +59,6 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executors/executor_notify_waitable.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp - src/rclcpp/executors/static_executor_entities_collector.cpp - src/rclcpp/executors/static_single_threaded_executor.cpp src/rclcpp/expand_topic_or_service_name.cpp src/rclcpp/future_return_code.cpp src/rclcpp/generic_publisher.cpp diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 3e654faa54..115d755626 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -29,26 +30,24 @@ #include "rcl/guard_condition.h" #include "rcl/wait.h" +#include "rclcpp/executors/executor_notify_waitable.hpp" #include "rcpputils/scope_exit.hpp" #include "rclcpp/context.hpp" #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/executor_options.hpp" +#include "rclcpp/executors/executor_entities_collection.hpp" +#include "rclcpp/executors/executor_entities_collector.hpp" #include "rclcpp/future_return_code.hpp" -#include "rclcpp/memory_strategies.hpp" -#include "rclcpp/memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/wait_set.hpp" namespace rclcpp { -typedef std::map> WeakCallbackGroupsToNodesMap; - // Forward declaration is used in convenience method signature. class Node; @@ -402,17 +401,6 @@ class Executor void cancel(); - /// Support dynamic switching of the memory strategy. - /** - * Switching the memory strategy while the executor is spinning in another threading could have - * unintended consequences. - * \param[in] memory_strategy Shared pointer to the memory strategy to set. - * \throws std::runtime_error if memory_strategy is null - */ - RCLCPP_PUBLIC - void - set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy); - /// Returns true if the executor is currently spinning. /** * This function can be called asynchronously from any thread. @@ -497,6 +485,10 @@ class Executor static void execute_client(rclcpp::ClientBase::SharedPtr client); + RCLCPP_PUBLIC + void + collect_entities(); + /// Block until more work becomes avilable or timeout is reached. /** * Builds a set of waitable entities, which are passed to the middleware. @@ -508,62 +500,6 @@ class Executor void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - /// Find node associated with a callback group - /** - * \param[in] weak_groups_to_nodes map of callback groups to nodes - * \param[in] group callback group to find assocatiated node - * \return Pointer to associated node if found, else nullptr - */ - RCLCPP_PUBLIC - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr - get_node_by_group( - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - rclcpp::CallbackGroup::SharedPtr group); - - /// Return true if the node has been added to this executor. - /** - * \param[in] node_ptr a shared pointer that points to a node base interface - * \param[in] weak_groups_to_nodes map to nodes to lookup - * \return true if the node is associated with the executor, otherwise false - */ - RCLCPP_PUBLIC - bool - has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; - - /// Find the callback group associated with a timer - /** - * \param[in] timer Timer to find associated callback group - * \return Pointer to callback group node if found, else nullptr - */ - RCLCPP_PUBLIC - rclcpp::CallbackGroup::SharedPtr - get_group_by_timer(rclcpp::TimerBase::SharedPtr timer); - - /// Add a callback group to an executor - /** - * \see rclcpp::Executor::add_callback_group - */ - RCLCPP_PUBLIC - virtual void - add_callback_group_to_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); - - /// Remove a callback group from the executor. - /** - * \see rclcpp::Executor::remove_callback_group - */ - RCLCPP_PUBLIC - virtual void - remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); - /// Check for executable in ready state and populate union structure. /** * \param[out] any_executable populated union structure of ready executable @@ -574,33 +510,6 @@ class Executor bool get_next_ready_executable(AnyExecutable & any_executable); - /// Check for executable in ready state and populate union structure. - /** - * This is the implementation of get_next_ready_executable that takes into - * account the current state of callback groups' association with nodes and - * executors. - * - * This checks in a particular order for available work: - * * Timers - * * Subscriptions - * * Services - * * Clients - * * Waitable - * - * If the next executable is not associated with this executor/node pair, - * then this method will return false. - * - * \param[out] any_executable populated union structure of ready executable - * \param[in] weak_groups_to_nodes mapping of callback groups to nodes - * \return true if an executable was ready and any_executable was populated, - * otherwise false - */ - RCLCPP_PUBLIC - bool - get_next_ready_executable_from_map( - AnyExecutable & any_executable, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - /// Wait for executable in ready state and populate union structure. /** * If an executable is ready, it will return immediately, otherwise @@ -618,21 +527,6 @@ class Executor AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - /// Add all callback groups that can be automatically added from associated nodes. - /** - * The executor, before collecting entities, verifies if any callback group from - * nodes associated with the executor, which is not already associated to an executor, - * can be automatically added to this executor. - * This takes care of any callback group that has been added to a node but not explicitly added - * to the executor. - * It is important to note that in order for the callback groups to be automatically added to an - * executor through this function, the node of the callback groups needs to have been added - * through the `add_node` method. - */ - RCLCPP_PUBLIC - virtual void - add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_); - /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; @@ -642,16 +536,8 @@ class Executor /// Guard condition for signaling the rmw layer to wake up for system shutdown. std::shared_ptr shutdown_guard_condition_; - /// Wait set for managing entities that the rmw layer waits on. - rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); - - // Mutex to protect the subsequent memory_strategy_. mutable std::mutex mutex_; - /// The memory strategy: an interface for handling user-defined memory allocation strategies. - memory_strategy::MemoryStrategy::SharedPtr - memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_); - /// The context associated with this executor. std::shared_ptr context_; @@ -661,39 +547,14 @@ class Executor virtual void spin_once_impl(std::chrono::nanoseconds timeout); - typedef std::map> - WeakNodesToGuardConditionsMap; - - typedef std::map> - WeakCallbackGroupsToGuardConditionsMap; - - /// maps nodes to guard conditions - WeakNodesToGuardConditionsMap - weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - - /// maps callback groups to guard conditions - WeakCallbackGroupsToGuardConditionsMap - weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - - /// maps callback groups associated to nodes - WeakCallbackGroupsToNodesMap - weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - - /// maps callback groups to nodes associated with executor - WeakCallbackGroupsToNodesMap - weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::shared_ptr notify_waitable_; + rclcpp::executors::ExecutorEntitiesCollector collector_; - /// maps all callback groups to nodes - WeakCallbackGroupsToNodesMap - weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + rclcpp::executors::ExecutorEntitiesCollection current_collection_; + std::shared_ptr current_notify_waitable_; - /// nodes that are associated with the executor - std::list - weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::shared_ptr wait_set_; + std::deque ready_executables_; /// shutdown callback handle registered to Context rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_; diff --git a/rclcpp/include/rclcpp/executors.hpp b/rclcpp/include/rclcpp/executors.hpp index 36fb0d63cf..85220a8899 100644 --- a/rclcpp/include/rclcpp/executors.hpp +++ b/rclcpp/include/rclcpp/executors.hpp @@ -20,7 +20,6 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" -#include "rclcpp/executors/static_single_threaded_executor.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" diff --git a/rclcpp/include/rclcpp/wait_set_policies/detail/storage_policy_common.hpp b/rclcpp/include/rclcpp/wait_set_policies/detail/storage_policy_common.hpp index 2db3ff630a..69b744877c 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/detail/storage_policy_common.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/detail/storage_policy_common.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__WAIT_SET_POLICIES__DETAIL__STORAGE_POLICY_COMMON_HPP_ #define RCLCPP__WAIT_SET_POLICIES__DETAIL__STORAGE_POLICY_COMMON_HPP_ +#include #include #include #include @@ -79,6 +80,7 @@ class StoragePolicyCommon throw std::runtime_error("unexpected condition, fixed storage policy needs pruning"); } // Flag for pruning. + std::cout << "waitable needs pruning" << std::endl; needs_pruning_ = true; continue; } @@ -86,6 +88,8 @@ class StoragePolicyCommon rclcpp::Waitable & waitable = *waitable_ptr_pair.second; subscriptions_from_waitables += waitable.get_number_of_ready_subscriptions(); guard_conditions_from_waitables += waitable.get_number_of_ready_guard_conditions(); + std::cout << "guard_conditions_from_waitables: " << guard_conditions_from_waitables << + std::endl; timers_from_waitables += waitable.get_number_of_ready_timers(); clients_from_waitables += waitable.get_number_of_ready_clients(); services_from_waitables += waitable.get_number_of_ready_services(); diff --git a/rclcpp/include/rclcpp/wait_set_policies/thread_safe_synchronization.hpp b/rclcpp/include/rclcpp/wait_set_policies/thread_safe_synchronization.hpp index 4a4fb16547..a7bd5ecf80 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/thread_safe_synchronization.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/thread_safe_synchronization.hpp @@ -279,9 +279,11 @@ class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon // storage wait sets since they cannot be changed after construction. // This will also clear the wait set and re-add all the entities, which // prepares it to be waited on again. + std::cout << "rebuild_rcl_wait_set" << std::endl; rebuild_rcl_wait_set(); } + std::cout << "get_rcl_wait_set" << std::endl; rcl_wait_set_t & rcl_wait_set = get_rcl_wait_set(); // Wait unconditionally until timeout condition occurs since we assume @@ -297,6 +299,7 @@ class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon // It is ok to wait while not having the lock acquired, because the state // in the rcl wait set will not be updated until this method calls // rebuild_rcl_wait_set(). + std::cout << "rcl_wait" << std::endl; rcl_ret_t ret = rcl_wait(&rcl_wait_set, time_left_to_wait_ns.count()); if (RCL_RET_OK == ret) { // Something has become ready in the wait set, first check if it was diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index b3f41f8ed5..c139913be8 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -661,6 +661,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli this->storage_acquire_ownerships(); RCPPUTILS_SCOPE_EXIT({this->storage_release_ownerships();}); + // this method comes from the SynchronizationPolicy return this->template sync_wait>( // pass along the time_to_wait duration as nanoseconds diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 747b784878..29a30d64e2 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -22,12 +23,13 @@ #include "rcl/allocator.h" #include "rcl/error_handling.h" +#include "rclcpp/executors/executor_notify_waitable.hpp" +#include "rclcpp/subscription_wait_set_mask.hpp" #include "rcpputils/scope_exit.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/executor.hpp" #include "rclcpp/guard_condition.hpp" -#include "rclcpp/memory_strategy.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" @@ -37,14 +39,21 @@ using namespace std::chrono_literals; -using rclcpp::exceptions::throw_from_rcl_error; using rclcpp::Executor; +static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false}; + Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(std::make_shared(options.context)), shutdown_guard_condition_(std::make_shared(options.context)), - memory_strategy_(options.memory_strategy) + notify_waitable_(std::make_shared( + [this]() { + this->collect_entities(); + })), + collector_(notify_waitable_), + current_notify_waitable_(notify_waitable_), + wait_set_(std::make_shared()) { // Store the context for later use. context_ = options.context; @@ -57,74 +66,50 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) } }); - // The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond, - // and one for the executor's guard cond (interrupt_guard_condition_) - memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get()); - - // Put the executor's guard condition in - memory_strategy_->add_guard_condition(*interrupt_guard_condition_.get()); - rcl_allocator_t allocator = memory_strategy_->get_allocator(); - - rcl_ret_t ret = rcl_wait_set_init( - &wait_set_, - 0, 2, 0, 0, 0, 0, - context_->get_rcl_context().get(), - allocator); - if (RCL_RET_OK != ret) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "failed to create wait set: %s", rcl_get_error_string().str); - rcl_reset_error(); - throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor"); - } + notify_waitable_->add_guard_condition(interrupt_guard_condition_); + notify_waitable_->add_guard_condition(shutdown_guard_condition_); } Executor::~Executor() { - // Disassociate all callback groups. - for (auto & pair : weak_groups_to_nodes_) { - auto group = pair.first.lock(); - if (group) { - std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); - has_executor.store(false); - } - } - // Disassociate all nodes. - std::for_each( - weak_nodes_.begin(), weak_nodes_.end(), [] - (rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) { - auto shared_node_ptr = weak_node_ptr.lock(); - if (shared_node_ptr) { - std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); - } + std::lock_guard lock(mutex_); + + notify_waitable_->remove_guard_condition(interrupt_guard_condition_); + notify_waitable_->remove_guard_condition(shutdown_guard_condition_); + + current_collection_.timers.update( + {}, + [this](auto timer) {wait_set_->add_timer(timer);}, + [this](auto timer) {wait_set_->remove_timer(timer);}); + + current_collection_.subscriptions.update( + {}, + [this](auto subscription) { + wait_set_->add_subscription(subscription, kDefaultSubscriptionMask); + }, + [this](auto subscription) { + wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask); }); - weak_nodes_.clear(); - weak_groups_associated_with_executor_to_nodes_.clear(); - weak_groups_to_nodes_associated_with_executor_.clear(); - weak_groups_to_nodes_.clear(); - for (const auto & pair : weak_groups_to_guard_conditions_) { - auto guard_condition = pair.second; - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_groups_to_guard_conditions_.clear(); - for (const auto & pair : weak_nodes_to_guard_conditions_) { - auto guard_condition = pair.second; - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_nodes_to_guard_conditions_.clear(); + current_collection_.clients.update( + {}, + [this](auto client) {wait_set_->add_client(client);}, + [this](auto client) {wait_set_->remove_client(client);}); - // Finalize the wait set. - if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "failed to destroy wait set: %s", rcl_get_error_string().str); - rcl_reset_error(); - } - // Remove and release the sigint guard condition - memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get()); - memory_strategy_->remove_guard_condition(interrupt_guard_condition_.get()); + current_collection_.services.update( + {}, + [this](auto service) {wait_set_->add_service(service);}, + [this](auto service) {wait_set_->remove_service(service);}); + + current_collection_.guard_conditions.update( + {}, + [this](auto guard_condition) {wait_set_->add_guard_condition(guard_condition);}, + [this](auto guard_condition) {wait_set_->remove_guard_condition(guard_condition);}); + + current_collection_.waitables.update( + {}, + [this](auto waitable) {wait_set_->add_waitable(waitable);}, + [this](auto waitable) {wait_set_->remove_waitable(waitable);}); // Remove shutdown callback handle registered to Context if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) { @@ -138,95 +123,38 @@ Executor::~Executor() std::vector Executor::get_all_callback_groups() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; + std::lock_guard lock(mutex_); + return this->collector_.get_all_callback_groups(); } std::vector Executor::get_manually_added_callback_groups() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - return groups; + std::lock_guard lock(mutex_); + return this->collector_.get_manually_added_callback_groups(); } std::vector Executor::get_automatically_added_callback_groups_from_nodes() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; -} - -void -Executor::add_callback_groups_from_nodes_associated_to_executor() -{ - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node) { - node->for_each_callback_group( - [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) - { - if ( - shared_group_ptr->automatically_add_to_executor_with_node() && - !shared_group_ptr->get_associated_with_executor_atomic().load()) - { - this->add_callback_group_to_map( - shared_group_ptr, - node, - weak_groups_to_nodes_associated_with_executor_, - true); - } - }); - } - } + std::lock_guard lock(mutex_); + return this->collector_.get_automatically_added_callback_groups(); } void -Executor::add_callback_group_to_map( +Executor::add_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, bool notify) { - // If the callback_group already has an executor - std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error("Callback group has already been added to an executor."); - } - - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto insert_info = - weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr)); - bool was_inserted = insert_info.second; - if (!was_inserted) { - throw std::runtime_error("Callback group was already added to executor."); - } - // Also add to the map that contains all callback groups - weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr)); - - if (node_ptr->get_context()->is_valid()) { - auto callback_group_guard_condition = group_ptr->get_notify_guard_condition(); - weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get(); - // Add the callback_group's notify condition to the guard condition handles - memory_strategy_->add_guard_condition(*callback_group_guard_condition); + (void) node_ptr; + { + std::lock_guard lock(mutex_); + this->collector_.add_callback_group(group_ptr); } if (notify) { - // Interrupt waiting to handle new node + // Interrupt waiting to handle removed callback group try { interrupt_guard_condition_->trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { @@ -237,91 +165,22 @@ Executor::add_callback_group_to_map( } } -void -Executor::add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify) -{ - std::lock_guard guard{mutex_}; - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_associated_with_executor_to_nodes_, - notify); -} - void Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - // If the node already has an executor - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error( - std::string("Node '") + node_ptr->get_fully_qualified_name() + - "' has already been added to an executor."); + { + std::lock_guard lock(mutex_); + this->collector_.add_node(node_ptr); } - std::lock_guard guard{mutex_}; - node_ptr->for_each_callback_group( - [this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - if (!group_ptr->get_associated_with_executor_atomic().load() && - group_ptr->automatically_add_to_executor_with_node()) - { - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); - } - }); - const auto gc = node_ptr->get_notify_guard_condition(); - weak_nodes_to_guard_conditions_[node_ptr] = gc.get(); - // Add the node's notify condition to the guard condition handles - memory_strategy_->add_guard_condition(*gc); - weak_nodes_.push_back(node_ptr); -} - -void -Executor::remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify) -{ - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter != weak_groups_to_nodes.end()) { - node_ptr = iter->second.lock(); - if (node_ptr == nullptr) { - throw std::runtime_error("Node must not be deleted before its callback group(s)."); - } - weak_groups_to_nodes.erase(iter); - weak_groups_to_nodes_.erase(group_ptr); - std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); - } else { - throw std::runtime_error("Callback group needs to be associated with executor."); - } - // If the node was matched and removed, interrupt waiting. - if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) && - !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_)) - { - auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr); - if (iter != weak_groups_to_guard_conditions_.end()) { - memory_strategy_->remove_guard_condition(iter->second); - } - weak_groups_to_guard_conditions_.erase(weak_group_ptr); - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group remove: ") + ex.what()); - } + if (notify) { + // Interrupt waiting to handle removed callback group + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); } } } @@ -331,11 +190,21 @@ Executor::remove_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) { - std::lock_guard guard{mutex_}; - this->remove_callback_group_from_map( - group_ptr, - weak_groups_associated_with_executor_to_nodes_, - notify); + { + std::lock_guard lock(mutex_); + this->collector_.remove_callback_group(group_ptr); + } + + if (notify) { + // Interrupt waiting to handle removed callback group + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); + } + } } void @@ -347,48 +216,21 @@ Executor::add_node(std::shared_ptr node_ptr, bool notify) void Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - if (!node_ptr->get_associated_with_executor_atomic().load()) { - throw std::runtime_error("Node needs to be associated with an executor."); - } - - std::lock_guard guard{mutex_}; - bool found_node = false; - auto node_it = weak_nodes_.begin(); - while (node_it != weak_nodes_.end()) { - bool matched = (node_it->lock() == node_ptr); - if (matched) { - found_node = true; - node_it = weak_nodes_.erase(node_it); - } else { - ++node_it; - } - } - if (!found_node) { - throw std::runtime_error("Node needs to be associated with this executor."); + { + std::lock_guard lock(mutex_); + this->collector_.remove_node(node_ptr); } - for (auto it = weak_groups_to_nodes_associated_with_executor_.begin(); - it != weak_groups_to_nodes_associated_with_executor_.end(); ) - { - auto weak_node_ptr = it->second; - auto shared_node_ptr = weak_node_ptr.lock(); - auto group_ptr = it->first.lock(); - - // Increment iterator before removing in case it's invalidated - it++; - if (shared_node_ptr == node_ptr) { - remove_callback_group_from_map( - group_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); + if (notify) { + // Interrupt waiting to handle removed callback group + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } - - memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition().get()); - weak_nodes_to_guard_conditions_.erase(node_ptr); - - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); } void @@ -504,22 +346,13 @@ Executor::cancel() } } -void -Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy) -{ - if (memory_strategy == nullptr) { - throw std::runtime_error("Received NULL memory strategy in executor."); - } - std::lock_guard guard{mutex_}; - memory_strategy_ = memory_strategy; -} - void Executor::execute_any_executable(AnyExecutable & any_exec) { if (!spinning.load()) { return; } + if (any_exec.timer) { TRACEPOINT( rclcpp_executor_execute, @@ -541,16 +374,10 @@ Executor::execute_any_executable(AnyExecutable & any_exec) if (any_exec.waitable) { any_exec.waitable->execute(any_exec.data); } + // Reset the callback_group, regardless of type - any_exec.callback_group->can_be_taken_from().store(true); - // Wake the wait, because it may need to be recalculated or work that - // was previously blocked is now available. - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition from execute_any_executable: ") + ex.what()); + if (any_exec.callback_group) { + any_exec.callback_group->can_be_taken_from().store(true); } } @@ -691,227 +518,99 @@ Executor::execute_client( } void -Executor::wait_for_work(std::chrono::nanoseconds timeout) +Executor::collect_entities() { - TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); - { - std::lock_guard guard(mutex_); - - // Check weak_nodes_ to find any callback group that is not owned - // by an executor and add it to the list of callbackgroups for - // collect entities. Also exchange to false so it is not - // allowed to add to another executor - add_callback_groups_from_nodes_associated_to_executor(); - - // Collect the subscriptions and timers to be waited on - memory_strategy_->clear_handles(); - bool has_invalid_weak_groups_or_nodes = - memory_strategy_->collect_entities(weak_groups_to_nodes_); - - if (has_invalid_weak_groups_or_nodes) { - std::vector invalid_group_ptrs; - for (auto pair : weak_groups_to_nodes_) { - auto weak_group_ptr = pair.first; - auto weak_node_ptr = pair.second; - if (weak_group_ptr.expired() || weak_node_ptr.expired()) { - invalid_group_ptrs.push_back(weak_group_ptr); - auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr); - if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) { - auto guard_condition = node_guard_pair->second; - weak_nodes_to_guard_conditions_.erase(weak_node_ptr); - memory_strategy_->remove_guard_condition(guard_condition); - } - } - } - std::for_each( - invalid_group_ptrs.begin(), invalid_group_ptrs.end(), - [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { - if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) != - weak_groups_to_nodes_associated_with_executor_.end()) - { - weak_groups_to_nodes_associated_with_executor_.erase(group_ptr); - } - if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) != - weak_groups_associated_with_executor_to_nodes_.end()) - { - weak_groups_associated_with_executor_to_nodes_.erase(group_ptr); - } - auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr); - if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) { - auto guard_condition = callback_guard_pair->second; - weak_groups_to_guard_conditions_.erase(group_ptr); - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_groups_to_nodes_.erase(group_ptr); - }); - } - - // clear wait set - rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); - if (ret != RCL_RET_OK) { - throw_from_rcl_error(ret, "Couldn't clear wait set"); - } + std::lock_guard guard(mutex_); + rclcpp::executors::ExecutorEntitiesCollection collection; + this->collector_.update_collections(); + auto callback_groups = this->collector_.get_all_callback_groups(); + rclcpp::executors::build_entities_collection(callback_groups, collection); + + if (notify_waitable_) { + // Make a copy of notify waitable so we can continue to mutate the original + // one outside of the execute loop. + *current_notify_waitable_ = *notify_waitable_; + auto notify_waitable = std::static_pointer_cast(current_notify_waitable_); + collection.waitables.insert({notify_waitable.get(), {notify_waitable, {}}}); + } + + current_collection_.timers.update( + collection.timers, + [this](auto timer) {wait_set_->add_timer(timer);}, + [this](auto timer) {wait_set_->remove_timer(timer);}); + + current_collection_.subscriptions.update( + collection.subscriptions, + [this](auto subscription) { + wait_set_->add_subscription(subscription, kDefaultSubscriptionMask); + }, + [this](auto subscription) { + wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask); + }); - // The size of waitables are accounted for in size of the other entities - ret = rcl_wait_set_resize( - &wait_set_, memory_strategy_->number_of_ready_subscriptions(), - memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), - memory_strategy_->number_of_ready_events()); - if (RCL_RET_OK != ret) { - throw_from_rcl_error(ret, "Couldn't resize the wait set"); - } + current_collection_.clients.update( + collection.clients, + [this](auto client) {wait_set_->add_client(client);}, + [this](auto client) {wait_set_->remove_client(client);}); - if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { - throw std::runtime_error("Couldn't fill wait set"); - } - } + current_collection_.services.update( + collection.services, + [this](auto service) {wait_set_->add_service(service);}, + [this](auto service) {wait_set_->remove_service(service);}); - rcl_ret_t status = - rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); - if (status == RCL_RET_WAIT_SET_EMPTY) { - RCUTILS_LOG_WARN_NAMED( - "rclcpp", - "empty wait set received in rcl_wait(). This should never happen."); - } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { - using rclcpp::exceptions::throw_from_rcl_error; - throw_from_rcl_error(status, "rcl_wait() failed"); - } + current_collection_.guard_conditions.update( + collection.guard_conditions, + [this](auto guard_condition) {wait_set_->add_guard_condition(guard_condition);}, + [this](auto guard_condition) {wait_set_->remove_guard_condition(guard_condition);}); - // check the null handles in the wait set and remove them from the handles in memory strategy - // for callback-based entities - std::lock_guard guard(mutex_); - memory_strategy_->remove_null_handles(&wait_set_); + current_collection_.waitables.update( + collection.waitables, + [this](auto waitable) {wait_set_->add_waitable(waitable);}, + [this](auto waitable) {wait_set_->remove_waitable(waitable);}); } -rclcpp::node_interfaces::NodeBaseInterface::SharedPtr -Executor::get_node_by_group( - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes, - rclcpp::CallbackGroup::SharedPtr group) +void +Executor::wait_for_work(std::chrono::nanoseconds timeout) { - if (!group) { - return nullptr; - } - rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group); - const auto finder = weak_groups_to_nodes.find(weak_group_ptr); - if (finder != weak_groups_to_nodes.end()) { - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock(); - return node_ptr; - } - return nullptr; -} + TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count()); -rclcpp::CallbackGroup::SharedPtr -Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) -{ - std::lock_guard guard{mutex_}; - for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { - auto group = pair.first.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { - return timer_ptr == timer; - }); - if (timer_ref) { - return group; - } + if (current_collection_.empty() || this->collector_.has_pending()) { + this->collect_entities(); } - for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { - auto group = pair.first.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { - return timer_ptr == timer; - }); - if (timer_ref) { - return group; - } + auto wait_result = wait_set_->wait(timeout); + + if (wait_result.kind() == WaitResultKind::Empty) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in wait(). This should never happen."); } - return nullptr; -} -bool -Executor::get_next_ready_executable(AnyExecutable & any_executable) -{ - bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_); - return success; + std::lock_guard guard{mutex_}; + ready_executables_ = rclcpp::executors::ready_executables(current_collection_, wait_result); } bool -Executor::get_next_ready_executable_from_map( - AnyExecutable & any_executable, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) +Executor::get_next_ready_executable(AnyExecutable & any_executable) { TRACEPOINT(rclcpp_executor_get_next_ready); - bool success = false; std::lock_guard guard{mutex_}; - // Check the timers to see if there are any that are ready - memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes); - if (any_executable.timer) { - success = true; - } - if (!success) { - // Check the subscriptions to see if there are any that are ready - memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes); - if (any_executable.subscription) { - success = true; - } - } - if (!success) { - // Check the services to see if there are any that are ready - memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes); - if (any_executable.service) { - success = true; - } - } - if (!success) { - // Check the clients to see if there are any that are ready - memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes); - if (any_executable.client) { - success = true; - } - } - if (!success) { - // Check the waitables to see if there are any that are ready - memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes); - if (any_executable.waitable) { - any_executable.data = any_executable.waitable->take_data(); - success = true; - } - } - // At this point any_executable should be valid with either a valid subscription - // or a valid timer, or it should be a null shared_ptr - if (success) { - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group; - auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter == weak_groups_to_nodes.end()) { - success = false; - } + + if (ready_executables_.size() == 0) { + return false; } - if (success) { - // If it is valid, check to see if the group is mutually exclusive or - // not, then mark it accordingly ..Check if the callback_group belongs to this executor - if (any_executable.callback_group && any_executable.callback_group->type() == \ - CallbackGroupType::MutuallyExclusive) - { - // It should not have been taken otherwise - assert(any_executable.callback_group->can_be_taken_from().load()); - // Set to false to indicate something is being run from this group - // This is reset to true either when the any_executable is executed or when the - // any_executable is destructued - any_executable.callback_group->can_be_taken_from().store(false); - } + any_executable = ready_executables_.front(); + ready_executables_.pop_front(); + + if (any_executable.callback_group && + any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) + { + assert(any_executable.callback_group->can_be_taken_from().load()); + any_executable.callback_group->can_be_taken_from().store(false); } - // If there is no ready executable, return false - return success; + + return true; } bool @@ -934,22 +633,6 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos return success; } -// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. -bool -Executor::has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) const -{ - return std::find_if( - weak_groups_to_nodes.begin(), - weak_groups_to_nodes.end(), - [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { - auto other_ptr = other.second.lock(); - return other_ptr == node_ptr; - }) != weak_groups_to_nodes.end(); -} - bool Executor::is_spinning() { diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index 2dce369b08..a10e8d694f 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -121,6 +121,7 @@ void check_ready( exec.callback_group = callback_group; if (fill_executable(exec, entity)) { executables.push_back(exec); + } else { } } } diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp index c0ad8a25a4..6d9fc01361 100644 --- a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -67,6 +67,7 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set) bool ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) { + std::cout << "ExecutorNotifyWaitable::is_ready" << std::endl; std::lock_guard lock(guard_condition_mutex_); bool any_ready = false; @@ -89,6 +90,7 @@ ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) void ExecutorNotifyWaitable::execute(std::shared_ptr & data) { + std::cout << "ExecutorNotifyWaitable::execute" << std::endl; (void) data; this->execute_callback_(); } diff --git a/rclcpp/test/benchmark/benchmark_executor.cpp b/rclcpp/test/benchmark/benchmark_executor.cpp index 652007b589..c2b42752f2 100644 --- a/rclcpp/test/benchmark/benchmark_executor.cpp +++ b/rclcpp/test/benchmark/benchmark_executor.cpp @@ -189,71 +189,6 @@ BENCHMARK_F(PerformanceTestExecutorSimple, multi_thread_executor_remove_node)(be } } -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_add_node)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - for (auto _ : st) { - (void)_; - executor.add_node(node); - st.PauseTiming(); - executor.remove_node(node); - st.ResumeTiming(); - } -} - -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_remove_node)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - for (auto _ : st) { - (void)_; - st.PauseTiming(); - executor.add_node(node); - st.ResumeTiming(); - executor.remove_node(node); - } -} - -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_spin_until_future_complete)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - // test success of an immediately finishing future - std::promise promise; - std::future future = promise.get_future(); - promise.set_value(true); - auto shared_future = future.share(); - - auto ret = executor.spin_until_future_complete(shared_future, 100ms); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - } - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - // static_single_thread_executor has a special design. We need to add/remove the node each - // time you call spin - st.PauseTiming(); - executor.add_node(node); - st.ResumeTiming(); - - ret = executor.spin_until_future_complete(shared_future, 100ms); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - break; - } - st.PauseTiming(); - executor.remove_node(node); - st.ResumeTiming(); - } -} - BENCHMARK_F( PerformanceTestExecutorSimple, single_thread_executor_spin_node_until_future_complete)(benchmark::State & st) @@ -314,29 +249,6 @@ BENCHMARK_F( } } -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_spin_node_until_future_complete)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - // test success of an immediately finishing future - std::promise promise; - std::future future = promise.get_future(); - promise.set_value(true); - auto shared_future = future.share(); - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - auto ret = rclcpp::executors::spin_node_until_future_complete( - executor, node, shared_future, 1s); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - break; - } - } -} BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark::State & st) { @@ -362,42 +274,3 @@ BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark } } } - -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_executor_entities_collector_execute)(benchmark::State & st) -{ - rclcpp::executors::StaticExecutorEntitiesCollector::SharedPtr entities_collector_ = - std::make_shared(); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - rcl_ret_t ret = rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator); - if (ret != RCL_RET_OK) { - st.SkipWithError(rcutils_get_error_string().str); - } - RCPPUTILS_SCOPE_EXIT( - { - rcl_ret_t ret = rcl_wait_set_fini(&wait_set); - if (ret != RCL_RET_OK) { - st.SkipWithError(rcutils_get_error_string().str); - } - }); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - std::shared_ptr data = entities_collector_->take_data(); - entities_collector_->execute(data); - } -} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index ddb219b558..b822e6e24f 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -663,14 +663,6 @@ if(TARGET test_executors) target_link_libraries(test_executors ${PROJECT_NAME}) endif() -ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp - APPEND_LIBRARY_DIRS "${append_library_dirs}") -if(TARGET test_static_single_threaded_executor) - ament_target_dependencies(test_static_single_threaded_executor - "test_msgs") - target_link_libraries(test_static_single_threaded_executor ${PROJECT_NAME} mimick) -endif() - ament_add_gtest(test_multi_threaded_executor executors/test_multi_threaded_executor.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_multi_threaded_executor) @@ -679,15 +671,6 @@ if(TARGET test_multi_threaded_executor) target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME}) endif() -ament_add_gtest(test_static_executor_entities_collector executors/test_static_executor_entities_collector.cpp - APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) -if(TARGET test_static_executor_entities_collector) - ament_target_dependencies(test_static_executor_entities_collector - "rcl" - "test_msgs") - target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick) -endif() - ament_add_gtest(test_entities_collector executors/test_entities_collector.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) if(TARGET test_entities_collector) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1fa2cbb4dd..1c22b17c24 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -91,8 +91,7 @@ class TestExecutorsStable : public TestExecutors {}; using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, - rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::MultiThreadedExecutor>; class ExecutorTypeNames { @@ -109,10 +108,6 @@ class ExecutorTypeNames return "MultiThreadedExecutor"; } - if (std::is_same()) { - return "StaticSingleThreadedExecutor"; - } - return ""; } }; @@ -135,6 +130,7 @@ TYPED_TEST(TestExecutors, detachOnDestruction) { { ExecutorType executor; executor.add_node(this->node); + std::cout << "here" << std::endl; } { ExecutorType executor; @@ -156,7 +152,7 @@ TYPED_TEST(TestExecutorsStable, addTemporaryNode) { } // Sleep for a short time to verify executor.spin() is going, and didn't throw. - std::thread spinner([&]() {EXPECT_NO_THROW(executor.spin());}); + std::thread spinner([&]() {executor.spin();}); std::this_thread::sleep_for(50ms); executor.cancel(); diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 02fa0b7a94..b53f47ffd5 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -49,8 +49,7 @@ class TestAddCallbackGroupsToExecutor : public ::testing::Test using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, - rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::MultiThreadedExecutor>; class ExecutorTypeNames { @@ -67,10 +66,6 @@ class ExecutorTypeNames return "MultiThreadedExecutor"; } - if (std::is_same()) { - return "StaticSingleThreadedExecutor"; - } - return ""; } }; @@ -137,9 +132,11 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) { 2s, timer_callback, cb_grp); ExecutorType executor; executor.add_callback_group(cb_grp, node->get_node_base_interface()); + const rclcpp::QoS qos(10); auto options = rclcpp::SubscriptionOptions(); auto callback = [](test_msgs::msg::Empty::ConstSharedPtr) {}; + rclcpp::CallbackGroup::SharedPtr cb_grp2 = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive); options.callback_group = cb_grp2; @@ -147,6 +144,7 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) { node->create_subscription("topic_name", qos, callback, options); executor.add_callback_group(cb_grp2, node->get_node_base_interface()); + executor.remove_callback_group(cb_grp); ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); executor.remove_callback_group(cb_grp2); diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index bdbb0a1079..e36b060d0b 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -46,23 +46,6 @@ class DummyExecutor : public rclcpp::Executor { spin_node_once_nanoseconds(node, std::chrono::milliseconds(100)); } - - rclcpp::memory_strategy::MemoryStrategy * memory_strategy_ptr() - { - return memory_strategy_.get(); - } - - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group( - rclcpp::CallbackGroup::SharedPtr group) - { - std::lock_guard guard_{mutex_}; // only to make the TSA happy - return get_node_by_group(weak_groups_to_nodes_, group); - } - - rclcpp::CallbackGroup::SharedPtr local_get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) - { - return get_group_by_timer(timer); - } }; class TestExecutor : public ::testing::Test @@ -126,13 +109,6 @@ TEST_F(TestExecutor, constructor_bad_guard_condition_init) { rclcpp::exceptions::RCLError); } -TEST_F(TestExecutor, constructor_bad_wait_set_init) { - auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_init, RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - static_cast(std::make_unique()), - std::runtime_error("Failed to create wait set in Executor constructor: error not set")); -} - TEST_F(TestExecutor, add_callback_group_twice) { DummyExecutor dummy; auto node = std::make_shared("node", "ns"); @@ -142,7 +118,7 @@ TEST_F(TestExecutor, add_callback_group_twice) { cb_group->get_associated_with_executor_atomic().exchange(false); RCLCPP_EXPECT_THROW_EQ( dummy.add_callback_group(cb_group, node->get_node_base_interface(), false), - std::runtime_error("Callback group was already added to executor.")); + std::runtime_error("Callback group has already been added to this executor.")); } TEST_F(TestExecutor, add_callback_group_failed_trigger_guard_condition) { @@ -328,24 +304,6 @@ TEST_F(TestExecutor, cancel_failed_trigger_guard_condition) { std::runtime_error("Failed to trigger guard condition in cancel: error not set")); } -TEST_F(TestExecutor, set_memory_strategy_nullptr) { - DummyExecutor dummy; - - RCLCPP_EXPECT_THROW_EQ( - dummy.set_memory_strategy(nullptr), - std::runtime_error("Received NULL memory strategy in executor.")); -} - -TEST_F(TestExecutor, set_memory_strategy) { - DummyExecutor dummy; - rclcpp::memory_strategy::MemoryStrategy::SharedPtr strategy = - std::make_shared< - rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); - - dummy.set_memory_strategy(strategy); - EXPECT_EQ(dummy.memory_strategy_ptr(), strategy.get()); -} - TEST_F(TestExecutor, spin_once_failed_trigger_guard_condition) { DummyExecutor dummy; auto node = std::make_shared("node", "ns"); @@ -417,71 +375,6 @@ TEST_F(TestExecutor, spin_some_fail_wait) { std::runtime_error("rcl_wait() failed: error not set")); } -TEST_F(TestExecutor, get_node_by_group_null_group) { - DummyExecutor dummy; - ASSERT_EQ(nullptr, dummy.local_get_node_by_group(nullptr)); -} - -TEST_F(TestExecutor, get_node_by_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - dummy.add_callback_group(cb_group, node->get_node_base_interface(), false); - ASSERT_EQ(node->get_node_base_interface().get(), dummy.local_get_node_by_group(cb_group).get()); -} - -TEST_F(TestExecutor, get_node_by_group_not_found) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - ASSERT_EQ(nullptr, dummy.local_get_node_by_group(cb_group).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_nullptr) { - DummyExecutor dummy; - ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(nullptr)); -} - -TEST_F(TestExecutor, get_group_by_timer) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_node(node); - - ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_with_deleted_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_node(node); - - cb_group.reset(); - - ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(timer).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_add_callback_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_callback_group(cb_group, node->get_node_base_interface(), false); - - ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get()); -} - TEST_F(TestExecutor, spin_until_future_complete_in_spin_until_future_complete) { DummyExecutor dummy; auto node = std::make_shared("node", "ns");