diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7bdc53d251..36997d85fd 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -242,42 +242,30 @@ class Executor * spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this * function to be non-blocking. */ - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node, - std::chrono::duration_cast(timeout) - ); - } + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Convenience function which takes Node and forwards NodeBaseInterface. - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - std::shared_ptr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node->get_node_base_interface(), - std::chrono::duration_cast(timeout) - ); - } + const std::shared_ptr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Add a node, complete all immediately available work, and remove the node. /** * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_some(std::shared_ptr node); /// Collect work once and execute all available work, optionally within a max duration. @@ -307,14 +295,14 @@ class Executor * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_all( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_all(std::shared_ptr node, std::chrono::nanoseconds max_duration); /// Collect and execute work repeatedly within a duration or until no more work is available. @@ -366,52 +354,11 @@ class Executor const FutureT & future, std::chrono::duration timeout = std::chrono::duration(-1)) { - // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete - // inside a callback executed by an executor. - - // Check the future before entering the while loop. - // If the future is already complete, don't try to spin. - std::future_status status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - - auto end_time = std::chrono::steady_clock::now(); - std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( - timeout); - if (timeout_ns > std::chrono::nanoseconds::zero()) { - end_time += timeout_ns; - } - std::chrono::nanoseconds timeout_left = timeout_ns; - - if (spinning.exchange(true)) { - throw std::runtime_error("spin_until_future_complete() called while already spinning"); + return spin_until_future_complete_impl(std::chrono::duration_cast( + timeout), [&future] () { + return future.wait_for(std::chrono::seconds(0)); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - while (rclcpp::ok(this->context_) && spinning.load()) { - // Do one item of work. - spin_once_impl(timeout_left); - - // Check if the future is set, return SUCCESS if it is. - status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - // If the original timeout is < 0, then this is blocking, never TIMEOUT. - if (timeout_ns < std::chrono::nanoseconds::zero()) { - continue; - } - // Otherwise check if we still have time to wait, return TIMEOUT if not. - auto now = std::chrono::steady_clock::now(); - if (now >= end_time) { - return FutureReturnCode::TIMEOUT; - } - // Subtract the elapsed time from the original timeout. - timeout_left = std::chrono::duration_cast(end_time - now); - } - - // The future did not complete before ok() returned false, return INTERRUPTED. - return FutureReturnCode::INTERRUPTED; + ); } /// Cancel any running spin* function, causing it to return. @@ -420,7 +367,7 @@ class Executor * \throws std::runtime_error if there is an issue triggering the guard condition */ RCLCPP_PUBLIC - void + virtual void cancel(); /// Returns true if the executor is currently spinning. @@ -429,10 +376,19 @@ class Executor * \return True if the executor is currently spinning. */ RCLCPP_PUBLIC - bool + virtual bool is_spinning(); protected: + // constructor that will not setup any internals. + /** + * This constructor is intended to be used by any derived executor, + * that explicitly does not want to use the default implementation provided + * by this class. This constructor is guaranteed, to not modify the system + * state. + * */ + explicit Executor(const std::shared_ptr & context); + /// Add a node to executor, execute the next available unit of work, and remove the node. /** * Implementation of spin_node_once using std::chrono::nanoseconds @@ -447,6 +403,10 @@ class Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout); + virtual FutureReturnCode spin_until_future_complete_impl( + std::chrono::nanoseconds max_duration, + const std::function & get_future_status); + /// Collect work and execute available work, optionally within a duration. /** * Implementation of spin_some and spin_all. diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 5df70a9465..553b7db9b7 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -50,6 +50,14 @@ static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {tru class rclcpp::ExecutorImplementation {}; +Executor::Executor(const std::shared_ptr & context) +: spinning(false), + entities_need_rebuild_(true), + collector_(nullptr), + wait_set_({}, {}, {}, {}, {}, {}, context) +{ +} + Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(std::make_shared(options.context)), @@ -229,6 +237,28 @@ Executor::remove_node(std::shared_ptr node_ptr, bool notify) this->remove_node(node_ptr->get_node_base_interface(), notify); } +void +Executor::spin_node_once( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node, + std::chrono::duration_cast(timeout) + ); +} + +void +Executor::spin_node_once( + const std::shared_ptr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node->get_node_base_interface(), + timeout + ); +} + void Executor::spin_node_once_nanoseconds( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, @@ -240,6 +270,58 @@ Executor::spin_node_once_nanoseconds( this->remove_node(node, false); } +rclcpp::FutureReturnCode Executor::spin_until_future_complete_impl( + std::chrono::nanoseconds timeout, + const std::function & get_future_status) +{ + // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = get_future_status(); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_until_future_complete() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + // Do one item of work. + spin_once_impl(timeout_left); + + // Check if the future is set, return SUCCESS if it is. + status = get_future_status(); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return FutureReturnCode::INTERRUPTED; +} + void Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp index b1075d7590..26c439bdde 100644 --- a/rclcpp/test/rclcpp/test_clock.cpp +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -26,14 +26,6 @@ using namespace std::chrono_literals; -enum class ClockType -{ - RCL_STEADY_TIME, - RCL_SYSTEM_TIME, - RCL_ROS_TIME, -}; - - class TestClockWakeup : public ::testing::TestWithParam { public: