diff --git a/rclcpp/include/rclcpp/clock.hpp b/rclcpp/include/rclcpp/clock.hpp index 702b224d53..6efb8f31a4 100644 --- a/rclcpp/include/rclcpp/clock.hpp +++ b/rclcpp/include/rclcpp/clock.hpp @@ -193,6 +193,17 @@ class Clock bool ros_time_is_active(); + /** + * Cancels an ongoing or future sleep operation of one thread. + * + * This function is intended for multi threaded signaling. It can + * be used by one thread, to wakeup another thread, using any + * of the sleep_ or wait_ methods. + */ + RCLCPP_PUBLIC + void + cancel_sleep_or_wait(); + /// Return the rcl_clock_t clock handle RCLCPP_PUBLIC rcl_clock_t * diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7bdc53d251..39f92378b2 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -242,42 +242,30 @@ class Executor * spin_node_once to block indefinitely (the default behavior). A timeout of 0 causes this * function to be non-blocking. */ - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node, - std::chrono::duration_cast(timeout) - ); - } + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Convenience function which takes Node and forwards NodeBaseInterface. - template - void + RCLCPP_PUBLIC + virtual void spin_node_once( - std::shared_ptr node, - std::chrono::duration timeout = std::chrono::duration(-1)) - { - return spin_node_once_nanoseconds( - node->get_node_base_interface(), - std::chrono::duration_cast(timeout) - ); - } + const std::shared_ptr & node, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Add a node, complete all immediately available work, and remove the node. /** * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_some(std::shared_ptr node); /// Collect work once and execute all available work, optionally within a max duration. @@ -307,14 +295,14 @@ class Executor * \param[in] node Shared pointer to the node to add. */ RCLCPP_PUBLIC - void + virtual void spin_node_all( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds max_duration); /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC - void + virtual void spin_node_all(std::shared_ptr node, std::chrono::nanoseconds max_duration); /// Collect and execute work repeatedly within a duration or until no more work is available. @@ -366,52 +354,11 @@ class Executor const FutureT & future, std::chrono::duration timeout = std::chrono::duration(-1)) { - // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete - // inside a callback executed by an executor. - - // Check the future before entering the while loop. - // If the future is already complete, don't try to spin. - std::future_status status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - - auto end_time = std::chrono::steady_clock::now(); - std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( - timeout); - if (timeout_ns > std::chrono::nanoseconds::zero()) { - end_time += timeout_ns; - } - std::chrono::nanoseconds timeout_left = timeout_ns; - - if (spinning.exchange(true)) { - throw std::runtime_error("spin_until_future_complete() called while already spinning"); + return spin_until_future_complete_impl(std::chrono::duration_cast( + timeout), [&future] (std::chrono::nanoseconds wait_time) { + return future.wait_for(wait_time); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - while (rclcpp::ok(this->context_) && spinning.load()) { - // Do one item of work. - spin_once_impl(timeout_left); - - // Check if the future is set, return SUCCESS if it is. - status = future.wait_for(std::chrono::seconds(0)); - if (status == std::future_status::ready) { - return FutureReturnCode::SUCCESS; - } - // If the original timeout is < 0, then this is blocking, never TIMEOUT. - if (timeout_ns < std::chrono::nanoseconds::zero()) { - continue; - } - // Otherwise check if we still have time to wait, return TIMEOUT if not. - auto now = std::chrono::steady_clock::now(); - if (now >= end_time) { - return FutureReturnCode::TIMEOUT; - } - // Subtract the elapsed time from the original timeout. - timeout_left = std::chrono::duration_cast(end_time - now); - } - - // The future did not complete before ok() returned false, return INTERRUPTED. - return FutureReturnCode::INTERRUPTED; + ); } /// Cancel any running spin* function, causing it to return. @@ -420,7 +367,7 @@ class Executor * \throws std::runtime_error if there is an issue triggering the guard condition */ RCLCPP_PUBLIC - void + virtual void cancel(); /// Returns true if the executor is currently spinning. @@ -433,6 +380,15 @@ class Executor is_spinning(); protected: + /// Constructor that will not initialize any non-trivial members. + /** + * This constructor is intended to be used by any derived executor + * that explicitly does not want to use the default implementation provided + * by this class. + * This constructor is guaranteed to not modify the system state. + */ + explicit Executor(const std::shared_ptr & context); + /// Add a node to executor, execute the next available unit of work, and remove the node. /** * Implementation of spin_node_once using std::chrono::nanoseconds @@ -447,6 +403,21 @@ class Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout); + /// Spin (blocking) until the get_future_status returns ready, max_duration is reached, + /// or rclcpp is interrupted. + /** + * \param[in] max_duration Optional duration parameter, which gets passed to Executor::spin_node_once. + * `-1` is block forever, `0` is non-blocking. + * If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return + * code. + * \param[in] wait_for_future A function waiting for a future to become ready. + * \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`. + */ + RCLCPP_PUBLIC + virtual FutureReturnCode spin_until_future_complete_impl( + std::chrono::nanoseconds max_duration, + const std::function & wait_for_future); + /// Collect work and execute available work, optionally within a duration. /** * Implementation of spin_some and spin_all. diff --git a/rclcpp/src/rclcpp/clock.cpp b/rclcpp/src/rclcpp/clock.cpp index f46649a77d..5c13f19d13 100644 --- a/rclcpp/src/rclcpp/clock.cpp +++ b/rclcpp/src/rclcpp/clock.cpp @@ -49,6 +49,10 @@ class Clock::Impl rcl_clock_t rcl_clock_; rcl_allocator_t allocator_; + bool stop_sleeping_ = false; + bool shutdown_ = false; + std::condition_variable cv_; + std::mutex wait_mutex_; std::mutex clock_mutex_; }; @@ -79,8 +83,20 @@ Clock::now() const return now; } +void +Clock::cancel_sleep_or_wait() +{ + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->stop_sleeping_ = true; + } + impl_->cv_.notify_one(); +} + bool -Clock::sleep_until(Time until, Context::SharedPtr context) +Clock::sleep_until( + Time until, + Context::SharedPtr context) { if (!context || !context->is_valid()) { throw std::runtime_error("context cannot be slept with because it's invalid"); @@ -91,12 +107,14 @@ Clock::sleep_until(Time until, Context::SharedPtr context) } bool time_source_changed = false; - std::condition_variable cv; - // Wake this thread if the context is shutdown rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback( - [&cv]() { - cv.notify_one(); + [this]() { + { + std::unique_lock lock(impl_->wait_mutex_); + impl_->shutdown_ = true; + } + impl_->cv_.notify_one(); }); // No longer need the shutdown callback when this function exits auto callback_remover = rcpputils::scope_exit( @@ -112,22 +130,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context) const std::chrono::steady_clock::time_point chrono_until = chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds()); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, chrono_until); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { + impl_->cv_.wait_until(lock, chrono_until); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_SYSTEM_TIME) { auto system_time = std::chrono::system_clock::time_point( // Cast because system clock resolution is too big for nanoseconds on some systems std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid()) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown or stop of sleep + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else if (this_clock_type == RCL_ROS_TIME) { // Install jump handler for any amount of time change, for two purposes: // - if ROS time is active, check if time reached on each new clock sample @@ -139,11 +159,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context) threshold.min_forward.nanoseconds = 1; auto clock_handler = create_jump_callback( nullptr, - [&cv, &time_source_changed](const rcl_time_jump_t & jump) { + [this, &time_source_changed](const rcl_time_jump_t & jump) { if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) { + std::lock_guard lk(impl_->wait_mutex_); time_source_changed = true; } - cv.notify_one(); + impl_->cv_.notify_one(); }, threshold); @@ -153,19 +174,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context) std::chrono::duration_cast( std::chrono::nanoseconds(until.nanoseconds()))); - // loop over spurious wakeups but notice shutdown or time source change - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait_until(lock, system_time); + // loop over spurious wakeups but notice shutdown, stop of sleep or time source change + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait_until(lock, system_time); } + impl_->stop_sleeping_ = false; } else { // RCL_ROS_TIME with ros_time_is_active. // Just wait without "until" because installed // jump callbacks wake the cv on every new sample. - std::unique_lock lock(impl_->clock_mutex_); - while (now() < until && context->is_valid() && !time_source_changed) { - cv.wait(lock); + std::unique_lock lock(impl_->wait_mutex_); + while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() && + !time_source_changed) + { + impl_->cv_.wait(lock); } + impl_->stop_sleeping_ = false; } } diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 5df70a9465..39c03994c3 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -50,6 +50,14 @@ static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {tru class rclcpp::ExecutorImplementation {}; +Executor::Executor(const std::shared_ptr & context) +: spinning(false), + entities_need_rebuild_(true), + collector_(nullptr), + wait_set_({}, {}, {}, {}, {}, {}, context) +{ +} + Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), interrupt_guard_condition_(std::make_shared(options.context)), @@ -229,6 +237,28 @@ Executor::remove_node(std::shared_ptr node_ptr, bool notify) this->remove_node(node_ptr->get_node_base_interface(), notify); } +void +Executor::spin_node_once( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node, + std::chrono::duration_cast(timeout) + ); +} + +void +Executor::spin_node_once( + const std::shared_ptr & node, + std::chrono::nanoseconds timeout) +{ + return spin_node_once_nanoseconds( + node->get_node_base_interface(), + timeout + ); +} + void Executor::spin_node_once_nanoseconds( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, @@ -240,6 +270,58 @@ Executor::spin_node_once_nanoseconds( this->remove_node(node, false); } +rclcpp::FutureReturnCode Executor::spin_until_future_complete_impl( + std::chrono::nanoseconds timeout, + const std::function & get_future_status) +{ + // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = get_future_status(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_until_future_complete() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + // Do one item of work. + spin_once_impl(timeout_left); + + // Check if the future is set, return SUCCESS if it is. + status = get_future_status(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return FutureReturnCode::INTERRUPTED; +} + void Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index ea29f7a288..844ca630ac 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -55,6 +55,10 @@ ament_add_gtest(test_client test_client.cpp) if(TARGET test_client) target_link_libraries(test_client ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) endif() +ament_add_gtest(test_clock test_clock.cpp) +if(TARGET test_clock) + target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS}) +endif() ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp) if(TARGET test_copy_all_parameter_values) target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME}) diff --git a/rclcpp/test/rclcpp/test_clock.cpp b/rclcpp/test/rclcpp/test_clock.cpp new file mode 100644 index 0000000000..26c439bdde --- /dev/null +++ b/rclcpp/test/rclcpp/test_clock.cpp @@ -0,0 +1,228 @@ +// Copyright 2024 Cellumation GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rcl/error_handling.h" +#include "rcl/time.h" +#include "rclcpp/clock.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/time_source.hpp" + +#include "../utils/rclcpp_gtest_macros.hpp" + +using namespace std::chrono_literals; + +class TestClockWakeup : public ::testing::TestWithParam +{ +public: + void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + + void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock) + { + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + clock->sleep_until(clock->now() + std::chrono::seconds(3)); + thread_finished = true; + }); + + // make sure the thread is already sleeping before we send the cancel + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); + } + +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("my_node"); + } + + void TearDown() + { + node.reset(); + } + + rclcpp::Node::SharedPtr node; +}; + +INSTANTIATE_TEST_SUITE_P( + Clocks, + TestClockWakeup, + ::testing::Values( + RCL_SYSTEM_TIME, RCL_ROS_TIME, RCL_STEADY_TIME +)); + +TEST_P(TestClockWakeup, wakeup_sleep) { + auto clock = std::make_shared(GetParam()); + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + test_wakeup_after_sleep(clock); + test_wakeup_before_sleep(clock); +} + +TEST_F(TestClockWakeup, no_wakeup_on_sim_time) { + node->set_parameter({"use_sim_time", true}); + auto clock = std::make_shared(RCL_ROS_TIME); + rclcpp::TimeSource time_source(node); + time_source.attachClock(clock); + + EXPECT_TRUE(clock->ros_time_is_active()); + + std::atomic_bool thread_finished = false; + + std::thread wait_thread = std::thread( + [&clock, &thread_finished]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::milliseconds(10)); + thread_finished = true; + }); + + // make sure, that the sim time clock does not wakeup, as no clock is provided + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + EXPECT_FALSE(thread_finished); + + // notify the clock, that the sleep shall be interrupted + clock->cancel_sleep_or_wait(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + wait_thread.join(); + + EXPECT_TRUE(thread_finished); + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +} + +TEST_F(TestClockWakeup, multiple_threads_wait_on_one_clock) { + auto clock = std::make_shared(RCL_ROS_TIME); + + std::vector thread_finished(10, false); + + std::vector threads; + + for(size_t nr = 0; nr < thread_finished.size(); nr++) { + threads.push_back(std::thread( + [&clock, &thread_finished, nr]() + { + // make sure the thread starts sleeping late + clock->sleep_until(clock->now() + std::chrono::seconds(10)); + thread_finished[nr] = true; + })); + } + + // wait a bit so all threads can execute the sleep_until + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + for(const bool & finished : thread_finished) { + EXPECT_FALSE(finished); + } + + rclcpp::shutdown(); + + auto start_time = std::chrono::steady_clock::now(); + auto cur_time = start_time; + bool threads_finished = false; + while (!threads_finished && start_time + std::chrono::seconds(1) > cur_time) { + threads_finished = true; + for(const bool finished : thread_finished) { + if(!finished) { + threads_finished = false; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cur_time = std::chrono::steady_clock::now(); + } + + for(const bool finished : thread_finished) { + EXPECT_TRUE(finished); + } + + for(auto & thread : threads) { + thread.join(); + } + + EXPECT_LT(cur_time, start_time + std::chrono::seconds(1)); +}