Skip to content

Commit

Permalink
[improve][broker] Reduce unnecessary REPLICATED_SUBSCRIPTION_SNAPSHOT…
Browse files Browse the repository at this point in the history
…_REQUEST (apache#23839)

(cherry picked from commit d3707c5)
(cherry picked from commit 7c7fadd)
  • Loading branch information
shibd authored and nikhil-ctds committed Jan 13, 2025
1 parent 531b5fe commit 846d0b0
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.Gauge;
import java.io.IOException;
Expand Down Expand Up @@ -206,6 +207,23 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
private void startNewSnapshot() {
cleanupTimedOutSnapshots();

if (lastCompletedSnapshotStartTime == 0 && !pendingSnapshots.isEmpty()) {
// 1. If the remote cluster has disabled subscription replication or there's an incorrect config,
// it will not respond to SNAPSHOT_REQUEST. Therefore, lastCompletedSnapshotStartTime will remain 0,
// making it unnecessary to resend the request.
// 2. This approach prevents sending additional SNAPSHOT_REQUEST to both local_topic and remote_topic.
// 3. Since it's uncertain when the remote cluster will enable subscription replication,
// the timeout mechanism of pendingSnapshots is used to ensure retries.
//
// In other words, when hit this case, The frequency of sending SNAPSHOT_REQUEST
// will use `replicatedSubscriptionsSnapshotTimeoutSeconds`.
if (log.isDebugEnabled()) {
log.debug("[{}] PendingSnapshot exists but has never succeeded. "
+ "Skipping snapshot creation until pending snapshot timeout.", topic.getName());
}
return;
}

if (topic.getLastMaxReadPositionMovedForwardTimestamp() < lastCompletedSnapshotStartTime
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
// There was no message written since the last snapshot, we can skip creating a new snapshot
Expand Down Expand Up @@ -302,6 +320,11 @@ String localCluster() {
return localCluster;
}

@VisibleForTesting
public ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots() {
return pendingSnapshots;
}

@Override
public boolean isMarkerMessage() {
// Everything published by this controller will be a marker a message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TransactionIsolationLevel;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand Down Expand Up @@ -987,6 +991,92 @@ public void testReplicatedSubscriptionWithCompaction() throws Exception {
Assert.assertEquals(result, List.of("V2"));
}

@Test
public void testReplicatedSubscriptionOneWay() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar-r4/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/one-way";
int defaultSubscriptionsSnapshotFrequency = config1.getReplicatedSubscriptionsSnapshotFrequencyMillis();
int defaultSubscriptionsSnapshotTimeout = config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds();
config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(2);
config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(100);

// cluster4 disabled ReplicatedSubscriptions
admin1.tenants().createTenant("pulsar-r4",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid4"), Sets.newHashSet(cluster1, cluster4)));
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster4));

String subscriptionName = "cluster-subscription";
boolean replicateSubscriptionState = true;

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();

@Cleanup
final PulsarClient client4 = PulsarClient.builder().serviceUrl(url4.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();

// create subscription in cluster4
createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
// create subscription in cluster4
createReplicatedSubscription(client4, topicName, subscriptionName, replicateSubscriptionState);

// send messages in cluster1
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
int numMessages = 6;
for (int i = 0; i < numMessages; i++) {
String body = "message" + i;
producer.send(body.getBytes(StandardCharsets.UTF_8));
}
producer.close();

// wait for snapshot marker request to be replicated
Thread.sleep(3 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

// Assert just have 1 pending snapshot in cluster1
final PersistentTopic topic1 =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
ReplicatedSubscriptionsController r1Controller =
topic1.getReplicatedSubscriptionController().get();
assertEquals(r1Controller.pendingSnapshots().size(), 1);

// Assert cluster4 just receive 1 snapshot request msg
int numSnapshotRequest = 0;
List<Message<byte[]>> r4Messages = admin4.topics()
.peekMessages(topicName, subscriptionName, 100, true, TransactionIsolationLevel.READ_UNCOMMITTED);
for (Message<byte[]> r4Message : r4Messages) {
MessageMetadata msgMetadata = ((MessageImpl<byte[]>) r4Message).getMessageBuilder();
if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() == MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) {
numSnapshotRequest++;
}
}
Assert.assertEquals(numSnapshotRequest, 1);

// Wait pending snapshot timeout
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000);
numSnapshotRequest = 0;
r4Messages = admin4.topics()
.peekMessages(topicName, subscriptionName, 100, true, TransactionIsolationLevel.READ_UNCOMMITTED);
for (Message<byte[]> r4Message : r4Messages) {
MessageMetadata msgMetadata = ((MessageImpl<byte[]>) r4Message).getMessageBuilder();
if (msgMetadata.hasMarkerType() && msgMetadata.getMarkerType() == MarkerType.REPLICATED_SUBSCRIPTION_SNAPSHOT_REQUEST_VALUE) {
numSnapshotRequest++;
}
}
Assert.assertEquals(numSnapshotRequest, 2);

// Set back to default config.
config1.setReplicatedSubscriptionsSnapshotTimeoutSeconds(defaultSubscriptionsSnapshotTimeout);
config1.setReplicatedSubscriptionsSnapshotFrequencyMillis(defaultSubscriptionsSnapshotFrequency);
}

/**
* Disable replication subscription.
* Test scheduled task case.
Expand Down

0 comments on commit 846d0b0

Please sign in to comment.