Skip to content

Commit

Permalink
[fix][broker] Ensure existing subscriptions use the initial replicati…
Browse files Browse the repository at this point in the history
…on policy

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Jan 10, 2025
1 parent 04e89fe commit 5958587
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class PersistentSubscription extends AbstractSubscription {
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;

protected static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
public static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";

// Map of properties that is used to mark this subscription as "replicated".
// Since this is the only field at this point, we can just keep a static
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,10 +1115,6 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
return;
}
}
if (replicated != null && replicated && !subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
}

if (startMessageRollbackDurationSec > 0) {
resetSubscriptionCursor(subscription, subscriptionFuture, startMessageRollbackDurationSec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.broker.service.persistent.PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
Expand All @@ -29,6 +30,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -93,4 +95,43 @@ public void testReplicateSubscriptionState(Boolean replicateSubscriptionState)
return true;
});
}

@Test(dataProvider = "replicateSubscriptionState")
public void testExistingSubscriptionWithReplicateSubscriptionState(Boolean replicateSubscriptionState)
throws Exception {
String subName = "my-sub-" + System.nanoTime();
String topic = "persistent://my-property/my-ns/" + System.nanoTime();

ConsumerBuilder<byte[]> consumer1Builder = pulsarClient.newConsumer().topic(topic).subscriptionName(subName);
if (replicateSubscriptionState != null) {
consumer1Builder.replicateSubscriptionState(replicateSubscriptionState);
}
@Cleanup
Consumer<byte[]> consumer1 = consumer1Builder.subscribe();
assertReplicatedSubscriptionStatus(topic, subName, replicateSubscriptionState);
consumer1.close();

admin.topics().unload(topic);

ConsumerBuilder<byte[]> consumer2Builder = pulsarClient.newConsumer().topic(topic).subscriptionName(subName);
if (replicateSubscriptionState != null) {
// Reverse
consumer2Builder.replicateSubscriptionState(!replicateSubscriptionState);
}
@Cleanup
Consumer<byte[]> consumer2 = consumer2Builder.subscribe();
assertReplicatedSubscriptionStatus(topic, subName, replicateSubscriptionState);
consumer2.close();
}

private void assertReplicatedSubscriptionStatus(String topic, String subName, Boolean expected)
throws PulsarAdminException {
assertThat(admin.topics().getInternalStats(topic).cursors.get(subName)).isNotNull().matches(n -> {
Long property = n.properties.get(REPLICATED_SUBSCRIPTION_PROPERTY);
assertThat(property).isEqualTo(expected == null || !expected ? null : 1L);
return true;
});
assertThat(admin.topics().getReplicatedSubscriptionStatus(topic, subName)).containsEntry(topic,
expected != null && expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> maxAcknowledgmentGroupSize(int messageNum);

/**
* Configures initial replicated subscription state for a new subscription.
* This setting does not affect existing subscription. Default is `null`.
*
* @param replicateSubscriptionState
* @param replicateSubscriptionState If true, the subscription state will be replicated
* across GEO-replicated clusters. If false, replication
* is disabled.
*/
ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState);

Expand Down

0 comments on commit 5958587

Please sign in to comment.