Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Oct 9, 2025

PIP-442

Motivation

See PIP-442.
Apache Pulsar brokers currently lack memory limits for topic listing operations (CommandGetTopicsOfNamespace and CommandWatchTopicList), creating a significant gap in the broker's otherwise comprehensive memory management framework. This can lead to:

  • Broker/Proxy Memory Exhaustion: Large namespaces with thousands of topics can trigger OutOfMemoryErrors when multiple clients simultaneously request topic lists
  • Unpredictable Resource Usage: Operators cannot reliably predict or limit total memory consumption due to this unbounded allocation path
  • Performance Degradation: Large topic list operations cause GC pressure and latency spikes affecting all broker operations

While Pulsar has robust memory limits for message publishing (maxMessagePublishBufferSizeInMB), managed ledger operations (managedLedgerMaxReadsInFlightSizeInMB, managedLedgerCacheSizeMB), and concurrent requests (maxConcurrentLookupRequest), topic listing operations remain unbounded.

Modifications

This PR implements PIP-442 by introducing memory-aware flow control for topic listing operations:

Core Components

  1. AsyncSemaphore Interface & Implementation

    • Generic asynchronous semaphore abstraction with configurable timeouts and queue limits
    • Supports permit acquisition, updates, and cancellation
    • Tracks queue size and wait times for monitoring
  2. AsyncDualMemoryLimiter Interface & Implementation

    • Dual memory pool tracking for heap memory (topic list assembly) and direct memory (network buffers)
    • Separate limits and queues for each memory type
    • Helper methods for common patterns (acquire-execute-release)
  3. AsyncDualMemoryLimiterUtil

    • Utility methods for acquiring permits and writing responses
    • Handles serialization and network buffer management
    • Ensures permits are released after write completion or failure
  4. TopicListMemoryLimiter

    • Extends AsyncDualMemoryLimiterImpl with metrics integration
    • Exposes both Prometheus and OpenTelemetry metrics
    • Tracks memory usage, queue sizes, wait times, and timeouts

Integration Points

Broker Integration:

  • Modified ServerCnx.handleGetTopicsOfNamespace() to acquire initial heap permits (1KB estimate), update to actual size after topic retrieval, then acquire direct memory permits for response serialization
  • Modified PulsarCommandSenderImpl to acquire direct memory permits before serialization and release after write completion
  • Updated TopicListService for watch command memory control
  • Created TopicListMemoryLimiter in BrokerService with configured limits

Proxy Integration:

  • Modified LookupProxyHandler.handleGetTopicsOfNamespace() with similar permit acquisition flow
  • Handles both heap memory (for topic list from broker) and direct memory (for response to client)
  • Created TopicListMemoryLimiter in ProxyService with configured limits

Configuration

Added six new configuration parameters to broker.conf, proxy.conf, and standalone.conf:

maxTopicListInFlightHeapMemSizeMB=100
maxTopicListInFlightDirectMemSizeMB=100
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000

Metrics

Added 12 new metrics per instance (broker/proxy) with both Prometheus and OpenTelemetry support:

  • Memory usage and limits (heap and direct)
  • Queue sizes and limits
  • Wait time summaries (P50, P95, P99, Max)
  • Timeout counters

Verifying this change

  • Make sure that the change passes the CI checks

This change added tests and can be verified as follows:

  • Added PatternConsumerBackPressureMultipleConsumersTest: Integration test that creates 8,192 partitioned topics and sends 500 concurrent getTopicsOfNamespace requests from 200 different client connections. The test intentionally reduces available direct memory to reproduce memory pressure scenarios and verifies all requests complete successfully with memory limiting in place.

  • Added ProxyPatternConsumerBackPressureMultipleConsumersTest: Extended version of the broker test that routes requests through the Pulsar Proxy to verify memory limiting works correctly in proxy scenarios.

  • Added AsyncSemaphoreImplTest: Comprehensive unit tests for the async semaphore implementation covering:

    • Basic acquire/release operations
    • Queueing behavior when permits unavailable
    • Timeout and queue full exceptions
    • Cancellation support
    • Permit updates (increase/decrease)
    • Concurrent operations
  • Added AsyncDualMemoryLimiterImplTest: Unit tests for the dual memory limiter covering:

    • Independent heap and direct memory limiting
    • Permit acquisition and updates for both types
    • Queue behavior and timeout handling
    • Concurrent operations with mixed memory types
    • Proper resource cleanup
  • Added AsyncDualMemoryLimiterUtilTest: Tests for utility methods covering:

    • Successful permit acquisition and response writing
    • Error handling for permit acquisition failures
    • Write failures and proper permit release
    • Cancellation scenarios
    • Concurrent operations
  • Verified metrics: Tests validate that all 26 metrics are properly registered and report correct values.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.2.0 milestone Oct 9, 2025
@lhotari lhotari self-assigned this Oct 9, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Oct 9, 2025
@lhotari lhotari changed the title [feat][broker/proxy] PIP-442: Add memory limits for CommandGetTopicsOfNamespace/CommandWatchTopicList [feat] PIP-442: Add memory limits for CommandGetTopicsOfNamespace/CommandWatchTopicList Oct 9, 2025
@lhotari lhotari closed this Oct 9, 2025
@lhotari lhotari reopened this Oct 9, 2025
@codecov-commenter
Copy link

codecov-commenter commented Oct 9, 2025

Codecov Report

❌ Patch coverage is 87.71121% with 80 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.31%. Comparing base (676ba07) to head (e6d137e).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
...apache/pulsar/proxy/server/LookupProxyHandler.java 45.45% 22 Missing and 2 partials ⚠️
...va/org/apache/pulsar/broker/service/ServerCnx.java 73.33% 14 Missing and 2 partials ⚠️
...he/pulsar/common/semaphore/AsyncSemaphoreImpl.java 92.10% 5 Missing and 7 partials ⚠️
...pache/pulsar/broker/admin/impl/NamespacesBase.java 76.47% 5 Missing and 3 partials ⚠️
.../broker/topiclistlimit/TopicListMemoryLimiter.java 95.67% 4 Missing and 3 partials ⚠️
...apache/pulsar/broker/service/TopicListService.java 82.85% 3 Missing and 3 partials ⚠️
...pulsar/broker/service/PulsarCommandSenderImpl.java 50.00% 0 Missing and 3 partials ⚠️
...roker/topiclistlimit/TopicListSizeResultCache.java 93.93% 0 Missing and 2 partials ⚠️
...r/common/semaphore/AsyncDualMemoryLimiterImpl.java 95.34% 1 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #24833       +/-   ##
=============================================
+ Coverage     38.56%   74.31%   +35.75%     
- Complexity    13262    33995    +20733     
=============================================
  Files          1856     1920       +64     
  Lines        145287   150039     +4752     
  Branches      16877    17399      +522     
=============================================
+ Hits          56025   111502    +55477     
+ Misses        81696    29628    -52068     
- Partials       7566     8909     +1343     
Flag Coverage Δ
inttests 26.28% <29.49%> (+0.09%) ⬆️
systests 22.88% <50.07%> (+0.11%) ⬆️
unittests 73.84% <87.71%> (+39.10%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/pulsar/broker/ServiceConfiguration.java 98.22% <100.00%> (+3.11%) ⬆️
.../org/apache/pulsar/broker/admin/v1/Namespaces.java 52.77% <100.00%> (+46.90%) ⬆️
.../org/apache/pulsar/broker/admin/v2/Namespaces.java 90.88% <100.00%> (+70.48%) ⬆️
...rg/apache/pulsar/broker/service/BrokerService.java 83.53% <100.00%> (+24.74%) ⬆️
...va/org/apache/pulsar/common/protocol/Commands.java 90.99% <100.00%> (+24.39%) ⬆️
...ulsar/common/semaphore/AsyncDualMemoryLimiter.java 100.00% <100.00%> (ø)
...r/common/semaphore/AsyncDualMemoryLimiterUtil.java 100.00% <100.00%> (ø)
...apache/pulsar/common/semaphore/AsyncSemaphore.java 100.00% <100.00%> (ø)
...apache/pulsar/proxy/server/ProxyConfiguration.java 95.67% <100.00%> (+16.19%) ⬆️
...a/org/apache/pulsar/proxy/server/ProxyService.java 80.52% <100.00%> (+31.49%) ⬆️
... and 9 more

... and 1399 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions github-actions bot added the PIP label Oct 10, 2025
@lhotari lhotari requested a review from Copilot October 10, 2025 18:24
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements PIP-442 to add memory limits for topic listing operations (CommandGetTopicsOfNamespace and CommandWatchTopicList). It introduces an asynchronous dual memory limiter that tracks separate limits for heap memory (topic list assembly) and direct memory (network buffers) to prevent broker/proxy memory exhaustion when handling large namespaces with thousands of topics.

  • Introduces AsyncSemaphore and AsyncDualMemoryLimiter interfaces with comprehensive implementations
  • Integrates memory limiting into broker and proxy topic listing operations with cancellation support
  • Adds 6 new configuration parameters for heap/direct memory limits, timeouts, and queue sizes
  • Exposes 26 new metrics (13 Prometheus + 13 OpenTelemetry) for monitoring memory usage and queue behavior

Reviewed Changes

Copilot reviewed 40 out of 40 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/ Core semaphore and dual memory limiter implementations with utility methods
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ Broker-side integration for topic listing memory control
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ Proxy-side integration for topic listing memory control
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/ TopicListMemoryLimiter with Prometheus and OpenTelemetry metrics
conf/*.conf Configuration files with new memory limiting parameters
Test files Comprehensive unit and integration tests for the new functionality

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@lhotari lhotari requested a review from codelipenghui October 17, 2025 19:14
@lhotari
Copy link
Member Author

lhotari commented Oct 17, 2025

I noticed a separate heap memory issue related to topic listing operations: the childrenCache in AbstractMetadataStore. There's a separate PR to address the unbounded cache: #24868.

private final Backoff retryBackoff;


public TopicListService(PulsarService pulsar, ServerCnx connection,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is a notification for the client, to let the client know which topic was changed; it should not be limited by the new feature. the change https://github.com/apache/pulsar/pull/24833/files#diff-5cb94e51fda5445d6fc829fda24eeccda65bf166643a7bf6957fa195516bca28L363-R392 is also related

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is a notification for the client, to let the client know which topic was changed; it should not be limited by the new feature.

I disagree. To properly limit memory usage, it's also necessary to handle CommandWatchTopicListSuccess / CommandWatchTopicUpdate as explained in https://github.com/apache/pulsar/blob/master/pip/pip-442.md .

Copy link
Contributor

@poorbarcode poorbarcode Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't close this comment for now. I will review this piece of code again. I will solve it as I have completed the review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari The command should not be limited, because MultiTopicsConsumer and MultiTopicsProducer need to receive this command once the topics' partition is changed. The regexp consumer is also relies on the command.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari The command should not be limited, because MultiTopicsConsumer and MultiTopicsProducer need to receive this command once the topics' partition is changed. The regexp consumer is also relies on the command.

@poorbarcode There's perhaps some misunderstanding. Memory limiting won't prevent this command from being executed unless it times out or the queue fills up. It is absolutely necessary to handle also CommandWatchTopicListSuccess / CommandWatchTopicUpdate related commands so that the broker doesn't run out of memory and that's also explained in PIP-442. Handling CommandWatchTopicListSuccess / CommandWatchTopicUpdate related commands with the memory limiter will help the broker remain stable regardless of a large amount of regex consumers connecting at once. When the limit is reached, it will queue up the requests until other requests have completed.
Does this make sense?

Copy link
Contributor

@poorbarcode poorbarcode Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless it times out or the queue fills up

That situation is what I want to say. It will cause consumers to be unable to receive messages that were sent to the new partitions until the client application restarts.

@lhotari lhotari requested a review from poorbarcode October 27, 2025 15:19
@lhotari
Copy link
Member Author

lhotari commented Oct 27, 2025

@codelipenghui @poorbarcode I've addressed the review comments. PTAL

@lhotari lhotari dismissed Denovo1998’s stale review October 27, 2025 16:43

Review comments were addressed earlier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants