Skip to content

Commit 218d86d

Browse files
author
Janosch Machowinski
committed
feat(clock): Added function to interrupt sleep
This is useful in case a second thread needs to wake up another thread, that is sleeping using a clock. Signed-off-by: Janosch Machowinski <[email protected]>
1 parent cb662f3 commit 218d86d

File tree

4 files changed

+233
-22
lines changed

4 files changed

+233
-22
lines changed

Diff for: rclcpp/include/rclcpp/clock.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ class Clock
193193
bool
194194
ros_time_is_active();
195195

196+
RCLCPP_PUBLIC
197+
void
198+
cancel_sleep_or_wait();
199+
200+
196201
/// Return the rcl_clock_t clock handle
197202
RCLCPP_PUBLIC
198203
rcl_clock_t *

Diff for: rclcpp/src/rclcpp/clock.cpp

+44-22
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ class Clock::Impl
4949

5050
rcl_clock_t rcl_clock_;
5151
rcl_allocator_t allocator_;
52+
bool stop_sleeping_ = false;
53+
std::condition_variable cv_;
54+
std::mutex wait_mutex_;
5255
std::mutex clock_mutex_;
5356
};
5457

@@ -79,8 +82,20 @@ Clock::now() const
7982
return now;
8083
}
8184

85+
void
86+
Clock::cancel_sleep_or_wait()
87+
{
88+
{
89+
std::unique_lock lock(impl_->wait_mutex_);
90+
impl_->stop_sleeping_ = true;
91+
}
92+
impl_->cv_.notify_one();
93+
}
94+
8295
bool
83-
Clock::sleep_until(Time until, Context::SharedPtr context)
96+
Clock::sleep_until(
97+
Time until,
98+
Context::SharedPtr context)
8499
{
85100
if (!context || !context->is_valid()) {
86101
throw std::runtime_error("context cannot be slept with because it's invalid");
@@ -91,12 +106,10 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
91106
}
92107
bool time_source_changed = false;
93108

94-
std::condition_variable cv;
95-
96109
// Wake this thread if the context is shutdown
97110
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback(
98-
[&cv]() {
99-
cv.notify_one();
111+
[this]() {
112+
cancel_sleep_or_wait();
100113
});
101114
// No longer need the shutdown callback when this function exits
102115
auto callback_remover = rcpputils::scope_exit(
@@ -112,22 +125,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
112125
const std::chrono::steady_clock::time_point chrono_until =
113126
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());
114127

115-
// loop over spurious wakeups but notice shutdown
116-
std::unique_lock lock(impl_->clock_mutex_);
117-
while (now() < until && context->is_valid()) {
118-
cv.wait_until(lock, chrono_until);
128+
// loop over spurious wakeups but notice shutdown or stop of sleep
129+
std::unique_lock lock(impl_->wait_mutex_);
130+
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
131+
impl_->cv_.wait_until(lock, chrono_until);
119132
}
133+
impl_->stop_sleeping_ = false;
120134
} else if (this_clock_type == RCL_SYSTEM_TIME) {
121135
auto system_time = std::chrono::system_clock::time_point(
122136
// Cast because system clock resolution is too big for nanoseconds on some systems
123137
std::chrono::duration_cast<std::chrono::system_clock::duration>(
124138
std::chrono::nanoseconds(until.nanoseconds())));
125139

126-
// loop over spurious wakeups but notice shutdown
127-
std::unique_lock lock(impl_->clock_mutex_);
128-
while (now() < until && context->is_valid()) {
129-
cv.wait_until(lock, system_time);
140+
// loop over spurious wakeups but notice shutdown or stop of sleep
141+
std::unique_lock lock(impl_->wait_mutex_);
142+
while (now() < until && !impl_->stop_sleeping_ && context->is_valid()) {
143+
impl_->cv_.wait_until(lock, system_time);
130144
}
145+
impl_->stop_sleeping_ = false;
131146
} else if (this_clock_type == RCL_ROS_TIME) {
132147
// Install jump handler for any amount of time change, for two purposes:
133148
// - if ROS time is active, check if time reached on each new clock sample
@@ -139,11 +154,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
139154
threshold.min_forward.nanoseconds = 1;
140155
auto clock_handler = create_jump_callback(
141156
nullptr,
142-
[&cv, &time_source_changed](const rcl_time_jump_t & jump) {
157+
[this, &time_source_changed](const rcl_time_jump_t & jump) {
143158
if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
159+
std::lock_guard<std::mutex> lk(impl_->wait_mutex_);
144160
time_source_changed = true;
145161
}
146-
cv.notify_one();
162+
impl_->cv_.notify_one();
147163
},
148164
threshold);
149165

@@ -153,19 +169,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
153169
std::chrono::duration_cast<std::chrono::system_clock::duration>(
154170
std::chrono::nanoseconds(until.nanoseconds())));
155171

156-
// loop over spurious wakeups but notice shutdown or time source change
157-
std::unique_lock lock(impl_->clock_mutex_);
158-
while (now() < until && context->is_valid() && !time_source_changed) {
159-
cv.wait_until(lock, system_time);
172+
// loop over spurious wakeups but notice shutdown, stop of sleep or time source change
173+
std::unique_lock lock(impl_->wait_mutex_);
174+
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
175+
!time_source_changed)
176+
{
177+
impl_->cv_.wait_until(lock, system_time);
160178
}
179+
impl_->stop_sleeping_ = false;
161180
} else {
162181
// RCL_ROS_TIME with ros_time_is_active.
163182
// Just wait without "until" because installed
164183
// jump callbacks wake the cv on every new sample.
165-
std::unique_lock lock(impl_->clock_mutex_);
166-
while (now() < until && context->is_valid() && !time_source_changed) {
167-
cv.wait(lock);
184+
std::unique_lock lock(impl_->wait_mutex_);
185+
while (now() < until && !impl_->stop_sleeping_ && context->is_valid() &&
186+
!time_source_changed)
187+
{
188+
impl_->cv_.wait(lock);
168189
}
190+
impl_->stop_sleeping_ = false;
169191
}
170192
}
171193

Diff for: rclcpp/test/rclcpp/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ ament_add_gtest(test_client test_client.cpp)
5555
if(TARGET test_client)
5656
target_link_libraries(test_client ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
5757
endif()
58+
ament_add_gtest(test_clock test_clock.cpp)
59+
if(TARGET test_clock)
60+
target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
61+
endif()
5862
ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp)
5963
if(TARGET test_copy_all_parameter_values)
6064
target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME})

Diff for: rclcpp/test/rclcpp/test_clock.cpp

+180
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Copyright 2024 Cellumation GmbH
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <gtest/gtest.h>
16+
17+
#include <chrono>
18+
19+
#include "rcl/error_handling.h"
20+
#include "rcl/time.h"
21+
#include "rclcpp/clock.hpp"
22+
#include "rclcpp/rclcpp.hpp"
23+
#include "rclcpp/time_source.hpp"
24+
25+
#include "../utils/rclcpp_gtest_macros.hpp"
26+
27+
using namespace std::chrono_literals;
28+
class TestClockWakeup : public ::testing::Test
29+
{
30+
public:
31+
void test_wakeup_before_sleep(const rclcpp::Clock::SharedPtr & clock)
32+
{
33+
std::atomic_bool thread_finished = false;
34+
35+
std::thread wait_thread = std::thread(
36+
[&clock, &thread_finished]()
37+
{
38+
// make sure the thread starts sleeping late
39+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
40+
clock->sleep_until(clock->now() + std::chrono::seconds(3));
41+
thread_finished = true;
42+
});
43+
44+
// notify the clock, that the sleep shall be interrupted
45+
clock->cancel_sleep_or_wait();
46+
47+
auto start_time = std::chrono::steady_clock::now();
48+
auto cur_time = start_time;
49+
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
50+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
51+
cur_time = std::chrono::steady_clock::now();
52+
}
53+
54+
wait_thread.join();
55+
56+
EXPECT_TRUE(thread_finished);
57+
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
58+
}
59+
60+
void test_wakeup_after_sleep(const rclcpp::Clock::SharedPtr & clock)
61+
{
62+
std::atomic_bool thread_finished = false;
63+
64+
std::thread wait_thread = std::thread(
65+
[&clock, &thread_finished]()
66+
{
67+
clock->sleep_until(clock->now() + std::chrono::seconds(3));
68+
thread_finished = true;
69+
});
70+
71+
// make sure the thread is already sleeping before we send the cancel
72+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
73+
74+
// notify the clock, that the sleep shall be interrupted
75+
clock->cancel_sleep_or_wait();
76+
77+
auto start_time = std::chrono::steady_clock::now();
78+
auto cur_time = start_time;
79+
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
80+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
81+
cur_time = std::chrono::steady_clock::now();
82+
}
83+
84+
wait_thread.join();
85+
86+
EXPECT_TRUE(thread_finished);
87+
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
88+
}
89+
90+
protected:
91+
static void SetUpTestCase()
92+
{
93+
rclcpp::init(0, nullptr);
94+
}
95+
96+
static void TearDownTestCase()
97+
{
98+
rclcpp::shutdown();
99+
}
100+
101+
void SetUp()
102+
{
103+
node = std::make_shared<rclcpp::Node>("my_node");
104+
}
105+
106+
void TearDown()
107+
{
108+
node.reset();
109+
}
110+
111+
rclcpp::Node::SharedPtr node;
112+
};
113+
114+
TEST_F(TestClockWakeup, wakeup_sleep_steay_time) {
115+
auto clock = std::make_shared<rclcpp::Clock>(RCL_STEADY_TIME);
116+
test_wakeup_after_sleep(clock);
117+
test_wakeup_before_sleep(clock);
118+
}
119+
120+
TEST_F(TestClockWakeup, wakeup_sleep_system_time) {
121+
auto clock = std::make_shared<rclcpp::Clock>(RCL_SYSTEM_TIME);
122+
test_wakeup_after_sleep(clock);
123+
test_wakeup_before_sleep(clock);
124+
}
125+
126+
TEST_F(TestClockWakeup, wakeup_sleep_ros_time_not_active) {
127+
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
128+
test_wakeup_after_sleep(clock);
129+
test_wakeup_before_sleep(clock);
130+
}
131+
132+
TEST_F(TestClockWakeup, wakeup_sleep_ros_time_active) {
133+
node->set_parameter({"use_sim_time", true});
134+
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
135+
rclcpp::TimeSource time_source(node);
136+
time_source.attachClock(clock);
137+
138+
EXPECT_TRUE(clock->ros_time_is_active());
139+
140+
test_wakeup_after_sleep(clock);
141+
test_wakeup_before_sleep(clock);
142+
}
143+
144+
TEST_F(TestClockWakeup, no_wakeup_on_sim_time) {
145+
node->set_parameter({"use_sim_time", true});
146+
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
147+
rclcpp::TimeSource time_source(node);
148+
time_source.attachClock(clock);
149+
150+
EXPECT_TRUE(clock->ros_time_is_active());
151+
152+
std::atomic_bool thread_finished = false;
153+
154+
std::thread wait_thread = std::thread(
155+
[&clock, &thread_finished]()
156+
{
157+
// make sure the thread starts sleeping late
158+
clock->sleep_until(clock->now() + std::chrono::milliseconds(10));
159+
thread_finished = true;
160+
});
161+
162+
// make sure, that the sim time clock does not wakeup, as no clock is provided
163+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
164+
EXPECT_FALSE(thread_finished);
165+
166+
// notify the clock, that the sleep shall be interrupted
167+
clock->cancel_sleep_or_wait();
168+
169+
auto start_time = std::chrono::steady_clock::now();
170+
auto cur_time = start_time;
171+
while (!thread_finished && start_time + std::chrono::seconds(1) > cur_time) {
172+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
173+
cur_time = std::chrono::steady_clock::now();
174+
}
175+
176+
wait_thread.join();
177+
178+
EXPECT_TRUE(thread_finished);
179+
EXPECT_LT(cur_time, start_time + std::chrono::seconds(1));
180+
}

0 commit comments

Comments
 (0)