Skip to content

Conversation

@Denovo1998
Copy link
Owner

@Denovo1998 Denovo1998 commented Nov 9, 2025

Fixes #xyz

Main Issue: #23149

PIP: apache#24370

Motivation

Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance:

  • Cancelling Scheduled Actions: A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely by acknowledging it pre-emptively.
  • Removing Backlogs: A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. This also applies to removing a single bad message from within a larger batch.
  • Manual Business Logic Correction: An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's delivery queue.

The existing skip(numMessages) API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by their specific MessageId (including ledgerId, entryId, and optional batchIndex), providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: apache#23907

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
#	pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
#	pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 80 to +83

CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> skipMessages(List<SkipEntry> entries);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Badge Provide SkipEntry implementation referenced by new APIs

The new skip-by-message-id flow adds a skipMessages(List<SkipEntry>) method to Subscription and imports org.apache.pulsar.broker.service.SkipEntry from several classes, but no such type exists anywhere in the project. This results in a compilation failure before any of the new code can run. Introduce the missing DTO (ledgerId, entryId, optional batch indexes) or reuse an existing type so the code builds.

Useful? React with 👍 / 👎.

Comment on lines +1988 to +2014
private void internalSkipByMessageIdsForPartitionedTopic(AsyncResponse asyncResponse,
PartitionedTopicMetadata partitionMetadata,
SkipMessageIdsRequest messageIds,
String subName) {
final List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
// Rebuild an Admin API request using the parsed items to avoid legacy-map format
List<org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem> items = new ArrayList<>();
for (SkipMessageIdsRequest.MessageIdItem it : messageIds.getItems()) {
items.add(new org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem(
it.getLedgerId(), it.getEntryId(), it.getBatchIndex()));
}
org.apache.pulsar.client.admin.SkipMessageIdsRequest req =
org.apache.pulsar.client.admin.SkipMessageIdsRequest.forMessageIds(items);

futures.add(admin
.topics()
.skipMessagesAsync(topicNamePartition.toString(), subName, req));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route message IDs to their owning partitions instead of broadcasting

The partitioned-topic handler sends the entire list of message IDs to every partition (for (int i …) calling skipMessagesAsync on each). When the request contains IDs that belong to only one partition (the normal case), the other partitions try to ack ledger/entry pairs that are not in their backlog and the whole operation fails once any partition returns an error. The admin API therefore cannot successfully skip a message on a multi-partition topic. Group message IDs by partition (e.g. via TopicName partition parsing or managed ledger ownership) and only invoke skipMessagesAsync for the partitions that actually contain each ID.

Useful? React with 👍 / 👎.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 13 out of 13 changed files in this pull request and generated 15 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

void skipMessages(String topic, String subName, SkipMessageIdsRequest request) throws PulsarAdminException;

/**
* Async version of {@link #skipMessages(String, String, SkipMessageIdsRequest)}.
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc for the async method is incomplete. It should include parameter and return documentation for consistency with other methods in this file. Consider:

/**
 * Skip messages by specifying a list of message IDs in various formats asynchronously.
 * Supports base64-encoded MessageId byte arrays and structured messageId objects with batchIndex.
 *
 * @param topic topic name (persistent://tenant/namespace/name)
 * @param subName subscription name
 * @param request request payload supporting multiple messageId formats
 * @return a future that can be used to track when the messages are skipped
 */
Suggested change
* Async version of {@link #skipMessages(String, String, SkipMessageIdsRequest)}.
* Skip messages by specifying a list of message IDs in various formats asynchronously.
* Supports base64-encoded MessageId byte arrays and structured messageId objects with batchIndex.
*
* @param topic topic name (persistent://tenant/namespace/name)
* @param subName subscription name
* @param request request payload supporting multiple messageId formats
* @return a future that can be used to track when the messages are skipped

Copilot uses AI. Check for mistakes.
Comment on lines +541 to +544
assertFalse((receivedMessages1.contains("msg-" + cancelMessage)
|| receivedMessages2.contains("msg-" + cancelMessage))
&& (receivedMessages1.size() + receivedMessages2.size() == 99),
"msg-" + cancelMessage + " should have been cancelled but was received");
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion logic is incorrect. The current logic assertFalse(A && B) passes if either the message was received OR the count is wrong, which is not the intended behavior. The test should verify:

  1. The cancelled message was NOT received
  2. Exactly 99 messages were received

Replace with:

assertFalse(receivedMessages1.contains("msg-" + cancelMessage) 
    || receivedMessages2.contains("msg-" + cancelMessage), 
    "msg-" + cancelMessage + " should have been cancelled but was received");
assertEquals(99, receivedMessages1.size() + receivedMessages2.size(), 
    "Should have received exactly 99 messages");
Suggested change
assertFalse((receivedMessages1.contains("msg-" + cancelMessage)
|| receivedMessages2.contains("msg-" + cancelMessage))
&& (receivedMessages1.size() + receivedMessages2.size() == 99),
"msg-" + cancelMessage + " should have been cancelled but was received");
assertFalse(receivedMessages1.contains("msg-" + cancelMessage)
|| receivedMessages2.contains("msg-" + cancelMessage),
"msg-" + cancelMessage + " should have been cancelled but was received");
assertEquals(99, receivedMessages1.size() + receivedMessages2.size(),
"Should have received exactly 99 messages");

Copilot uses AI. Check for mistakes.
throw new ParameterException("Invalid --messageId-triplet: " + s + ", " + e.getMessage());
}
items.add(new SkipMessageIdsRequest.MessageIdItem(ledgerId, entryId, batchIndex));
}
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After filtering null/empty triplets, the items list could be empty. Consider validating that at least one valid message ID was provided before making the API call:

if (items.isEmpty()) {
    throw new ParameterException("No valid message IDs provided");
}
Suggested change
}
}
if (items.isEmpty()) {
throw new ParameterException("No valid message IDs provided");
}

Copilot uses AI. Check for mistakes.
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 400, message = "Bad Request: invalid messageIds format"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namesapce or topic or subscription does not exist") })
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in error message: "Namesapce" should be "Namespace"

Copilot uses AI. Check for mistakes.
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.SkipEntry;
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The imported class SkipEntry does not exist in the codebase and needs to be created. See the comment on PersistentSubscription.java for the required class structure.

Copilot uses AI. Check for mistakes.
private String topicName;

@Option(names = { "-s",
"--subscription" }, description = "Subscription to be skip messages on", required = true)
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar issue: "Subscription to be skip messages on" should be "Subscription to skip messages on"

Copilot uses AI. Check for mistakes.
Comment on lines +2000 to +2014
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
// Rebuild an Admin API request using the parsed items to avoid legacy-map format
List<org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem> items = new ArrayList<>();
for (SkipMessageIdsRequest.MessageIdItem it : messageIds.getItems()) {
items.add(new org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem(
it.getLedgerId(), it.getEntryId(), it.getBatchIndex()));
}
org.apache.pulsar.client.admin.SkipMessageIdsRequest req =
org.apache.pulsar.client.admin.SkipMessageIdsRequest.forMessageIds(items);

futures.add(admin
.topics()
.skipMessagesAsync(topicNamePartition.toString(), subName, req));
}
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for handling partitioned topics appears incorrect. The same message IDs are sent to all partitions (loop on lines 2000-2014), but message IDs are partition-specific - a message ID from partition-0 won't exist in partition-1.

This will likely result in errors or no-op behavior on most partitions. Consider either:

  1. Requiring users to specify the partition explicitly for this operation (document that it doesn't work on partitioned topic names)
  2. Implementing logic to extract partition information from the message IDs if available
  3. Changing the API to accept partition-specific message ID mappings

Copilot uses AI. Check for mistakes.
Comment on lines +52 to +62
public long getLedgerId() {
return ledgerId;
}

public long getEntryId() {
return entryId;
}

public Integer getBatchIndex() {
return batchIndex;
}
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getter methods (getLedgerId, getEntryId, getBatchIndex) are redundant for a Java record, as records automatically generate these methods. These explicit definitions can be removed.

Suggested change
public long getLedgerId() {
return ledgerId;
}
public long getEntryId() {
return entryId;
}
public Integer getBatchIndex() {
return batchIndex;
}

Copilot uses AI. Check for mistakes.
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a partitioned topic is not allowed"),
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message "Skipping messages on a partitioned topic is not allowed" is inaccurate. The implementation supports partitioned topics by iterating over all partitions (see internalSkipByMessageIdsForPartitionedTopic). This ApiResponse should be removed or the message should be updated to reflect actual limitations.

Suggested change
@ApiResponse(code = 405, message = "Skipping messages on a partitioned topic is not allowed"),

Copilot uses AI. Check for mistakes.
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.SkipEntry;
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The imported class SkipEntry does not exist in the codebase. This class needs to be created with the following structure based on its usage:

package org.apache.pulsar.broker.service;

import java.util.List;

public class SkipEntry {
    private final long ledgerId;
    private final long entryId;
    private final List<Integer> batchIndexes;
    
    public SkipEntry(long ledgerId, long entryId, List<Integer> batchIndexes) {
        this.ledgerId = ledgerId;
        this.entryId = entryId;
        this.batchIndexes = batchIndexes;
    }
    
    public long getLedgerId() {
        return ledgerId;
    }
    
    public long getEntryId() {
        return entryId;
    }
    
    public List<Integer> getBatchIndexes() {
        return batchIndexes;
    }
}

Alternatively, it could be defined as a record.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants