Skip to content

Conversation

@Denovo1998
Copy link
Contributor

@Denovo1998 Denovo1998 commented Jan 29, 2025

Fixes #xyz

Main Issue: #23149

PIP: #24370

Motivation

Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance:

  • Cancelling Scheduled Actions: A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely by acknowledging it pre-emptively.
  • Removing Backlogs: A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. This also applies to removing a single bad message from within a larger batch.
  • Manual Business Logic Correction: An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's delivery queue.

The existing skip(numMessages) API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by their specific MessageId (including ledgerId, entryId, and optional batchIndex), providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: Denovo1998#17

@github-actions
Copy link

@Denovo1998 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Jan 29, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @Denovo1998. Since there's a major impact to the core parts of Pulsar, more effort would have to be spent in design decisions.

All major project decisions (excluding new security issues) are made by the whole community in the Apache way on the [email protected] mailing list.

The most important thing about engaging with any Apache project is that everyone is equal. All project participants with an opinion can express that opinion and, where appropriate, have the community consider it.
Unless you already have, please join the dev mailing list using these instructions , and start a discussion about this proposal.

Pulsar project decisions are usually documented in the form of Pulsar Improvement Proposals (PIPs) and a new PIP is started with a discussion on the mailing list.

In this case, I believe that the use case is very clear, but the solution might be different from what the current pull request includes.

It's a useful starting point to have this PR in place before starting a PIP, however it won't be merged before we have concluded on a solution that meets our functional and non-functional requirements. For example, we are looking for maintainability (fits the current architecture), performance (doesn't have a negative performance impact on other workloads) and compatibility (doesn't break existing brokers and clients).

At a quick glance it seems that from the architecture side, that the current DelayedDeliveryTracker should be leveraged to support this use case. For encoding the marker messages, Pulsar contains marker messages. It should be considered how these cancellation events would be stored in the topic. Compatibility concerns are around possible limitations with geo-replication. It might not even make sense to store this information in the topic in the first place. When the cancellation messages are stored in the topic, cancellation would only work when the DelayedDeliveryTracker state is such that it has "indexed" the delayed messages and the cancellation messages. For example, the InMemoryDelayedDeliveryTracker keeps state only in memory. The impact of the cancellation commands in the topic would be that before delivering any scheduled message, the state would have to be caught up before delivering a scheduled message. This is just a first thought about the possible impact of supporting cancellation. Due to such performance impacting details, it's more likely that this type of cancellation support would have to be enabled for a namespace or topic explicitly.

I hope this comment helps in making further progress.

@lhotari
Copy link
Member

lhotari commented Jan 30, 2025

When the cancellation messages are stored in the topic, cancellation would only work when the DelayedDeliveryTracker state is such that it has "indexed" the delayed messages and the cancellation messages. For example, the InMemoryDelayedDeliveryTracker keeps state only in memory. The impact of the cancellation commands in the topic would be that before delivering any scheduled message, the state would have to be caught up before delivering a scheduled message. This is just a first thought about the possible impact of supporting cancellation. Due to such performance impacting details, it's more likely that this type of cancellation support would have to be enabled for a namespace or topic explicitly.

Not directly related, but contains some details about the current delayed delivery tracker solution: #23912 .
For the BucketDelayedDeliveryTracker, it could be feasible to add the cancellation support since it keeps persistent state. However, the challenge that the cancellation commands might not be processed while making delayed message delivery decisions remains. It seems that some sort of RPC would be needed and storing the cancellation commands directly in the BucketDelayedDeliveryTracker persistence solution could be feasible. For the InMemoryDelayedDeliveryTracker, I don't think that it would be feasible to add support for cancellation at all since based on my initial analysis, I don't think that it makes sense to store the cancellation state in the topic itself due to the problems with unprocessed cancellation commands while making delivery decisions.

@Denovo1998
Copy link
Contributor Author

@lhotari Thank you very much, this is very useful.

