From ad3f3e5b90101a35c55bb988cb3e5c8921193542 Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Wed, 29 Mar 2023 17:59:14 +0000 Subject: [PATCH] Utilize rclcpp::WaitSet as part of the executors Signed-off-by: Michael Carroll --- rclcpp/CMakeLists.txt | 4 +- rclcpp/include/rclcpp/callback_group.hpp | 4 + rclcpp/include/rclcpp/executor.hpp | 163 +---- rclcpp/include/rclcpp/executors.hpp | 1 - .../executors/executor_entities_collector.hpp | 2 +- .../static_executor_entities_collector.hpp | 357 ---------- .../static_single_threaded_executor.hpp | 223 ------- rclcpp/src/rclcpp/executor.cpp | 615 ++++------------- .../static_executor_entities_collector.cpp | 524 --------------- .../static_single_threaded_executor.cpp | 280 -------- rclcpp/test/benchmark/benchmark_executor.cpp | 127 ---- rclcpp/test/rclcpp/CMakeLists.txt | 16 +- .../executors/test_entities_collector.cpp | 185 ++++++ .../test/rclcpp/executors/test_executors.cpp | 7 +- ...est_static_executor_entities_collector.cpp | 625 ------------------ .../test_static_single_threaded_executor.cpp | 142 ---- .../test_add_callback_groups_to_executor.cpp | 7 +- rclcpp/test/rclcpp/test_executor.cpp | 102 +-- 18 files changed, 337 insertions(+), 3047 deletions(-) delete mode 100644 rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp delete mode 100644 rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp delete mode 100644 rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp delete mode 100644 rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp create mode 100644 rclcpp/test/rclcpp/executors/test_entities_collector.cpp delete mode 100644 rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp delete mode 100644 rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 324a53c518..e080c00c79 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -55,12 +55,10 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executor.cpp src/rclcpp/executors.cpp src/rclcpp/executors/executor_notify_waitable.cpp - src/rclcpp/executors/executor_entities_collector.cpp src/rclcpp/executors/executor_entities_collection.cpp + src/rclcpp/executors/executor_entities_collector.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp - src/rclcpp/executors/static_executor_entities_collector.cpp - src/rclcpp/executors/static_single_threaded_executor.cpp src/rclcpp/expand_topic_or_service_name.cpp src/rclcpp/future_return_code.cpp src/rclcpp/generic_publisher.cpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 279c642c0e..7899a35233 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -152,6 +152,10 @@ class CallbackGroup bool has_valid_node(); + RCLCPP_PUBLIC + bool + has_valid_node(); + RCLCPP_PUBLIC std::atomic_bool & can_be_taken_from(); diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 94a8488557..3a4af267ec 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -35,20 +36,17 @@ #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/executor_options.hpp" +#include "rclcpp/executors/executor_entities_collection.hpp" +#include "rclcpp/executors/executor_entities_collector.hpp" #include "rclcpp/future_return_code.hpp" -#include "rclcpp/memory_strategies.hpp" -#include "rclcpp/memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/wait_set.hpp" namespace rclcpp { -typedef std::map> WeakCallbackGroupsToNodesMap; - // Forward declaration is used in convenience method signature. class Node; @@ -402,17 +400,6 @@ class Executor void cancel(); - /// Support dynamic switching of the memory strategy. - /** - * Switching the memory strategy while the executor is spinning in another threading could have - * unintended consequences. - * \param[in] memory_strategy Shared pointer to the memory strategy to set. - * \throws std::runtime_error if memory_strategy is null - */ - RCLCPP_PUBLIC - void - set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy); - /// Returns true if the executor is currently spinning. /** * This function can be called asynchronously from any thread. @@ -508,62 +495,6 @@ class Executor void wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - /// Find node associated with a callback group - /** - * \param[in] weak_groups_to_nodes map of callback groups to nodes - * \param[in] group callback group to find assocatiated node - * \return Pointer to associated node if found, else nullptr - */ - RCLCPP_PUBLIC - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr - get_node_by_group( - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - rclcpp::CallbackGroup::SharedPtr group); - - /// Return true if the node has been added to this executor. - /** - * \param[in] node_ptr a shared pointer that points to a node base interface - * \param[in] weak_groups_to_nodes map to nodes to lookup - * \return true if the node is associated with the executor, otherwise false - */ - RCLCPP_PUBLIC - bool - has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; - - /// Find the callback group associated with a timer - /** - * \param[in] timer Timer to find associated callback group - * \return Pointer to callback group node if found, else nullptr - */ - RCLCPP_PUBLIC - rclcpp::CallbackGroup::SharedPtr - get_group_by_timer(rclcpp::TimerBase::SharedPtr timer); - - /// Add a callback group to an executor - /** - * \see rclcpp::Executor::add_callback_group - */ - RCLCPP_PUBLIC - virtual void - add_callback_group_to_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); - - /// Remove a callback group from the executor. - /** - * \see rclcpp::Executor::remove_callback_group - */ - RCLCPP_PUBLIC - virtual void - remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); - /// Check for executable in ready state and populate union structure. /** * \param[out] any_executable populated union structure of ready executable @@ -574,33 +505,6 @@ class Executor bool get_next_ready_executable(AnyExecutable & any_executable); - /// Check for executable in ready state and populate union structure. - /** - * This is the implementation of get_next_ready_executable that takes into - * account the current state of callback groups' association with nodes and - * executors. - * - * This checks in a particular order for available work: - * * Timers - * * Subscriptions - * * Services - * * Clients - * * Waitable - * - * If the next executable is not associated with this executor/node pair, - * then this method will return false. - * - * \param[out] any_executable populated union structure of ready executable - * \param[in] weak_groups_to_nodes mapping of callback groups to nodes - * \return true if an executable was ready and any_executable was populated, - * otherwise false - */ - RCLCPP_PUBLIC - bool - get_next_ready_executable_from_map( - AnyExecutable & any_executable, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - /// Wait for executable in ready state and populate union structure. /** * If an executable is ready, it will return immediately, otherwise @@ -618,21 +522,6 @@ class Executor AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - /// Add all callback groups that can be automatically added from associated nodes. - /** - * The executor, before collecting entities, verifies if any callback group from - * nodes associated with the executor, which is not already associated to an executor, - * can be automatically added to this executor. - * This takes care of any callback group that has been added to a node but not explicitly added - * to the executor. - * It is important to note that in order for the callback groups to be automatically added to an - * executor through this function, the node of the callback groups needs to have been added - * through the `add_node` method. - */ - RCLCPP_PUBLIC - virtual void - add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_); - /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; @@ -641,16 +530,8 @@ class Executor std::shared_ptr shutdown_guard_condition_; - /// Wait set for managing entities that the rmw layer waits on. - rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); - - // Mutex to protect the subsequent memory_strategy_. mutable std::mutex mutex_; - /// The memory strategy: an interface for handling user-defined memory allocation strategies. - memory_strategy::MemoryStrategy::SharedPtr - memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_); - /// The context associated with this executor. std::shared_ptr context_; @@ -660,39 +541,11 @@ class Executor virtual void spin_once_impl(std::chrono::nanoseconds timeout); - typedef std::map> - WeakNodesToGuardConditionsMap; - - typedef std::map> - WeakCallbackGroupsToGuardConditionsMap; - - /// maps nodes to guard conditions - WeakNodesToGuardConditionsMap - weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - - /// maps callback groups to guard conditions - WeakCallbackGroupsToGuardConditionsMap - weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - - /// maps callback groups associated to nodes - WeakCallbackGroupsToNodesMap - weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - - /// maps callback groups to nodes associated with executor - WeakCallbackGroupsToNodesMap - weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_); - - /// maps all callback groups to nodes - WeakCallbackGroupsToNodesMap - weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + rclcpp::executors::ExecutorEntitiesCollector collector_; + rclcpp::executors::ExecutorEntitiesCollection current_collection_; - /// nodes that are associated with the executor - std::list - weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::shared_ptr wait_set_; + std::deque ready_executables_; /// shutdown callback handle registered to Context rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_; diff --git a/rclcpp/include/rclcpp/executors.hpp b/rclcpp/include/rclcpp/executors.hpp index 36fb0d63cf..85220a8899 100644 --- a/rclcpp/include/rclcpp/executors.hpp +++ b/rclcpp/include/rclcpp/executors.hpp @@ -20,7 +20,6 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" -#include "rclcpp/executors/static_single_threaded_executor.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp index 623ec64ac5..c8e5f2b306 100644 --- a/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp @@ -147,5 +147,5 @@ class ExecutorEntitiesCollector }; } // namespace executors } // namespace rclcpp - +// #endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp deleted file mode 100644 index f9fd2ff672..0000000000 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ /dev/null @@ -1,357 +0,0 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ -#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ - -#include -#include -#include -#include -#include - -#include "rcl/guard_condition.h" -#include "rcl/wait.h" - -#include "rclcpp/experimental/executable_list.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/memory_strategy.hpp" -#include "rclcpp/visibility_control.hpp" -#include "rclcpp/waitable.hpp" - -namespace rclcpp -{ -namespace executors -{ -typedef std::map> WeakCallbackGroupsToNodesMap; - -class StaticExecutorEntitiesCollector final - : public rclcpp::Waitable, - public std::enable_shared_from_this -{ -public: - RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector) - - // Constructor - RCLCPP_PUBLIC - StaticExecutorEntitiesCollector() = default; - - // Destructor - RCLCPP_PUBLIC - ~StaticExecutorEntitiesCollector(); - - /// Initialize StaticExecutorEntitiesCollector - /** - * \param p_wait_set A reference to the wait set to be used in the executor - * \param memory_strategy Shared pointer to the memory strategy to set. - * \throws std::runtime_error if memory strategy is null - */ - RCLCPP_PUBLIC - void - init( - rcl_wait_set_t * p_wait_set, - rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy); - - /// Finalize StaticExecutorEntitiesCollector to clear resources - RCLCPP_PUBLIC - bool - is_init() {return initialized_;} - - RCLCPP_PUBLIC - void - fini(); - - /// Execute the waitable. - RCLCPP_PUBLIC - void - execute(std::shared_ptr & data) override; - - /// Take the data so that it can be consumed with `execute`. - /** - * For `StaticExecutorEntitiesCollector`, this always return `nullptr`. - * \sa rclcpp::Waitable::take_data() - */ - RCLCPP_PUBLIC - std::shared_ptr - take_data() override; - - /// Function to add_handles_to_wait_set and wait for work and - /** - * block until the wait set is ready or until the timeout has been exceeded. - * \throws std::runtime_error if wait set couldn't be cleared or filled. - * \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error() - */ - RCLCPP_PUBLIC - void - refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - - /** - * \throws std::runtime_error if it couldn't add guard condition to wait set - */ - RCLCPP_PUBLIC - void - add_to_wait_set(rcl_wait_set_t * wait_set) override; - - RCLCPP_PUBLIC - size_t - get_number_of_ready_guard_conditions() override; - - /// Add a callback group to an executor. - /** - * \see rclcpp::Executor::add_callback_group - */ - RCLCPP_PUBLIC - bool - add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); - - /// Add a callback group to an executor. - /** - * \see rclcpp::Executor::add_callback_group - * \return boolean whether the node from the callback group is new - */ - RCLCPP_PUBLIC - bool - add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - - /// Remove a callback group from the executor. - /** - * \see rclcpp::Executor::remove_callback_group - */ - RCLCPP_PUBLIC - bool - remove_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr); - - /// Remove a callback group from the executor. - /** - * \see rclcpp::Executor::remove_callback_group_from_map - */ - RCLCPP_PUBLIC - bool - remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - - /** - * \see rclcpp::Executor::add_node() - * \throw std::runtime_error if node was already added - */ - RCLCPP_PUBLIC - bool - add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); - - /** - * \see rclcpp::Executor::remove_node() - * \throw std::runtime_error if no guard condition is associated with node. - */ - RCLCPP_PUBLIC - bool - remove_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); - - RCLCPP_PUBLIC - std::vector - get_all_callback_groups(); - - /// Get callback groups that belong to executor. - /** - * \see rclcpp::Executor::get_manually_added_callback_groups() - */ - RCLCPP_PUBLIC - std::vector - get_manually_added_callback_groups(); - - /// Get callback groups that belong to executor. - /** - * \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() - */ - RCLCPP_PUBLIC - std::vector - get_automatically_added_callback_groups_from_nodes(); - - /// Complete all available queued work without blocking. - /** - * This function checks if after the guard condition was triggered - * (or a spurious wakeup happened) we are really ready to execute - * i.e. re-collect entities - */ - RCLCPP_PUBLIC - bool - is_ready(rcl_wait_set_t * wait_set) override; - - /// Return number of timers - /** - * \return number of timers - */ - RCLCPP_PUBLIC - size_t - get_number_of_timers() {return exec_list_.number_of_timers;} - - /// Return number of subscriptions - /** - * \return number of subscriptions - */ - RCLCPP_PUBLIC - size_t - get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;} - - /// Return number of services - /** - * \return number of services - */ - RCLCPP_PUBLIC - size_t - get_number_of_services() {return exec_list_.number_of_services;} - - /// Return number of clients - /** - * \return number of clients - */ - RCLCPP_PUBLIC - size_t - get_number_of_clients() {return exec_list_.number_of_clients;} - - /// Return number of waitables - /** - * \return number of waitables - */ - RCLCPP_PUBLIC - size_t - get_number_of_waitables() {return exec_list_.number_of_waitables;} - - /** Return a SubscritionBase Sharedptr by index. - * \param[in] i The index of the SubscritionBase - * \return a SubscritionBase shared pointer - * \throws std::out_of_range if the argument is higher than the size of the structrue. - */ - RCLCPP_PUBLIC - rclcpp::SubscriptionBase::SharedPtr - get_subscription(size_t i) {return exec_list_.subscription[i];} - - /** Return a TimerBase Sharedptr by index. - * \param[in] i The index of the TimerBase - * \return a TimerBase shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::TimerBase::SharedPtr - get_timer(size_t i) {return exec_list_.timer[i];} - - /** Return a ServiceBase Sharedptr by index. - * \param[in] i The index of the ServiceBase - * \return a ServiceBase shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::ServiceBase::SharedPtr - get_service(size_t i) {return exec_list_.service[i];} - - /** Return a ClientBase Sharedptr by index - * \param[in] i The index of the ClientBase - * \return a ClientBase shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::ClientBase::SharedPtr - get_client(size_t i) {return exec_list_.client[i];} - - /** Return a Waitable Sharedptr by index - * \param[in] i The index of the Waitable - * \return a Waitable shared pointer - * \throws std::out_of_range if the argument is higher than the size. - */ - RCLCPP_PUBLIC - rclcpp::Waitable::SharedPtr - get_waitable(size_t i) {return exec_list_.waitable[i];} - -private: - /// Function to reallocate space for entities in the wait set. - /** - * \throws std::runtime_error if wait set couldn't be cleared or resized. - */ - void - prepare_wait_set(); - - void - fill_executable_list(); - - void - fill_memory_strategy(); - - /// Return true if the node belongs to the collector - /** - * \param[in] node_ptr a node base interface shared pointer - * \param[in] weak_groups_to_nodes map to nodes to lookup - * \return boolean whether a node belongs the collector - */ - bool - has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; - - /// Add all callback groups that can be automatically added by any executor - /// and is not already associated with an executor from nodes - /// that are associated with executor - /** - * \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor() - */ - void - add_callback_groups_from_nodes_associated_to_executor(); - - void - fill_executable_list_from_map(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); - - /// Memory strategy: an interface for handling user-defined memory allocation strategies. - rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; - - // maps callback groups to nodes. - WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; - // maps callback groups to nodes. - WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; - - typedef std::map> - WeakNodesToGuardConditionsMap; - WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; - - /// List of weak nodes registered in the static executor - std::list weak_nodes_; - - // Mutex to protect vector of new nodes. - std::mutex new_nodes_mutex_; - std::vector new_nodes_; - - /// Wait set for managing entities that the rmw layer waits on. - rcl_wait_set_t * p_wait_set_ = nullptr; - - /// Executable list: timers, subscribers, clients, services and waitables - rclcpp::experimental::ExecutableList exec_list_; - - /// Bool to check if the entities collector has been initialized - bool initialized_ = false; -}; - -} // namespace executors -} // namespace rclcpp - -#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp deleted file mode 100644 index 5294605eaf..0000000000 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright 2019 Nobleo Technology -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ -#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ - -#include -#include -#include -#include -#include -#include - -#include "rmw/rmw.h" - -#include "rclcpp/executor.hpp" -#include "rclcpp/executors/static_executor_entities_collector.hpp" -#include "rclcpp/experimental/executable_list.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/memory_strategies.hpp" -#include "rclcpp/node.hpp" -#include "rclcpp/rate.hpp" -#include "rclcpp/utilities.hpp" -#include "rclcpp/visibility_control.hpp" - -namespace rclcpp -{ -namespace executors -{ - -/// Static executor implementation -/** - * This executor is a static version of the original single threaded executor. - * It's static because it doesn't reconstruct the executable list for every iteration. - * All nodes, callbackgroups, timers, subscriptions etc. are created before - * spin() is called, and modified only when an entity is added/removed to/from a node. - * - * To run this executor instead of SingleThreadedExecutor replace: - * rclcpp::executors::SingleThreadedExecutor exec; - * by - * rclcpp::executors::StaticSingleThreadedExecutor exec; - * in your source code and spin node(s) in the following way: - * exec.add_node(node); - * exec.spin(); - * exec.remove_node(node); - */ -class StaticSingleThreadedExecutor : public rclcpp::Executor -{ -public: - RCLCPP_SMART_PTR_DEFINITIONS(StaticSingleThreadedExecutor) - - /// Default constructor. See the default constructor for Executor. - RCLCPP_PUBLIC - explicit StaticSingleThreadedExecutor( - const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); - - /// Default destrcutor. - RCLCPP_PUBLIC - virtual ~StaticSingleThreadedExecutor(); - - /// Static executor implementation of spin. - /** - * This function will block until work comes in, execute it, and keep blocking. - * It will only be interrupted by a CTRL-C (managed by the global signal handler). - * \throws std::runtime_error when spin() called while already spinning - */ - RCLCPP_PUBLIC - void - spin() override; - - /// Static executor implementation of spin some - /** - * This non-blocking function will execute entities that - * were ready when this API was called, until timeout or no - * more work available. Entities that got ready while - * executing work, won't be taken into account here. - * - * Example: - * while(condition) { - * spin_some(); - * sleep(); // User should have some sync work or - * // sleep to avoid a 100% CPU usage - * } - */ - RCLCPP_PUBLIC - void - spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override; - - /// Static executor implementation of spin all - /** - * This non-blocking function will execute entities until timeout (must be >= 0) - * or no more work available. - * If timeout is `0`, potentially it blocks forever until no more work is available. - * If new entities get ready while executing work available, they will be executed - * as long as the timeout hasn't expired. - * - * Example: - * while(condition) { - * spin_all(); - * sleep(); // User should have some sync work or - * // sleep to avoid a 100% CPU usage - * } - */ - RCLCPP_PUBLIC - void - spin_all(std::chrono::nanoseconds max_duration) override; - - /// Add a callback group to an executor. - /** - * \sa rclcpp::Executor::add_callback_group - */ - RCLCPP_PUBLIC - void - add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify = true) override; - - /// Remove callback group from the executor - /** - * \sa rclcpp::Executor::remove_callback_group - */ - RCLCPP_PUBLIC - void - remove_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - bool notify = true) override; - - /// Add a node to the executor. - /** - * \sa rclcpp::Executor::add_node - */ - RCLCPP_PUBLIC - void - add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify = true) override; - - /// Convenience function which takes Node and forwards NodeBaseInterface. - /** - * \sa rclcpp::StaticSingleThreadedExecutor::add_node - */ - RCLCPP_PUBLIC - void - add_node(std::shared_ptr node_ptr, bool notify = true) override; - - /// Remove a node from the executor. - /** - * \sa rclcpp::Executor::remove_node - */ - RCLCPP_PUBLIC - void - remove_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify = true) override; - - /// Convenience function which takes Node and forwards NodeBaseInterface. - /** - * \sa rclcpp::Executor::remove_node - */ - RCLCPP_PUBLIC - void - remove_node(std::shared_ptr node_ptr, bool notify = true) override; - - RCLCPP_PUBLIC - std::vector - get_all_callback_groups() override; - - /// Get callback groups that belong to executor. - /** - * \sa rclcpp::Executor::get_manually_added_callback_groups() - */ - RCLCPP_PUBLIC - std::vector - get_manually_added_callback_groups() override; - - /// Get callback groups that belong to executor. - /** - * \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() - */ - RCLCPP_PUBLIC - std::vector - get_automatically_added_callback_groups_from_nodes() override; - -protected: - /** - * @brief Executes ready executables from wait set. - * @param spin_once if true executes only the first ready executable. - * @return true if any executable was ready. - */ - RCLCPP_PUBLIC - bool - execute_ready_executables(bool spin_once = false); - - RCLCPP_PUBLIC - void - spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive); - - RCLCPP_PUBLIC - void - spin_once_impl(std::chrono::nanoseconds timeout) override; - -private: - RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor) - - StaticExecutorEntitiesCollector::SharedPtr entities_collector_; -}; - -} // namespace executors -} // namespace rclcpp - -#endif // RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 197822e087..23d5695c89 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -22,12 +23,12 @@ #include "rcl/allocator.h" #include "rcl/error_handling.h" +#include "rclcpp/subscription_wait_set_mask.hpp" #include "rcpputils/scope_exit.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/executor.hpp" #include "rclcpp/guard_condition.hpp" -#include "rclcpp/memory_strategy.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" @@ -37,17 +38,15 @@ using namespace std::chrono_literals; -using rclcpp::exceptions::throw_from_rcl_error; -using rclcpp::AnyExecutable; using rclcpp::Executor; -using rclcpp::ExecutorOptions; -using rclcpp::FutureReturnCode; + +static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {true, false, false}; Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(options.context), shutdown_guard_condition_(std::make_shared(options.context)), - memory_strategy_(options.memory_strategy) + wait_set_(std::make_shared()) { // Store the context for later use. context_ = options.context; @@ -60,74 +59,39 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) } }); - // The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond, - // and one for the executor's guard cond (interrupt_guard_condition_) - memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get()); - - // Put the executor's guard condition in - memory_strategy_->add_guard_condition(interrupt_guard_condition_); - rcl_allocator_t allocator = memory_strategy_->get_allocator(); - - rcl_ret_t ret = rcl_wait_set_init( - &wait_set_, - 0, 2, 0, 0, 0, 0, - context_->get_rcl_context().get(), - allocator); - if (RCL_RET_OK != ret) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "failed to create wait set: %s", rcl_get_error_string().str); - rcl_reset_error(); - throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor"); - } + this->collector_.get_executor_notify_waitable()->add_guard_condition( + &interrupt_guard_condition_); + this->collector_.get_executor_notify_waitable()->add_guard_condition( + shutdown_guard_condition_.get()); } Executor::~Executor() { - // Disassociate all callback groups. - for (auto & pair : weak_groups_to_nodes_) { - auto group = pair.first.lock(); - if (group) { - std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); - has_executor.store(false); - } - } - // Disassociate all nodes. - std::for_each( - weak_nodes_.begin(), weak_nodes_.end(), [] - (rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) { - auto shared_node_ptr = weak_node_ptr.lock(); - if (shared_node_ptr) { - std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); - } - }); - weak_nodes_.clear(); - weak_groups_associated_with_executor_to_nodes_.clear(); - weak_groups_to_nodes_associated_with_executor_.clear(); - weak_groups_to_nodes_.clear(); - for (const auto & pair : weak_groups_to_guard_conditions_) { - auto guard_condition = pair.second; - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_groups_to_guard_conditions_.clear(); + current_collection_.update_timers({}, + [this](auto timer){wait_set_->add_timer(timer);}, + [this](auto timer){wait_set_->remove_timer(timer);}); - for (const auto & pair : weak_nodes_to_guard_conditions_) { - auto guard_condition = pair.second; - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_nodes_to_guard_conditions_.clear(); + current_collection_.update_subscriptions({}, + [this](auto subscription){ + wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);}, + [this](auto subscription){ + wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);}); - // Finalize the wait set. - if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "failed to destroy wait set: %s", rcl_get_error_string().str); - rcl_reset_error(); - } - // Remove and release the sigint guard condition - memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get()); - memory_strategy_->remove_guard_condition(&interrupt_guard_condition_); + current_collection_.update_clients({}, + [this](auto client){wait_set_->add_client(client);}, + [this](auto client){wait_set_->remove_client(client);}); + + current_collection_.update_services({}, + [this](auto service){wait_set_->add_service(service);}, + [this](auto service){wait_set_->remove_service(service);}); + + current_collection_.update_guard_conditions({}, + [this](auto guard_condition){wait_set_->add_guard_condition(guard_condition);}, + [this](auto guard_condition){wait_set_->remove_guard_condition(guard_condition);}); + + current_collection_.update_waitables({}, + [this](auto waitable){wait_set_->add_waitable(waitable);}, + [this](auto waitable){wait_set_->remove_waitable(waitable);}); // Remove shutdown callback handle registered to Context if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) { @@ -141,190 +105,56 @@ Executor::~Executor() std::vector Executor::get_all_callback_groups() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; + return this->collector_.get_all_callback_groups(); } std::vector Executor::get_manually_added_callback_groups() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - return groups; + return this->collector_.get_manually_added_callback_groups(); } std::vector Executor::get_automatically_added_callback_groups_from_nodes() { - std::vector groups; - std::lock_guard guard{mutex_}; - for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; -} - -void -Executor::add_callback_groups_from_nodes_associated_to_executor() -{ - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node) { - node->for_each_callback_group( - [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) - { - if ( - shared_group_ptr->automatically_add_to_executor_with_node() && - !shared_group_ptr->get_associated_with_executor_atomic().load()) - { - this->add_callback_group_to_map( - shared_group_ptr, - node, - weak_groups_to_nodes_associated_with_executor_, - true); - } - }); - } - } + return this->collector_.get_automatically_added_callback_groups(); } void -Executor::add_callback_group_to_map( +Executor::add_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, bool notify) { - // If the callback_group already has an executor - std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error("Callback group has already been added to an executor."); - } - - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto insert_info = - weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr)); - bool was_inserted = insert_info.second; - if (!was_inserted) { - throw std::runtime_error("Callback group was already added to executor."); - } - // Also add to the map that contains all callback groups - weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr)); - - if (node_ptr->get_context()->is_valid()) { - auto callback_group_guard_condition = group_ptr->get_notify_guard_condition(); - weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get(); - // Add the callback_group's notify condition to the guard condition handles - memory_strategy_->add_guard_condition(*callback_group_guard_condition); - } + (void) node_ptr; + this->collector_.add_callback_group(group_ptr); - if (notify) { - // Interrupt waiting to handle new node + if (notify) + { + // Interrupt waiting to handle removed callback group try { interrupt_guard_condition_.trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group add: ") + ex.what()); + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } } -void -Executor::add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify) -{ - std::lock_guard guard{mutex_}; - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_associated_with_executor_to_nodes_, - notify); -} - void Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - // If the node already has an executor - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error( - std::string("Node '") + node_ptr->get_fully_qualified_name() + - "' has already been added to an executor."); - } - std::lock_guard guard{mutex_}; - node_ptr->for_each_callback_group( - [this, node_ptr, notify](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - if (!group_ptr->get_associated_with_executor_atomic().load() && - group_ptr->automatically_add_to_executor_with_node()) - { - this->add_callback_group_to_map( - group_ptr, - node_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); - } - }); - - const auto & gc = node_ptr->get_notify_guard_condition(); - weak_nodes_to_guard_conditions_[node_ptr] = &gc; - // Add the node's notify condition to the guard condition handles - memory_strategy_->add_guard_condition(gc); - weak_nodes_.push_back(node_ptr); -} - -void -Executor::remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify) -{ - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter != weak_groups_to_nodes.end()) { - node_ptr = iter->second.lock(); - if (node_ptr == nullptr) { - throw std::runtime_error("Node must not be deleted before its callback group(s)."); - } - weak_groups_to_nodes.erase(iter); - weak_groups_to_nodes_.erase(group_ptr); - std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); - } else { - throw std::runtime_error("Callback group needs to be associated with executor."); - } - // If the node was matched and removed, interrupt waiting. - if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) && - !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_)) + this->collector_.add_node(node_ptr); + if (notify) { - auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr); - if (iter != weak_groups_to_guard_conditions_.end()) { - memory_strategy_->remove_guard_condition(iter->second); - } - weak_groups_to_guard_conditions_.erase(weak_group_ptr); - - if (notify) { - try { - interrupt_guard_condition_.trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( + // Interrupt waiting to handle removed callback group + try { + interrupt_guard_condition_.trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( std::string( "Failed to trigger guard condition on callback group remove: ") + ex.what()); - } } } } @@ -334,64 +164,55 @@ Executor::remove_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) { - std::lock_guard guard{mutex_}; - this->remove_callback_group_from_map( - group_ptr, - weak_groups_associated_with_executor_to_nodes_, - notify); + this->collector_.remove_callback_group(group_ptr); + + if (notify) + { + // Interrupt waiting to handle removed callback group + try { + interrupt_guard_condition_.trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); + } + } } void Executor::add_node(std::shared_ptr node_ptr, bool notify) { this->add_node(node_ptr->get_node_base_interface(), notify); + + if (notify) + { + // Interrupt waiting to handle removed callback group + try { + interrupt_guard_condition_.trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on node add: ") + ex.what()); + } + } } void Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - if (!node_ptr->get_associated_with_executor_atomic().load()) { - throw std::runtime_error("Node needs to be associated with an executor."); - } - - std::lock_guard guard{mutex_}; - bool found_node = false; - auto node_it = weak_nodes_.begin(); - while (node_it != weak_nodes_.end()) { - bool matched = (node_it->lock() == node_ptr); - if (matched) { - found_node = true; - node_it = weak_nodes_.erase(node_it); - } else { - ++node_it; - } - } - if (!found_node) { - throw std::runtime_error("Node needs to be associated with this executor."); - } + this->collector_.remove_node(node_ptr); - for (auto it = weak_groups_to_nodes_associated_with_executor_.begin(); - it != weak_groups_to_nodes_associated_with_executor_.end(); ) + if (notify) { - auto weak_node_ptr = it->second; - auto shared_node_ptr = weak_node_ptr.lock(); - auto group_ptr = it->first.lock(); - - // Increment iterator before removing in case it's invalidated - it++; - if (shared_node_ptr == node_ptr) { - remove_callback_group_from_map( - group_ptr, - weak_groups_to_nodes_associated_with_executor_, - notify); + // Interrupt waiting to handle removed callback group + try { + interrupt_guard_condition_.trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } - - memory_strategy_->remove_guard_condition(&node_ptr->get_notify_guard_condition()); - weak_nodes_to_guard_conditions_.erase(node_ptr); - - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); } void @@ -507,22 +328,13 @@ Executor::cancel() } } -void -Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy) -{ - if (memory_strategy == nullptr) { - throw std::runtime_error("Received NULL memory strategy in executor."); - } - std::lock_guard guard{mutex_}; - memory_strategy_ = memory_strategy; -} - void Executor::execute_any_executable(AnyExecutable & any_exec) { if (!spinning.load()) { return; } + if (any_exec.timer) { TRACEPOINT( rclcpp_executor_execute, @@ -700,221 +512,72 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) { std::lock_guard guard(mutex_); - // Check weak_nodes_ to find any callback group that is not owned - // by an executor and add it to the list of callbackgroups for - // collect entities. Also exchange to false so it is not - // allowed to add to another executor - add_callback_groups_from_nodes_associated_to_executor(); - - // Collect the subscriptions and timers to be waited on - memory_strategy_->clear_handles(); - bool has_invalid_weak_groups_or_nodes = - memory_strategy_->collect_entities(weak_groups_to_nodes_); - - if (has_invalid_weak_groups_or_nodes) { - std::vector invalid_group_ptrs; - for (auto pair : weak_groups_to_nodes_) { - auto weak_group_ptr = pair.first; - auto weak_node_ptr = pair.second; - if (weak_group_ptr.expired() || weak_node_ptr.expired()) { - invalid_group_ptrs.push_back(weak_group_ptr); - auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr); - if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) { - auto guard_condition = node_guard_pair->second; - weak_nodes_to_guard_conditions_.erase(weak_node_ptr); - memory_strategy_->remove_guard_condition(guard_condition); - } - } - } - std::for_each( - invalid_group_ptrs.begin(), invalid_group_ptrs.end(), - [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { - if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) != - weak_groups_to_nodes_associated_with_executor_.end()) - { - weak_groups_to_nodes_associated_with_executor_.erase(group_ptr); - } - if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) != - weak_groups_associated_with_executor_to_nodes_.end()) - { - weak_groups_associated_with_executor_to_nodes_.erase(group_ptr); - } - auto callback_guard_pair = weak_groups_to_guard_conditions_.find(group_ptr); - if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) { - auto guard_condition = callback_guard_pair->second; - weak_groups_to_guard_conditions_.erase(group_ptr); - memory_strategy_->remove_guard_condition(guard_condition); - } - weak_groups_to_nodes_.erase(group_ptr); - }); - } + rclcpp::executors::ExecutorEntitiesCollection collection; + auto callback_groups = this->collector_.get_all_callback_groups(); + rclcpp::executors::build_entities_collection(callback_groups, collection); - // clear wait set - rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); - if (ret != RCL_RET_OK) { - throw_from_rcl_error(ret, "Couldn't clear wait set"); - } + current_collection_.update_timers(collection.timers, + [this](auto timer){wait_set_->add_timer(timer);}, + [this](auto timer){wait_set_->remove_timer(timer);}); - // The size of waitables are accounted for in size of the other entities - ret = rcl_wait_set_resize( - &wait_set_, memory_strategy_->number_of_ready_subscriptions(), - memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), - memory_strategy_->number_of_ready_events()); - if (RCL_RET_OK != ret) { - throw_from_rcl_error(ret, "Couldn't resize the wait set"); - } + current_collection_.update_subscriptions(collection.subscriptions, + [this](auto subscription){ + wait_set_->add_subscription(subscription, kDefaultSubscriptionMask);}, + [this](auto subscription){ + wait_set_->remove_subscription(subscription, kDefaultSubscriptionMask);}); - if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { - throw std::runtime_error("Couldn't fill wait set"); - } - } + current_collection_.update_clients(collection.clients, + [this](auto client){wait_set_->add_client(client);}, + [this](auto client){wait_set_->remove_client(client);}); - rcl_ret_t status = - rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); - if (status == RCL_RET_WAIT_SET_EMPTY) { - RCUTILS_LOG_WARN_NAMED( - "rclcpp", - "empty wait set received in rcl_wait(). This should never happen."); - } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { - using rclcpp::exceptions::throw_from_rcl_error; - throw_from_rcl_error(status, "rcl_wait() failed"); - } + current_collection_.update_services(collection.services, + [this](auto service){wait_set_->add_service(service);}, + [this](auto service){wait_set_->remove_service(service);}); - // check the null handles in the wait set and remove them from the handles in memory strategy - // for callback-based entities - std::lock_guard guard(mutex_); - memory_strategy_->remove_null_handles(&wait_set_); -} + current_collection_.update_guard_conditions(collection.guard_conditions, + [this](auto guard_condition){wait_set_->add_guard_condition(guard_condition);}, + [this](auto guard_condition){wait_set_->remove_guard_condition(guard_condition);}); -rclcpp::node_interfaces::NodeBaseInterface::SharedPtr -Executor::get_node_by_group( - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes, - rclcpp::CallbackGroup::SharedPtr group) -{ - if (!group) { - return nullptr; - } - rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group); - const auto finder = weak_groups_to_nodes.find(weak_group_ptr); - if (finder != weak_groups_to_nodes.end()) { - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock(); - return node_ptr; + current_collection_.update_waitables(collection.waitables, + [this](auto waitable){wait_set_->add_waitable(waitable);}, + [this](auto waitable){wait_set_->remove_waitable(waitable);}); } - return nullptr; -} -rclcpp::CallbackGroup::SharedPtr -Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) -{ - std::lock_guard guard{mutex_}; - for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { - auto group = pair.first.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { - return timer_ptr == timer; - }); - if (timer_ref) { - return group; - } - } + auto wait_result = wait_set_->wait(timeout); - for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { - auto group = pair.first.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { - return timer_ptr == timer; - }); - if (timer_ref) { - return group; - } + if (wait_result.kind() == WaitResultKind::Empty) + { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in wait(). This should never happen."); } - return nullptr; -} -bool -Executor::get_next_ready_executable(AnyExecutable & any_executable) -{ - bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_); - return success; + std::lock_guard guard{mutex_}; + ready_executables_ = rclcpp::executors::ready_executables(current_collection_, wait_result); } bool -Executor::get_next_ready_executable_from_map( - AnyExecutable & any_executable, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) +Executor::get_next_ready_executable(AnyExecutable & any_executable) { TRACEPOINT(rclcpp_executor_get_next_ready); - bool success = false; std::lock_guard guard{mutex_}; - // Check the timers to see if there are any that are ready - memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes); - if (any_executable.timer) { - success = true; - } - if (!success) { - // Check the subscriptions to see if there are any that are ready - memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes); - if (any_executable.subscription) { - success = true; - } - } - if (!success) { - // Check the services to see if there are any that are ready - memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes); - if (any_executable.service) { - success = true; - } - } - if (!success) { - // Check the clients to see if there are any that are ready - memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes); - if (any_executable.client) { - success = true; - } - } - if (!success) { - // Check the waitables to see if there are any that are ready - memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes); - if (any_executable.waitable) { - any_executable.data = any_executable.waitable->take_data(); - success = true; - } - } - // At this point any_executable should be valid with either a valid subscription - // or a valid timer, or it should be a null shared_ptr - if (success) { - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group; - auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter == weak_groups_to_nodes.end()) { - success = false; - } + + if (ready_executables_.size() == 0) + { + return false; } - if (success) { - // If it is valid, check to see if the group is mutually exclusive or - // not, then mark it accordingly ..Check if the callback_group belongs to this executor - if (any_executable.callback_group && any_executable.callback_group->type() == \ - CallbackGroupType::MutuallyExclusive) - { - // It should not have been taken otherwise - assert(any_executable.callback_group->can_be_taken_from().load()); - // Set to false to indicate something is being run from this group - // This is reset to true either when the any_executable is executed or when the - // any_executable is destructued - any_executable.callback_group->can_be_taken_from().store(false); - } + any_executable = ready_executables_.front(); + ready_executables_.pop_front(); + + if (any_executable.callback_group && + any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) + { + assert(any_executable.callback_group->can_be_taken_from().load()); + any_executable.callback_group->can_be_taken_from().store(false); } - // If there is no ready executable, return false - return success; + + return true; } bool @@ -937,22 +600,6 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos return success; } -// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. -bool -Executor::has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) const -{ - return std::find_if( - weak_groups_to_nodes.begin(), - weak_groups_to_nodes.end(), - [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { - auto other_ptr = other.second.lock(); - return other_ptr == node_ptr; - }) != weak_groups_to_nodes.end(); -} - bool Executor::is_spinning() { diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp deleted file mode 100644 index bf50e062f3..0000000000 --- a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp +++ /dev/null @@ -1,524 +0,0 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rclcpp/executors/static_executor_entities_collector.hpp" - -#include -#include -#include -#include -#include -#include - -#include "rclcpp/memory_strategy.hpp" -#include "rclcpp/executors/static_single_threaded_executor.hpp" -#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" - -using rclcpp::executors::StaticExecutorEntitiesCollector; - -StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() -{ - // Disassociate all callback groups and thus nodes. - for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { - auto group = pair.first.lock(); - if (group) { - std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); - has_executor.store(false); - } - } - for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { - auto group = pair.first.lock(); - if (group) { - std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); - has_executor.store(false); - } - } - // Disassociate all nodes - for (const auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node) { - std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); - has_executor.store(false); - } - } - weak_groups_associated_with_executor_to_nodes_.clear(); - weak_groups_to_nodes_associated_with_executor_.clear(); - exec_list_.clear(); - weak_nodes_.clear(); - weak_nodes_to_guard_conditions_.clear(); -} - -void -StaticExecutorEntitiesCollector::init( - rcl_wait_set_t * p_wait_set, - rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy) -{ - // Empty initialize executable list - exec_list_ = rclcpp::experimental::ExecutableList(); - // Get executor's wait_set_ pointer - p_wait_set_ = p_wait_set; - // Get executor's memory strategy ptr - if (memory_strategy == nullptr) { - throw std::runtime_error("Received NULL memory strategy in executor waitable."); - } - memory_strategy_ = memory_strategy; - - // Get memory strategy and executable list. Prepare wait_set_ - std::shared_ptr shared_ptr; - execute(shared_ptr); - - // The entities collector is now initialized - initialized_ = true; -} - -void -StaticExecutorEntitiesCollector::fini() -{ - memory_strategy_->clear_handles(); - exec_list_.clear(); -} - -std::shared_ptr -StaticExecutorEntitiesCollector::take_data() -{ - return nullptr; -} - -void -StaticExecutorEntitiesCollector::execute(std::shared_ptr & data) -{ - (void) data; - // Fill memory strategy with entities coming from weak_nodes_ - fill_memory_strategy(); - // Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy) - fill_executable_list(); - // Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize) - prepare_wait_set(); - // Add new nodes guard conditions to map - std::lock_guard guard{new_nodes_mutex_}; - for (const auto & weak_node : new_nodes_) { - if (auto node_ptr = weak_node.lock()) { - const auto & gc = node_ptr->get_notify_guard_condition(); - weak_nodes_to_guard_conditions_[node_ptr] = &gc; - } - } - new_nodes_.clear(); -} - -void -StaticExecutorEntitiesCollector::fill_memory_strategy() -{ - memory_strategy_->clear_handles(); - bool has_invalid_weak_groups_or_nodes = - memory_strategy_->collect_entities(weak_groups_to_nodes_associated_with_executor_); - // Clean up any invalid nodes, if they were detected - if (has_invalid_weak_groups_or_nodes) { - std::vector invalid_group_ptrs; - for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { - auto & weak_group_ptr = pair.first; - auto & weak_node_ptr = pair.second; - if (weak_group_ptr.expired() || weak_node_ptr.expired()) { - invalid_group_ptrs.push_back(weak_group_ptr); - } - } - std::for_each( - invalid_group_ptrs.begin(), invalid_group_ptrs.end(), - [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { - weak_groups_to_nodes_associated_with_executor_.erase(group_ptr); - }); - } - has_invalid_weak_groups_or_nodes = - memory_strategy_->collect_entities(weak_groups_associated_with_executor_to_nodes_); - // Clean up any invalid nodes, if they were detected - if (has_invalid_weak_groups_or_nodes) { - std::vector invalid_group_ptrs; - for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { - auto & weak_group_ptr = pair.first; - const auto & weak_node_ptr = pair.second; - if (weak_group_ptr.expired() || weak_node_ptr.expired()) { - invalid_group_ptrs.push_back(weak_group_ptr); - } - } - std::for_each( - invalid_group_ptrs.begin(), invalid_group_ptrs.end(), - [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { - weak_groups_associated_with_executor_to_nodes_.erase(group_ptr); - }); - } - - // Add the static executor waitable to the memory strategy - memory_strategy_->add_waitable_handle(this->shared_from_this()); -} - -void -StaticExecutorEntitiesCollector::fill_executable_list() -{ - exec_list_.clear(); - add_callback_groups_from_nodes_associated_to_executor(); - fill_executable_list_from_map(weak_groups_associated_with_executor_to_nodes_); - fill_executable_list_from_map(weak_groups_to_nodes_associated_with_executor_); - // Add the executor's waitable to the executable list - exec_list_.add_waitable(shared_from_this()); -} -void -StaticExecutorEntitiesCollector::fill_executable_list_from_map( - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) -{ - for (const auto & pair : weak_groups_to_nodes) { - auto group = pair.first.lock(); - auto node = pair.second.lock(); - if (!node || !group || !group->can_be_taken_from().load()) { - continue; - } - group->find_timer_ptrs_if( - [this](const rclcpp::TimerBase::SharedPtr & timer) { - if (timer) { - exec_list_.add_timer(timer); - } - return false; - }); - group->find_subscription_ptrs_if( - [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { - if (subscription) { - exec_list_.add_subscription(subscription); - } - return false; - }); - group->find_service_ptrs_if( - [this](const rclcpp::ServiceBase::SharedPtr & service) { - if (service) { - exec_list_.add_service(service); - } - return false; - }); - group->find_client_ptrs_if( - [this](const rclcpp::ClientBase::SharedPtr & client) { - if (client) { - exec_list_.add_client(client); - } - return false; - }); - group->find_waitable_ptrs_if( - [this](const rclcpp::Waitable::SharedPtr & waitable) { - if (waitable) { - exec_list_.add_waitable(waitable); - } - return false; - }); - } -} - -void -StaticExecutorEntitiesCollector::prepare_wait_set() -{ - // clear wait set - if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { - throw std::runtime_error("Couldn't clear wait set"); - } - - // The size of waitables are accounted for in size of the other entities - rcl_ret_t ret = rcl_wait_set_resize( - p_wait_set_, memory_strategy_->number_of_ready_subscriptions(), - memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), - memory_strategy_->number_of_ready_events()); - - if (RCL_RET_OK != ret) { - throw std::runtime_error( - std::string("Couldn't resize the wait set: ") + rcl_get_error_string().str); - } -} - -void -StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout) -{ - // clear wait set (memset to '0' all wait_set_ entities - // but keeps the wait_set_ number of entities) - if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { - throw std::runtime_error("Couldn't clear wait set"); - } - - if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) { - throw std::runtime_error("Couldn't fill wait set"); - } - - rcl_ret_t status = - rcl_wait(p_wait_set_, std::chrono::duration_cast(timeout).count()); - - if (status == RCL_RET_WAIT_SET_EMPTY) { - RCUTILS_LOG_WARN_NAMED( - "rclcpp", - "empty wait set received in rcl_wait(). This should never happen."); - } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { - using rclcpp::exceptions::throw_from_rcl_error; - throw_from_rcl_error(status, "rcl_wait() failed"); - } -} - -void -StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set) -{ - // Add waitable guard conditions (one for each registered node) into the wait set. - for (const auto & pair : weak_nodes_to_guard_conditions_) { - auto & gc = pair.second; - detail::add_guard_condition_to_rcl_wait_set(*wait_set, *gc); - } -} - -size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions() -{ - std::lock_guard guard{new_nodes_mutex_}; - return weak_nodes_to_guard_conditions_.size() + new_nodes_.size(); -} - -bool -StaticExecutorEntitiesCollector::add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) -{ - bool is_new_node = false; - // If the node already has an executor - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error("Node has already been added to an executor."); - } - node_ptr->for_each_callback_group( - [this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr) - { - if ( - !group_ptr->get_associated_with_executor_atomic().load() && - group_ptr->automatically_add_to_executor_with_node()) - { - is_new_node = (add_callback_group( - group_ptr, - node_ptr, - weak_groups_to_nodes_associated_with_executor_) || - is_new_node); - } - }); - weak_nodes_.push_back(node_ptr); - return is_new_node; -} - -bool -StaticExecutorEntitiesCollector::add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) -{ - // If the callback_group already has an executor - std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error("Callback group has already been added to an executor."); - } - bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && - !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto insert_info = weak_groups_to_nodes.insert( - std::make_pair(weak_group_ptr, node_ptr)); - bool was_inserted = insert_info.second; - if (!was_inserted) { - throw std::runtime_error("Callback group was already added to executor."); - } - if (is_new_node) { - std::lock_guard guard{new_nodes_mutex_}; - new_nodes_.push_back(node_ptr); - return true; - } - return false; -} - -bool -StaticExecutorEntitiesCollector::add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) -{ - return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_); -} - -bool -StaticExecutorEntitiesCollector::remove_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr) -{ - return this->remove_callback_group_from_map( - group_ptr, - weak_groups_associated_with_executor_to_nodes_); -} - -bool -StaticExecutorEntitiesCollector::remove_callback_group_from_map( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) -{ - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; - rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; - auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter != weak_groups_to_nodes.end()) { - node_ptr = iter->second.lock(); - if (node_ptr == nullptr) { - throw std::runtime_error("Node must not be deleted before its callback group(s)."); - } - weak_groups_to_nodes.erase(iter); - } else { - throw std::runtime_error("Callback group needs to be associated with executor."); - } - // If the node was matched and removed, interrupt waiting. - if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && - !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_)) - { - rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr); - weak_nodes_to_guard_conditions_.erase(node_weak_ptr); - return true; - } - return false; -} - -bool -StaticExecutorEntitiesCollector::remove_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) -{ - if (!node_ptr->get_associated_with_executor_atomic().load()) { - return false; - } - bool node_found = false; - auto node_it = weak_nodes_.begin(); - while (node_it != weak_nodes_.end()) { - bool matched = (node_it->lock() == node_ptr); - if (matched) { - weak_nodes_.erase(node_it); - node_found = true; - break; - } - ++node_it; - } - if (!node_found) { - return false; - } - std::vector found_group_ptrs; - std::for_each( - weak_groups_to_nodes_associated_with_executor_.begin(), - weak_groups_to_nodes_associated_with_executor_.end(), - [&found_group_ptrs, node_ptr](std::pair key_value_pair) { - auto & weak_node_ptr = key_value_pair.second; - auto shared_node_ptr = weak_node_ptr.lock(); - auto group_ptr = key_value_pair.first.lock(); - if (shared_node_ptr == node_ptr) { - found_group_ptrs.push_back(group_ptr); - } - }); - std::for_each( - found_group_ptrs.begin(), found_group_ptrs.end(), [this] - (rclcpp::CallbackGroup::SharedPtr group_ptr) { - this->remove_callback_group_from_map( - group_ptr, - weak_groups_to_nodes_associated_with_executor_); - }); - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); - return true; -} - -bool -StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) -{ - // Check wait_set guard_conditions for added/removed entities to/from a node - for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) { - if (p_wait_set->guard_conditions[i] != NULL) { - auto found_guard_condition = std::find_if( - weak_nodes_to_guard_conditions_.begin(), weak_nodes_to_guard_conditions_.end(), - [&](std::pair pair) -> bool { - const rcl_guard_condition_t & rcl_gc = pair.second->get_rcl_guard_condition(); - return &rcl_gc == p_wait_set->guard_conditions[i]; - }); - if (found_guard_condition != weak_nodes_to_guard_conditions_.end()) { - return true; - } - } - } - // None of the guard conditions triggered belong to a registered node - return false; -} - -// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. -bool -StaticExecutorEntitiesCollector::has_node( - const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & - weak_groups_to_nodes) const -{ - return std::find_if( - weak_groups_to_nodes.begin(), - weak_groups_to_nodes.end(), - [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { - auto other_ptr = other.second.lock(); - return other_ptr == node_ptr; - }) != weak_groups_to_nodes.end(); -} - -void -StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor() -{ - for (const auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node) { - node->for_each_callback_group( - [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) - { - if (shared_group_ptr->automatically_add_to_executor_with_node() && - !shared_group_ptr->get_associated_with_executor_atomic().load()) - { - add_callback_group( - shared_group_ptr, - node, - weak_groups_to_nodes_associated_with_executor_); - } - }); - } - } -} - -std::vector -StaticExecutorEntitiesCollector::get_all_callback_groups() -{ - std::vector groups; - for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; -} - -std::vector -StaticExecutorEntitiesCollector::get_manually_added_callback_groups() -{ - std::vector groups; - for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { - groups.push_back(group_node_ptr.first); - } - return groups; -} - -std::vector -StaticExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes() -{ - std::vector groups; - for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { - groups.push_back(group_node_ptr.first); - } - return groups; -} diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp deleted file mode 100644 index 209fcde556..0000000000 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2019 Nobleo Technology -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rclcpp/executors/static_single_threaded_executor.hpp" - -#include -#include -#include -#include - -#include "rcpputils/scope_exit.hpp" - -using rclcpp::executors::StaticSingleThreadedExecutor; -using rclcpp::experimental::ExecutableList; - -StaticSingleThreadedExecutor::StaticSingleThreadedExecutor( - const rclcpp::ExecutorOptions & options) -: rclcpp::Executor(options) -{ - entities_collector_ = std::make_shared(); -} - -StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() -{ - if (entities_collector_->is_init()) { - entities_collector_->fini(); - } -} - -void -StaticSingleThreadedExecutor::spin() -{ - if (spinning.exchange(true)) { - throw std::runtime_error("spin() called while already spinning"); - } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - - // Set memory_strategy_ and exec_list_ based on weak_nodes_ - // Prepare wait_set_ based on memory_strategy_ - entities_collector_->init(&wait_set_, memory_strategy_); - - while (rclcpp::ok(this->context_) && spinning.load()) { - // Refresh wait set and wait for work - entities_collector_->refresh_wait_set(); - execute_ready_executables(); - } -} - -void -StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration) -{ - // In this context a 0 input max_duration means no duration limit - if (std::chrono::nanoseconds(0) == max_duration) { - max_duration = std::chrono::nanoseconds::max(); - } - - return this->spin_some_impl(max_duration, false); -} - -void -StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration) -{ - if (max_duration < std::chrono::nanoseconds(0)) { - throw std::invalid_argument("max_duration must be greater than or equal to 0"); - } - return this->spin_some_impl(max_duration, true); -} - -void -StaticSingleThreadedExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) -{ - // Make sure the entities collector has been initialized - if (!entities_collector_->is_init()) { - entities_collector_->init(&wait_set_, memory_strategy_); - } - - auto start = std::chrono::steady_clock::now(); - auto max_duration_not_elapsed = [max_duration, start]() { - if (std::chrono::nanoseconds(0) == max_duration) { - // told to spin forever if need be - return true; - } else if (std::chrono::steady_clock::now() - start < max_duration) { - // told to spin only for some maximum amount of time - return true; - } - // spun too long - return false; - }; - - if (spinning.exchange(true)) { - throw std::runtime_error("spin_some() called while already spinning"); - } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - - while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { - // Get executables that are ready now - entities_collector_->refresh_wait_set(std::chrono::milliseconds::zero()); - // Execute ready executables - bool work_available = execute_ready_executables(); - if (!work_available || !exhaustive) { - break; - } - } -} - -void -StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) -{ - // Make sure the entities collector has been initialized - if (!entities_collector_->is_init()) { - entities_collector_->init(&wait_set_, memory_strategy_); - } - - if (rclcpp::ok(context_) && spinning.load()) { - // Wait until we have a ready entity or timeout expired - entities_collector_->refresh_wait_set(timeout); - // Execute ready executables - execute_ready_executables(true); - } -} - -void -StaticSingleThreadedExecutor::add_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - bool notify) -{ - bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr); - if (is_new_node && notify) { - // Interrupt waiting to handle new node - interrupt_guard_condition_.trigger(); - } -} - -void -StaticSingleThreadedExecutor::add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) -{ - bool is_new_node = entities_collector_->add_node(node_ptr); - if (is_new_node && notify) { - // Interrupt waiting to handle new node - interrupt_guard_condition_.trigger(); - } -} - -void -StaticSingleThreadedExecutor::add_node(std::shared_ptr node_ptr, bool notify) -{ - this->add_node(node_ptr->get_node_base_interface(), notify); -} - -void -StaticSingleThreadedExecutor::remove_callback_group( - rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) -{ - bool node_removed = entities_collector_->remove_callback_group(group_ptr); - // If the node was matched and removed, interrupt waiting - if (node_removed && notify) { - interrupt_guard_condition_.trigger(); - } -} - -void -StaticSingleThreadedExecutor::remove_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) -{ - bool node_removed = entities_collector_->remove_node(node_ptr); - if (!node_removed) { - throw std::runtime_error("Node needs to be associated with this executor."); - } - // If the node was matched and removed, interrupt waiting - if (notify) { - interrupt_guard_condition_.trigger(); - } -} - -std::vector -StaticSingleThreadedExecutor::get_all_callback_groups() -{ - return entities_collector_->get_all_callback_groups(); -} - -std::vector -StaticSingleThreadedExecutor::get_manually_added_callback_groups() -{ - return entities_collector_->get_manually_added_callback_groups(); -} - -std::vector -StaticSingleThreadedExecutor::get_automatically_added_callback_groups_from_nodes() -{ - return entities_collector_->get_automatically_added_callback_groups_from_nodes(); -} - -void -StaticSingleThreadedExecutor::remove_node(std::shared_ptr node_ptr, bool notify) -{ - this->remove_node(node_ptr->get_node_base_interface(), notify); -} - -bool -StaticSingleThreadedExecutor::execute_ready_executables(bool spin_once) -{ - bool any_ready_executable = false; - - // Execute all the ready subscriptions - for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { - if (i < entities_collector_->get_number_of_subscriptions()) { - if (wait_set_.subscriptions[i]) { - execute_subscription(entities_collector_->get_subscription(i)); - if (spin_once) { - return true; - } - any_ready_executable = true; - } - } - } - // Execute all the ready timers - for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { - if (i < entities_collector_->get_number_of_timers()) { - if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) { - auto timer = entities_collector_->get_timer(i); - timer->call(); - execute_timer(std::move(timer)); - if (spin_once) { - return true; - } - any_ready_executable = true; - } - } - } - // Execute all the ready services - for (size_t i = 0; i < wait_set_.size_of_services; ++i) { - if (i < entities_collector_->get_number_of_services()) { - if (wait_set_.services[i]) { - execute_service(entities_collector_->get_service(i)); - if (spin_once) { - return true; - } - any_ready_executable = true; - } - } - } - // Execute all the ready clients - for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { - if (i < entities_collector_->get_number_of_clients()) { - if (wait_set_.clients[i]) { - execute_client(entities_collector_->get_client(i)); - if (spin_once) { - return true; - } - any_ready_executable = true; - } - } - } - // Execute all the ready waitables - for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) { - auto waitable = entities_collector_->get_waitable(i); - if (waitable->is_ready(&wait_set_)) { - auto data = waitable->take_data(); - waitable->execute(data); - if (spin_once) { - return true; - } - any_ready_executable = true; - } - } - return any_ready_executable; -} diff --git a/rclcpp/test/benchmark/benchmark_executor.cpp b/rclcpp/test/benchmark/benchmark_executor.cpp index 652007b589..c2b42752f2 100644 --- a/rclcpp/test/benchmark/benchmark_executor.cpp +++ b/rclcpp/test/benchmark/benchmark_executor.cpp @@ -189,71 +189,6 @@ BENCHMARK_F(PerformanceTestExecutorSimple, multi_thread_executor_remove_node)(be } } -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_add_node)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - for (auto _ : st) { - (void)_; - executor.add_node(node); - st.PauseTiming(); - executor.remove_node(node); - st.ResumeTiming(); - } -} - -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_remove_node)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - for (auto _ : st) { - (void)_; - st.PauseTiming(); - executor.add_node(node); - st.ResumeTiming(); - executor.remove_node(node); - } -} - -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_spin_until_future_complete)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - // test success of an immediately finishing future - std::promise promise; - std::future future = promise.get_future(); - promise.set_value(true); - auto shared_future = future.share(); - - auto ret = executor.spin_until_future_complete(shared_future, 100ms); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - } - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - // static_single_thread_executor has a special design. We need to add/remove the node each - // time you call spin - st.PauseTiming(); - executor.add_node(node); - st.ResumeTiming(); - - ret = executor.spin_until_future_complete(shared_future, 100ms); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - break; - } - st.PauseTiming(); - executor.remove_node(node); - st.ResumeTiming(); - } -} - BENCHMARK_F( PerformanceTestExecutorSimple, single_thread_executor_spin_node_until_future_complete)(benchmark::State & st) @@ -314,29 +249,6 @@ BENCHMARK_F( } } -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_single_thread_executor_spin_node_until_future_complete)(benchmark::State & st) -{ - rclcpp::executors::StaticSingleThreadedExecutor executor; - // test success of an immediately finishing future - std::promise promise; - std::future future = promise.get_future(); - promise.set_value(true); - auto shared_future = future.share(); - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - auto ret = rclcpp::executors::spin_node_until_future_complete( - executor, node, shared_future, 1s); - if (ret != rclcpp::FutureReturnCode::SUCCESS) { - st.SkipWithError(rcutils_get_error_string().str); - break; - } - } -} BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark::State & st) { @@ -362,42 +274,3 @@ BENCHMARK_F(PerformanceTestExecutorSimple, spin_until_future_complete)(benchmark } } } - -BENCHMARK_F( - PerformanceTestExecutorSimple, - static_executor_entities_collector_execute)(benchmark::State & st) -{ - rclcpp::executors::StaticExecutorEntitiesCollector::SharedPtr entities_collector_ = - std::make_shared(); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - rcl_ret_t ret = rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator); - if (ret != RCL_RET_OK) { - st.SkipWithError(rcutils_get_error_string().str); - } - RCPPUTILS_SCOPE_EXIT( - { - rcl_ret_t ret = rcl_wait_set_fini(&wait_set); - if (ret != RCL_RET_OK) { - st.SkipWithError(rcutils_get_error_string().str); - } - }); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - reset_heap_counters(); - - for (auto _ : st) { - (void)_; - std::shared_ptr data = entities_collector_->take_data(); - entities_collector_->execute(data); - } -} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 256ddc65eb..b822e6e24f 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -663,14 +663,6 @@ if(TARGET test_executors) target_link_libraries(test_executors ${PROJECT_NAME}) endif() -ament_add_gtest(test_static_single_threaded_executor executors/test_static_single_threaded_executor.cpp - APPEND_LIBRARY_DIRS "${append_library_dirs}") -if(TARGET test_static_single_threaded_executor) - ament_target_dependencies(test_static_single_threaded_executor - "test_msgs") - target_link_libraries(test_static_single_threaded_executor ${PROJECT_NAME} mimick) -endif() - ament_add_gtest(test_multi_threaded_executor executors/test_multi_threaded_executor.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_multi_threaded_executor) @@ -679,13 +671,13 @@ if(TARGET test_multi_threaded_executor) target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME}) endif() -ament_add_gtest(test_static_executor_entities_collector executors/test_static_executor_entities_collector.cpp +ament_add_gtest(test_entities_collector executors/test_entities_collector.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) -if(TARGET test_static_executor_entities_collector) - ament_target_dependencies(test_static_executor_entities_collector +if(TARGET test_entities_collector) + ament_target_dependencies(test_entities_collector "rcl" "test_msgs") - target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick) + target_link_libraries(test_entities_collector ${PROJECT_NAME} mimick) endif() ament_add_gtest(test_executor_notify_waitable executors/test_executor_notify_waitable.cpp diff --git a/rclcpp/test/rclcpp/executors/test_entities_collector.cpp b/rclcpp/test/rclcpp/executors/test_entities_collector.cpp new file mode 100644 index 0000000000..defa647999 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_entities_collector.cpp @@ -0,0 +1,185 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/executor_entities_collector.hpp" + +#include "../../utils/rclcpp_gtest_macros.hpp" + +class TestExecutorEntitiesCollector : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestExecutorEntitiesCollector, add_remove_node) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node1 = std::make_shared("node1", "ns"); + auto node2 = std::make_shared("node2", "ns"); + + // Add a node + EXPECT_NO_THROW(entities_collector.add_node(node1->get_node_base_interface())); + + // Add the same node a second time + RCLCPP_EXPECT_THROW_EQ( + entities_collector.add_node(node1->get_node_base_interface()), + std::runtime_error("Node '/ns/node1' has already been added to an executor.")); + + // Remove a node before adding + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_node(node2->get_node_base_interface()), + std::runtime_error("Node needs to be associated with an executor.")); + + // Simulate node being associated somewhere else + auto & has_executor = node2->get_node_base_interface()->get_associated_with_executor_atomic(); + has_executor.store(true); + + // Add an already-associated node + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_node(node2->get_node_base_interface()), + std::runtime_error("Node needs to be associated with this executor.")); + + has_executor.store(false); + + // Add the now-disassociated node + EXPECT_NO_THROW(entities_collector.add_node(node2->get_node_base_interface())); + + // Remove an existing node + EXPECT_NO_THROW(entities_collector.remove_node(node1->get_node_base_interface())); + // Remove the same node a second time + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_node(node1->get_node_base_interface()), + std::runtime_error("Node needs to be associated with an executor.")); + + // Remove an existing node + EXPECT_NO_THROW(entities_collector.remove_node(node2->get_node_base_interface())); +} + +TEST_F(TestExecutorEntitiesCollector, add_callback_group) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); +} + +TEST_F(TestExecutorEntitiesCollector, add_node_default_callback_group) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node = std::make_shared("node1", "ns"); + entities_collector.add_node(node->get_node_base_interface()); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 1u); +} + +TEST_F(TestExecutorEntitiesCollector, add_callback_group_after_add_node) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_node(node->get_node_base_interface()); + RCLCPP_EXPECT_THROW_EQ( + entities_collector.add_callback_group(cb_group), + std::runtime_error("Callback group has already been added to an executor.")); +} + +TEST_F(TestExecutorEntitiesCollector, add_callback_group_twice) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + cb_group->get_associated_with_executor_atomic().exchange(false); + RCLCPP_EXPECT_THROW_EQ( + entities_collector.add_callback_group(cb_group), + std::runtime_error("Callback group has already been added to this executor.")); +} + +TEST_F(TestExecutorEntitiesCollector, remove_callback_group_after_node) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + node.reset(); + + ASSERT_FALSE(cb_group->has_valid_node()); + + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_callback_group(cb_group), + std::runtime_error("Node must not be deleted before its callback group(s).")); +} + +TEST_F(TestExecutorEntitiesCollector, remove_callback_group_twice) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.remove_callback_group(cb_group); + + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_callback_group(cb_group), + std::runtime_error("Callback group needs to be associated with executor.")); +} + +TEST_F(TestExecutorEntitiesCollector, remove_node_opposite_order) { + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(); + + auto node1 = std::make_shared("node1", "ns"); + EXPECT_NO_THROW(entities_collector.add_node(node1->get_node_base_interface())); + + auto node2 = std::make_shared("node2", "ns"); + EXPECT_NO_THROW(entities_collector.add_node(node2->get_node_base_interface())); + + EXPECT_NO_THROW(entities_collector.remove_node(node2->get_node_base_interface())); +} diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1fa2cbb4dd..54fb9af6fc 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -91,8 +91,7 @@ class TestExecutorsStable : public TestExecutors {}; using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, - rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::MultiThreadedExecutor>; class ExecutorTypeNames { @@ -109,10 +108,6 @@ class ExecutorTypeNames return "MultiThreadedExecutor"; } - if (std::is_same()) { - return "StaticSingleThreadedExecutor"; - } - return ""; } }; diff --git a/rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp b/rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp deleted file mode 100644 index 1ea91029f4..0000000000 --- a/rclcpp/test/rclcpp/executors/test_static_executor_entities_collector.cpp +++ /dev/null @@ -1,625 +0,0 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include -#include -#include - -#include "rclcpp/rclcpp.hpp" - -#include "rcpputils/scope_exit.hpp" - -#include "test_msgs/msg/empty.hpp" -#include "test_msgs/srv/empty.hpp" - -#include "../../mocking_utils/patch.hpp" -#include "../../utils/rclcpp_gtest_macros.hpp" - -namespace -{ - -struct NumberOfEntities -{ - size_t subscriptions = 0; - size_t timers = 0; - size_t services = 0; - size_t clients = 0; - size_t waitables = 0; -}; - -std::unique_ptr get_number_of_default_entities(rclcpp::Node::SharedPtr node) -{ - auto number_of_entities = std::make_unique(); - node->for_each_callback_group( - [&number_of_entities](rclcpp::CallbackGroup::SharedPtr group) - { - if (!group->can_be_taken_from().load()) { - return; - } - group->find_subscription_ptrs_if( - [&number_of_entities](rclcpp::SubscriptionBase::SharedPtr &) - { - number_of_entities->subscriptions++; return false; - }); - group->find_timer_ptrs_if( - [&number_of_entities](rclcpp::TimerBase::SharedPtr &) - { - number_of_entities->timers++; return false; - }); - group->find_service_ptrs_if( - [&number_of_entities](rclcpp::ServiceBase::SharedPtr &) - { - number_of_entities->services++; return false; - }); - group->find_client_ptrs_if( - [&number_of_entities](rclcpp::ClientBase::SharedPtr &) - { - number_of_entities->clients++; return false; - }); - group->find_waitable_ptrs_if( - [&number_of_entities](rclcpp::Waitable::SharedPtr &) - { - number_of_entities->waitables++; return false; - }); - }); - - return number_of_entities; -} - -} // namespace - -class TestStaticExecutorEntitiesCollector : public ::testing::Test -{ -public: - void SetUp() - { - rclcpp::init(0, nullptr); - entities_collector_ = - std::make_shared(); - } - - void TearDown() - { - rclcpp::shutdown(); - } - - rclcpp::executors::StaticExecutorEntitiesCollector::SharedPtr entities_collector_; -}; - -TEST_F(TestStaticExecutorEntitiesCollector, construct_destruct) { - EXPECT_EQ(0u, entities_collector_->get_number_of_subscriptions()); - EXPECT_EQ(0u, entities_collector_->get_number_of_timers()); - EXPECT_EQ(0u, entities_collector_->get_number_of_services()); - EXPECT_EQ(0u, entities_collector_->get_number_of_clients()); - EXPECT_EQ(0u, entities_collector_->get_number_of_waitables()); -} - -TEST_F(TestStaticExecutorEntitiesCollector, add_remove_node) { - auto node1 = std::make_shared("node1", "ns"); - EXPECT_NO_THROW(entities_collector_->add_node(node1->get_node_base_interface())); - - // Check adding second time - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->add_node(node1->get_node_base_interface()), - std::runtime_error("Node has already been added to an executor.")); - - auto node2 = std::make_shared("node2", "ns"); - EXPECT_FALSE(entities_collector_->remove_node(node2->get_node_base_interface())); - EXPECT_NO_THROW(entities_collector_->add_node(node2->get_node_base_interface())); - - EXPECT_TRUE(entities_collector_->remove_node(node1->get_node_base_interface())); - EXPECT_FALSE(entities_collector_->remove_node(node1->get_node_base_interface())); - EXPECT_TRUE(entities_collector_->remove_node(node2->get_node_base_interface())); - - auto node3 = std::make_shared("node3", "ns"); - node3->get_node_base_interface()->get_associated_with_executor_atomic().exchange(true); - EXPECT_FALSE(entities_collector_->remove_node(node3->get_node_base_interface())); -} - -TEST_F(TestStaticExecutorEntitiesCollector, init_bad_arguments) { - auto node = std::make_shared("node", "ns"); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - rclcpp::GuardCondition guard_condition(shared_context); - - // Check memory strategy is nullptr - rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy = nullptr; - EXPECT_THROW( - entities_collector_->init(&wait_set, memory_strategy), - std::runtime_error); -} - -TEST_F(TestStaticExecutorEntitiesCollector, add_remove_basic_node) { - auto node = std::make_shared("node", "ns"); - const auto expected_number_of_entities = get_number_of_default_entities(node); - EXPECT_NE(nullptr, expected_number_of_entities); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - EXPECT_EQ( - expected_number_of_entities->subscriptions, - entities_collector_->get_number_of_subscriptions()); - EXPECT_EQ(expected_number_of_entities->timers, entities_collector_->get_number_of_timers()); - EXPECT_EQ(expected_number_of_entities->services, entities_collector_->get_number_of_services()); - EXPECT_EQ(expected_number_of_entities->clients, entities_collector_->get_number_of_clients()); - // One extra for the executor - EXPECT_EQ( - 1u + expected_number_of_entities->waitables, - entities_collector_->get_number_of_waitables()); - - EXPECT_TRUE(entities_collector_->remove_node(node->get_node_base_interface())); - entities_collector_->init(&wait_set, memory_strategy); - EXPECT_EQ(0u, entities_collector_->get_number_of_subscriptions()); - EXPECT_EQ(0u, entities_collector_->get_number_of_timers()); - EXPECT_EQ(0u, entities_collector_->get_number_of_services()); - EXPECT_EQ(0u, entities_collector_->get_number_of_clients()); - - // Still one for the executor - EXPECT_EQ(1u, entities_collector_->get_number_of_waitables()); -} - -TEST_F(TestStaticExecutorEntitiesCollector, add_remove_node_out_of_scope) { - rclcpp::Context::SharedPtr shared_context = nullptr; - { - auto node1 = std::make_shared("node1", "ns"); - auto node2 = std::make_shared("node2", "ns"); - auto node3 = std::make_shared("node3", "ns"); - entities_collector_->add_node(node1->get_node_base_interface()); - entities_collector_->add_node(node2->get_node_base_interface()); - entities_collector_->add_node(node3->get_node_base_interface()); - shared_context = node1->get_node_base_interface()->get_context(); - } - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - // Expect weak_node pointers to be cleaned up and used - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - EXPECT_EQ(0u, entities_collector_->get_number_of_subscriptions()); - EXPECT_EQ(0u, entities_collector_->get_number_of_timers()); - EXPECT_EQ(0u, entities_collector_->get_number_of_services()); - EXPECT_EQ(0u, entities_collector_->get_number_of_clients()); - - // Still one for the executor - EXPECT_EQ(1u, entities_collector_->get_number_of_waitables()); -} - -class TestWaitable : public rclcpp::Waitable -{ -public: - void add_to_wait_set(rcl_wait_set_t *) override {} - - bool is_ready(rcl_wait_set_t *) override {return true;} - - std::shared_ptr - take_data() override - { - return nullptr; - } - void - execute(std::shared_ptr & data) override - { - (void) data; - } -}; - -TEST_F(TestStaticExecutorEntitiesCollector, add_remove_node_with_entities) { - auto node = std::make_shared("node", "ns"); - auto expected_number_of_entities = get_number_of_default_entities(node); - EXPECT_NE(nullptr, expected_number_of_entities); - - // Create 1 of each entity type - auto subscription = - node->create_subscription( - "topic", rclcpp::QoS(10), [](test_msgs::msg::Empty::ConstSharedPtr) {}); - auto timer = - node->create_wall_timer(std::chrono::seconds(60), []() {}); - auto service = - node->create_service( - "service", - []( - const test_msgs::srv::Empty::Request::SharedPtr, - test_msgs::srv::Empty::Response::SharedPtr) {}); - auto client = node->create_client("service"); - auto waitable = std::make_shared(); - - // Adding a subscription could add another waitable, so we need to get the - // current number of waitables just before adding the new waitable. - expected_number_of_entities->waitables = get_number_of_default_entities(node)->waitables; - node->get_node_waitables_interface()->add_waitable(waitable, nullptr); - - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - EXPECT_EQ( - 1u + expected_number_of_entities->subscriptions, - entities_collector_->get_number_of_subscriptions()); - EXPECT_EQ(1u + expected_number_of_entities->timers, entities_collector_->get_number_of_timers()); - EXPECT_EQ( - 1u + expected_number_of_entities->services, - entities_collector_->get_number_of_services()); - EXPECT_EQ( - 1u + expected_number_of_entities->clients, - entities_collector_->get_number_of_clients()); - - // One extra for the executor - EXPECT_EQ( - 2u + expected_number_of_entities->waitables, - entities_collector_->get_number_of_waitables()); - - entities_collector_->remove_node(node->get_node_base_interface()); - entities_collector_->init(&wait_set, memory_strategy); - EXPECT_EQ(0u, entities_collector_->get_number_of_subscriptions()); - EXPECT_EQ(0u, entities_collector_->get_number_of_timers()); - EXPECT_EQ(0u, entities_collector_->get_number_of_services()); - EXPECT_EQ(0u, entities_collector_->get_number_of_clients()); - // Still one for the executor - EXPECT_EQ(1u, entities_collector_->get_number_of_waitables()); -} - -TEST_F(TestStaticExecutorEntitiesCollector, add_callback_group) { - auto node = std::make_shared("node1", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); - ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); -} - -TEST_F(TestStaticExecutorEntitiesCollector, add_callback_group_after_add_node) { - auto node = std::make_shared("node1", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - entities_collector_->add_node(node->get_node_base_interface()); - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()), - std::runtime_error("Callback group has already been added to an executor.")); -} - -TEST_F(TestStaticExecutorEntitiesCollector, add_callback_group_twice) { - auto node = std::make_shared("node1", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); - ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); - cb_group->get_associated_with_executor_atomic().exchange(false); - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()), - std::runtime_error("Callback group was already added to executor.")); -} - -TEST_F(TestStaticExecutorEntitiesCollector, prepare_wait_set_rcl_wait_set_clear_error) { - auto node = std::make_shared("node", "ns"); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - { - auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_clear, RCL_RET_ERROR); - std::shared_ptr data = entities_collector_->take_data(); - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->execute(data), - std::runtime_error("Couldn't clear wait set")); - } - - EXPECT_TRUE(entities_collector_->remove_node(node->get_node_base_interface())); -} - -TEST_F(TestStaticExecutorEntitiesCollector, prepare_wait_set_rcl_wait_set_resize_error) { - auto node = std::make_shared("node", "ns"); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - { - auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait_set_resize, RCL_RET_ERROR); - std::shared_ptr data = entities_collector_->take_data(); - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->execute(data), - std::runtime_error("Couldn't resize the wait set: error not set")); - } - - EXPECT_TRUE(entities_collector_->remove_node(node->get_node_base_interface())); -} - -TEST_F(TestStaticExecutorEntitiesCollector, refresh_wait_set_not_initialized) { - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->refresh_wait_set(std::chrono::nanoseconds(1000)), - std::runtime_error("Couldn't clear wait set")); - rcl_reset_error(); -} - -TEST_F(TestStaticExecutorEntitiesCollector, refresh_wait_set_rcl_wait_failed) { - auto node = std::make_shared("node", "ns"); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - { - auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_wait, RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->refresh_wait_set(std::chrono::nanoseconds(1000)), - std::runtime_error("rcl_wait() failed: error not set")); - } - - EXPECT_TRUE(entities_collector_->remove_node(node->get_node_base_interface())); -} - -TEST_F(TestStaticExecutorEntitiesCollector, refresh_wait_set_add_handles_to_wait_set_failed) { - auto node = std::make_shared("node", "ns"); - - // Create 1 of each entity type - auto subscription = - node->create_subscription( - "topic", rclcpp::QoS(10), [](test_msgs::msg::Empty::ConstSharedPtr) {}); - auto timer = - node->create_wall_timer(std::chrono::seconds(60), []() {}); - auto service = - node->create_service( - "service", - []( - const test_msgs::srv::Empty::Request::SharedPtr, - test_msgs::srv::Empty::Response::SharedPtr) {}); - auto client = node->create_client("service"); - auto waitable = std::make_shared(); - - node->get_node_waitables_interface()->add_waitable(waitable, nullptr); - - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_wait_set_add_subscription, - RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->refresh_wait_set(std::chrono::nanoseconds(1000)), - std::runtime_error("Couldn't fill wait set")); - } - - entities_collector_->remove_node(node->get_node_base_interface()); -} - -TEST_F(TestStaticExecutorEntitiesCollector, add_to_wait_set_nullptr) { - auto node = std::make_shared("node", "ns"); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - EXPECT_THROW( - entities_collector_->add_to_wait_set(nullptr), - std::invalid_argument); - rcl_reset_error(); - - EXPECT_TRUE(entities_collector_->remove_node(node->get_node_base_interface())); -} - -TEST_F(TestStaticExecutorEntitiesCollector, fill_memory_strategy_invalid_group) { - auto node = std::make_shared("node", "ns"); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); - ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 2u); - - cb_group.reset(); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); - - EXPECT_TRUE(entities_collector_->remove_node(node->get_node_base_interface())); -} - -TEST_F(TestStaticExecutorEntitiesCollector, remove_callback_group_after_node) { - auto node = std::make_shared("node1", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); - ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); - - node.reset(); - - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->remove_callback_group(cb_group), - std::runtime_error("Node must not be deleted before its callback group(s).")); -} - -TEST_F(TestStaticExecutorEntitiesCollector, remove_callback_group_twice) { - auto node = std::make_shared("node1", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); - ASSERT_EQ(entities_collector_->get_all_callback_groups().size(), 1u); - - entities_collector_->remove_callback_group(cb_group); - - RCLCPP_EXPECT_THROW_EQ( - entities_collector_->remove_callback_group(cb_group), - std::runtime_error("Callback group needs to be associated with executor.")); -} - -TEST_F(TestStaticExecutorEntitiesCollector, remove_node_opposite_order) { - auto node1 = std::make_shared("node1", "ns"); - EXPECT_NO_THROW(entities_collector_->add_node(node1->get_node_base_interface())); - - auto node2 = std::make_shared("node2", "ns"); - EXPECT_NO_THROW(entities_collector_->add_node(node2->get_node_base_interface())); - - EXPECT_TRUE(entities_collector_->remove_node(node2->get_node_base_interface())); -} - -TEST_F( - TestStaticExecutorEntitiesCollector, - add_callback_groups_from_nodes_associated_to_executor_add) { - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - entities_collector_->add_callback_group(cb_group, node->get_node_base_interface()); - entities_collector_->add_node(node->get_node_base_interface()); - - rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); - rcl_allocator_t allocator = rcl_get_default_allocator(); - auto shared_context = node->get_node_base_interface()->get_context(); - rcl_context_t * context = shared_context->get_rcl_context().get(); - EXPECT_EQ( - RCL_RET_OK, - rcl_wait_set_init(&wait_set, 100, 100, 100, 100, 100, 100, context, allocator)); - RCPPUTILS_SCOPE_EXIT({EXPECT_EQ(RCL_RET_OK, rcl_wait_set_fini(&wait_set));}); - - auto memory_strategy = rclcpp::memory_strategies::create_default_strategy(); - rclcpp::GuardCondition guard_condition(shared_context); - - entities_collector_->init(&wait_set, memory_strategy); - RCPPUTILS_SCOPE_EXIT(entities_collector_->fini()); - - cb_group->get_associated_with_executor_atomic().exchange(false); - std::shared_ptr data = entities_collector_->take_data(); - entities_collector_->execute(data); - - EXPECT_TRUE(entities_collector_->remove_node(node->get_node_base_interface())); -} diff --git a/rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp b/rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp deleted file mode 100644 index 5ca6c1c25a..0000000000 --- a/rclcpp/test/rclcpp/executors/test_static_single_threaded_executor.cpp +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include -#include -#include -#include - -#include "rclcpp/exceptions.hpp" -#include "rclcpp/node.hpp" -#include "rclcpp/rclcpp.hpp" -#include "rclcpp/executors.hpp" - -#include "test_msgs/srv/empty.hpp" - -#include "../../mocking_utils/patch.hpp" -#include "../../utils/rclcpp_gtest_macros.hpp" - -using namespace std::chrono_literals; - -class TestStaticSingleThreadedExecutor : public ::testing::Test -{ -public: - void SetUp() - { - rclcpp::init(0, nullptr); - } - - void TearDown() - { - rclcpp::shutdown(); - } -}; - -TEST_F(TestStaticSingleThreadedExecutor, add_callback_group_trigger_guard_failed) { - rclcpp::executors::StaticSingleThreadedExecutor executor; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - executor.add_callback_group(cb_group, node->get_node_base_interface(), true), - std::runtime_error("error not set")); - } -} - -TEST_F(TestStaticSingleThreadedExecutor, add_node_trigger_guard_failed) { - rclcpp::executors::StaticSingleThreadedExecutor executor; - auto node = std::make_shared("node", "ns"); - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - executor.add_node(node), - std::runtime_error("error not set")); - } -} - -TEST_F(TestStaticSingleThreadedExecutor, remove_callback_group_trigger_guard_failed) { - rclcpp::executors::StaticSingleThreadedExecutor executor; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - - executor.add_callback_group(cb_group, node->get_node_base_interface(), true); - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - executor.remove_callback_group(cb_group, true), - std::runtime_error("error not set")); - } -} - -TEST_F(TestStaticSingleThreadedExecutor, remove_node_failed) { - rclcpp::executors::StaticSingleThreadedExecutor executor; - auto node = std::make_shared("node", "ns"); - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - executor.remove_node(node, true), - std::runtime_error("Node needs to be associated with this executor.")); - } -} - -TEST_F(TestStaticSingleThreadedExecutor, remove_node_trigger_guard_failed) { - rclcpp::executors::StaticSingleThreadedExecutor executor; - auto node = std::make_shared("node", "ns"); - - executor.add_node(node); - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_trigger_guard_condition, RCL_RET_ERROR); - RCLCPP_EXPECT_THROW_EQ( - executor.remove_node(node, true), - std::runtime_error("error not set")); - } -} - -TEST_F(TestStaticSingleThreadedExecutor, execute_service) { - rclcpp::executors::StaticSingleThreadedExecutor executor; - auto node = std::make_shared("node", "ns"); - executor.add_node(node); - - auto service = - node->create_service( - "service", - []( - const test_msgs::srv::Empty::Request::SharedPtr, - test_msgs::srv::Empty::Response::SharedPtr) {}); - auto client = node->create_client("service"); - - std::promise promise; - std::future future = promise.get_future(); - EXPECT_EQ( - rclcpp::FutureReturnCode::TIMEOUT, - executor.spin_until_future_complete(future, std::chrono::milliseconds(1))); - - executor.remove_node(node); - executor.spin_until_future_complete(future, std::chrono::milliseconds(1)); -} diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 02fa0b7a94..c76f6e8d28 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -49,8 +49,7 @@ class TestAddCallbackGroupsToExecutor : public ::testing::Test using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, - rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::MultiThreadedExecutor>; class ExecutorTypeNames { @@ -67,10 +66,6 @@ class ExecutorTypeNames return "MultiThreadedExecutor"; } - if (std::is_same()) { - return "StaticSingleThreadedExecutor"; - } - return ""; } }; diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index bdbb0a1079..9bcff7d855 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -46,23 +46,6 @@ class DummyExecutor : public rclcpp::Executor { spin_node_once_nanoseconds(node, std::chrono::milliseconds(100)); } - - rclcpp::memory_strategy::MemoryStrategy * memory_strategy_ptr() - { - return memory_strategy_.get(); - } - - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group( - rclcpp::CallbackGroup::SharedPtr group) - { - std::lock_guard guard_{mutex_}; // only to make the TSA happy - return get_node_by_group(weak_groups_to_nodes_, group); - } - - rclcpp::CallbackGroup::SharedPtr local_get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) - { - return get_group_by_timer(timer); - } }; class TestExecutor : public ::testing::Test @@ -142,7 +125,7 @@ TEST_F(TestExecutor, add_callback_group_twice) { cb_group->get_associated_with_executor_atomic().exchange(false); RCLCPP_EXPECT_THROW_EQ( dummy.add_callback_group(cb_group, node->get_node_base_interface(), false), - std::runtime_error("Callback group was already added to executor.")); + std::runtime_error("Callback group has already been added to this executor.")); } TEST_F(TestExecutor, add_callback_group_failed_trigger_guard_condition) { @@ -328,24 +311,6 @@ TEST_F(TestExecutor, cancel_failed_trigger_guard_condition) { std::runtime_error("Failed to trigger guard condition in cancel: error not set")); } -TEST_F(TestExecutor, set_memory_strategy_nullptr) { - DummyExecutor dummy; - - RCLCPP_EXPECT_THROW_EQ( - dummy.set_memory_strategy(nullptr), - std::runtime_error("Received NULL memory strategy in executor.")); -} - -TEST_F(TestExecutor, set_memory_strategy) { - DummyExecutor dummy; - rclcpp::memory_strategy::MemoryStrategy::SharedPtr strategy = - std::make_shared< - rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); - - dummy.set_memory_strategy(strategy); - EXPECT_EQ(dummy.memory_strategy_ptr(), strategy.get()); -} - TEST_F(TestExecutor, spin_once_failed_trigger_guard_condition) { DummyExecutor dummy; auto node = std::make_shared("node", "ns"); @@ -417,71 +382,6 @@ TEST_F(TestExecutor, spin_some_fail_wait) { std::runtime_error("rcl_wait() failed: error not set")); } -TEST_F(TestExecutor, get_node_by_group_null_group) { - DummyExecutor dummy; - ASSERT_EQ(nullptr, dummy.local_get_node_by_group(nullptr)); -} - -TEST_F(TestExecutor, get_node_by_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - dummy.add_callback_group(cb_group, node->get_node_base_interface(), false); - ASSERT_EQ(node->get_node_base_interface().get(), dummy.local_get_node_by_group(cb_group).get()); -} - -TEST_F(TestExecutor, get_node_by_group_not_found) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - ASSERT_EQ(nullptr, dummy.local_get_node_by_group(cb_group).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_nullptr) { - DummyExecutor dummy; - ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(nullptr)); -} - -TEST_F(TestExecutor, get_group_by_timer) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_node(node); - - ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_with_deleted_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_node(node); - - cb_group.reset(); - - ASSERT_EQ(nullptr, dummy.local_get_group_by_timer(timer).get()); -} - -TEST_F(TestExecutor, get_group_by_timer_add_callback_group) { - DummyExecutor dummy; - auto node = std::make_shared("node", "ns"); - rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = - node->create_wall_timer(std::chrono::milliseconds(1), [&]() {}, cb_group); - dummy.add_callback_group(cb_group, node->get_node_base_interface(), false); - - ASSERT_EQ(cb_group.get(), dummy.local_get_group_by_timer(timer).get()); -} - TEST_F(TestExecutor, spin_until_future_complete_in_spin_until_future_complete) { DummyExecutor dummy; auto node = std::make_shared("node", "ns");