From 50779742a8a496655453fec8452ea82962fb3685 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Tue, 26 Mar 2024 15:17:01 +0100 Subject: [PATCH 01/10] feat(clock): Added function to interrupt sleep This is useful in case a second thread needs to wake up another thread, that is sleeping using a clock. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/clock.hpp | 5 + rclcpp/src/rclcpp/clock.cpp | 66 +++++++---- rclcpp/test/rclcpp/CMakeLists.txt | 4 + rclcpp/test/rclcpp/test_clock.cpp | 180 ++++++++++++++++++++++++++++++ 4 files changed, 233 insertions(+), 22 deletions(-) create mode 100644 rclcpp/test/rclcpp/test_clock.cpp diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index 702b224d53..b714bf59b8 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -193,6 +193,11 @@ class Clock bool ros_time_is_active(); + RCLCPP_PUBLIC + void + cancel_sleep_or_wait(); + + /// Return the rcl_clock_t clock handle RCLCPP_PUBLIC rcl_clock_t * diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index f46649a77d..addf3628c3 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -49,6 +49,9 @@ class Clock::Impl rcl_clock_t rcl_clock_; rcl_allocator_t allocator_; + bool stop_sleeping_ = false; + std::condition_variable cv_; + std::mutex wait_mutex_; std::mutex clock_mutex_; }; @@ -79,8 +82,20 @@ Clock::now() const return now; } +void +Clock::cancel_sleep_or_wait() +{ + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->stop_sleeping_ = true; + } + impl_->cv_.notify_one(); +} + bool -Clock::sleep_until(Time until, Context::SharedPtr context) +Clock::sleep_until( + Time until, + Context::SharedPtr context) { if (!context || !context->is_valid()) { throw std::runtime_error("context cannot be slept with because it's invalid"); @@ -91,12 +106,10 @@ Clock::sleep_until(Time until, Context::SharedPtr context) } bool time_source_changed = false; - std::condition_variable cv; - // Wake this thread if the context is shutdown rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback( - [&cv]() { - cv.notify_one(); + [this]() { + cancel_sleep_or_wait(); }); // No longer need the shutdown callback when this function exits auto callback_remover = rcpputils::scope_exit( @@ -112,22 +125,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context) const std::chrono::steady_clock::time_point chrono_until = chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds()); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, chrono_until); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + impl_->cv_.wait_until(lock, chrono_until); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_SYSTEM_TIME) { auto system_time = std::chrono::system_clock::time_point( // Cast because system clock resolution is too big for nanoseconds on some systems std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_ROS_TIME) { // Install jump handler for any amount of time change, for two purposes: // - if ROS time is active, check if time reached on each new clock sample @@ -139,11 +154,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context) threshold.min_forward.nanoseconds = 1; auto clock_handler = create_jump_callback( nullptr, - [&cv, &time_source_changed](const rcl_time_jump_t & jump) { + [this, &time_source_changed](const rcl_time_jump_t & jump) { if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) { + std::lock_guard lk(impl_->wait_mutex_); time_source_changed = true; } - cv.notify_one(); + impl_->cv_.notify_one(); }, threshold); @@ -153,19 +169,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context) std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown or time source change - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown, stop of sleep or time source change + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else { // RCL_ROS_TIME with ros_time_is_active. // Just wait without "until" because installed // jump callbacks wake the cv on every new sample. - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait(lock); + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait(lock); } + impl_->stop_sleeping_ = false; } } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index ea29f7a288..844ca630ac 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -55,6 +55,10 @@ ament_add_gtest(test_client test_client.cpp) if(TARGET test_client) target_link_libraries(test_client ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) endif() +ament_add_gtest(test_clock test_clock.cpp) +if(TARGET test_clock) + target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) +endif() ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp) if(TARGET test_copy_all_parameter_values) target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME}) diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp new file mode 100644 index 0000000000..08dcbe632a --- /dev/null +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -0,0 +1,180 @@ +// Copyright 2024 Cellumation GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rcl/error_handling.h" +#include "rcl/time.h" +#include "rclcpp/clock.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/time_source.hpp" + +#include "../utils/rclcpp_gtest_macros.hpp" + +using namespace std::chrono_literals; +class TestClockWakeup : public ::testing::Test +{ +public: + void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + + void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // make sure the thread is already sleeping before we send the cancel + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("my_node"); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +TEST_F(TestClockWakeup, wakeup_sleep_steay_time) { + auto clock = std::make_shared(RCL_STEADY_TIME); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_system_time) { + auto clock = std::make_shared(RCL_SYSTEM_TIME); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) { + auto clock = std::make_shared(RCL_ROS_TIME); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::milliseconds(10)); + thread_finished = true; + }); + + // make sure, that the sim time clock does not wakeup, as no clock is provided + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + EXPECT_FALSE(thread_finished); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +} From 03e5f9a626287fd5ec3e7ccd5d323049e7e5fda8 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 8 Apr 2024 17:34:31 +0200 Subject: [PATCH 02/10] feat: Added support for multi thread wait / shutdown This adds support for multiple threads waiting on the same clock, while an shutdown is invoked. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/clock.hpp | 8 ++- rclcpp/src/rclcpp/clock.cpp | 15 ++++-- rclcpp/test/rclcpp/test_clock.cpp | 84 +++++++++++++++++++++++++------ 3 files changed, 87 insertions(+), 20 deletions(-) diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index b714bf59b8..6efb8f31a4 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -193,11 +193,17 @@ class Clock bool ros_time_is_active(); + /** + * Cancels an ongoing or future sleep operation of one thread. + * + * This function is intended for multi threaded signaling. It can + * be used by one thread, to wakeup another thread, using any + * of the sleep_ or wait_ methods. + */ RCLCPP_PUBLIC void cancel_sleep_or_wait(); - /// Return the rcl_clock_t clock handle RCLCPP_PUBLIC rcl_clock_t * diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index addf3628c3..5c13f19d13 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -50,6 +50,7 @@ class Clock::Impl rcl_clock_t rcl_clock_; rcl_allocator_t allocator_; bool stop_sleeping_ = false; + bool shutdown_ = false; std::condition_variable cv_; std::mutex wait_mutex_; std::mutex clock_mutex_; @@ -109,7 +110,11 @@ Clock::sleep_until( // Wake this thread if the context is shutdown rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback( [this]() { - cancel_sleep_or_wait(); + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->shutdown_ = true; + } + impl_->cv_.notify_one(); }); // No longer need the shutdown callback when this function exits auto callback_remover = rcpputils::scope_exit( @@ -127,7 +132,7 @@ Clock::sleep_until( // loop over spurious wakeups but notice shutdown or stop of sleep std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { impl_->cv_.wait_until(lock, chrono_until); } impl_->stop_sleeping_ = false; @@ -139,7 +144,7 @@ Clock::sleep_until( // loop over spurious wakeups but notice shutdown or stop of sleep std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) { + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { impl_->cv_.wait_until(lock, system_time); } impl_->stop_sleeping_ = false; @@ -171,7 +176,7 @@ Clock::sleep_until( // loop over spurious wakeups but notice shutdown, stop of sleep or time source change std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && !time_source_changed) { impl_->cv_.wait_until(lock, system_time); @@ -182,7 +187,7 @@ Clock::sleep_until( // Just wait without "until" because installed // jump callbacks wake the cv on every new sample. std::unique_lock lock(impl_->wait_mutex_); - while (now() < until && !impl_->stop_sleeping_ && context->is_valid() && + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && !time_source_changed) { impl_->cv_.wait(lock); diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp index 08dcbe632a..b1075d7590 100644 --- a/rclcpp/test/rclcpp/test_clock.cpp +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -25,7 +25,16 @@ #include "../utils/rclcpp_gtest_macros.hpp" using namespace std::chrono_literals; -class TestClockWakeup : public ::testing::Test + +enum class ClockType +{ + RCL_STEADY_TIME, + RCL_SYSTEM_TIME, + RCL_ROS_TIME, +}; + + +class TestClockWakeup : public ::testing::TestWithParam { public: void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock) @@ -111,20 +120,15 @@ class TestClockWakeup : public ::testing::Test rclcpp::Node::SharedPtr node; }; -TEST_F(TestClockWakeup, wakeup_sleep_steay_time) { - auto clock = std::make_shared(RCL_STEADY_TIME); - test_wakeup_after_sleep(clock); - test_wakeup_before_sleep(clock); -} - -TEST_F(TestClockWakeup, wakeup_sleep_system_time) { - auto clock = std::make_shared(RCL_SYSTEM_TIME); - test_wakeup_after_sleep(clock); - test_wakeup_before_sleep(clock); -} +INSTANTIATE_TEST_SUITE_P( + Clocks, + TestClockWakeup, + ::testing::Values( + RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME +)); -TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) { - auto clock = std::make_shared(RCL_ROS_TIME); +TEST_P(TestClockWakeup, wakeup_sleep) { + auto clock = std::make_shared(GetParam()); test_wakeup_after_sleep(clock); test_wakeup_before_sleep(clock); } @@ -178,3 +182,55 @@ TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { EXPECT_TRUE(thread_finished); EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); } + +TEST_F(TestClockWakeup, multiple_threads_wait_on_one_clock) { + auto clock = std::make_shared(RCL_ROS_TIME); + + std::vector thread_finished(10, false); + + std::vector threads; + + for(size_t nr = 0; nr < thread_finished.size(); nr++) { + threads.push_back(std::thread( + [&clock, &thread_finished, nr]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::seconds(10)); + thread_finished[nr] = true; + })); + } + + // wait a bit so all threads can execute the sleep_until + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + for(const bool & finished : thread_finished) { + EXPECT_FALSE(finished); + } + + rclcpp::shutdown(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + bool threads_finished = false; + while (!threads_finished && start_time + std::chrono::seconds(1) > cur_time) { + threads_finished = true; + for(const bool finished : thread_finished) { + if(!finished) { + threads_finished = false; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + for(const bool finished : thread_finished) { + EXPECT_TRUE(finished); + } + + for(auto & thread : threads) { + thread.join(); + } + + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +} From 34e2a802666f03b4248a094f59daed809be60bbe Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Sun, 14 Apr 2024 14:31:24 +0200 Subject: [PATCH 03/10] chore: Made Executor fully overloadable This commit makes every public funciton virtual, and adds virtual impl function for the existing template functions. The goal of this commit is to be able to fully control the everything from a derived class. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/executor.hpp | 102 +++++++++-------------------- rclcpp/src/rclcpp/executor.cpp | 82 +++++++++++++++++++++++ rclcpp/test/rclcpp/test_clock.cpp | 8 --- 3 files changed, 113 insertions(+), 79 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7bdc53d251..36997d85fd 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -242,42 +242,30 @@ class Executor * spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this * function to be non-blocking. */ - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node, - std::chrono::duration_cast(timeout) - ); - } + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Convenience function which takes Node and forwards NodeBaseInterface. - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - std::shared_ptr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node->get_node_base_interface(), - std::chrono::duration_cast(timeout) - ); - } + const std::shared_ptr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Add a node, complete all immediately available work, and remove the node. /** * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_some(std::shared_ptr node); /// Collect work once and execute all available work, optionally within a max duration. @@ -307,14 +295,14 @@ class Executor * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_all( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_all(std::shared_ptr node, std::chrono::nanoseconds max_duration); /// Collect and execute work repeatedly within a duration or until no more work is available. @@ -366,52 +354,11 @@ class Executor const FutureT & future, std::chrono::duration timeout = std::chrono::duration(-1)) { - // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete - // inside a callback executed by an executor. - - // Check the future before entering the while loop. - // If the future is already complete, don't try to spin. - std::future_status status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - - auto end_time = std::chrono::steady_clock::now(); - std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( - timeout); - if (timeout_ns > std::chrono::nanoseconds::zero()) { - end_time += timeout_ns; - } - std::chrono::nanoseconds timeout_left = timeout_ns; - - if (spinning.exchange(true)) { - throw std::runtime_error("spin_until_future_complete() called while already spinning"); + return spin_until_future_complete_impl(std::chrono::duration_cast( + timeout), [&future] () { + return future.wait_for(std::chrono::seconds(0)); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - while (rclcpp::ok(this->context_) && spinning.load()) { - // Do one item of work. - spin_once_impl(timeout_left); - - // Check if the future is set, return SUCCESS if it is. - status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - // If the original timeout is < 0, then this is blocking, never TIMEOUT. - if (timeout_ns < std::chrono::nanoseconds::zero()) { - continue; - } - // Otherwise check if we still have time to wait, return TIMEOUT if not. - auto now = std::chrono::steady_clock::now(); - if (now >= end_time) { - return FutureReturnCode::TIMEOUT; - } - // Subtract the elapsed time from the original timeout. - timeout_left = std::chrono::duration_cast(end_time - now); - } - - // The future did not complete before ok() returned false, return INTERRUPTED. - return FutureReturnCode::INTERRUPTED; + ); } /// Cancel any running spin* function, causing it to return. @@ -420,7 +367,7 @@ class Executor * \throws std::runtime_error if there is an issue triggering the guard condition */ RCLCPP_PUBLIC - void + virtual void cancel(); /// Returns true if the executor is currently spinning. @@ -429,10 +376,19 @@ class Executor * \return True if the executor is currently spinning. */ RCLCPP_PUBLIC - bool + virtual bool is_spinning(); protected: + // constructor that will not setup any internals. + /** + * This constructor is intended to be used by any derived executor, + * that explicitly does not want to use the default implementation provided + * by this class. This constructor is guaranteed, to not modify the system + * state. + * */ + explicit Executor(const std::shared_ptr & context); + /// Add a node to executor, execute the next available unit of work, and remove the node. /** * Implementation of spin_node_once using std::chrono::nanoseconds @@ -447,6 +403,10 @@ class Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout); + virtual FutureReturnCode spin_until_future_complete_impl( + std::chrono::nanoseconds max_duration, + const std::function & get_future_status); + /// Collect work and execute available work, optionally within a duration. /** * Implementation of spin_some and spin_all. diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 5df70a9465..553b7db9b7 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -50,6 +50,14 @@ static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {tru class rclcpp::ExecutorImplementation {}; +Executor::Executor(const std::shared_ptr & context) +: spinning(false), + entities_need_rebuild_(true), + collector_(nullptr), + wait_set_({}, {}, {}, {}, {}, {}, context) +{ +} + Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(std::make_shared(options.context)), @@ -229,6 +237,28 @@ Executor::remove_node(std::shared_ptr node_ptr, bool notify) this->remove_node(node_ptr->get_node_base_interface(), notify); } +void +Executor::spin_node_once( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node, + std::chrono::duration_cast(timeout) + ); +} + +void +Executor::spin_node_once( + const std::shared_ptr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node->get_node_base_interface(), + timeout + ); +} + void Executor::spin_node_once_nanoseconds( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, @@ -240,6 +270,58 @@ Executor::spin_node_once_nanoseconds( this->remove_node(node, false); } +rclcpp::FutureReturnCode Executor::spin_until_future_complete_impl( + std::chrono::nanoseconds timeout, + const std::function & get_future_status) +{ + // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = get_future_status(); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_until_future_complete() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + // Do one item of work. + spin_once_impl(timeout_left); + + // Check if the future is set, return SUCCESS if it is. + status = get_future_status(); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return FutureReturnCode::INTERRUPTED; +} + void Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp index b1075d7590..26c439bdde 100644 --- a/rclcpp/test/rclcpp/test_clock.cpp +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -26,14 +26,6 @@ using namespace std::chrono_literals; -enum class ClockType -{ - RCL_STEADY_TIME, - RCL_SYSTEM_TIME, - RCL_ROS_TIME, -}; - - class TestClockWakeup : public ::testing::TestWithParam { public: From 77799c9aa0f49d735b58889458bcb9358db4c7de Mon Sep 17 00:00:00 2001 From: jmachowinski Date: Mon, 15 Apr 2024 15:47:47 +0200 Subject: [PATCH 04/10] Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski --- rclcpp/include/rclcpp/executor.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 36997d85fd..8edf082e15 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -384,8 +384,8 @@ class Executor /** * This constructor is intended to be used by any derived executor, * that explicitly does not want to use the default implementation provided - * by this class. This constructor is guaranteed, to not modify the system - * state. + * by this class. + * This constructor is guaranteed to not modify the system state. * */ explicit Executor(const std::shared_ptr & context); From f85ad12ef0601c359bd5c0f8016c963bef17621b Mon Sep 17 00:00:00 2001 From: jmachowinski Date: Mon, 15 Apr 2024 15:49:25 +0200 Subject: [PATCH 05/10] Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski --- rclcpp/include/rclcpp/executor.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 8edf082e15..25534724db 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -380,7 +380,7 @@ class Executor is_spinning(); protected: - // constructor that will not setup any internals. + /// Constructor that will not initialize any non-trivial members. /** * This constructor is intended to be used by any derived executor, * that explicitly does not want to use the default implementation provided From c72e601e49a67fe5b0288e166ee083d265b48f44 Mon Sep 17 00:00:00 2001 From: jmachowinski Date: Mon, 15 Apr 2024 15:49:41 +0200 Subject: [PATCH 06/10] Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski --- rclcpp/include/rclcpp/executor.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 25534724db..35f4649acc 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -386,7 +386,7 @@ class Executor * that explicitly does not want to use the default implementation provided * by this class. * This constructor is guaranteed to not modify the system state. - * */ + */ explicit Executor(const std::shared_ptr & context); /// Add a node to executor, execute the next available unit of work, and remove the node. From 2ebcb49db7b1fac2e91d87e15656cbca2b86c3d6 Mon Sep 17 00:00:00 2001 From: jmachowinski Date: Mon, 15 Apr 2024 15:50:14 +0200 Subject: [PATCH 07/10] Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski --- rclcpp/include/rclcpp/executor.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 35f4649acc..be22159a75 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -382,7 +382,7 @@ class Executor protected: /// Constructor that will not initialize any non-trivial members. /** - * This constructor is intended to be used by any derived executor, + * This constructor is intended to be used by any derived executor * that explicitly does not want to use the default implementation provided * by this class. * This constructor is guaranteed to not modify the system state. From da85eb0ab9e1320c7a6fa60a1135747101282e99 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 15 Apr 2024 16:16:43 +0200 Subject: [PATCH 08/10] docs: added doc string for spin_until_future_complete_impl Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/executor.hpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index be22159a75..4a4d7b20af 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -403,6 +403,17 @@ class Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout); + /// Spin (blocking) until the get_future_status returns ready, max_duration is reached, + /// or rclcpp is interrupted. + /** + * \param[in] get_future_status A function returning the status of a future that is been waited for. + * \param[in] max_duration Optional duration parameter, which gets passed to Executor::spin_node_once. + * `-1` is block forever, `0` is non-blocking. + * If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return + * code. + * \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`. + */ + RCLCPP_PUBLIC virtual FutureReturnCode spin_until_future_complete_impl( std::chrono::nanoseconds max_duration, const std::function & get_future_status); From 3b8c5587d301530aedc8997359ef8f189f300609 Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 15 Apr 2024 16:16:59 +0200 Subject: [PATCH 09/10] made is_spinning not virtual Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/executor.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 4a4d7b20af..58e6fc5b9c 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -376,7 +376,7 @@ class Executor * \return True if the executor is currently spinning. */ RCLCPP_PUBLIC - virtual bool + bool is_spinning(); protected: From 163f727aa0b493e71e9d5ae6de36da87c6046a4a Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Mon, 15 Apr 2024 16:29:48 +0200 Subject: [PATCH 10/10] feat: Changed interface of spin_until_future_complete_impl This change allows it to use a second thread to wait for the future to become ready. Signed-off-by: Janosch Machowinski --- rclcpp/include/rclcpp/executor.hpp | 8 ++++---- rclcpp/src/rclcpp/executor.cpp | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 58e6fc5b9c..39f92378b2 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -355,8 +355,8 @@ class Executor std::chrono::duration timeout = std::chrono::duration(-1)) { return spin_until_future_complete_impl(std::chrono::duration_cast( - timeout), [&future] () { - return future.wait_for(std::chrono::seconds(0)); + timeout), [&future] (std::chrono::nanoseconds wait_time) { + return future.wait_for(wait_time); } ); } @@ -406,17 +406,17 @@ class Executor /// Spin (blocking) until the get_future_status returns ready, max_duration is reached, /// or rclcpp is interrupted. /** - * \param[in] get_future_status A function returning the status of a future that is been waited for. * \param[in] max_duration Optional duration parameter, which gets passed to Executor::spin_node_once. * `-1` is block forever, `0` is non-blocking. * If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return * code. + * \param[in] wait_for_future A function waiting for a future to become ready. * \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`. */ RCLCPP_PUBLIC virtual FutureReturnCode spin_until_future_complete_impl( std::chrono::nanoseconds max_duration, - const std::function & get_future_status); + const std::function & wait_for_future); /// Collect work and execute available work, optionally within a duration. /** diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 553b7db9b7..39c03994c3 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -272,14 +272,14 @@ Executor::spin_node_once_nanoseconds( rclcpp::FutureReturnCode Executor::spin_until_future_complete_impl( std::chrono::nanoseconds timeout, - const std::function & get_future_status) + const std::function & get_future_status) { // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete // inside a callback executed by an executor. // Check the future before entering the while loop. // If the future is already complete, don't try to spin. - std::future_status status = get_future_status(); + std::future_status status = get_future_status(std::chrono::seconds(0)); if (status == std::future_status::ready) { return FutureReturnCode::SUCCESS; } @@ -301,7 +301,7 @@ rclcpp::FutureReturnCode Executor::spin_until_future_complete_impl( spin_once_impl(timeout_left); // Check if the future is set, return SUCCESS if it is. - status = get_future_status(); + status = get_future_status(std::chrono::seconds(0)); if (status == std::future_status::ready) { return FutureReturnCode::SUCCESS; }