Skip to content

Commit 2c6bd7a

Browse files
mjcarrollAlberto Soragna
authored and
Alberto Soragna
committed
Trigger the intraprocess guard condition with data (ros2#2164)
If the intraprocess buffer still has data after taking, re-trigger the guard condition to ensure that the executor will continue to service it, even if incoming publications stop. Signed-off-by: Michael Carroll <[email protected]>
1 parent 6aa4ffc commit 2c6bd7a

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

Diff for: rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

+7
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ class SubscriptionIntraProcess
118118
return nullptr;
119119
}
120120
}
121+
122+
if (this->buffer_->has_data()) {
123+
// If there is data still to be processed, indicate to the
124+
// executor or waitset by triggering the guard condition.
125+
this->trigger_guard_condition();
126+
}
127+
121128
return std::static_pointer_cast<void>(
122129
std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
123130
std::pair<ConstMessageSharedPtr, MessageUniquePtr>(

Diff for: rclcpp/test/rclcpp/executors/test_executors.cpp

+103
Original file line numberDiff line numberDiff line change
@@ -593,3 +593,106 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) {
593593

594594
rclcpp::shutdown();
595595
}
596+
597+
template<typename T>
598+
class TestIntraprocessExecutors : public ::testing::Test
599+
{
600+
public:
601+
static void SetUpTestCase()
602+
{
603+
rclcpp::init(0, nullptr);
604+
}
605+
606+
static void TearDownTestCase()
607+
{
608+
rclcpp::shutdown();
609+
}
610+
611+
void SetUp()
612+
{
613+
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
614+
std::stringstream test_name;
615+
test_name << test_info->test_case_name() << "_" << test_info->name();
616+
node = std::make_shared<rclcpp::Node>("node", test_name.str());
617+
618+
callback_count = 0;
619+
620+
const std::string topic_name = std::string("topic_") + test_name.str();
621+
622+
rclcpp::PublisherOptions po;
623+
po.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
624+
publisher = node->create_publisher<test_msgs::msg::Empty>(topic_name, rclcpp::QoS(1), po);
625+
626+
auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {
627+
this->callback_count.fetch_add(1);
628+
};
629+
630+
rclcpp::SubscriptionOptions so;
631+
so.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
632+
subscription =
633+
node->create_subscription<test_msgs::msg::Empty>(
634+
topic_name, rclcpp::QoS(kNumMessages), std::move(callback), so);
635+
}
636+
637+
void TearDown()
638+
{
639+
publisher.reset();
640+
subscription.reset();
641+
node.reset();
642+
}
643+
644+
const size_t kNumMessages = 100;
645+
646+
rclcpp::Node::SharedPtr node;
647+
rclcpp::Publisher<test_msgs::msg::Empty>::SharedPtr publisher;
648+
rclcpp::Subscription<test_msgs::msg::Empty>::SharedPtr subscription;
649+
std::atomic_int callback_count;
650+
};
651+
652+
TYPED_TEST_SUITE(TestIntraprocessExecutors, ExecutorTypes, ExecutorTypeNames);
653+
654+
TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) {
655+
// This tests that executors will continue to service intraprocess subscriptions in the case
656+
// that publishers aren't continuing to publish.
657+
// This was previously broken in that intraprocess guard conditions were only triggered on
658+
// publish and the test was added to prevent future regressions.
659+
const size_t kNumMessages = 100;
660+
661+
using ExecutorType = TypeParam;
662+
ExecutorType executor;
663+
executor.add_node(this->node);
664+
665+
EXPECT_EQ(0, this->callback_count.load());
666+
this->publisher->publish(test_msgs::msg::Empty());
667+
668+
// Wait for up to 5 seconds for the first message to come available.
669+
const std::chrono::milliseconds sleep_per_loop(10);
670+
int loops = 0;
671+
while (1u != this->callback_count.load() && loops < 500) {
672+
rclcpp::sleep_for(sleep_per_loop);
673+
executor.spin_some();
674+
loops++;
675+
}
676+
EXPECT_EQ(1u, this->callback_count.load());
677+
678+
// reset counter
679+
this->callback_count.store(0);
680+
681+
for (size_t ii = 0; ii < kNumMessages; ++ii) {
682+
this->publisher->publish(test_msgs::msg::Empty());
683+
}
684+
685+
// Fire a timer every 10ms up to 5 seconds waiting for subscriptions to be read.
686+
loops = 0;
687+
auto timer = this->node->create_wall_timer(
688+
std::chrono::milliseconds(10), [this, &executor, &loops, &kNumMessages]() {
689+
loops++;
690+
if (kNumMessages == this->callback_count.load() ||
691+
loops == 500)
692+
{
693+
executor.cancel();
694+
}
695+
});
696+
executor.spin();
697+
EXPECT_EQ(kNumMessages, this->callback_count.load());
698+
}

0 commit comments

Comments
 (0)