Skip to content

Commit

Permalink
[fix] [broker] Fix config replicationStartAt does not work when set i…
Browse files Browse the repository at this point in the history
…t to earliest (#23719)

Co-authored-by: Lari Hotari <[email protected]>
(cherry picked from commit 39f4ccd)
  • Loading branch information
poorbarcode committed Dec 13, 2024
1 parent 2e7356c commit 529b3f5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2066,9 +2066,13 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
final CompletableFuture<Void> future = new CompletableFuture<>();

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
String replicationStartAt = getBrokerService().getPulsar().getConfiguration().getReplicationStartAt();
final InitialPosition initialPosition;
if (MessageId.earliest.toString()
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
// "MessageId.earliest.toString()" is "-1:-1:-1", which is not suggested, just guarantee compatibility with the
// previous version.
// "InitialPosition.Earliest.name()" is "Earliest", which is suggested.
if (MessageId.earliest.toString().equalsIgnoreCase(replicationStartAt)
|| InitialPosition.Earliest.name().equalsIgnoreCase(replicationStartAt)) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,9 +1023,9 @@ public void testConfigReplicationStartAt() throws Exception {
disableReplication(topic1);

// 2.Update config: start at "earliest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
admin1.brokers().updateDynamicConfiguration("replicationStartAt", "earliest");
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
assertEquals(pulsar1.getConfiguration().getReplicationStartAt(), "earliest");
});

final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
Expand Down

0 comments on commit 529b3f5

Please sign in to comment.