From 34e83b57261c46ee698887eeba5e5b0fe676e047 Mon Sep 17 00:00:00 2001 From: fujun Date: Wed, 23 Apr 2025 16:44:10 +0800 Subject: [PATCH 1/7] [feat][admin] add triggerOffload with sizeThreshold api --- .../broker/admin/AdminApiOffloadTest.java | 98 ++++++++++++++++++- .../apache/pulsar/client/admin/Topics.java | 17 ++++ .../client/admin/internal/TopicsImpl.java | 49 ++++++++++ 3 files changed, 159 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 9aa2dcc700c9b..fad1a3f0b44fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -118,7 +118,7 @@ public void cleanup() throws Exception { super.internalCleanup(); } - private void testOffload(String topicName, String mlName) throws Exception { + private void testOffloadWithMessageId(String topicName, String mlName) throws Exception { LedgerOffloader offloader = mock(LedgerOffloader.class); when(offloader.getOffloadDriverName()).thenReturn("mock"); @@ -194,19 +194,107 @@ private void testOffload(String topicName, String mlName) throws Exception { attributes, actual -> assertThat(actual).isPositive()); } + private void testOffloadWithSizeThreshold(String topicName, String mlName) throws Exception { + LedgerOffloader offloader = mock(LedgerOffloader.class); + when(offloader.getOffloadDriverName()).thenReturn("mock"); + + doReturn(offloader).when(pulsar).getManagedLedgerOffloader(any(), any()); + + CompletableFuture promise = new CompletableFuture<>(); + doReturn(promise).when(offloader).offload(any(), any(), any()); + doReturn(true).when(offloader).isAppendable(); + + try (Producer p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) { + for (int i = 0; i < 15; i++) { + p.send("Foobar".getBytes()); + } + } + + ManagedLedgerInfo info = pulsar.getDefaultManagedLedgerFactory().getManagedLedgerInfo(mlName); + assertEquals(info.ledgers.size(), 2); + + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), LongRunningProcessStatus.Status.NOT_RUN); + var topicNameObject = TopicName.get(topicName); + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicNameObject.getDomain().toString()) + .put(OpenTelemetryAttributes.PULSAR_TENANT, topicNameObject.getTenant()) + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObject.getNamespace()) + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicNameObject.getPartitionedTopicName()) + .build(); + // Verify the respective metric is 0 before the offload begins. + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER, + attributes, actual -> assertThat(actual).isZero()); + + admin.topics().triggerOffload(topicName, 50L); + + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), + LongRunningProcessStatus.Status.RUNNING); + + try { + admin.topics().triggerOffload(topicName, 50L); + Assert.fail("Should have failed"); + } catch (ConflictException e) { + // expected + } + + // fail first time + promise.completeExceptionally(new Exception("Some random failure")); + + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), + LongRunningProcessStatus.Status.ERROR); + Assert.assertTrue(admin.topics().offloadStatus(topicName).getLastError().contains("Some random failure")); + + // Try again + doReturn(CompletableFuture.completedFuture(null)) + .when(offloader).offload(any(), any(), any()); + + admin.topics().triggerOffload(topicName, 30L); + + Awaitility.await().untilAsserted(() -> + assertEquals(admin.topics().offloadStatus(topicName).getStatus(), + LongRunningProcessStatus.Status.SUCCESS)); + MessageId firstUnoffloaded = admin.topics().offloadStatus(topicName).getFirstUnoffloadedMessage(); + assertTrue(firstUnoffloaded instanceof MessageIdImpl); + MessageIdImpl firstUnoffloadedMessage = (MessageIdImpl) firstUnoffloaded; + // First unoffloaded is the first entry of current ledger + assertEquals(firstUnoffloadedMessage.getLedgerId(), info.ledgers.get(1).ledgerId); + assertEquals(firstUnoffloadedMessage.getEntryId(), 0); + + verify(offloader, times(2)).offload(any(), any(), any()); + + // Verify the metrics have been updated. + metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.STORAGE_OFFLOADED_COUNTER, + attributes, actual -> assertThat(actual).isPositive()); + } + + @Test + public void testOffloadWithMessageIdV2() throws Exception { + String topicName = "persistent://prop-xyz/ns1/topic1"; + String mlName = "prop-xyz/ns1/persistent/topic1"; + testOffloadWithMessageId(topicName, mlName); + } @Test - public void testOffloadV2() throws Exception { + public void testOffloadWithSizeThresholdV2() throws Exception { String topicName = "persistent://prop-xyz/ns1/topic1"; String mlName = "prop-xyz/ns1/persistent/topic1"; - testOffload(topicName, mlName); + testOffloadWithSizeThreshold(topicName, mlName); + } + + @Test + public void testOffloadWithMessageIdV1() throws Exception { + String topicName = "persistent://prop-xyz/test/ns1/topic2"; + String mlName = "prop-xyz/test/ns1/persistent/topic2"; + testOffloadWithMessageId(topicName, mlName); } @Test - public void testOffloadV1() throws Exception { + public void testOffloadWithSizeThresholdV1() throws Exception { String topicName = "persistent://prop-xyz/test/ns1/topic2"; String mlName = "prop-xyz/test/ns1/persistent/topic2"; - testOffload(topicName, mlName); + testOffloadWithSizeThreshold(topicName, mlName); } @Test diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index c681bd1a7bca1..3fc8c7e6eb3e4 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2108,6 +2108,14 @@ CompletableFuture updateSubscriptionPropertiesAsync(String topic, String */ void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException; + /** + * Trigger offloading messages in topic to longterm storage. + * + * @param topic the topic to offload + * @param sizeThreshold maximum amount of data (bytes) that should be retained in longterm storage + */ + void triggerOffload(String topic, long sizeThreshold) throws PulsarAdminException; + /** * Trigger offloading messages in topic to longterm storage asynchronously. * @@ -2116,6 +2124,15 @@ CompletableFuture updateSubscriptionPropertiesAsync(String topic, String */ CompletableFuture triggerOffloadAsync(String topic, MessageId messageId); + + /** + * Trigger offloading messages in topic to longterm storage asynchronously. + * + * @param topic the topic to offload + * @param sizeThreshold maximum amount of data (bytes) that should be retained in longterm storage + */ + CompletableFuture triggerOffloadAsync(String topic, long sizeThreshold); + /** * Check the status of an ongoing offloading operation for a topic. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9c4a6eef753de..339f8fae3dd4a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.admin.internal; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.Lists; import com.google.gson.Gson; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -27,6 +28,7 @@ import java.util.Base64; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -1207,6 +1209,11 @@ public void triggerOffload(String topic, MessageId messageId) throws PulsarAdmin sync(() -> triggerOffloadAsync(topic, messageId)); } + @Override + public void triggerOffload(String topic, long sizeThreshold) throws PulsarAdminException { + sync(() -> triggerOffloadAsync(topic, sizeThreshold)); + } + @Override public CompletableFuture triggerOffloadAsync(String topic, MessageId messageId) { TopicName tn = validateTopic(topic); @@ -1231,6 +1238,48 @@ public void failed(Throwable throwable) { return future; } + @Override + public CompletableFuture triggerOffloadAsync(String topic, long sizeThreshold) { + CompletableFuture future = new CompletableFuture<>(); + try { + validatePersistentTopic(topic); + PersistentTopicInternalStats stats = getInternalStats(topic); + if (stats.ledgers.size() < 1) { + throw new PulsarAdminException("Topic doesn't have any data"); + } + LinkedList ledgers = new LinkedList(stats.ledgers); + ledgers.get(ledgers.size() - 1).size = stats.currentLedgerSize; // doesn't get filled in now it seems + MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold); + future = triggerOffloadAsync(topic, messageId); + } catch (PulsarAdminException e) { + future.completeExceptionally(getApiException(e)); + } + return future; + } + + private MessageId findFirstLedgerWithinThreshold(List ledgers, + long sizeThreshold) { + long suffixSize = 0L; + + ledgers = Lists.reverse(ledgers); + long previousLedger = ledgers.get(0).ledgerId; + for (PersistentTopicInternalStats.LedgerInfo l : ledgers) { + suffixSize += l.size; + if (suffixSize > sizeThreshold) { + return new MessageIdImpl(previousLedger, 0L, -1); + } + previousLedger = l.ledgerId; + } + return null; + } + + private void validatePersistentTopic(String topic) throws PulsarAdminException { + TopicName topicName = TopicName.get(topic); + if (topicName.getDomain() != TopicDomain.persistent) { + throw new PulsarAdminException("Need to provide a persistent topic name"); + } + } + @Override public OffloadProcessStatus offloadStatus(String topic) throws PulsarAdminException { From a4fef99f93bd28b89373f1103e447e04e5892d63 Mon Sep 17 00:00:00 2001 From: fujun Date: Tue, 29 Apr 2025 16:37:05 +0800 Subject: [PATCH 2/7] [feat][admin] update triggerOffload with sizeThreshold api --- .../pulsar/client/admin/internal/TopicsImpl.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 339f8fae3dd4a..b8e436a7a5aa1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1242,14 +1242,15 @@ public void failed(Throwable throwable) { public CompletableFuture triggerOffloadAsync(String topic, long sizeThreshold) { CompletableFuture future = new CompletableFuture<>(); try { - validatePersistentTopic(topic); PersistentTopicInternalStats stats = getInternalStats(topic); if (stats.ledgers.size() < 1) { throw new PulsarAdminException("Topic doesn't have any data"); } LinkedList ledgers = new LinkedList(stats.ledgers); - ledgers.get(ledgers.size() - 1).size = stats.currentLedgerSize; // doesn't get filled in now it seems MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold); + if (messageId == null) { + return CompletableFuture.completedFuture(null); + } future = triggerOffloadAsync(topic, messageId); } catch (PulsarAdminException e) { future.completeExceptionally(getApiException(e)); @@ -1273,13 +1274,6 @@ private MessageId findFirstLedgerWithinThreshold(List Date: Thu, 19 Jun 2025 17:21:06 +0800 Subject: [PATCH 3/7] [feat][admin] Optimize API parameter explanation --- .../src/main/java/org/apache/pulsar/client/admin/Topics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 3fc8c7e6eb3e4..22fbdb0c5440f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2112,7 +2112,7 @@ CompletableFuture updateSubscriptionPropertiesAsync(String topic, String * Trigger offloading messages in topic to longterm storage. * * @param topic the topic to offload - * @param sizeThreshold maximum amount of data (bytes) that should be retained in longterm storage + * @param sizeThreshold maximum amount of data (bytes) that should be retained in bookkeeper */ void triggerOffload(String topic, long sizeThreshold) throws PulsarAdminException; @@ -2129,7 +2129,7 @@ CompletableFuture updateSubscriptionPropertiesAsync(String topic, String * Trigger offloading messages in topic to longterm storage asynchronously. * * @param topic the topic to offload - * @param sizeThreshold maximum amount of data (bytes) that should be retained in longterm storage + * @param sizeThreshold maximum amount of data (bytes) that should be retained in bookkeeper */ CompletableFuture triggerOffloadAsync(String topic, long sizeThreshold); From bc9d73554a195d38e5d5704bdbae7e1c9eb0cfc0 Mon Sep 17 00:00:00 2001 From: fujun Date: Thu, 19 Jun 2025 17:23:22 +0800 Subject: [PATCH 4/7] [feat][admin] use getInternalStatsAsync instead of getInternalStats --- .../client/admin/internal/TopicsImpl.java | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index b8e436a7a5aa1..c0bef67b7541b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1241,20 +1241,30 @@ public void failed(Throwable throwable) { @Override public CompletableFuture triggerOffloadAsync(String topic, long sizeThreshold) { CompletableFuture future = new CompletableFuture<>(); - try { - PersistentTopicInternalStats stats = getInternalStats(topic); - if (stats.ledgers.size() < 1) { - throw new PulsarAdminException("Topic doesn't have any data"); - } - LinkedList ledgers = new LinkedList(stats.ledgers); - MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold); - if (messageId == null) { - return CompletableFuture.completedFuture(null); - } - future = triggerOffloadAsync(topic, messageId); - } catch (PulsarAdminException e) { - future.completeExceptionally(getApiException(e)); - } + getInternalStatsAsync(topic) + .thenAccept(stats -> { + if (stats.ledgers.size() < 1) { + future.completeExceptionally(new PulsarAdminException("Topic doesn't have any data")); + return; + } + LinkedList ledgers = new LinkedList<>(stats.ledgers); + MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold); + if (messageId == null) { + future.complete(null); + } else { + triggerOffloadAsync(topic, messageId).whenComplete((v, ex) -> { + if (ex != null) { + future.completeExceptionally(ex); + } else { + future.complete(null); + } + }); + } + }) + .exceptionally(ex -> { + future.completeExceptionally(getApiException(ex)); + return null; + }); return future; } From ff002be87098f63bfe61121bfc81c5cf6a52b6b3 Mon Sep 17 00:00:00 2001 From: fujun Date: Thu, 19 Jun 2025 17:54:43 +0800 Subject: [PATCH 5/7] [feat][admin] Unify findFirstLedgerWithinThreshold logic in cli and admin --- .../client/admin/internal/TopicsImpl.java | 2 +- .../admin/internal/PulsarAdminImplTest.java | 34 +++++++++++++++++ .../apache/pulsar/admin/cli/CmdTopics.java | 38 ++----------------- .../pulsar/admin/cli/TestCmdTopics.java | 23 ----------- 4 files changed, 38 insertions(+), 59 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index c0bef67b7541b..5bfd0442231e4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1268,7 +1268,7 @@ public CompletableFuture triggerOffloadAsync(String topic, long sizeThresh return future; } - private MessageId findFirstLedgerWithinThreshold(List ledgers, + static MessageId findFirstLedgerWithinThreshold(List ledgers, long sizeThreshold) { long suffixSize = 0L; diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java index def05bb6da7f3..a07f43354cbd2 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminImplTest.java @@ -18,10 +18,15 @@ */ package org.apache.pulsar.client.admin.internal; +import java.util.ArrayList; +import java.util.List; import lombok.SneakyThrows; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.testng.Assert; import org.testng.annotations.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -46,6 +51,35 @@ public void testAuthFromConfUsedWhenConfHasAuth() { assertThat(createAdminAndGetAuth(conf)).isSameAs(auth); } + @Test + public void testFindFirstLedgerWithinThreshold() throws Exception { + List ledgers = new ArrayList<>(); + ledgers.add(newLedger(0, 10, 1000)); + ledgers.add(newLedger(1, 10, 2000)); + ledgers.add(newLedger(2, 10, 3000)); + + // test huge threshold + Assert.assertNull(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, Long.MAX_VALUE)); + + // test small threshold + Assert.assertEquals(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, 0), + new MessageIdImpl(2, 0, -1)); + + // test middling thresholds + Assert.assertEquals(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, 1000), + new MessageIdImpl(2, 0, -1)); + Assert.assertEquals(TopicsImpl.findFirstLedgerWithinThreshold(ledgers, 5000), + new MessageIdImpl(1, 0, -1)); + } + + private static ManagedLedgerInternalStats.LedgerInfo newLedger(long id, long entries, long size) { + ManagedLedgerInternalStats.LedgerInfo l = new ManagedLedgerInternalStats.LedgerInfo(); + l.ledgerId = id; + l.entries = entries; + l.size = size; + return l; + } + @SneakyThrows private Authentication createAdminAndGetAuth(ClientConfigurationData conf) { try (PulsarAdminImpl admin = new PulsarAdminImpl("http://localhost:8080", conf, null)) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index ca15e11139046..f212abb0618d5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -36,7 +36,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -79,7 +78,6 @@ import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.PersistencePolicies; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; @@ -1320,22 +1318,6 @@ void run() throws PulsarAdminException { } } - static MessageId findFirstLedgerWithinThreshold(List ledgers, - long sizeThreshold) { - long suffixSize = 0L; - - ledgers = Lists.reverse(ledgers); - long previousLedger = ledgers.get(0).ledgerId; - for (PersistentTopicInternalStats.LedgerInfo l : ledgers) { - suffixSize += l.size; - if (suffixSize > sizeThreshold) { - return new MessageIdImpl(previousLedger, 0L, -1); - } - previousLedger = l.ledgerId; - } - return null; - } - public static void printMessages(List> messages, boolean showServerMarker, CliCommand cli) { if (messages == null) { return; @@ -1399,23 +1381,9 @@ private class Offload extends CliCommand { @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); - - PersistentTopicInternalStats stats = getTopics().getInternalStats(persistentTopic, false); - if (stats.ledgers.size() < 1) { - throw new PulsarAdminException("Topic doesn't have any data"); - } - - LinkedList ledgers = new LinkedList(stats.ledgers); - ledgers.get(ledgers.size() - 1).size = stats.currentLedgerSize; // doesn't get filled in now it seems - MessageId messageId = findFirstLedgerWithinThreshold(ledgers, sizeThreshold); - - if (messageId == null) { - System.out.println("Nothing to offload"); - return; - } - - getTopics().triggerOffload(persistentTopic, messageId); - System.out.println("Offload triggered for " + persistentTopic + " for messages before " + messageId); + getTopics().triggerOffload(persistentTopic, sizeThreshold); + System.out.println("Offload triggered for " + persistentTopic + " which keep " + + sizeThreshold + " bytes on bookkeeper"); } } diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java index bd926edc5a808..119413c6d0a43 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java @@ -38,7 +38,6 @@ import java.io.StringWriter; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.List; import lombok.Cleanup; import org.apache.pulsar.client.admin.ListTopicsOptions; @@ -47,7 +46,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Schemas; import org.apache.pulsar.client.admin.Topics; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo; import org.apache.pulsar.common.policies.data.RetentionPolicies; @@ -97,27 +95,6 @@ private static LedgerInfo newLedger(long id, long entries, long size) { return l; } - @Test - public void testFindFirstLedgerWithinThreshold() throws Exception { - List ledgers = new ArrayList<>(); - ledgers.add(newLedger(0, 10, 1000)); - ledgers.add(newLedger(1, 10, 2000)); - ledgers.add(newLedger(2, 10, 3000)); - - // test huge threshold - Assert.assertNull(CmdTopics.findFirstLedgerWithinThreshold(ledgers, Long.MAX_VALUE)); - - // test small threshold - Assert.assertEquals(CmdTopics.findFirstLedgerWithinThreshold(ledgers, 0), - new MessageIdImpl(2, 0, -1)); - - // test middling thresholds - Assert.assertEquals(CmdTopics.findFirstLedgerWithinThreshold(ledgers, 1000), - new MessageIdImpl(2, 0, -1)); - Assert.assertEquals(CmdTopics.findFirstLedgerWithinThreshold(ledgers, 5000), - new MessageIdImpl(1, 0, -1)); - } - @Test public void testListCmd() throws Exception { List topicList = Lists.newArrayList("persistent://public/default/t1", "persistent://public/default/t2", From 1cf79691ac1ebc4cb3f24f762d0fa7938768a903 Mon Sep 17 00:00:00 2001 From: fujun Date: Fri, 20 Jun 2025 10:15:42 +0800 Subject: [PATCH 6/7] =?UTF-8?q?[feat][admin]=20=E4=BF=AE=E5=A4=8Dci=20test?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pulsar/tests/integration/offload/TestBaseOffload.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java index 7d38aa77bf027..6bd50fd679a31 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -97,7 +98,9 @@ protected void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String admi String output = pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload", "--size-threshold", "100G", topic).getStdout(); - Assert.assertTrue(output.contains("Nothing to offload")); + + System.out.println("******:" + output); + Assert.assertTrue(StringUtils.isEmpty(output)); output = pulsarCluster.runAdminCommandOnAnyBroker( "topics", "offload-status", topic).getStdout(); From f7c72ba6e5831a2b9ca6aac3354511695e876418 Mon Sep 17 00:00:00 2001 From: fujun Date: Fri, 20 Jun 2025 11:55:19 +0800 Subject: [PATCH 7/7] =?UTF-8?q?[feat][admin]=20=E4=BF=AE=E5=A4=8Dci=20test?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- .../pulsar/tests/integration/offload/TestBaseOffload.java | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 5f8c9f49d65d1..96eb11a9360ba 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1996,7 +1996,7 @@ public boolean matches(Long timestamp) { stats.ledgers.add(newLedger(2, 10, 3000)); when(mockTopics.getInternalStats("persistent://myprop/clust/ns1/ds1", false)).thenReturn(stats); cmdTopics.run(split("offload persistent://myprop/clust/ns1/ds1 -s 1k")); - verify(mockTopics).triggerOffload("persistent://myprop/clust/ns1/ds1", new MessageIdImpl(2, 0, -1)); + verify(mockTopics).triggerOffload("persistent://myprop/clust/ns1/ds1", 1024); when(mockTopics.offloadStatus("persistent://myprop/clust/ns1/ds1")).thenReturn(new OffloadProcessStatusImpl()); cmdTopics.run(split("offload-status persistent://myprop/clust/ns1/ds1")); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java index 6bd50fd679a31..735c483d7e544 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java @@ -27,7 +27,6 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -98,9 +97,7 @@ protected void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String admi String output = pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload", "--size-threshold", "100G", topic).getStdout(); - - System.out.println("******:" + output); - Assert.assertTrue(StringUtils.isEmpty(output)); + Assert.assertTrue(output.contains("which keep 107374182400 bytes on bookkeeper")); output = pulsarCluster.runAdminCommandOnAnyBroker( "topics", "offload-status", topic).getStdout();