Skip to content

Conversation

@Denovo1998
Copy link
Contributor

@Denovo1998 Denovo1998 commented Jun 1, 2025

Motivation

Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance:

  • Cancelling Scheduled Actions: A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely.
  • Removing Backlogs: A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single message without affecting valid messages around it is a critical operational capability.
  • Manual Business Logic Correction: An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's queue.

The existing skip-messages API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces a more fine-grained administrative tool to acknowledge a single message by its unique MessageId. This provides a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog.

Implementation PR: #23907

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: Denovo1998#8

@Denovo1998
Copy link
Contributor Author

Currently, there are some issues with this implementation.
(Note that this PIP only discusses the implementation of BucketDelayedDeliveryTracker)

If a delayed message in the LastMutableBucket has not been flushed to Bookie yet, and a failure occurs, the data in the LastMutableBucket will be lost. However, this will not have any impact, as after restarting, messages will still be read from the MackDelete position and the Bucket will be rebuilt.

However, our delayed message deletion command is different. If there is a LastMutableBucket that has not been successfully persisted (sealBucketAndAsyncPersistent), and the Broker crashes, this command will be lost. We cannot wait for the Seal Bucket condition to be triggered before returning the success of canceling the delayed message command, because we do not know how long it will take.

This part is important and needs to be resolved. I will think about how to solve this part, and everyone is also welcome to discuss with me.

I have some solutions, let me try testing them out first.

@Denovo1998
Copy link
Contributor Author

The document has been updated, we will implement the delayed message cancellation function through acknowledge message.

@Denovo1998 Denovo1998 changed the title [improve][pip] PIP-423: Add Support for Cancelling Individual Delayed Messages [improve][pip] PIP-423: Add a new admin API to acknowledge a single message Jun 10, 2025
Comment on lines +146 to +147
* **HTTP Body Parameters (JSON):** A map where keys are `ledgerId` as a string and values are `entryId` as a string.
* **Example Body:** `{"12345": "100", "12346": "200"}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization format should be changed. The problem with this format is that it cannot be extended to cover batch messages.

besides ledgerId and entryId, there's also batchIndex, which is part of the message id.

One of the design principles has been to hide the implementation details of the message id in Pulsar and not leak it. That's why there are org.apache.pulsar.client.api.MessageId#toByteArray and org.apache.pulsar.client.api.MessageId#fromByteArray .

It would be useful to support multiple formats for the message id. For consistency with the MessageId.toByteArray() API, there should be support for accepting a list of base64 encoded byte array representations of the MessageId.
One way to solve this is to have the body map to an object that is mapped with Jackson using polymorphism configured with @JsonTypeInfo and @JsonSubTypes annotations. That would allow flexibility for adding various formats for the message id list.

the default format could be the byteArrays encoded as base64:

{"type": "byteArray", "messageIds": ["CLlgEAQwAA==", "CLlgEAYwAA==", "CLlgEJ4BMAA=", "CLlgEKQCMAA=", "CLlgEPQCMAA=", "CLlgECcwAA==", "CLlgEIQDMAA=", "CLlgEKEDMAA=", "CLlgEAQwAA==", "CLlgEI4EMAA="]}

since this could be made the default, the "type": "byteArray" could be omitted:

{"messageIds": ["CLlgEAQwAA==", "CLlgEAYwAA==", "CLlgEJ4BMAA=", "CLlgEKQCMAA=", "CLlgEPQCMAA=", "CLlgECcwAA==", "CLlgEIQDMAA=", "CLlgEKEDMAA=", "CLlgEAQwAA==", "CLlgEI4EMAA="]}

another could be the ledgerId:entryId pairs (without support for batchIndex) if this is absolutely necessary:

{"type": "map_of_ledgerId_entryId", "messageIds": {12345: 100, 12346: 200}}

Instead of that, it could be useful to have a format that maps directly:

{"type": "messageId", "messageIds": [{"ledgerId": 12345, "entryId": 100}, {"ledgerId": 12346, "entryId": 200}]}

this format could also support the batch index.

{"type": "messageId", "messageIds": [{"ledgerId": 12345, "entryId": 100, "batchIndex": 5}, {"ledgerId": 12346, "entryId": 200, "batchIndex": 10}]}

* **Usage:** `pulsar-admin topics skip-messages <topic-name> [options]`
* **Options:**
* `-s, --subscription <subName>` (required): The subscription to skip messages on.
* `-m, --messageId <ledgerId=entryId>` (required): The message ID to skip. This option can be specified multiple times.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be possible to also specify the batch index.
A better format is ledgerId:entryId[:batchIndex] where the separator is : and the batch index is optional.

// ... existing methods
CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
Copy link
Member

@lhotari lhotari Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be List<MessageIdAdv> in this interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually sure if List<MessageIdAdv> is the best option here. I guess we'd need a new type (class) that captures Position and batchIndex.
client's MessageId and MessageIdAdv are slightly different and contain more state that's not relevant here.

// ... existing methods
CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually sure if List<MessageIdAdv> is the best option here. I guess we'd need a new type (class) that captures Position and batchIndex.
client's MessageId and MessageIdAdv are slightly different and contain more state that's not relevant here.

```java
// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@Override
public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be List of message ids with (ledgerId, entryId and batchIndex information)

Comment on lines +95 to +108
List<Position> positions = new ArrayList<>();
for (Map.Entry<String, String> entry : messageIds.entrySet()) {
try {
long ledgerId = Long.parseLong(entry.getKey());
long entryId = Long.parseLong(entry.getValue());
Position position = PositionFactory.create(ledgerId, entryId);
positions.add(position);
} catch (Exception e) {
return CompletableFuture.failedFuture(new NotAllowedException("Invalid message ID."));
}
}

Map<String, Long> properties = Collections.emptyMap();
acknowledgeMessage(positions, AckType.Individual, properties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should cover handling the case when the batchIndex is passed. If no batchIndex is passed, it should acknowledge the complete entry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants