Skip to content

Commit

Permalink
add config: persistIndividualAckAsLongArray
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Dec 25, 2024
1 parent e5e3873 commit f2a2441
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private String shadowSourceName;
@Getter
private boolean persistIndividualAckAsLongArray;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand All @@ -103,6 +105,11 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
return this;
}

public ManagedLedgerConfig setPersistIndividualAckAsLongArray(boolean persistIndividualAckAsLongArray) {
this.persistIndividualAckAsLongArray = persistIndividualAckAsLongArray;
return this;
}

/**
* @return the lazyCursorRecovery
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public class ManagedCursorImpl implements ManagedCursor {
protected volatile long messagesConsumedCounter;

// Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger;
@VisibleForTesting
volatile LedgerHandle cursorLedger;

// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
Expand Down Expand Up @@ -3253,7 +3254,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
* Do not enable the feature that https://github.com/apache/pulsar/pull/9292 introduced, to avoid serialization
* and deserialization error.
*/
if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
if (getConfig().isUnackedRangesOpenCacheSetEnabled() && getConfig().isPersistIndividualAckAsLongArray()) {
try {
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -39,6 +40,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand All @@ -52,6 +54,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
Expand Down Expand Up @@ -650,4 +653,60 @@ public void testUnackmessagesAndRecoveryCompatibility(boolean enabled1, boolean
ledger2.close();
factory.shutdown();
}

@DataProvider(name = "booleans")
public Object[][] booleans() {
return new Object[][] {
{true},
{false},
};
}

@Test(dataProvider = "booleans")
public void testConfigPersistIndividualAckAsLongArray(boolean enable) throws Exception {
final String mlName = "ml" + UUID.randomUUID().toString().replaceAll("-", "");
final String cursorName = "c1";
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
final ManagedLedgerConfig config = new ManagedLedgerConfig()
.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1)
.setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1)
.setMaxUnackedRangesToPersistInMetadataStore(1)
.setUnackedRangesOpenCacheSetEnabled(true).setPersistIndividualAckAsLongArray(enable);

ManagedLedger ledger1 = factory.open(mlName, config);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger1.openCursor(cursorName);

// Write entries.
int totalEntries = 100;
List<Position> entries = new ArrayList<>();
for (int i = 0; i < totalEntries; i++) {
Position p = ledger1.addEntry("entry".getBytes());
entries.add(p);
}
// Make ack holes and trigger a mark deletion.
for (int i = totalEntries - 1; i >=0 ; i--) {
if (i % 2 == 0) {
cursor1.delete(entries.get(i));
}
}
cursor1.markDelete(entries.get(9));
Awaitility.await().untilAsserted(() -> {
assertEquals(cursor1.pendingMarkDeleteOps.size(), 0);
});

// Verify: the config affects.
long cursorLedgerLac = cursor1.cursorLedger.getLastAddConfirmed();
LedgerEntry ledgerEntry = cursor1.cursorLedger.readEntries(cursorLedgerLac, cursorLedgerLac).nextElement();
MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry());
if (enable) {
assertNotEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
} else {
assertEquals(positionInfo.getIndividualDeletedMessageRangesList().size(), 0);
}

// cleanup
ledger1.close();
factory.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " will only be tracked in memory and messages will be redelivered in case of"
+ " crashes.")
private int managedLedgerMaxUnackedRangesToPersist = 10000;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
private boolean managedLedgerPersistIndividualAckAsLongArray = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "If enabled, the maximum \"acknowledgment holes\" will not be limited and \"acknowledgment holes\" "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,8 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T

managedLedgerConfig
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig
.setPersistIndividualAckAsLongArray(serviceConfig.isManagedLedgerPersistIndividualAckAsLongArray());
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class ManagedLedgerConfigTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "booleans")
public Object[][] booleans() {
return new Object[][] {
{true},
{false},
};
}

@Test(dataProvider = "booleans")
public void testConfigPersistIndividualAckAsLongArray(boolean enabled) throws Exception {
pulsar.getConfiguration().setManagedLedgerPersistIndividualAckAsLongArray(enabled);
final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, true).get().get();
ManagedLedgerConfig mlConf = topic.getManagedLedger().getConfig();
assertEquals(mlConf.isPersistIndividualAckAsLongArray(), enabled);

// cleanup.
admin.topics().delete(tpName);
}
}

0 comments on commit f2a2441

Please sign in to comment.