I initially implemented delayed message cancellation in the Dispatcher rather than the DelayedDeliveryTracker because using MARK messages appeared straightforward, and I assumed minimal storage overhead since not all delayed messages require cancellation. Determining the optimal timing for sending MARK messages also presented significant design challenges. However, as you rightly pointed out, storing MARK messages directly in topics was fundamentally flawed from the start, and I overlooked geo-replication implications.

Additionally, I realized the current PR's MARK messages affect the entire topic rather than specific subscriptions - cancellation commands should ideally be subscription-scoped. The cancellation command for delayed messages proposed in a subscription should only act on that one subscription.

I will now focus on implementing delayed message cancellation within the BucketDelayedDeliveryTracker and plan to propose a PIP in the near future.

@thetumbled
Copy link
Member

thetumbled commented Feb 18, 2025

First of all, what do you mean by cancelling a delayed message? do you want to delete such message in broker and don't dispatch it, or cancel the delayed time and dispatch it immediately?

As a message queue, pulsar do not support message modification, but if you do want to withdraw the message produced, you can try the transaction feature or topic compaction feature. Frankly speaking, we may meet some issues when combining delayed message feature with transaction feature or topic compaction feature.

It is also reasonable that pulsar do not support the modification of delayed time or cancellation of delayed message, just like we can't withdraw the message produced. There are too many features in pulsar, we can't integrate all features into pulsar, most of these so-called requirement should be implemented in user's side. And the complexity of Pulsar is pretty high, we need to balance between the benefit of this minority demand and the risk of breaking existing logic, adding the complexity of the project.
Combination of different features of pulsar can result into lots of problem. IMO, what pulsar mostly need is not adding edge features that only minority would use, but the reliability, stability, simplicity. We need to decrease the complexity of the project.

@thetumbled
Copy link
Member

thetumbled commented Feb 18, 2025

This is a big new feature, impacting the core logic, a pip may be needed to get more opinions from the community.

@Denovo1998
Copy link
Contributor Author

Denovo1998 commented Feb 21, 2025

This is a big new feature, impacting the core logic, a pip may be needed to get more opinions from the community.

Yes. It‘s very interesting and challenging for me. I will launch the PIP process later.

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
#	pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
#	pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java
Copilot AI review requested due to automatic review settings November 6, 2025 13:00
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds functionality to skip (acknowledge) specific messages by their message IDs on a topic subscription, primarily to support canceling delayed messages. The feature includes both API and CLI support.

Key Changes:

  • Added new skipMessages(Map<String, String> messageIds) method to skip specific messages by their ledger ID and entry ID
  • Implemented REST endpoints in both v1 and v2 admin APIs to support the new skip-by-message-IDs operation
  • Added CLI command skip-messages to invoke the new functionality from the command line

Reviewed Changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java Added API method signatures for skipping messages by message IDs
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java Implemented client methods to call the skip-by-message-IDs REST endpoint
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java Added CLI command to skip messages by message IDs
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java Added interface method for skipping messages by message IDs
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java Implemented logic to acknowledge specific messages by their positions
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java Added no-op implementation for non-persistent topics
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java Added v1 REST endpoint for skip-by-message-IDs
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java Added v2 REST endpoint for skip-by-message-IDs
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java Implemented admin logic for both partitioned and non-partitioned topics, includes unrelated pattern matching refactorings
pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdTopics.java Added unit test for CLI command
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java Added integration test for delayed message cancellation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@Denovo1998 Denovo1998 marked this pull request as draft November 6, 2025 13:18
@Denovo1998 Denovo1998 changed the title [improve][broker] Implementing delayed message cancellation in pulsar [improve][broker] PIP-423: Add a new admin API to acknowledge a single message Nov 9, 2025
@Denovo1998 Denovo1998 marked this pull request as ready for review November 9, 2025 10:02
@Denovo1998 Denovo1998 requested a review from Copilot November 9, 2025 10:05
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants