Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,37 @@ maxTopicsPerNamespace=0
# The maximum number of connections in the broker. If it exceeds, new connections are rejected.
brokerMaxConnections=0

# It relates to configuration "WriteBufferHighWaterMark" of Netty Channel Config. If the number of bytes
# queued in the write buffer exceeds this value, channel writable state will start to return "false".
pulsarChannelWriteBufferHighWaterMark=65536
Copy link
Contributor

@Technoboy- Technoboy- Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this field not named nettyChannelWriteBufferHighWaterMark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the pip-434 defined it, which is different from bk_client params


# It relates to configuration "WriteBufferLowWaterMark" of Netty Channel Config. If the number of bytes"
# queued in the write buffer is smaller than this value, channel writable state will start to return "true".
pulsarChannelWriteBufferLowWaterMark=32768

# If enabled, the broker will pause reading from the channel to deal with new request once the writer
# buffer is full, until it is changed to writable.
pulsarChannelPauseReceivingRequestsIfUnwritable=false

# After the connection is recovered from a pause receiving state, the channel will be rate-limited
# for a time window to avoid overwhelming due to the backlog of requests. This parameter defines
# how long the rate limiting should last, in millis. Once the bytes that are waiting to be sent out
# reach the "pulsarChannelWriteBufferHighWaterMark"? the timer will be reset. Setting a negative
# value will disable the rate limiting.
pulsarChannelPauseReceivingCooldownMs=5000

# After the connection is recovered from a pause receiving state, the channel will be rate-limited for a
# period of time to avoid overwhelming due to the backlog of requests. This parameter defines how
# many requests should be allowed in the rate limiting period.
pulsarChannelPauseReceivingCooldownRateLimitPermits=5

# After the connection is recovered from a pause receiving state, the channel will be rate-limited for a
# period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the
# backlog of requests. This parameter defines the period of the rate limiter in milliseconds. If the rate
# limit period is set to 1000, then the unit is requests per 1000 milliseconds. When it's 10, the unit
# is requests per every 10ms.
pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=10

# The maximum number of connections per IP. If it exceeds, new connections are rejected.
brokerMaxConnectionsPerIp=0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,8 +951,8 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece

@FieldContext(
category = CATEGORY_POLICIES,
doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited"
+ " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines"
doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited"
+ " for a time window to avoid overwhelming due to the backlog of requests. This parameter defines"
+ " how long the rate limiting should last, in millis. Once the bytes that are waiting to be sent out"
+ " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative"
+ " value will disable the rate limiting."
Expand All @@ -973,7 +973,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a"
+ " period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the"
+ " backlog of requests. This parameter defines the period of the rate limiter in milliseconds. If the rate"
+ " limit period is set to 1000, then the unit is requests per 1000 milli seconds. When it's 10, the unit"
+ " limit period is set to 1000, then the unit is requests per 1000 milliseconds. When it's 10, the unit"
+ " is requests per every 10ms."

)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) {
ctx.channel().config().setAutoRead(true);
pausedDueToRateLimitation = false;
}
}, 1, TimeUnit.SECONDS);
}, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,11 @@ public void testInitialize() throws Exception {
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200);
assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(), true);
assertEquals(standalone.getConfig().getPulsarChannelWriteBufferHighWaterMark(), 60000);
assertEquals(standalone.getConfig().getPulsarChannelWriteBufferLowWaterMark(), 120000);
assertEquals(standalone.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable(), true);
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownMs(), 10_000);
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(), 100);
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), 200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ public void testInit() throws Exception {
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true);
assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
assertEquals(config.isCreateTopicToRemoteClusterForReplication(), false);
assertEquals(config.getPulsarChannelWriteBufferHighWaterMark(), 60000);
assertEquals(config.getPulsarChannelWriteBufferLowWaterMark(), 120000);
assertEquals(config.isPulsarChannelPauseReceivingRequestsIfUnwritable(), true);
assertEquals(config.getPulsarChannelPauseReceivingCooldownMs(), 10_000);
assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPermits(), 100);
assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), 200);
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true

pulsarChannelWriteBufferHighWaterMark=60000
pulsarChannelWriteBufferLowWaterMark=120000
pulsarChannelPauseReceivingRequestsIfUnwritable=true
pulsarChannelPauseReceivingCooldownMs=10000
pulsarChannelPauseReceivingCooldownRateLimitPermits=100
pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200

### --- Transaction config variables --- ###
transactionLogBatchedWriteEnabled=true
transactionLogBatchedWriteMaxRecords=11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true

pulsarChannelWriteBufferHighWaterMark=60000
pulsarChannelWriteBufferLowWaterMark=120000
pulsarChannelPauseReceivingRequestsIfUnwritable=true
pulsarChannelPauseReceivingCooldownMs=10000
pulsarChannelPauseReceivingCooldownRateLimitPermits=100
pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200

topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
createTopicToRemoteClusterForReplication=true
Loading