Skip to content

Commit

Permalink
[improve][broker] PIP-392: Add configuration to enable consistent has…
Browse files Browse the repository at this point in the history
…hing to select active consumer for partitioned topic (#23584)
  • Loading branch information
shibd authored Nov 21, 2024
1 parent 5338dc9 commit 49aa308
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 1 deletion.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# For non-partitioned topics, consistent hashing is used by default.
activeConsumerFailoverConsistentHashing=false

# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ maxMessageSizeCheckIntervalInSeconds=60
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# For non-partitioned topics, consistent hashing is used by default.
activeConsumerFailoverConsistentHashing=false

# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
Expand Down
4 changes: 4 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# Enable consistent hashing for selecting the active consumer in partitioned topics with Failover subscription type.
# For non-partitioned topics, consistent hashing is used by default.
activeConsumerFailoverConsistentHashing=false

# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,13 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "How long to delay rewinding cursor and dispatching messages when active consumer is changed"
)
private int activeConsumerFailoverDelayTimeMillis = 1000;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable consistent hashing for selecting the active consumer in partitioned "
+ "topics with Failover subscription type."
+ "For non-partitioned topics, consistent hashing is used by default."
)
private boolean activeConsumerFailoverConsistentHashing = false;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum time to spend while scanning a subscription to calculate the accurate backlog"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected boolean pickAndScheduleActiveConsumer() {
}
}
}
int index = partitionIndex >= 0
int index = partitionIndex >= 0 && !serviceConfig.isActiveConsumerFailoverConsistentHashing()
? partitionIndex % consumersSize
: peekConsumerIndexFromHashRing(makeHashRing(consumersSize));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,39 @@ public void testTopicsDistribution() throws Exception {
}
}

@Test
public void testPartitionedTopicDistribution() throws Exception {
this.conf.setActiveConsumerFailoverConsistentHashing(true);
final String topic = "partitioned-topics-distribution";
final int topicCount = 100;
final int consumers = 10;

for (int i = 0; i < topicCount; i++) {
admin.topics().createPartitionedTopic(topic + "-" + i, 1);
}

CustomizedConsumerEventListener eventListener = new CustomizedConsumerEventListener();

List<Consumer<?>> consumerList = new ArrayList<>(consumers);
for (int i = 0; i < consumers; i++) {
consumerList.add(pulsarClient.newConsumer()
.topics(IntStream.range(0, topicCount).mapToObj(j -> topic + "-" + j).toList())
.subscriptionType(SubscriptionType.Failover)
.subscriptionName("my-sub")
.consumerName("consumer-" + i)
.consumerEventListener(eventListener)
.subscribe());
}

log.info("Topics are distributed to consumers as {}", eventListener.getActiveConsumers());
Map<String, Integer> assigned = new HashMap<>();
eventListener.getActiveConsumers().forEach((k, v) -> assigned.compute(v, (t, c) -> c == null ? 1 : ++ c));
assertEquals(assigned.size(), consumers);
for (Consumer<?> consumer : consumerList) {
consumer.close();
}
}

private static class CustomizedConsumerEventListener implements ConsumerEventListener {

private final Map<String, String> activeConsumers = new HashMap<>();
Expand Down

0 comments on commit 49aa308

Please sign in to comment.