Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure waitables handle guard condition retriggering #2420

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/event_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class EventHandlerBase : public Waitable
/// Check if the Waitable is ready.
RCLCPP_PUBLIC
bool
is_ready(rcl_wait_set_t * wait_set) override;
is_ready(const rcl_wait_set_t * wait_set) override;

/// Set a callback to be called when each new event instance occurs.
/**
Expand Down Expand Up @@ -294,7 +294,7 @@ class EventHandler : public EventHandlerBase

/// Execute any entities of the Waitable that are ready.
void
execute(std::shared_ptr<void> & data) override
execute(const std::shared_ptr<void> & data) override
{
if (!data) {
throw std::runtime_error("'data' is empty");
Expand Down
5 changes: 2 additions & 3 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
RCLCPP_PUBLIC
ExecutorNotifyWaitable(ExecutorNotifyWaitable & other);


RCLCPP_PUBLIC
ExecutorNotifyWaitable & operator=(ExecutorNotifyWaitable & other);

Expand All @@ -69,7 +68,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
*/
RCLCPP_PUBLIC
bool
is_ready(rcl_wait_set_t * wait_set) override;
is_ready(const rcl_wait_set_t * wait_set) override;

/// Perform work associated with the waitable.
/**
Expand All @@ -78,7 +77,7 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
*/
RCLCPP_PUBLIC
void
execute(std::shared_ptr<void> & data) override;
execute(const std::shared_ptr<void> & data) override;

