Skip to content
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
8697e78
[feat][broker/proxy] PIP-442: Add memory limits for CommandGetTopicsO…
lhotari Sep 22, 2025
72e15e4
Allow acquiring zero permits to simplify logic when payload is empty
lhotari Oct 9, 2025
aad9a6f
Fix javadoc
lhotari Oct 9, 2025
fa4b3a7
Add test for the proxy lookups
lhotari Oct 10, 2025
8ad197f
Implement metrics
lhotari Oct 10, 2025
98a45e8
Update PIP-442 based on implementation
lhotari Oct 10, 2025
a76e16c
Remove dynamic configuration for topic list configuration
lhotari Oct 10, 2025
22728d8
Ignore registration and unregistration exceptions in tests
lhotari Oct 10, 2025
93bd9d3
Log at debug level
lhotari Oct 10, 2025
d9d06cc
Remove extra register
lhotari Oct 10, 2025
a8d17d9
Fix test
lhotari Oct 10, 2025
0d74565
Fix test
lhotari Oct 10, 2025
225cb75
Update pulsar-broker-common/src/main/java/org/apache/pulsar/broker/to…
lhotari Oct 10, 2025
74c09ef
Update pulsar-common/src/test/java/org/apache/pulsar/common/semaphore…
lhotari Oct 10, 2025
723ccc6
Update pulsar-common/src/test/java/org/apache/pulsar/common/semaphore…
lhotari Oct 10, 2025
858e6dd
Remove redundant files
lhotari Oct 13, 2025
64590d5
Address review comment: release lookupSemaphore
lhotari Oct 13, 2025
aa8d57b
Improve javadoc
lhotari Oct 13, 2025
5c1ff14
Synchronize relevant details in pip-442.md
lhotari Oct 13, 2025
a754edb
Add retries to watch topic list success and update
lhotari Oct 14, 2025
e3c59de
Replace ConcurrentLinkedQueue with ArrayBlockingQueue
lhotari Oct 14, 2025
b40a73d
Fix exception class name typos in javadocs
lhotari Oct 14, 2025
fec104a
Address review feedback
lhotari Oct 14, 2025
db594d9
Address review feedback
lhotari Oct 14, 2025
a77f6da
Fix otel metric names for wait time
lhotari Oct 14, 2025
6154983
Use closer approximation of size by using ByteBufUtil.utf8Bytes to ca…
lhotari Oct 14, 2025
fbee63b
Address review feedback
lhotari Oct 14, 2025
3f7fffa
Address review feedback
lhotari Oct 15, 2025
654fe7d
Address review feedback about race condition in internalProcessQueue
lhotari Oct 15, 2025
05b789e
Add check for maxPermits when acquiring permits
lhotari Oct 15, 2025
7f5b461
Add javadoc for AsyncDualMemoryLimiterImpl constructors
lhotari Oct 15, 2025
ea5a7e0
Add topic list size cache to address heap limit with proxy
lhotari Oct 15, 2025
a2bb806
Don't release the heap permit before direct memory permit has been ac…
lhotari Oct 15, 2025
db17938
Increase concurrency in test
lhotari Oct 15, 2025
591b8d2
Add solution that performs a single request for getting the size of t…
lhotari Oct 15, 2025
7b93184
Checkstyle
lhotari Oct 15, 2025
0640095
Add javadoc for TopicListSizeResultCache
lhotari Oct 15, 2025
06fff3a
Fix test
lhotari Oct 15, 2025
e009d06
Add unit tests for TopicListSizeResultCache
lhotari Oct 15, 2025
6f03a59
Merge remote-tracking branch 'origin/master' into lh-implement-PIP-442
lhotari Oct 15, 2025
1b48093
Support making the semaphore unbounded by setting maxPermits to <=0
lhotari Oct 16, 2025
1b07870
Support disabling max queue size
lhotari Oct 16, 2025
6163c93
Add javadoc for disabling limits
lhotari Oct 16, 2025
27f2e9a
Refactor
lhotari Oct 17, 2025
7f11f03
Extract estimateTopicListSize utility method
lhotari Oct 17, 2025
5992f21
Add limits for Admin API for listing topics
lhotari Oct 17, 2025
373fe1a
Add http lookup test
lhotari Oct 17, 2025
ea516ec
Merge remote-tracking branch 'origin/master' into lh-implement-PIP-442
lhotari Oct 17, 2025
8ccd8b9
Add comment about overhead
lhotari Oct 17, 2025
829ad5f
Address review comment about permits validation
lhotari Oct 27, 2025
e9ebebb
Add comments to address review comment
lhotari Oct 27, 2025
0ddfe52
Don't use CompletableFuture.join so that the logic is more clear
lhotari Oct 27, 2025
9cb73b3
Merge remote-tracking branch 'origin/master' into lh-implement-PIP-442
lhotari Oct 27, 2025
498c442
Add more tests for validating updating of permits
lhotari Oct 30, 2025
e5e51ea
Refactor the code for easier readability
lhotari Oct 30, 2025
8e853b1
No need to add permits if the permits were already released
lhotari Oct 30, 2025
2f7f3b4
Add javadoc about the usage of update
lhotari Oct 30, 2025
d9a9e94
Don't release original permits unless permits were successfully acquired
lhotari Oct 30, 2025
34a5704
Add logic for handling CompletableFuture.cancel
lhotari Oct 30, 2025
da594b7
Merge remote-tracking branch 'origin/master' into lh-implement-PIP-442
lhotari Oct 30, 2025
6e96e93
Add test case to concern about permit leak or extra permits
lhotari Oct 30, 2025
578f027
Add more test cases for leaks when there are exceptions
lhotari Nov 4, 2025
e6d137e
Merge remote-tracking branch 'origin/master' into lh-implement-PIP-442
lhotari Nov 4, 2025
c71cd3c
Retry indefinitely in TopicListService
lhotari Nov 5, 2025
587bf80
Improve closing in TopicListService
lhotari Nov 5, 2025
0eb8163
Handle topic list sending and updates in order
lhotari Nov 5, 2025
508c99b
Improve method name and comment
lhotari Nov 5, 2025
380a8e3
Add heap limit handling to listing topics
lhotari Nov 5, 2025
ffe72b5
Update comment
lhotari Nov 5, 2025
a3190cc
Fix tests
lhotari Nov 5, 2025
a73d04d
Fix tests
lhotari Nov 6, 2025
d209d54
Refactor
lhotari Nov 6, 2025
baec893
Use Awaitility in the test
lhotari Nov 6, 2025
32d28fb
Fix retrying in initializeTopicsListWatcher
lhotari Nov 6, 2025
ca665ad
Test permit acquiring retries
lhotari Nov 6, 2025
c3bfade
Remove possibly registered watcher future
lhotari Nov 6, 2025
71db086
Remove unnecessary logic which removed previous watcher with same id
lhotari Nov 6, 2025
7c64540
Simplify TopicListSizeResultCache: use latest value
lhotari Nov 7, 2025
896a8d2
Fix typo
lhotari Nov 7, 2025
635601c
Merge remote-tracking branch 'origin/master' into lh-implement-PIP-442
lhotari Nov 7, 2025
09cadc2
Revert changes to topic list watcher
lhotari Nov 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,30 @@ allowOverrideEntryFilters=false
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

# Maximum heap memory for inflight topic list operations (MB).
# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
maxTopicListInFlightHeapMemSizeMB=100

# Maximum direct memory for inflight topic list responses (MB).
# Default: 100 MB (network buffers for serialized responses)
maxTopicListInFlightDirectMemSizeMB=100

# Timeout for acquiring heap memory permits (milliseconds).
# Default: 25000 (25 seconds)
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000

# Maximum queue size for heap memory permit requests.
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000

# Timeout for acquiring direct memory permits (milliseconds).
# Default: 25000 (25 seconds)
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000

# Maximum queue size for direct memory permit requests.
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000

# Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000

Expand Down
24 changes: 24 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ maxConcurrentInboundConnectionsPerIp=0
# Max concurrent outbound connections. The proxy will error out requests beyond that.
maxConcurrentLookupRequests=50000

# Maximum heap memory for inflight topic list operations (MB).
# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
maxTopicListInFlightHeapMemSizeMB=100

# Maximum direct memory for inflight topic list responses (MB).
# Default: 100 MB (network buffers for serialized responses)
maxTopicListInFlightDirectMemSizeMB=100

# Timeout for acquiring heap memory permits (milliseconds).
# Default: 25000 (25 seconds)
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000

# Maximum queue size for heap memory permit requests.
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000

# Timeout for acquiring direct memory permits (milliseconds).
# Default: 25000 (25 seconds)
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000

# Maximum queue size for direct memory permit requests.
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000

##### --- TLS --- #####

# Deprecated - use servicePortTls and webServicePortTls instead
Expand Down
24 changes: 24 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,30 @@ preciseDispatcherFlowControl=false
# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

# Maximum heap memory for inflight topic list operations (MB).
# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
maxTopicListInFlightHeapMemSizeMB=100

# Maximum direct memory for inflight topic list responses (MB).
# Default: 100 MB (network buffers for serialized responses)
maxTopicListInFlightDirectMemSizeMB=100

# Timeout for acquiring heap memory permits (milliseconds).
# Default: 25000 (25 seconds)
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000

# Maximum queue size for heap memory permit requests.
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000

# Timeout for acquiring direct memory permits (milliseconds).
# Default: 25000 (25 seconds)
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000

# Maximum queue size for direct memory permit requests.
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000

# Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000

Expand Down
600 changes: 401 additions & 199 deletions pip/pip-442.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,42 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic")
private int maxConcurrentLookupRequest = 50000;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum heap memory for inflight topic list operations (MB).\n"
+ "Default: 100 MB (supports ~1M topic names assuming 100 bytes each)")
private int maxTopicListInFlightHeapMemSizeMB = 100;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum direct memory for inflight topic list responses (MB).\n"
+ "Default: 100 MB (network buffers for serialized responses)")
private int maxTopicListInFlightDirectMemSizeMB = 100;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout for acquiring heap memory permits (milliseconds).\n"
+ "Default: 25000 (25 seconds)")
private int maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis = 25000;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum queue size for heap memory permit requests.\n"
+ "Default: 10000 (prevent unbounded queueing)")
private int maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize = 10000;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Timeout for acquiring direct memory permits (milliseconds).\n"
+ "Default: 25000 (25 seconds)")
private int maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis = 25000;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum queue size for direct memory permit requests.\n"
+ "Default: 10000 (prevent unbounded queueing)")
private int maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize = 10000;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Loading
Loading