Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

private void testOffload(String topicName, String mlName) throws Exception {
private void testOffloadWithMessageId(String topicName, String mlName) throws Exception {
LedgerOffloader offloader = mock(LedgerOffloader.class);
when(offloader.getOffloadDriverName()).thenReturn("mock");

Expand Down Expand Up @@ -194,19 +194,107 @@ private void testOffload(String topicName, String mlName) throws Exception {
attributes, actual -> assertThat(actual).isPositive());
}

private void testOffloadWithSizeThreshold(String topicName, String mlName) throws Exception {
LedgerOffloader offloader = mock(LedgerOffloader.class);
when(offloader.getOffloadDriverName()).thenReturn("mock");

doReturn(offloader).when(pulsar).getManagedLedgerOffloader(any(), any());

CompletableFuture<Void> promise = new CompletableFuture<>();
doReturn(promise).when(offloader).offload(any(), any(), any());
doReturn(true).when(offloader).isAppendable();

try (Producer<byte[]> p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
for (int i = 0; i < 15; i++) {
p.send("Foobar".getBytes());
}
}

ManagedLedgerInfo info = pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(mlName);
assertEquals(info.ledgers.size(), 2);

assertEquals(admin.topics().offloadStatus(topicName).getStatus(), LongRunningProcessStatus.Status.NOT_RUN);
var topicNameObject = TopicName.get(topicName);
var attributes = Attributes.builder()
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicNameObject.getDomain().toString())
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicNameObject.getTenant())
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObject.getNamespace())
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicNameObject.getPartitionedTopicName())
.build();
// Verify the respective metric is 0 before the offload begins.
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER,
attributes, actual -> assertThat(actual).isZero());

admin.topics().triggerOffload(topicName, 50L);

assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.RUNNING);

try {
admin.topics().triggerOffload(topicName, 50L);
Assert.fail("Should have failed");
} catch (ConflictException e) {
// expected
}

// fail first time
promise.completeExceptionally(new Exception("Some random failure"));

assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.ERROR);
Assert.assertTrue(admin.topics().offloadStatus(topicName).getLastError().contains("Some random failure"));

// Try again
doReturn(CompletableFuture.completedFuture(null))
.when(offloader).offload(any(), any(), any());

admin.topics().triggerOffload(topicName, 30L);

Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().offloadStatus(topicName).getStatus(),
LongRunningProcessStatus.Status.SUCCESS));
MessageId firstUnoffloaded = admin.topics().offloadStatus(topicName).getFirstUnoffloadedMessage();
assertTrue(firstUnoffloaded instanceof MessageIdImpl);
MessageIdImpl firstUnoffloadedMessage = (MessageIdImpl) firstUnoffloaded;
// First unoffloaded is the first entry of current ledger
assertEquals(firstUnoffloadedMessage.getLedgerId(), info.ledgers.get(1).ledgerId);
assertEquals(firstUnoffloadedMessage.getEntryId(), 0);

verify(offloader, times(2)).offload(any(), any(), any());

// Verify the metrics have been updated.
metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER,
attributes, actual -> assertThat(actual).isPositive());
}

@Test
public void testOffloadWithMessageIdV2() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
String mlName = "prop-xyz/ns1/persistent/topic1";
testOffloadWithMessageId(topicName, mlName);
}

@Test
public void testOffloadV2() throws Exception {
public void testOffloadWithSizeThresholdV2() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
String mlName = "prop-xyz/ns1/persistent/topic1";
testOffload(topicName, mlName);
testOffloadWithSizeThreshold(topicName, mlName);
}

@Test
public void testOffloadWithMessageIdV1() throws Exception {
String topicName = "persistent://prop-xyz/test/ns1/topic2";
String mlName = "prop-xyz/test/ns1/persistent/topic2";
testOffloadWithMessageId(topicName, mlName);
}

@Test
public void testOffloadV1() throws Exception {
public void testOffloadWithSizeThresholdV1() throws Exception {
String topicName = "persistent://prop-xyz/test/ns1/topic2";
String mlName = "prop-xyz/test/ns1/persistent/topic2";
testOffload(topicName, mlName);
testOffloadWithSizeThreshold(topicName, mlName);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,14 @@ CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String
*/
void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException;

/**
* Trigger offloading messages in topic to longterm storage.
*
* @param topic the topic to offload
* @param sizeThreshold maximum amount of data (bytes) that should be retained in bookkeeper
*/
void triggerOffload(String topic, long sizeThreshold) throws PulsarAdminException;

/**
* Trigger offloading messages in topic to longterm storage asynchronously.
*
Expand All @@ -2116,6 +2124,15 @@ CompletableFuture<Void> updateSubscriptionPropertiesAsync(String topic, String
*/
CompletableFuture<Void> triggerOffloadAsync(String topic, MessageId messageId);


/**
* Trigger offloading messages in topic to longterm storage asynchronously.
*
* @param topic the topic to offload
* @param sizeThreshold maximum amount of data (bytes) that should be retained in bookkeeper
*/
CompletableFuture<Void> triggerOffloadAsync(String topic, long sizeThreshold);

/**
* Check the status of an ongoing offloading operation for a topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin.internal;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -27,6 +28,7 @@
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -1207,6 +1209,11 @@ public void triggerOffload(String topic, MessageId messageId) throws PulsarAdmin
sync(() -> triggerOffloadAsync(topic, messageId));
}

@Override
public void triggerOffload(String topic, long sizeThreshold) throws PulsarAdminException {
sync(() -> triggerOffloadAsync(topic, sizeThreshold));
}

@Override
public CompletableFuture<Void> triggerOffloadAsync(String topic, MessageId messageId) {
TopicName tn = validateTopic(topic);
Expand All @@ -1231,6 +1238,52 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public CompletableFuture<Void> triggerOffloadAsync(String topic, long sizeThreshold) {
CompletableFuture<Void> future = new CompletableFuture<>();
getInternalStatsAsync(topic)
.thenAccept(stats -> {
if (stats.ledgers.size() < 1) {
future.completeExceptionally(new PulsarAdminException("Topic doesn't have any data"));
return;
}
LinkedList<PersistentTopicInternalStats.LedgerInfo> ledgers = new LinkedList<>(stats.ledgers);
MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold);
if (messageId == null) {
future.complete(null);
} else {
triggerOffloadAsync(topic, messageId).whenComplete((v, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(null);
}
});
}
})
.exceptionally(ex -> {
future.completeExceptionally(getApiException(ex));
return null;
});
return future;
}

static MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> ledgers,
long sizeThreshold) {
long suffixSize = 0L;

ledgers = Lists.reverse(ledgers);
long previousLedger = ledgers.get(0).ledgerId;
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
suffixSize += l.size;
if (suffixSize > sizeThreshold) {
return new MessageIdImpl(previousLedger, 0L, -1);
}
previousLedger = l.ledgerId;
}
return null;
}

@Override
public OffloadProcessStatus offloadStatus(String topic)
throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
*/
package org.apache.pulsar.client.admin.internal;

import java.util.ArrayList;
import java.util.List;
import lombok.SneakyThrows;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -46,6 +51,35 @@ public void testAuthFromConfUsedWhenConfHasAuth() {
assertThat(createAdminAndGetAuth(conf)).isSameAs(auth);
}

@Test
public void testFindFirstLedgerWithinThreshold() throws Exception {
List<ManagedLedgerInternalStats.LedgerInfo> ledgers = new ArrayList<>();
ledgers.add(newLedger(0, 10, 1000));
ledgers.add(newLedger(1, 10, 2000));
ledgers.add(newLedger(2, 10, 3000));

// test huge threshold
Assert.assertNull(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, Long.MAX_VALUE));

// test small threshold
Assert.assertEquals(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, 0),
new MessageIdImpl(2, 0, -1));

// test middling thresholds
Assert.assertEquals(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, 1000),
new MessageIdImpl(2, 0, -1));
Assert.assertEquals(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, 5000),
new MessageIdImpl(1, 0, -1));
}

private static ManagedLedgerInternalStats.LedgerInfo newLedger(long id, long entries, long size) {
ManagedLedgerInternalStats.LedgerInfo l = new ManagedLedgerInternalStats.LedgerInfo();
l.ledgerId = id;
l.entries = entries;
l.size = size;
return l;
}

@SneakyThrows
private Authentication createAdminAndGetAuth(ClientConfigurationData conf) {
try (PulsarAdminImpl admin = new PulsarAdminImpl("http://localhost:8080", conf, null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1996,7 +1996,7 @@ public boolean matches(Long timestamp) {
stats.ledgers.add(newLedger(2, 10, 3000));
when(mockTopics.getInternalStats("persistent://myprop/clust/ns1/ds1", false)).thenReturn(stats);
cmdTopics.run(split("offload persistent://myprop/clust/ns1/ds1 -s 1k"));
verify(mockTopics).triggerOffload("persistent://myprop/clust/ns1/ds1", new MessageIdImpl(2, 0, -1));
verify(mockTopics).triggerOffload("persistent://myprop/clust/ns1/ds1", 1024);

when(mockTopics.offloadStatus("persistent://myprop/clust/ns1/ds1")).thenReturn(new OffloadProcessStatusImpl());
cmdTopics.run(split("offload-status persistent://myprop/clust/ns1/ds1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -79,7 +78,6 @@
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand Down Expand Up @@ -1320,22 +1318,6 @@ void run() throws PulsarAdminException {
}
}

static MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> ledgers,
long sizeThreshold) {
long suffixSize = 0L;

ledgers = Lists.reverse(ledgers);
long previousLedger = ledgers.get(0).ledgerId;
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
suffixSize += l.size;
if (suffixSize > sizeThreshold) {
return new MessageIdImpl(previousLedger, 0L, -1);
}
previousLedger = l.ledgerId;
}
return null;
}

public static void printMessages(List<Message<byte[]>> messages, boolean showServerMarker, CliCommand cli) {
if (messages == null) {
return;
Expand Down Expand Up @@ -1399,23 +1381,9 @@ private class Offload extends CliCommand {
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(topicName);

PersistentTopicInternalStats stats = getTopics().getInternalStats(persistentTopic, false);
if (stats.ledgers.size() < 1) {
throw new PulsarAdminException("Topic doesn't have any data");
}

LinkedList<PersistentTopicInternalStats.LedgerInfo> ledgers = new LinkedList(stats.ledgers);
ledgers.get(ledgers.size() - 1).size = stats.currentLedgerSize; // doesn't get filled in now it seems
MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold);

if (messageId == null) {
System.out.println("Nothing to offload");
return;
}

getTopics().triggerOffload(persistentTopic, messageId);
System.out.println("Offload triggered for " + persistentTopic + " for messages before " + messageId);
getTopics().triggerOffload(persistentTopic, sizeThreshold);
System.out.println("Offload triggered for " + persistentTopic + " which keep "
+ sizeThreshold + " bytes on bookkeeper");
}
}

Expand Down
Loading
Loading