/// Retrieve data to be used in the next execute call.
/**
Expand Down
26 changes: 22 additions & 4 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ class SubscriptionIntraProcess

virtual ~SubscriptionIntraProcess() = default;

RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set) override
{
// This block is necessary when the guard condition wakes the wait set, but
// the intra process waitable was not handled before the wait set is waited
// on again.
// Basically we're keeping the guard condition triggered so long as there is
// data in the buffer.
if (this->buffer_->has_data()) {
// If there is data still to be processed, indicate to the
// executor or waitset by triggering the guard condition.
this->trigger_guard_condition();
}
// Let the parent classes handle the rest of the work:
return SubscriptionIntraProcessBufferT::add_to_wait_set(wait_set);
}

std::shared_ptr<void>
take_data() override
{
Expand Down Expand Up @@ -132,25 +150,25 @@ class SubscriptionIntraProcess
);
}

void execute(std::shared_ptr<void> & data) override
void execute(const std::shared_ptr<void> & data) override
{
execute_impl<SubscribedType>(data);
}

protected:
template<typename T>
typename std::enable_if<std::is_same<T, rcl_serialized_message_t>::value, void>::type
execute_impl(std::shared_ptr<void> & data)
execute_impl(const std::shared_ptr<void> & data)
{
(void)data;
throw std::runtime_error("Subscription intra-process can't handle serialized messages");
}

template<class T>
typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
execute_impl(std::shared_ptr<void> & data)
execute_impl(const std::shared_ptr<void> & data)
{
if (!data) {
if (nullptr == data) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
is_durability_transient_local() const;

bool
is_ready(rcl_wait_set_t * wait_set) override = 0;
is_ready(const rcl_wait_set_t * wait_set) override = 0;

std::shared_ptr<void>
take_data() override = 0;
Expand All @@ -85,7 +85,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
}

void
execute(std::shared_ptr<void> & data) override = 0;
execute(const std::shared_ptr<void> & data) override = 0;

virtual
bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff
}

bool
is_ready(rcl_wait_set_t * wait_set) override
is_ready(const rcl_wait_set_t * wait_set) override
{
(void) wait_set;
return buffer_->has_data();
Expand Down
23 changes: 11 additions & 12 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include <memory>
#include <vector>
#include <utility>

#include "rcl/allocator.h"

Expand Down Expand Up @@ -121,8 +120,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
}
for (size_t i = 0; i < waitable_handles_.size(); ++i) {
if (waitable_handles_[i]->is_ready(wait_set)) {
waitable_triggered_handles_.emplace_back(std::move(waitable_handles_[i]));
if (!waitable_handles_[i]->is_ready(wait_set)) {
waitable_handles_[i].reset();
}
}

Expand All @@ -146,7 +145,10 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
timer_handles_.end()
);

waitable_handles_.clear();
waitable_handles_.erase(
std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
waitable_handles_.end()
);
}

bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
Expand Down Expand Up @@ -390,17 +392,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
rclcpp::AnyExecutable & any_exec,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override
{
auto & waitable_handles = waitable_triggered_handles_;
auto it = waitable_handles.begin();
while (it != waitable_handles.end()) {
auto it = waitable_handles_.begin();
while (it != waitable_handles_.end()) {
std::shared_ptr<Waitable> & waitable = *it;
if (waitable) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_waitable(waitable, weak_groups_to_nodes);
if (!group) {
// Group was not found, meaning the waitable is not valid...
// Remove it from the ready list and continue looking
it = waitable_handles.erase(it);
it = waitable_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
Expand All @@ -413,11 +414,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
any_exec.waitable = waitable;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes);
waitable_handles.erase(it);
waitable_handles_.erase(it);
return;
}
// Else, the waitable is no longer valid, remove it and continue
it = waitable_handles.erase(it);
it = waitable_handles_.erase(it);
}
}

Expand Down Expand Up @@ -498,8 +499,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;

VectorRebind<std::shared_ptr<Waitable>> waitable_triggered_handles_;

std::shared_ptr<VoidAlloc> allocator_;
};

Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class Waitable
RCLCPP_PUBLIC
virtual
bool
is_ready(rcl_wait_set_t * wait_set) = 0;
is_ready(const rcl_wait_set_t * wait_set) = 0;

/// Take the data so that it can be consumed with `execute`.
/**
Expand Down Expand Up @@ -203,7 +203,7 @@ class Waitable
RCLCPP_PUBLIC
virtual
void
execute(std::shared_ptr<void> & data) = 0;
execute(const std::shared_ptr<void> & data) = 0;

/// Exchange the "in use by wait set" state for this timer.
/**
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/src/rclcpp/event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <stdexcept>
#include <string>

#include "rcl/error_handling.h"
#include "rcl/event.h"

#include "rclcpp/event_handler.hpp"
Expand Down Expand Up @@ -66,7 +67,7 @@ EventHandlerBase::add_to_wait_set(rcl_wait_set_t * wait_set)

/// Check if the Waitable is ready.
bool
EventHandlerBase::is_ready(rcl_wait_set_t * wait_set)
EventHandlerBase::is_ready(const rcl_wait_set_t * wait_set)
{
return wait_set->events[wait_set_event_index_] == &event_handle_;
}
Expand Down
9 changes: 7 additions & 2 deletions rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void
ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

// Note: no guard conditions need to be re-triggered, since the guard
// conditions in this class are not tracking a stateful condition, but instead
// only serve to interrupt the wait set when new information is available to
// consider.
for (auto weak_guard_condition : this->notify_guard_conditions_) {
auto guard_condition = weak_guard_condition.lock();
if (!guard_condition) {continue;}
Expand All @@ -64,7 +69,7 @@ ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set)
}

bool
ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set)
ExecutorNotifyWaitable::is_ready(const rcl_wait_set_t * wait_set)
{
std::lock_guard<std::mutex> lock(guard_condition_mutex_);

Expand All @@ -87,7 +92,7 @@ ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set)
}

void
ExecutorNotifyWaitable::execute(std::shared_ptr<void> & data)
ExecutorNotifyWaitable::execute(const std::shared_ptr<void> & data)
{
(void) data;
this->execute_callback_();
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ if(TARGET test_thread_safe_synchronization)
target_link_libraries(test_thread_safe_synchronization ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_intra_process_waitable waitables/test_intra_process_waitable.cpp)
if(TARGET test_intra_process_waitable)
target_link_libraries(test_intra_process_waitable ${PROJECT_NAME} ${test_msgs_TARGETS})
endif()

ament_add_gtest(test_rosout_qos test_rosout_qos.cpp)
if(TARGET test_rosout_qos)
target_link_libraries(test_rosout_qos ${PROJECT_NAME} rcl::rcl rmw::rmw)
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class TestWaitable : public rclcpp::Waitable
}

bool
is_ready(rcl_wait_set_t * wait_set) override
is_ready(const rcl_wait_set_t * wait_set) override
{
(void)wait_set;
return true;
Expand All @@ -375,7 +375,7 @@ class TestWaitable : public rclcpp::Waitable
}

void
execute(std::shared_ptr<void> & data) override
execute(const std::shared_ptr<void> & data) override
{
(void) data;
trigger_count_--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,15 @@ class TestWaitable : public rclcpp::Waitable
public:
void add_to_wait_set(rcl_wait_set_t *) override {}

bool is_ready(rcl_wait_set_t *) override {return true;}
bool is_ready(const rcl_wait_set_t *) override {return true;}

std::shared_ptr<void>
take_data() override
{
return nullptr;
}
void
execute(std::shared_ptr<void> & data) override
execute(const std::shared_ptr<void> & data) override
{
(void) data;
}
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/test/rclcpp/node_interfaces/test_node_waitables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ class TestWaitable : public rclcpp::Waitable
{
public:
void add_to_wait_set(rcl_wait_set_t *) override {}
bool is_ready(rcl_wait_set_t *) override {return false;}
bool is_ready(const rcl_wait_set_t *) override {return false;}

std::shared_ptr<void>
take_data() override
{
return nullptr;
}

void execute(std::shared_ptr<void> & data) override
void execute(const std::shared_ptr<void> & data) override
{
(void) data;
}
Expand Down
Loading