Skip to content
255 changes: 255 additions & 0 deletions pip/pip-423.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
# PIP-423: Add a new admin API to acknowledge a single message

# Background knowledge

* **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends.
* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`).
* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message. When a message is acknowledged individually ahead of the `mark-delete position`, the broker's `ManagedCursor` persistently tracks this to ensure that acknowledged messages are not redelivered after a broker or consumer restart.
* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time.
* **Existing `skipMessages` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal extends this existing API to add the ability to skip messages by their specific `MessageId`.

# Motivation

Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance:

* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely.
* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single message without affecting valid messages around it is a critical operational capability.
* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's queue.

The existing `skipMessages(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal enhances the `skipMessages` API to accept specific message IDs, providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog.

# Goals

## In Scope

* Enhance the existing Admin API endpoint and `pulsar-admin` CLI to support skipping specific messages for a subscription.
* Introduce a new CLI command `pulsar-admin topics skip-messages` for this purpose.
* The target message(s) will be identified by their `ledgerId` and `entryId`.
* The implementation will leverage Pulsar's existing, robust `AckType.Individual` mechanism for persistence and reliability.
* This feature will only be supported for subscription types that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`).
* Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription.

## Out of Scope
* Adding a new, separate Admin API endpoint. This feature enhances the existing `skip` endpoint.
* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation.

# High Level Design

The proposed solution extends the existing administrative `skipMessages` API to trigger Pulsar's individual acknowledgement capability on demand.

1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request must specify the topic, the target subscription, and a map of `ledgerId` to `entryId` for the messages to be skipped.

2. **Broker Receives Request:** The Pulsar broker that owns the topic partition receives the admin request. It validates the parameters and the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` permission. The API call is an overload of the existing skip endpoint, where the number of messages to skip is specified as `0` in the URL path, and the message IDs are passed in the POST body.

3. **Delegate to Subscription:** The broker invokes a new `skipMessages(Map<String, String> messageIds)` method on the target `PersistentSubscription` object.

4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the following occurs:
* It verifies that the subscription's type is compatible with individual acknowledgement (i.e., not cumulative).
* It constructs `Position` objects from the provided ledger and entry IDs.
* It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for the specified positions. This is functionally identical to a consumer individually acknowledging the messages.

5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these individual acknowledgements, which are persisted.
* For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered.
* For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` attempts to schedule it. The message is thus effectively **cancelled**.

This design is simple and robust as it builds upon the broker's proven message acknowledgement foundation while cleanly extending an existing administrative API.

# Detailed Design

## Design & Implementation Details

The core of the implementation involves adding a new method to the `Subscription` interface and implementing it in `PersistentSubscription` to leverage the existing individual acknowledgment mechanism.

1. **Subscription Interface Extension:**
The `Subscription` interface is extended with a new method to handle skipping by message IDs.

```java
// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
public interface Subscription extends MessageExpirer {
// ... existing methods
CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> skipMessages(Map<String, String> messageIds);
Copy link
Member

@lhotari lhotari Oct 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be List<MessageIdAdv> in this interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually sure if List<MessageIdAdv> is the best option here. I guess we'd need a new type (class) that captures Position and batchIndex.
client's MessageId and MessageIdAdv are slightly different and contain more state that's not relevant here.

// ... existing methods
}
```

2. **PersistentSubscription Implementation:**
The `PersistentSubscription` class provides the concrete implementation. It converts the message ID map into `Position` objects and uses the standard individual acknowledge flow.

```java
// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@Override
public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be List of message ids with (ledgerId, entryId and batchIndex information)

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Skipping messages by messageIds, current backlog {}", topicName, subName,
cursor.getNumberOfEntriesInBacklog(false));
}

if (Subscription.isCumulativeAckMode(getType())) {
return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type."));
}

List<Position> positions = new ArrayList<>();
for (Map.Entry<String, String> entry : messageIds.entrySet()) {
try {
long ledgerId = Long.parseLong(entry.getKey());
long entryId = Long.parseLong(entry.getValue());
Position position = PositionFactory.create(ledgerId, entryId);
positions.add(position);
} catch (Exception e) {
return CompletableFuture.failedFuture(new NotAllowedException("Invalid message ID."));
}
}

Map<String, Long> properties = Collections.emptyMap();
acknowledgeMessage(positions, AckType.Individual, properties);
Comment on lines +95 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should cover handling the case when the batchIndex is passed. If no batchIndex is passed, it should acknowledge the complete entry.


return CompletableFuture.completedFuture(null);
}
```

3. **Admin API Logic:**
The `PersistentTopicsBase` class is updated to handle the overloaded `skipMessages` request. When `numMessages` is 0 and the `messageIds` map is not empty, it routes the request to the new `subscription.skipMessages(Map)` method.

```java
// in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages,
boolean authoritative, Map<String, String> messageIds) {
// ...
// In the implementation logic for the topic:
// ...
if (!messageIds.isEmpty() && numMessages == 0) {
return sub.skipMessages(messageIds).thenAccept(unused -> {
log.info("[{}] Skipped messages on {} {}", clientAppId(), topicName, subName);
asyncResponse.resume(Response.noContent().build());
}
);
}
return sub.skipMessages(numMessages).thenAccept(unused -> {
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages,
topicName, subName);
// ...
});
// ...
}
```

## Public-facing Changes
The existing `skipMessages` API is modified to accept a POST body containing message IDs.

### Public API
The REST endpoint for skipping messages is updated. To skip by message ID, a client must send a `POST` request with `numMessages` set to `0` in the path and provide a map of message IDs in the JSON body.

* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}`
* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}`
* **Path Parameters:**
* `numMessages`: Must be set to `0` to enable skipping by message ID.
* **HTTP Body Parameters (JSON):** A map where keys are `ledgerId` as a string and values are `entryId` as a string.
* **Example Body:** `{"12345": "100", "12345": "101"}`
* **Response Codes:**
* `204 No Content`: The operation was successful.
* `400 Bad Request`: Invalid message ID format in the request body.
* `403 Forbidden`: The client is not authorized to perform a `skip` operation on this topic.
* `404 Not Found`: The topic or subscription does not exist.
* `405 Method Not Allowed`: The subscription type does not support individual acknowledgement (e.g., `Exclusive`, `Failover`).

### Binary protocol

No changes are made to the Pulsar binary protocol.

### Configuration

**No new configuration parameters are introduced. However, this feature's performance and behavior under heavy load are directly influenced by existing `ManagedLedger` configurations that govern the persistence of acknowledgement holes.**

**Administrators should be aware of these settings if they expect a high volume of message cancellations:**

```
# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=10000

# Maximum number of partially acknowledged batch messages per subscription that will have their batch
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
# deleted indexes will be cleared while redelivering the messages to consumers.
managedLedgerMaxBatchDeletedIndexToPersist=10000

# When storing acknowledgement state, choose a more compact serialization format that stores
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
managedLedgerPersistIndividualAckAsLongArray=true

# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
managedLedgerUnackedRangesOpenCacheSetEnabled=true

# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# MetadataStore.
managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000

# ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY).
# If value is NONE, then save the ManagedCursorInfo bytes data directly without compression.
# Using compression reduces the size of persistent cursor (subscription) metadata. This enables using a higher
# managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the overall metadata stored in
# the metadata store such as ZooKeeper.
managedCursorInfoCompressionType=NONE

# ManagedCursorInfo compression size threshold (bytes), only compress metadata when origin size more then this value.
# 0 means compression will always apply.
managedCursorInfoCompressionThresholdInBytes=16384

# ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY).
# If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly without compression.
# Using compression reduces the size of the persistent topic metadata. When a topic contains a large number of
# individual ledgers in BookKeeper or tiered storage, compression helps prevent the metadata size from exceeding
# the maximum size of a metadata store entry (ZNode in ZooKeeper). This also reduces the overall metadata stored
# in the metadata store such as ZooKeeper.
managedLedgerInfoCompressionType=NONE

# ManagedLedgerInfo compression size threshold (bytes), only compress metadata when origin size more then this value.
# 0 means compression will always apply.
managedLedgerInfoCompressionThresholdInBytes=16384
```

### CLI

A new CLI command is added to `pulsar-admin topics`.

* **Command:** `skip-messages`
* **Description:** Skip some messages for a subscription by their message IDs.
* **Usage:** `pulsar-admin topics skip-messages <topic-name> [options]`
* **Options:**
* `-s, --subscription <subName>` (required): The subscription to skip messages on.
* `-m, --messageId <ledgerId=entryId>` (required): The message ID to skip. This option can be specified multiple times.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be possible to also specify the batch index.
A better format is ledgerId:entryId[:batchIndex] where the separator is : and the batch index is optional.

* **Example:**
```bash
# Skip a single message for subscription 'my-sub'
pulsar-admin topics skip-messages persistent://public/default/my-topic \
-s my-sub -m 12345=100

# Skip multiple messages
pulsar-admin topics skip-messages persistent://public/default/my-topic \
-s my-sub -m 12345=100 -m 12345=101
```

# Backward & Forward Compatibility

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

This operation is local to a cluster and a subscription's cursor. To acknowledge a message on a replicated topic in multiple clusters, the admin command must be executed against each cluster. Geo-replication state is not affected.

# Alternatives

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/lo182ztgrkzlq6mbkytj8krd050yvb9w
* Mailing List voting thread: