Skip to content

Commit 2f1b739

Browse files
committed
Introduce executors new spin_for method, replace spin_until_future_complete
with spin_until_complete. (ros2#1821) * Introduce spin_for method. * Introduce spin_until_complete. * Deprecate spin_until_future_complete. * Replace usage of deprecated method. * Update unit-tests. Signed-off-by: Hubert Liberacki <[email protected]>
1 parent eac0063 commit 2f1b739

25 files changed

+407
-247
lines changed

rclcpp/include/rclcpp/client.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -579,11 +579,11 @@ class Client : public ClientBase
579579
/// Send a request to the service server.
580580
/**
581581
* This method returns a `FutureAndRequestId` instance
582-
* that can be passed to Executor::spin_until_future_complete() to
582+
* that can be passed to Executor::spin_until_complete() to
583583
* wait until it has been completed.
584584
*
585585
* If the future never completes,
586-
* e.g. the call to Executor::spin_until_future_complete() times out,
586+
* e.g. the call to Executor::spin_until_complete() times out,
587587
* Client::remove_pending_request() must be called to clean the client internal state.
588588
* Not doing so will make the `Client` instance to use more memory each time a response is not
589589
* received from the service server.
@@ -592,7 +592,7 @@ class Client : public ClientBase
592592
* auto future = client->async_send_request(my_request);
593593
* if (
594594
* rclcpp::FutureReturnCode::TIMEOUT ==
595-
* executor->spin_until_future_complete(future, timeout))
595+
* executor->spin_until_complete(future, timeout))
596596
* {
597597
* client->remove_pending_request(future);
598598
* // handle timeout

rclcpp/include/rclcpp/executor.hpp

+86-46
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <mutex>
2727
#include <string>
2828
#include <vector>
29+
#include <type_traits>
2930

3031
#include "rcl/guard_condition.h"
3132
#include "rcl/wait.h"
@@ -319,6 +320,47 @@ class Executor
319320
virtual void
320321
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
321322

323+
/// Spin (blocking) until the condition is complete, it times out waiting, or rclcpp is interrupted.
324+
/**
325+
* \param[in] future The condition which can be callable or future type to wait on. If this function returns SUCCESS, the future can be
326+
* accessed without blocking (though it may still throw an exception).
327+
* \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once.
328+
* `-1` is block forever, `0` is non-blocking.
329+
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
330+
* code.
331+
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
332+
*/
333+
template<typename Condition, typename DurationT = std::chrono::milliseconds>
334+
FutureReturnCode spin_until_complete(
335+
const Condition & condition,
336+
DurationT timeout = DurationT(-1))
337+
{
338+
if constexpr (std::is_invocable_v<Condition>) {
339+
using RetT = std::invoke_result_t<Condition>;
340+
static_assert(
341+
std::is_same_v<bool, RetT>,
342+
"Conditional callable has to return boolean type");
343+
return spin_until_complete_impl(condition, timeout);
344+
} else {
345+
auto checkFuture = [&condition]() {
346+
return condition.wait_for(std::chrono::seconds(0)) ==
347+
std::future_status::ready;
348+
};
349+
return spin_until_complete_impl(checkFuture, timeout);
350+
}
351+
}
352+
353+
/// spin (blocking) for given amount of duration.
354+
/**
355+
* \param[in] duration gets passed to Executor::spin_node_once, and spins the executor for given duration.
356+
*/
357+
template<typename DurationT>
358+
void
359+
spin_for(DurationT duration)
360+
{
361+
(void)spin_until_complete([]() {return false;}, duration);
362+
}
363+
322364
/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
323365
/**
324366
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
@@ -330,57 +372,13 @@ class Executor
330372
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
331373
*/
332374
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
375+
[[deprecated("use spin_until_complete(const Condition & condition, DurationT timeout) instead")]]
333376
FutureReturnCode
334377
spin_until_future_complete(
335378
const FutureT & future,
336379
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
337380
{
338-
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
339-
// inside a callback executed by an executor.
340-
341-
// Check the future before entering the while loop.
342-
// If the future is already complete, don't try to spin.
343-
std::future_status status = future.wait_for(std::chrono::seconds(0));
344-
if (status == std::future_status::ready) {
345-
return FutureReturnCode::SUCCESS;
346-
}
347-
348-
auto end_time = std::chrono::steady_clock::now();
349-
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
350-
timeout);
351-
if (timeout_ns > std::chrono::nanoseconds::zero()) {
352-
end_time += timeout_ns;
353-
}
354-
std::chrono::nanoseconds timeout_left = timeout_ns;
355-
356-
if (spinning.exchange(true)) {
357-
throw std::runtime_error("spin_until_future_complete() called while already spinning");
358-
}
359-
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
360-
while (rclcpp::ok(this->context_) && spinning.load()) {
361-
// Do one item of work.
362-
spin_once_impl(timeout_left);
363-
364-
// Check if the future is set, return SUCCESS if it is.
365-
status = future.wait_for(std::chrono::seconds(0));
366-
if (status == std::future_status::ready) {
367-
return FutureReturnCode::SUCCESS;
368-
}
369-
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
370-
if (timeout_ns < std::chrono::nanoseconds::zero()) {
371-
continue;
372-
}
373-
// Otherwise check if we still have time to wait, return TIMEOUT if not.
374-
auto now = std::chrono::steady_clock::now();
375-
if (now >= end_time) {
376-
return FutureReturnCode::TIMEOUT;
377-
}
378-
// Subtract the elapsed time from the original timeout.
379-
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
380-
}
381-
382-
// The future did not complete before ok() returned false, return INTERRUPTED.
383-
return FutureReturnCode::INTERRUPTED;
381+
return spin_until_complete(future, timeout);
384382
}
385383

386384
/// Cancel any running spin* function, causing it to return.
@@ -560,6 +558,48 @@ class Executor
560558
virtual void
561559
spin_once_impl(std::chrono::nanoseconds timeout);
562560

561+
protected:
562+
// Implementation details, used by spin_until_complete and spin_for.
563+
// Previouse implementation of spin_until_future_complete.
564+
template<typename Condition, typename DurationT>
565+
FutureReturnCode spin_until_complete_impl(Condition condition, DurationT timeout)
566+
{
567+
auto end_time = std::chrono::steady_clock::now();
568+
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
569+
timeout);
570+
if (timeout_ns > std::chrono::nanoseconds::zero()) {
571+
end_time += timeout_ns;
572+
}
573+
std::chrono::nanoseconds timeout_left = timeout_ns;
574+
if (spinning.exchange(true)) {
575+
throw std::runtime_error("spin_until_complete() called while already spinning");
576+
}
577+
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
578+
while (rclcpp::ok(this->context_) && spinning.load()) {
579+
// Do one item of work.
580+
spin_once_impl(timeout_left);
581+
582+
if (condition()) {
583+
return FutureReturnCode::SUCCESS;
584+
}
585+
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
586+
if (timeout_ns < std::chrono::nanoseconds::zero()) {
587+
continue;
588+
}
589+
// Otherwise check if we still have time to wait, return TIMEOUT if not.
590+
auto now = std::chrono::steady_clock::now();
591+
if (now >= end_time) {
592+
return FutureReturnCode::TIMEOUT;
593+
}
594+
// Subtract the elapsed time from the original timeout.
595+
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
596+
}
597+
598+
// The condition did not pass before ok() returned false, return INTERRUPTED.
599+
return FutureReturnCode::INTERRUPTED;
600+
}
601+
602+
public:
563603
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
564604
const rclcpp::GuardCondition *,
565605
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>

rclcpp/include/rclcpp/executors.hpp

+74-9
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,50 @@ namespace executors
5454
using rclcpp::executors::MultiThreadedExecutor;
5555
using rclcpp::executors::SingleThreadedExecutor;
5656

57+
/// Spin (blocking) until the conditon is complete, it times out waiting, or rclcpp is interrupted.
58+
/**
59+
* \param[in] executor The executor which will spin the node.
60+
* \param[in] node_ptr The node to spin.
61+
* \param[in] condition The callable or future to wait on. If `SUCCESS`, the condition is safe to
62+
* access after this function
63+
* \param[in] timeout Optional timeout parameter, which gets passed to
64+
* Executor::spin_node_once.
65+
* `-1` is block forever, `0` is non-blocking.
66+
* If the time spent inside the blocking loop exceeds this timeout, return a `TIMEOUT` return code.
67+
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
68+
*/
69+
template<typename Condition, typename DurationT = std::chrono::milliseconds>
70+
rclcpp::FutureReturnCode
71+
spin_node_until_complete(
72+
rclcpp::Executor & executor,
73+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
74+
const Condition & condition,
75+
DurationT timeout = DurationT(-1))
76+
{
77+
// TODO(wjwwood): does not work recursively; can't call spin_node_until_complete
78+
// inside a callback executed by an executor.
79+
executor.add_node(node_ptr);
80+
auto retcode = executor.spin_until_complete(condition, timeout);
81+
executor.remove_node(node_ptr);
82+
return retcode;
83+
}
84+
85+
template<typename NodeT = rclcpp::Node, typename ConditionT,
86+
typename DurationT = std::chrono::milliseconds>
87+
rclcpp::FutureReturnCode
88+
spin_node_until_complete(
89+
rclcpp::Executor & executor,
90+
std::shared_ptr<NodeT> node_ptr,
91+
const ConditionT & condition,
92+
DurationT timeout = DurationT(-1))
93+
{
94+
return rclcpp::executors::spin_node_until_complete(
95+
executor,
96+
node_ptr->get_node_base_interface(),
97+
condition,
98+
timeout);
99+
}
100+
57101
/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
58102
/**
59103
* \param[in] executor The executor which will spin the node.
@@ -67,31 +111,28 @@ using rclcpp::executors::SingleThreadedExecutor;
67111
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
68112
*/
69113
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
114+
[[deprecated("use spin_node_until_complete(Executor &, node_interfaces::NodeBaseInterface::SharedPtr, const Condition &, DurationT) instead")]]
70115
rclcpp::FutureReturnCode
71116
spin_node_until_future_complete(
72117
rclcpp::Executor & executor,
73118
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
74119
const FutureT & future,
75120
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
76121
{
77-
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
78-
// inside a callback executed by an executor.
79-
executor.add_node(node_ptr);
80-
auto retcode = executor.spin_until_future_complete(future, timeout);
81-
executor.remove_node(node_ptr);
82-
return retcode;
122+
return spin_until_complete(executor, node_ptr, future, timeout);
83123
}
84124

85125
template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
86126
typename TimeT = std::milli>
127+
[[deprecated("use spin_node_until_complete(Executor &, std::shared_ptr<NodeT>, const Condition &, DurationT) instead")]]
87128
rclcpp::FutureReturnCode
88129
spin_node_until_future_complete(
89130
rclcpp::Executor & executor,
90131
std::shared_ptr<NodeT> node_ptr,
91132
const FutureT & future,
92133
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
93134
{
94-
return rclcpp::executors::spin_node_until_future_complete(
135+
return rclcpp::executors::spin_node_until_complete(
95136
executor,
96137
node_ptr->get_node_base_interface(),
97138
future,
@@ -100,26 +141,50 @@ spin_node_until_future_complete(
100141

101142
} // namespace executors
102143

144+
template<typename Condition, typename DurationT = std::chrono::milliseconds>
145+
rclcpp::FutureReturnCode
146+
spin_until_complete(
147+
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
148+
const Condition & condition,
149+
DurationT timeout = DurationT(-1))
150+
{
151+
rclcpp::executors::SingleThreadedExecutor executor;
152+
return executors::spin_node_until_complete<Condition>(executor, node_ptr, condition, timeout);
153+
}
154+
155+
template<typename NodeT = rclcpp::Node, typename Condition,
156+
typename DurationT = std::chrono::milliseconds>
157+
rclcpp::FutureReturnCode
158+
spin_until_complete(
159+
std::shared_ptr<NodeT> node_ptr,
160+
const Condition & condition,
161+
DurationT timeout = DurationT(-1))
162+
{
163+
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), condition, timeout);
164+
}
165+
103166
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
167+
[[deprecated("use spin_until_complete(node_interfaces::NodeBaseInterface::SharedPtr, const Condition &, DurationT) instead")]]
104168
rclcpp::FutureReturnCode
105169
spin_until_future_complete(
106170
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
107171
const FutureT & future,
108172
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
109173
{
110174
rclcpp::executors::SingleThreadedExecutor executor;
111-
return executors::spin_node_until_future_complete<FutureT>(executor, node_ptr, future, timeout);
175+
return executors::spin_node_until_complete<FutureT>(executor, node_ptr, future, timeout);
112176
}
113177

114178
template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
115179
typename TimeT = std::milli>
180+
[[deprecated("use spin_until_complete(std::shared_ptr<NodeT>, const Condition &, DurationT) instead")]]
116181
rclcpp::FutureReturnCode
117182
spin_until_future_complete(
118183
std::shared_ptr<NodeT> node_ptr,
119184
const FutureT & future,
120185
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
121186
{
122-
return rclcpp::spin_until_future_complete(node_ptr->get_node_base_interface(), future, timeout);
187+
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), future, timeout);
123188
}
124189

125190
} // namespace rclcpp

rclcpp/include/rclcpp/future_return_code.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
namespace rclcpp
2424
{
2525

26-
/// Return codes to be used with spin_until_future_complete.
26+
/// Return codes to be used with spin_until_complete.
2727
/**
2828
* SUCCESS: The future is complete and can be accessed with "get" without blocking.
2929
* This does not indicate that the operation succeeded; "get" may still throw an exception.

rclcpp/include/rclcpp/rclcpp.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
* - Executors (responsible for execution of callbacks through a blocking spin):
6969
* - rclcpp::spin()
7070
* - rclcpp::spin_some()
71-
* - rclcpp::spin_until_future_complete()
71+
* - rclcpp::spin_until_complete()
7272
* - rclcpp::executors::SingleThreadedExecutor
7373
* - rclcpp::executors::SingleThreadedExecutor::add_node()
7474
* - rclcpp::executors::SingleThreadedExecutor::spin()

rclcpp/include/rclcpp/wait_set_policies/sequential_synchronization.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ class SequentialSynchronization : public detail::SynchronizationPolicyCommon
203203
std::shared_ptr<rclcpp::Waitable> && waitable,
204204
std::shared_ptr<void> && associated_entity,
205205
std::function<
206-
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void> &&)
206+
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void>&&)
207207
> add_waitable_function)
208208
{
209209
// Explicitly no thread synchronization.

rclcpp/include/rclcpp/wait_set_policies/thread_safe_synchronization.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ class ThreadSafeSynchronization : public detail::SynchronizationPolicyCommon
216216
std::shared_ptr<rclcpp::Waitable> && waitable,
217217
std::shared_ptr<void> && associated_entity,
218218
std::function<
219-
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void> &&)
219+
void(std::shared_ptr<rclcpp::Waitable>&&, std::shared_ptr<void>&&)
220220
> add_waitable_function)
221221
{
222222
using rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock;

0 commit comments

Comments
 (0)