Skip to content

Commit 6bccbc4

Browse files
Add AsyncFunctionHandlerParams initialization through ROS parameters (backport #402) (#413)
Add AsyncFunctionHandlerParams initialization through ROS parameters (#402) (cherry picked from commit 37a17bc) Co-authored-by: Sai Kishor Kothakota <[email protected]>
1 parent 471295c commit 6bccbc4

File tree

2 files changed

+125
-32
lines changed

2 files changed

+125
-32
lines changed

realtime_tools/include/realtime_tools/async_function_handler.hpp

Lines changed: 115 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,14 @@
3838

3939
namespace realtime_tools
4040
{
41-
41+
/**
42+
* @brief Enum class to define the scheduling policy for the async worker thread.
43+
* SYNCHRONIZED: The async worker thread will be synchronized with the main thread, as the main
44+
* thread will be triggering the async callback method.
45+
* DETACHED: The async worker thread will be detached from the main thread and will have its own
46+
* execution cycle.
47+
* UNKNOWN: The scheduling policy is unknown.
48+
*/
4249
class AsyncSchedulingPolicy
4350
{
4451
public:
@@ -92,9 +99,38 @@ class AsyncSchedulingPolicy
9299
Value value_ = UNKNOWN;
93100
};
94101

102+
/**
103+
* @brief The AsyncFunctionHandlerParams struct is used to configure the AsyncFunctionHandler.
104+
* If the type is SYNCHRONIZED, the async worker thread will be synchronized with the main
105+
* thread, as the main thread will be triggering the async callback method.
106+
* If the type is DETACHED, the async worker thread will be detached from the main thread and
107+
* will have its own execution cycle.
108+
*
109+
* @param thread_priority Priority of the async worker thread. Should be between 0 and 99.
110+
* @param cpu_affinity_cores CPU cores to which the async worker thread should be pinned.
111+
* If empty, the thread will not be pinned to any CPU core.
112+
* @param scheduling_policy Scheduling policy for the async worker thread. Can be either
113+
* SYNCHRONIZED or DETACHED.
114+
* @param exec_rate Execution rate of the async worker thread in Hz. Only used if the
115+
* scheduling_policy is DETACHED. Must be a positive integer.
116+
* @param clock Clock to be used for the async worker thread. Only used if the scheduling_policy
117+
* is DETACHED.
118+
* @param logger Logger to be used for the async worker thread. If not set, a default logger will be used.
119+
* @param trigger_predicate Predicate function to check if the async callback method should be triggered or not.
120+
* If not set, the async callback method will be triggered every time.
121+
* @param wait_until_initial_trigger Whether to wait until the initial trigger predicate is true before starting
122+
* the async callback method. If true, the async callback method will not be called until the trigger predicate
123+
* returns true for the first time. Very useful when the type is DETACHED.
124+
* @param print_warnings Whether to print warnings when the async callback method is not triggered due to any reason.
125+
*/
95126
struct AsyncFunctionHandlerParams
96127
{
97-
bool validate()
128+
/**
129+
* @brief Validate the parameters.
130+
* @return true if the parameters are valid, false otherwise.
131+
* @throws std::runtime_error if the scheduling policy is UNKNOWN.
132+
*/
133+
bool validate() const
98134
{
99135
if (thread_priority < 0 || thread_priority > 99) {
100136
RCLCPP_ERROR(
@@ -120,28 +156,77 @@ struct AsyncFunctionHandlerParams
120156
RCLCPP_ERROR(logger, "The parsed trigger predicate is not valid!");
121157
return false;
122158
}
159+
for (const int & core : cpu_affinity_cores) {
160+
if (core < 0) {
161+
RCLCPP_ERROR(logger, "Invalid CPU core id: %d. It should be a non-negative integer.", core);
162+
return false;
163+
}
164+
}
123165
return true;
124166
}
125167

126-
/// @brief The AsyncFunctionHandlerParams struct is used to configure the AsyncFunctionHandler.
127-
/// If the type is SYNCHRONIZED, the async worker thread will be synchronized with the main
128-
/// thread, as the main thread will be triggering the async callback method.
129-
/// If the type is DETACHED, the async worker thread will be detached from the main thread and
130-
/// will have its own execution cycle.
131-
int thread_priority = 50; /// Thread priority for the async worker thread
132-
std::vector<int> cpu_affinity_cores =
133-
{}; /// CPU cores to which the async worker thread should be pinned
134-
AsyncSchedulingPolicy scheduling_policy =
135-
AsyncSchedulingPolicy::SYNCHRONIZED; /// Scheduling policy for the async worker thread
136-
unsigned int exec_rate = 0u; /// Execution rate of the async worker thread in Hz
137-
rclcpp::Clock::SharedPtr clock = nullptr; /// Clock to be used for the async worker thread
138-
rclcpp::Logger logger =
139-
rclcpp::get_logger("AsyncFunctionHandler"); /// Logger to be used for the async worker thread
140-
std::function<bool()> trigger_predicate = []() {
141-
return true;
142-
}; /// Predicate function to check if the async callback method should be triggered or not
143-
bool wait_until_initial_trigger =
144-
true; /// Whether to wait until the initial trigger predicate is true
168+
/**
169+
* @brief Initialize the parameters from a node's parameters.
170+
* The node should have the following parameters:
171+
* - thread_priority (int): Priority of the async worker thread. Default is 50.
172+
* - cpu_affinity (int[]): CPU cores to which the async worker thread should be pinned.
173+
* Default is empty, which means the thread will not be pinned to any CPU core.
174+
* - scheduling_policy (string): Scheduling policy for the async worker thread. Can be either
175+
* "synchronized" or "detached". Default is "synchronized".
176+
* - execution_rate (int): Execution rate of the async worker thread in Hz.
177+
* - wait_until_initial_trigger (bool): Whether to wait until the initial trigger predicate is true
178+
* before starting the async callback method. Default is true.
179+
* - print_warnings (bool): Whether to print warnings when the async callback method is not triggered
180+
* due to any reason. Default is true.
181+
* @param node The node that is used to get the parameters.
182+
* @param prefix Parameter prefix to use when accessing node parameters.
183+
*/
184+
template <typename NodeT>
185+
void initialize(NodeT & node, const std::string & prefix)
186+
{
187+
if (node->has_parameter(prefix + "thread_priority")) {
188+
thread_priority = static_cast<int>(node->get_parameter(prefix + "thread_priority").as_int());
189+
}
190+
if (node->has_parameter(prefix + "cpu_affinity")) {
191+
const auto cpu_affinity_param =
192+
node->get_parameter(prefix + "cpu_affinity").as_integer_array();
193+
for (const auto & core : cpu_affinity_param) {
194+
cpu_affinity_cores.push_back(static_cast<int>(core));
195+
}
196+
}
197+
if (node->has_parameter(prefix + "scheduling_policy")) {
198+
scheduling_policy =
199+
AsyncSchedulingPolicy(node->get_parameter(prefix + "scheduling_policy").as_string());
200+
}
201+
if (
202+
scheduling_policy == AsyncSchedulingPolicy::DETACHED &&
203+
node->has_parameter(prefix + "execution_rate")) {
204+
const int execution_rate =
205+
static_cast<int>(node->get_parameter(prefix + "execution_rate").as_int());
206+
if (execution_rate <= 0) {
207+
throw std::runtime_error(
208+
"AsyncFunctionHandler: execution_rate parameter must be positive.");
209+
}
210+
exec_rate = static_cast<unsigned int>(execution_rate);
211+
}
212+
if (node->has_parameter(prefix + "wait_until_initial_trigger")) {
213+
wait_until_initial_trigger =
214+
node->get_parameter(prefix + "wait_until_initial_trigger").as_bool();
215+
}
216+
if (node->has_parameter(prefix + "print_warnings")) {
217+
print_warnings = node->get_parameter(prefix + "print_warnings").as_bool();
218+
}
219+
}
220+
221+
int thread_priority = 50;
222+
std::vector<int> cpu_affinity_cores = {};
223+
AsyncSchedulingPolicy scheduling_policy = AsyncSchedulingPolicy::SYNCHRONIZED;
224+
unsigned int exec_rate = 0u;
225+
rclcpp::Clock::SharedPtr clock = nullptr;
226+
rclcpp::Logger logger = rclcpp::get_logger("AsyncFunctionHandler");
227+
std::function<bool()> trigger_predicate = []() { return true; };
228+
bool wait_until_initial_trigger = true;
229+
bool print_warnings = true;
145230
};
146231

147232
/**
@@ -210,6 +295,7 @@ class AsyncFunctionHandler
210295
std::function<T(const rclcpp::Time &, const rclcpp::Duration &)> callback,
211296
const AsyncFunctionHandlerParams & params)
212297
{
298+
params.validate();
213299
init(callback, params.trigger_predicate, params.thread_priority);
214300
params_ = params;
215301
pause_thread_ = params.wait_until_initial_trigger;
@@ -250,7 +336,7 @@ class AsyncFunctionHandler
250336
RCLCPP_WARN_ONCE(
251337
params_.logger,
252338
"AsyncFunctionHandler is configured with DETACHED scheduling policy. "
253-
"This means that the async callback will not be synchronized with the main thread. ");
339+
"This means that the async callback may not be synchronized with the main thread. ");
254340
if (pause_thread_.load(std::memory_order_relaxed)) {
255341
{
256342
std::unique_lock<std::mutex> lock(async_mtx_);
@@ -560,11 +646,13 @@ class AsyncFunctionHandler
560646
std::chrono::duration<double, std::milli>(time_now - next_iteration_time).count();
561647
const double cm_period = 1.e3 / static_cast<double>(params_.exec_rate);
562648
const int overrun_count = static_cast<int>(std::ceil(time_diff / cm_period));
563-
RCLCPP_WARN_THROTTLE(
564-
params_.logger, *params_.clock, 1000,
565-
"Overrun detected! The async callback missed its desired rate of %d Hz. The loop "
566-
"took %f ms (missed cycles : %d).",
567-
params_.exec_rate, time_diff + cm_period, overrun_count + 1);
649+
if (params_.print_warnings) {
650+
RCLCPP_WARN_THROTTLE(
651+
params_.logger, *params_.clock, 1000,
652+
"Overrun detected! The async callback missed its desired rate of %d Hz. The loop "
653+
"took %f ms (missed cycles : %d).",
654+
params_.exec_rate, time_diff + cm_period, overrun_count + 1);
655+
}
568656
next_iteration_time += (overrun_count * period);
569657
}
570658
std::this_thread::sleep_until(next_iteration_time);

realtime_tools/test/test_async_function_handler.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <memory>
1717

1818
#include "gmock/gmock.h"
19+
#include "rclcpp/node.hpp"
1920
#include "rclcpp/rclcpp.hpp"
2021
#include "test_async_function_handler.hpp"
2122

@@ -432,12 +433,16 @@ TEST_F(AsyncFunctionHandlerTest, trigger_for_several_cycles_in_detached_scheduli
432433
{
433434
realtime_tools::TestAsyncFunctionHandler async_class;
434435

435-
rclcpp::Clock::SharedPtr clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
436+
rclcpp::NodeOptions node_options;
437+
node_options.arguments(
438+
{"--ros-args", "-p", "scheduling_policy:=detached", "-p", "wait_until_initial_trigger:=false",
439+
"-p", "execution_rate:=500", "-p", "thread_priority:=60", "-p", "cpu_affinity:=[0]"});
440+
node_options.allow_undeclared_parameters(true);
441+
node_options.automatically_declare_parameters_from_overrides(true);
442+
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>("test_node", node_options);
436443
realtime_tools::AsyncFunctionHandlerParams params;
437-
params.scheduling_policy = realtime_tools::AsyncSchedulingPolicy::DETACHED;
438-
params.clock = clock;
439-
params.exec_rate = 500u; // 500 Hz
440-
params.wait_until_initial_trigger = false;
444+
params.clock = node->get_clock();
445+
params.initialize(node, "");
441446
async_class.initialize(params);
442447
ASSERT_TRUE(async_class.get_handler().is_initialized());
443448
ASSERT_FALSE(async_class.get_handler().is_running());

0 commit comments

Comments
 (0)