Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Fix : ConsumerStatsTest.testAsyncCallOnPartitionedTopic is flaky (#5066)
Browse files Browse the repository at this point in the history
fix #4894

### Motivation

Due to the concurrent execution of the program, when checking whether the ‘BrokerConsumerStats’'s cache is in effect, there is no guarantee that the cache time will not be exceeded.

### Modifications

- Check the cache in an inexact way.
- Increase waiting time
  • Loading branch information
fxbing authored and jiazhai committed Aug 30, 2019
1 parent e78beaa commit bc76613
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions pulsar-client-cpp/tests/ConsumerStatsTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";

void partitionedCallbackFunction(Result result, BrokerConsumerStats brokerConsumerStats, long expectedBacklog,
Latch& latch, int index) {
Latch& latch, int index, bool accurate) {
ASSERT_EQ(result, ResultOk);
PartitionedBrokerConsumerStatsImpl* statsPtr =
(PartitionedBrokerConsumerStatsImpl*)(brokerConsumerStats.getImpl().get());
LOG_DEBUG(statsPtr);
ASSERT_EQ(expectedBacklog, statsPtr->getBrokerConsumerStats(index).getMsgBacklog());
if (accurate) {
ASSERT_EQ(expectedBacklog, statsPtr->getBrokerConsumerStats(index).getMsgBacklog());
} else {
ASSERT_LE(expectedBacklog, statsPtr->getBrokerConsumerStats(index).getMsgBacklog());
}
latch.countdown();
}

Expand Down Expand Up @@ -283,8 +287,8 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {

// Expecting return from 4 callbacks
Latch latch(4);
consumer.getBrokerConsumerStatsAsync(
std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 5, latch, 0));
consumer.getBrokerConsumerStatsAsync(std::bind(partitionedCallbackFunction, std::placeholders::_1,
std::placeholders::_2, 5, latch, 0, true));

// Now we have 10 messages per partition
for (int i = numOfMessages; i < (numOfMessages * 2); i++) {
Expand All @@ -294,23 +298,25 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
}

// Expecting cached result
consumer.getBrokerConsumerStatsAsync(
std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 5, latch, 0));
// Inaccurate judgment is used because it cannot guarantee that the above operations are completed within
// cache time.
consumer.getBrokerConsumerStatsAsync(std::bind(partitionedCallbackFunction, std::placeholders::_1,
std::placeholders::_2, 5, latch, 0, false));

std::this_thread::sleep_for(std::chrono::milliseconds(4500));
// Expecting fresh results
consumer.getBrokerConsumerStatsAsync(
std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 10, latch, 2));
consumer.getBrokerConsumerStatsAsync(std::bind(partitionedCallbackFunction, std::placeholders::_1,
std::placeholders::_2, 10, latch, 2, true));

Message msg;
while (consumer.receive(msg)) {
// Do nothing
}

// Expecting the backlog to be the same since we didn't acknowledge the messages
consumer.getBrokerConsumerStatsAsync(
std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 10, latch, 3));
consumer.getBrokerConsumerStatsAsync(std::bind(partitionedCallbackFunction, std::placeholders::_1,
std::placeholders::_2, 10, latch, 3, true));

// Wait for ten seconds only
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
ASSERT_TRUE(latch.wait(std::chrono::seconds(30)));
}

0 comments on commit bc76613

Please sign in to comment.