From 34e2a802666f03b4248a094f59daed809be60bbe Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Sun, 14 Apr 2024 14:31:24 +0200 Subject: [PATCH] chore: Made Executor fully overloadable This commit makes every public funciton virtual, and adds virtual impl function for the existing template functions. The goal of this commit is to be able to fully control the everything from a derived class. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/executor.hpp | 102 +++++++++-------------------- rclcpp/src/rclcpp/executor.cpp | 82 +++++++++++++++++++++++ rclcpp/test/rclcpp/test_clock.cpp | 8 --- 3 files changed, 113 insertions(+), 79 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7bdc53d251..36997d85fd 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -242,42 +242,30 @@ class Executor * spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this * function to be non-blocking. */ - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node, - std::chrono::duration_cast(timeout) - ); - } + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Convenience function which takes Node and forwards NodeBaseInterface. - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - std::shared_ptr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node->get_node_base_interface(), - std::chrono::duration_cast(timeout) - ); - } + const std::shared_ptr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Add a node, complete all immediately available work, and remove the node. /** * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_some(std::shared_ptr node); /// Collect work once and execute all available work, optionally within a max duration. @@ -307,14 +295,14 @@ class Executor * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_all( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_all(std::shared_ptr node, std::chrono::nanoseconds max_duration); /// Collect and execute work repeatedly within a duration or until no more work is available. @@ -366,52 +354,11 @@ class Executor const FutureT & future, std::chrono::duration timeout = std::chrono::duration(-1)) { - // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete - // inside a callback executed by an executor. - - // Check the future before entering the while loop. - // If the future is already complete, don't try to spin. - std::future_status status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - - auto end_time = std::chrono::steady_clock::now(); - std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( - timeout); - if (timeout_ns > std::chrono::nanoseconds::zero()) { - end_time += timeout_ns; - } - std::chrono::nanoseconds timeout_left = timeout_ns; - - if (spinning.exchange(true)) { - throw std::runtime_error("spin_until_future_complete() called while already spinning"); + return spin_until_future_complete_impl(std::chrono::duration_cast( + timeout), [&future] () { + return future.wait_for(std::chrono::seconds(0)); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - while (rclcpp::ok(this->context_) && spinning.load()) { - // Do one item of work. - spin_once_impl(timeout_left); - - // Check if the future is set, return SUCCESS if it is. - status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - // If the original timeout is < 0, then this is blocking, never TIMEOUT. - if (timeout_ns < std::chrono::nanoseconds::zero()) { - continue; - } - // Otherwise check if we still have time to wait, return TIMEOUT if not. - auto now = std::chrono::steady_clock::now(); - if (now >= end_time) { - return FutureReturnCode::TIMEOUT; - } - // Subtract the elapsed time from the original timeout. - timeout_left = std::chrono::duration_cast(end_time - now); - } - - // The future did not complete before ok() returned false, return INTERRUPTED. - return FutureReturnCode::INTERRUPTED; + ); } /// Cancel any running spin* function, causing it to return. @@ -420,7 +367,7 @@ class Executor * \throws std::runtime_error if there is an issue triggering the guard condition */ RCLCPP_PUBLIC - void + virtual void cancel(); /// Returns true if the executor is currently spinning. @@ -429,10 +376,19 @@ class Executor * \return True if the executor is currently spinning. */ RCLCPP_PUBLIC - bool + virtual bool is_spinning(); protected: + // constructor that will not setup any internals. + /** + * This constructor is intended to be used by any derived executor, + * that explicitly does not want to use the default implementation provided + * by this class. This constructor is guaranteed, to not modify the system + * state. + * */ + explicit Executor(const std::shared_ptr & context); + /// Add a node to executor, execute the next available unit of work, and remove the node. /** * Implementation of spin_node_once using std::chrono::nanoseconds @@ -447,6 +403,10 @@ class Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout); + virtual FutureReturnCode spin_until_future_complete_impl( + std::chrono::nanoseconds max_duration, + const std::function & get_future_status); + /// Collect work and execute available work, optionally within a duration. /** * Implementation of spin_some and spin_all. diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 5df70a9465..553b7db9b7 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -50,6 +50,14 @@ static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {tru class rclcpp::ExecutorImplementation {}; +Executor::Executor(const std::shared_ptr & context) +: spinning(false), + entities_need_rebuild_(true), + collector_(nullptr), + wait_set_({}, {}, {}, {}, {}, {}, context) +{ +} + Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(std::make_shared(options.context)), @@ -229,6 +237,28 @@ Executor::remove_node(std::shared_ptr node_ptr, bool notify) this->remove_node(node_ptr->get_node_base_interface(), notify); } +void +Executor::spin_node_once( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node, + std::chrono::duration_cast(timeout) + ); +} + +void +Executor::spin_node_once( + const std::shared_ptr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node->get_node_base_interface(), + timeout + ); +} + void Executor::spin_node_once_nanoseconds( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, @@ -240,6 +270,58 @@ Executor::spin_node_once_nanoseconds( this->remove_node(node, false); } +rclcpp::FutureReturnCode Executor::spin_until_future_complete_impl( + std::chrono::nanoseconds timeout, + const std::function & get_future_status) +{ + // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = get_future_status(); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_until_future_complete() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + // Do one item of work. + spin_once_impl(timeout_left); + + // Check if the future is set, return SUCCESS if it is. + status = get_future_status(); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return FutureReturnCode::INTERRUPTED; +} + void Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp index b1075d7590..26c439bdde 100644 --- a/rclcpp/test/rclcpp/test_clock.cpp +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -26,14 +26,6 @@ using namespace std::chrono_literals; -enum class ClockType -{ - RCL_STEADY_TIME, - RCL_SYSTEM_TIME, - RCL_ROS_TIME, -}; - - class TestClockWakeup : public ::testing::TestWithParam { public: