From 6bb9407b9079eeaaef005613cef3a5a1224f1a36 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 16 Apr 2024 08:35:56 -0700 Subject: [PATCH] Reduce overhead for inheriting from rclcpp::Executor when base functionality is not reused (#2506) * feat(clock): Added function to interrupt sleep This is useful in case a second thread needs to wake up another thread, that is sleeping using a clock. Signed-off-by: Janosch Machowinski * feat: Added support for multi thread wait / shutdown This adds support for multiple threads waiting on the same clock, while an shutdown is invoked. Signed-off-by: Janosch Machowinski * chore: Made Executor fully overloadable This commit makes every public funciton virtual, and adds virtual impl function for the existing template functions. The goal of this commit is to be able to fully control the everything from a derived class. Signed-off-by: Janosch Machowinski * Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski * Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski * Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski * Update rclcpp/include/rclcpp/executor.hpp Co-authored-by: William Woodall Signed-off-by: jmachowinski * docs: added doc string for spin_until_future_complete_impl Signed-off-by: Janosch Machowinski * made is_spinning not virtual Signed-off-by: Janosch Machowinski * feat: Changed interface of spin_until_future_complete_impl This change allows it to use a second thread to wait for the future to become ready. Signed-off-by: Janosch Machowinski * doc fixup Signed-off-by: William Woodall * undo template->implicit conversion change Signed-off-by: William Woodall * style Signed-off-by: William Woodall --------- Signed-off-by: Janosch Machowinski Signed-off-by: Janosch Machowinski Signed-off-by: jmachowinski Signed-off-by: William Woodall Co-authored-by: Janosch Machowinski Co-authored-by: jmachowinski --- rclcpp/include/rclcpp/clock.hpp | 10 ++ rclcpp/include/rclcpp/executor.hpp | 85 +++++------ rclcpp/src/rclcpp/clock.cpp | 71 ++++++--- rclcpp/src/rclcpp/executor.cpp | 64 +++++++- rclcpp/test/rclcpp/CMakeLists.txt | 4 + rclcpp/test/rclcpp/test_clock.cpp | 229 +++++++++++++++++++++++++++++ 6 files changed, 390 insertions(+), 73 deletions(-) create mode 100644 rclcpp/test/rclcpp/test_clock.cpp diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index 702b224d53..e73f3849d3 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -193,6 +193,16 @@ class Clock bool ros_time_is_active(); + /** + * Cancels an ongoing or future sleep operation of one thread. + * + * This function can be used by one thread, to wakeup another thread that is + * blocked using any of the sleep_ or wait_ methods of this class. + */ + RCLCPP_PUBLIC + void + cancel_sleep_or_wait(); + /// Return the rcl_clock_t clock handle RCLCPP_PUBLIC rcl_clock_t * diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7bdc53d251..5e5a054814 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -272,12 +272,12 @@ class Executor * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_some(std::shared_ptr node); /// Collect work once and execute all available work, optionally within a max duration. @@ -307,14 +307,14 @@ class Executor * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_all( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_all(std::shared_ptr node, std::chrono::nanoseconds max_duration); /// Collect and execute work repeatedly within a duration or until no more work is available. @@ -366,52 +366,12 @@ class Executor const FutureT & future, std::chrono::duration timeout = std::chrono::duration(-1)) { - // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete - // inside a callback executed by an executor. - - // Check the future before entering the while loop. - // If the future is already complete, don't try to spin. - std::future_status status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - - auto end_time = std::chrono::steady_clock::now(); - std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( - timeout); - if (timeout_ns > std::chrono::nanoseconds::zero()) { - end_time += timeout_ns; - } - std::chrono::nanoseconds timeout_left = timeout_ns; - - if (spinning.exchange(true)) { - throw std::runtime_error("spin_until_future_complete() called while already spinning"); - } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - while (rclcpp::ok(this->context_) && spinning.load()) { - // Do one item of work. - spin_once_impl(timeout_left); - - // Check if the future is set, return SUCCESS if it is. - status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - // If the original timeout is < 0, then this is blocking, never TIMEOUT. - if (timeout_ns < std::chrono::nanoseconds::zero()) { - continue; + return spin_until_future_complete_impl( + std::chrono::duration_cast(timeout), + [&future](std::chrono::nanoseconds wait_time) { + return future.wait_for(wait_time); } - // Otherwise check if we still have time to wait, return TIMEOUT if not. - auto now = std::chrono::steady_clock::now(); - if (now >= end_time) { - return FutureReturnCode::TIMEOUT; - } - // Subtract the elapsed time from the original timeout. - timeout_left = std::chrono::duration_cast(end_time - now); - } - - // The future did not complete before ok() returned false, return INTERRUPTED. - return FutureReturnCode::INTERRUPTED; + ); } /// Cancel any running spin* function, causing it to return. @@ -420,7 +380,7 @@ class Executor * \throws std::runtime_error if there is an issue triggering the guard condition */ RCLCPP_PUBLIC - void + virtual void cancel(); /// Returns true if the executor is currently spinning. @@ -433,6 +393,14 @@ class Executor is_spinning(); protected: + /// Constructor that will not initialize any non-trivial members. + /** + * This constructor is intended to be used by any derived executor + * that explicitly does not want to use the default implementation provided + * by this class. + */ + explicit Executor(const std::shared_ptr & context); + /// Add a node to executor, execute the next available unit of work, and remove the node. /** * Implementation of spin_node_once using std::chrono::nanoseconds @@ -447,6 +415,23 @@ class Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout); + /// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted. + /** + * \sa spin_until_future_complete() + * The only difference with spin_until_future_complete() is that the future's + * type is obscured through a std::function which lets you wait on it + * reguardless of type. + * + * \param[in] timeout see spin_until_future_complete() for details + * \param[in] wait_for_future function to wait on the future and get the + * status after waiting + */ + RCLCPP_PUBLIC + virtual FutureReturnCode + spin_until_future_complete_impl( + std::chrono::nanoseconds timeout, + const std::function & wait_for_future); + /// Collect work and execute available work, optionally within a duration. /** * Implementation of spin_some and spin_all. diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index f46649a77d..5c13f19d13 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -49,6 +49,10 @@ class Clock::Impl rcl_clock_t rcl_clock_; rcl_allocator_t allocator_; + bool stop_sleeping_ = false; + bool shutdown_ = false; + std::condition_variable cv_; + std::mutex wait_mutex_; std::mutex clock_mutex_; }; @@ -79,8 +83,20 @@ Clock::now() const return now; } +void +Clock::cancel_sleep_or_wait() +{ + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->stop_sleeping_ = true; + } + impl_->cv_.notify_one(); +} + bool -Clock::sleep_until(Time until, Context::SharedPtr context) +Clock::sleep_until( + Time until, + Context::SharedPtr context) { if (!context || !context->is_valid()) { throw std::runtime_error("context cannot be slept with because it's invalid"); @@ -91,12 +107,14 @@ Clock::sleep_until(Time until, Context::SharedPtr context) } bool time_source_changed = false; - std::condition_variable cv; - // Wake this thread if the context is shutdown rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback( - [&cv]() { - cv.notify_one(); + [this]() { + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->shutdown_ = true; + } + impl_->cv_.notify_one(); }); // No longer need the shutdown callback when this function exits auto callback_remover = rcpputils::scope_exit( @@ -112,22 +130,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context) const std::chrono::steady_clock::time_point chrono_until = chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds()); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, chrono_until); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { + impl_->cv_.wait_until(lock, chrono_until); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_SYSTEM_TIME) { auto system_time = std::chrono::system_clock::time_point( // Cast because system clock resolution is too big for nanoseconds on some systems std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_ROS_TIME) { // Install jump handler for any amount of time change, for two purposes: // - if ROS time is active, check if time reached on each new clock sample @@ -139,11 +159,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context) threshold.min_forward.nanoseconds = 1; auto clock_handler = create_jump_callback( nullptr, - [&cv, &time_source_changed](const rcl_time_jump_t & jump) { + [this, &time_source_changed](const rcl_time_jump_t & jump) { if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) { + std::lock_guard lk(impl_->wait_mutex_); time_source_changed = true; } - cv.notify_one(); + impl_->cv_.notify_one(); }, threshold); @@ -153,19 +174,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context) std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown or time source change - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown, stop of sleep or time source change + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else { // RCL_ROS_TIME with ros_time_is_active. // Just wait without "until" because installed // jump callbacks wake the cv on every new sample. - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait(lock); + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait(lock); } + impl_->stop_sleeping_ = false; } } diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 5df70a9465..42a4d274dc 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -50,6 +50,14 @@ static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {tru class rclcpp::ExecutorImplementation {}; +Executor::Executor(const std::shared_ptr & context) +: spinning(false), + entities_need_rebuild_(true), + collector_(nullptr), + wait_set_({}, {}, {}, {}, {}, {}, context) +{ +} + Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(std::make_shared(options.context)), @@ -120,7 +128,8 @@ Executor::~Executor() } } -void Executor::trigger_entity_recollect(bool notify) +void +Executor::trigger_entity_recollect(bool notify) { this->entities_need_rebuild_.store(true); @@ -240,6 +249,59 @@ Executor::spin_node_once_nanoseconds( this->remove_node(node, false); } +rclcpp::FutureReturnCode +Executor::spin_until_future_complete_impl( + std::chrono::nanoseconds timeout, + const std::function & wait_for_future) +{ + // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = wait_for_future(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_until_future_complete() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + // Do one item of work. + spin_once_impl(timeout_left); + + // Check if the future is set, return SUCCESS if it is. + status = wait_for_future(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return FutureReturnCode::INTERRUPTED; +} + void Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index ea29f7a288..844ca630ac 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -55,6 +55,10 @@ ament_add_gtest(test_client test_client.cpp) if(TARGET test_client) target_link_libraries(test_client ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) endif() +ament_add_gtest(test_clock test_clock.cpp) +if(TARGET test_clock) + target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) +endif() ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp) if(TARGET test_copy_all_parameter_values) target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME}) diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp new file mode 100644 index 0000000000..2ddd775707 --- /dev/null +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -0,0 +1,229 @@ +// Copyright 2024 Cellumation GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rcl/error_handling.h" +#include "rcl/time.h" +#include "rclcpp/clock.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/time_source.hpp" + +#include "../utils/rclcpp_gtest_macros.hpp" + +using namespace std::chrono_literals; + +class TestClockWakeup : public ::testing::TestWithParam +{ +public: + void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + + void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // make sure the thread is already sleeping before we send the cancel + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("my_node"); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +INSTANTIATE_TEST_SUITE_P( + Clocks, + TestClockWakeup, + ::testing::Values( + RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME +)); + +TEST_P(TestClockWakeup, wakeup_sleep) { + auto clock = std::make_shared(GetParam()); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::milliseconds(10)); + thread_finished = true; + }); + + // make sure, that the sim time clock does not wakeup, as no clock is provided + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + EXPECT_FALSE(thread_finished); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +} + +TEST_F(TestClockWakeup, multiple_threads_wait_on_one_clock) { + auto clock = std::make_shared(RCL_ROS_TIME); + + std::vector thread_finished(10, false); + + std::vector threads; + + for (size_t nr = 0; nr < thread_finished.size(); nr++) { + threads.push_back( + std::thread( + [&clock, &thread_finished, nr]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::seconds(10)); + thread_finished[nr] = true; + })); + } + + // wait a bit so all threads can execute the sleep_until + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + for (const bool & finished : thread_finished) { + EXPECT_FALSE(finished); + } + + rclcpp::shutdown(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + bool threads_finished = false; + while (!threads_finished && start_time + std::chrono::seconds(1) > cur_time) { + threads_finished = true; + for (const bool finished : thread_finished) { + if (!finished) { + threads_finished = false; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + for (const bool finished : thread_finished) { + EXPECT_TRUE(finished); + } + + for (auto & thread : threads) { + thread.join(); + } + + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +}