Skip to content

Conversation

@JunFu0814
Copy link
Contributor

@JunFu0814 JunFu0814 commented Jun 17, 2025

Main Issue: #24276

Motivation

For pip #24276 , add a new admin api for trigger offload with size threshold.

Modifications

  1. Add new admin apis for trigger offload with size threshold.
  2. Optimize formatting.
  3. Enhance testing.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions
Copy link

@JunFu0814 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@JunFu0814 JunFu0814 changed the title [feat][client] clinet apitrigger offload with size threshold [feat][admin] new admin api for trigger offload with size threshold Jun 17, 2025
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Jun 17, 2025
@codelipenghui codelipenghui added this to the 4.1.0 milestone Jun 17, 2025
@codelipenghui codelipenghui added type/feature The PR added a new feature or issue requested a new feature area/broker area/tieredstorage labels Jun 17, 2025
@JunFu0814 JunFu0814 removed their assignment Jun 18, 2025
public CompletableFuture<Void> triggerOffloadAsync(String topic, long sizeThreshold) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
PersistentTopicInternalStats stats = getInternalStats(topic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use getInternalStatsAsync instead of getInternalStats to avoid thread blocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodece please review again bc9d735

@nodece nodece changed the title [feat][admin] new admin api for trigger offload with size threshold [improve][admin] PIP-416: Add a new topic method to implement trigger offload by size threshold Jun 18, 2025
Comment on lines 1261 to 1275
private MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> ledgers,
long sizeThreshold) {
long suffixSize = 0L;

ledgers = Lists.reverse(ledgers);
long previousLedger = ledgers.get(0).ledgerId;
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
suffixSize += l.size;
if (suffixSize > sizeThreshold) {
return new MessageIdImpl(previousLedger, 0L, -1);
}
previousLedger = l.ledgerId;
}
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move to the broker side which can provide consistent behavior from the admin CLI and the admin REST API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @codelipenghui , It is a better way to move to the broker side, which can ensure consistent behavior of cli and clients of any language type, but of course this will also bring more workload. In the current pip, I will first ensure that cli and java client use the same findFirstLedgerWithinThreshold logic. Please help review on this ff002be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but of course this will also bring more workload

While both approaches require iterating over ledgers, the scope of that iteration differs:

Client-side: You'd have to iterate over all ledgers to determine what needs offloading.

Broker-side: The broker only needs to iterate over the specific ledgers that are being offloaded, directly accessing data already in memory.

Even if the client pre-calculates message IDs and sends them to the broker, the broker still needs to iterate the ledger map and decide ledgers should be offloaded. The key is that the broker can do this much more efficiently, leveraging its existing in-memory structures without the constant creation and disposal of new objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the broker side already has most of the implementation

private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
CompletableFuture<Position> finalPromise) {
if (getOffloadPoliciesIfAppendable().isEmpty()) {
String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
finalPromise.completeExceptionally(new IllegalArgumentException(msg));
return;
}
if (offloadThresholdInBytes < 0 && offloadThresholdInSeconds < 0) {
String msg = String.format("[%s] Nothing to offload due to [managedLedgerOffloadThresholdInBytes] and "
+ "[managedLedgerOffloadThresholdInSeconds] less than 0.", name);
finalPromise.completeExceptionally(new IllegalArgumentException(msg));
return;
}
if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise),
100, TimeUnit.MILLISECONDS);
return;
}
CompletableFuture<Position> unlockingPromise = new CompletableFuture<>();
unlockingPromise.whenComplete((res, ex) -> {
offloadMutex.unlock();
if (ex != null) {
finalPromise.completeExceptionally(ex);
} else {
finalPromise.complete(res);
}
});
long sizeSummed = 0;
long toOffloadSize = 0;
long alreadyOffloadedSize = 0;
ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque<>();
final long offloadTimeThresholdMillis = TimeUnit.SECONDS.toMillis(offloadThresholdInSeconds);
for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
final LedgerInfo info = e.getValue();
// Skip current active ledger, an active ledger can't be offloaded.
// Can't `info.getLedgerId() == currentLedger.getId()` here, trigger offloading is before create ledger.
if (info.getTimestamp() == 0L) {
continue;
}
final long size = info.getSize();
final long timestamp = info.getTimestamp();
final long now = System.currentTimeMillis();
sizeSummed += size;
final boolean alreadyOffloaded = info.hasOffloadContext() && info.getOffloadContext().getComplete();
if (alreadyOffloaded) {
alreadyOffloadedSize += size;
} else {
if ((offloadThresholdInBytes >= 0 && sizeSummed > offloadThresholdInBytes)
|| (offloadTimeThresholdMillis >= 0 && now - timestamp >= offloadTimeThresholdMillis)) {
toOffloadSize += size;
toOffload.addFirst(info);
}
}
}
if (toOffload.size() > 0) {
log.info("[{}] Going to automatically offload ledgers {}"
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
offloadLoop(unlockingPromise, toOffload, PositionFactory.LATEST, Optional.empty());
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, "
+ "threshold = [managedLedgerOffloadThresholdInBytes:{}, "
+ "managedLedgerOffloadThresholdInSeconds:{}]",
name, sizeSummed, alreadyOffloadedSize, offloadThresholdInBytes,
TimeUnit.MILLISECONDS.toSeconds(offloadTimeThresholdMillis));
unlockingPromise.complete(PositionFactory.LATEST);
}
}

There is no need to have duplicated codes for like findFirstLedgerWithinThreshold.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Thank you for your suggestion, I will understand the logic here in depth frist

Copy link
Contributor Author

@JunFu0814 JunFu0814 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @codelipenghui , I have gained an understanding of the offload-related logic. As you mentioned, regardless of whether the offload is executed automatically or the offload operation is manually triggered, the logic on the broker side will ultimately be executed in the offloadLoop code, and offloadThresholdInBytes is a crucial condition. Therefore, we can provide a unified API based on offloadThresholdInBytes on the broker side, enabling the admin client and CLI to no longer need to be concerned about the findFirstLedgerWithinThreshold logic. Regarding the changes here, I suggest introducing a new PIP for discussion and to finalize the optimization solution. In PIP-416 #24276 , our goal is merely to provide a means in the Java admin client to manually trigger topic offload based on the offloadThresholdInBytes condition (since the CLI already offers such a method), thereby enhancing the user experience. I believe these two approaches are not conflicting.

void offloadLoop(CompletableFuture<Position> promise, Queue<LedgerInfo> ledgersToOffload,
Position firstUnoffloaded, Optional<Throwable> firstError) {
State currentState = getState();
if (currentState == State.Closed) {
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
String.format("managed ledger [%s] has already closed", name)));
return;
}
if (currentState.isFenced()) {
promise.completeExceptionally(new ManagedLedgerFencedException(
String.format("managed ledger [%s] is fenced", name)));
return;
}
LedgerInfo info = ledgersToOffload.poll();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodece What suggestions do you have for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker already supports automatic offload, and users can manually trigger offload using tools like the pulsar-admin CLI. Currently, @codelipenghui is proposing to enhance offload handling directly on the broker side, possibly by adding new methods or APIs to allow the broker to make offload decisions.

While I understand the motivation for this, adding such logic to the broker would increase complexity and require additional API surface. An alternative: handling the logic on the client side. In this approach, the client would fetch relevant metadata from the broker, apply the offload decision logic, and trigger offload manually through existing interfaces. This avoids modifying core broker logic and offers more flexibility for customized behavior.

That said, we could still consider implementing manual triggering directly on the broker side in the future as a built-in capability. But for now, I think starting with the client-side logic is a simpler and more incremental step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodece

adding such logic to the broker would increase complexity and require additional API surface

Could you please provide more details of where increased the complexity? new REST API?
As I understand, moving to the client side will definitely adding complexity to the system

  • Duplicated codes added for finding the ledger
  • Inconsistent REST API and admin CLI (If other people want to trigger the offset via REST API, how can we support them?)
  • Pulsarctl or other CLI tools need to catchup the feature with adding duplicated codes

This avoids modifying core broker logic and offers more flexibility for customized behavior.

And I don't think it needs to modifying the core broker logic, it just expose the existing internal capability(tigger data offloading) to users.

That said, we could still consider implementing manual triggering directly on the broker side in the future as a built-in capability. But for now, I think starting with the client-side logic is a simpler and more incremental step.

And it definitely not a more incremental step. Once we supported REST API to trigger offload by size threshold, all the CLI tools need to remove the existing implementation and move to the new REST API.

@JunFu0814 JunFu0814 requested a review from nodece June 20, 2025 06:45
@nodece nodece closed this Jul 1, 2025
@nodece nodece reopened this Jul 1, 2025
@coderzc coderzc modified the milestones: 4.1.0, 4.2.0 Sep 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker area/tieredstorage doc-not-needed Your PR changes do not impact docs type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants