From b06687b08fe12dfb30f9c7618f23bb8b67256dec Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 29 Sep 2025 15:21:18 +0800 Subject: [PATCH 1/3] [improve][broker]add configurations for PIP-434 --- conf/broker.conf | 31 +++++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 6 ++-- .../pulsar/broker/service/ServerCnx.java | 2 +- .../pulsar/broker/service/StandaloneTest.java | 7 +++++ .../naming/ServiceConfigurationTest.java | 7 ++++- .../configurations/pulsar_broker_test.conf | 7 +++++ .../pulsar_broker_test_standalone.conf | 8 +++++ 7 files changed, 63 insertions(+), 5 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index a1f59bd3eed00..d98b20bd5a4a0 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -330,6 +330,37 @@ maxTopicsPerNamespace=0 # The maximum number of connections in the broker. If it exceeds, new connections are rejected. brokerMaxConnections=0 +# It relates to configuration "WriteBufferHighWaterMark" of Netty Channel Config. If the number of bytes +# queued in the write buffer exceeds this value, channel writable state will start to return "false". +pulsarChannelWriteBufferHighWaterMark=65536 + +# It relates to configuration "WriteBufferLowWaterMark" of Netty Channel Config. If the number of bytes" +# queued in the write buffer is smaller than this value, channel writable state will start to return "true". +pulsarChannelWriteBufferLowWaterMark=131072 + +# If enabled, the broker will pause reading from the channel to deal with new request once the writer +# buffer is full, until it is changed to writable. +pulsarChannelPauseReceivingRequestsIfUnwritable=false + +# After the connection is recovered from a pause receiving state, the channel will be rate-limited +# for a time window to avoid overwhelming due to the backlog of requests. This parameter defines +# how long the rate limiting should last, in millis. Once the bytes that are waiting to be sent out +# reach the "pulsarChannelWriteBufferHighWaterMark"? the timer will be reset. Setting a negative +# value will disable the rate limiting. +pulsarChannelPauseReceivingCooldownMs=5000 + +# After the connection is recovered from a pause receiving state, the channel will be rate-limited for a +# period of time to avoid overwhelming due to the backlog of requests. This parameter defines how +# many requests should be allowed in the rate limiting period. +pulsarChannelPauseReceivingCooldownRateLimitPermits=5 + +# After the connection is recovered from a pause receiving state, the channel will be rate-limited for a +# period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the +# backlog of requests. This parameter defines the period of the rate limiter in milliseconds. If the rate +# limit period is set to 1000, then the unit is requests per 1000 milliseconds. When it's 10, the unit +# is requests per every 10ms. +pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=10 + # The maximum number of connections per IP. If it exceeds, new connections are rejected. brokerMaxConnectionsPerIp=0 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a59cf07075ab8..5ca0db944a417 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -951,8 +951,8 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_POLICIES, - doc = "After the connection is recovered from an pause receiving state, the channel will be rate-limited" - + " for a of time window to avoid overwhelming due to the backlog of requests. This parameter defines" + doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited" + + " for a time window to avoid overwhelming due to the backlog of requests. This parameter defines" + " how long the rate limiting should last, in millis. Once the bytes that are waiting to be sent out" + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the timer will be reset. Setting a negative" + " value will disable the rate limiting." @@ -973,7 +973,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "After the connection is recovered from a pause receiving state, the channel will be rate-limited for a" + " period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the" + " backlog of requests. This parameter defines the period of the rate limiter in milliseconds. If the rate" - + " limit period is set to 1000, then the unit is requests per 1000 milli seconds. When it's 10, the unit" + + " limit period is set to 1000, then the unit is requests per 1000 milliseconds. When it's 10, the unit" + " is requests per every 10ms." ) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 011b54c0b0a59..4d21b2810cd3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -489,7 +489,7 @@ private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand cmd) { ctx.channel().config().setAutoRead(true); pausedDueToRateLimitation = false; } - }, 1, TimeUnit.SECONDS); + }, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index dc35e2d382d0a..4b1d7d2a088f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -22,6 +22,7 @@ import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; import org.apache.pulsar.PulsarStandaloneStarter; +import org.testng.Assert; import org.testng.annotations.Test; @Test(groups = "broker") @@ -65,5 +66,11 @@ public void testInitialize() throws Exception { assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1); assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 200); assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(), true); + assertEquals(standalone.getConfig().getPulsarChannelWriteBufferHighWaterMark(), 60000); + assertEquals(standalone.getConfig().getPulsarChannelWriteBufferLowWaterMark(), 120000); + assertEquals(standalone.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable(), true); + assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownMs(), 10_000); + assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(), 100); + assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), 200); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 1802bd6f59c8b..ed551c8c1bb20 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -78,7 +78,12 @@ public void testInit() throws Exception { assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(), true); assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1); assertEquals(config.getTopicNameCacheMaxCapacity(), 200); - assertEquals(config.isCreateTopicToRemoteClusterForReplication(), false); + assertEquals(config.getPulsarChannelWriteBufferHighWaterMark(), 60000); + assertEquals(config.getPulsarChannelWriteBufferLowWaterMark(), 120000); + assertEquals(config.isPulsarChannelPauseReceivingRequestsIfUnwritable(), true); + assertEquals(config.getPulsarChannelPauseReceivingCooldownMs(), 10_000); + assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPermits(), 100); + assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(), 200); OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(config.getProperties()); assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(), "bookkeeper-first"); } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 0fdb29e06866f..5ce477550e529 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -95,6 +95,13 @@ defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true +pulsarChannelWriteBufferHighWaterMark=60000 +pulsarChannelWriteBufferLowWaterMark=120000 +pulsarChannelPauseReceivingRequestsIfUnwritable=true +pulsarChannelPauseReceivingCooldownMs=10000 +pulsarChannelPauseReceivingCooldownRateLimitPermits=100 +pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200 + ### --- Transaction config variables --- ### transactionLogBatchedWriteEnabled=true transactionLogBatchedWriteMaxRecords=11 diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf index d3f9430f29b48..0c393b1db62bb 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf @@ -95,6 +95,14 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide maxMessagePublishBufferSizeInMB=-1 dispatcherPauseOnAckStatePersistentEnabled=true + +pulsarChannelWriteBufferHighWaterMark=60000 +pulsarChannelWriteBufferLowWaterMark=120000 +pulsarChannelPauseReceivingRequestsIfUnwritable=true +pulsarChannelPauseReceivingCooldownMs=10000 +pulsarChannelPauseReceivingCooldownRateLimitPermits=100 +pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200 + topicNameCacheMaxCapacity=200 maxSecondsToClearTopicNameCache=1 createTopicToRemoteClusterForReplication=true From 3a3277d6bd59f2289e3216742ebd9d60dcad5b1d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 29 Sep 2025 15:50:49 +0800 Subject: [PATCH 2/3] checkstyle --- .../java/org/apache/pulsar/broker/service/StandaloneTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java index 4b1d7d2a088f6..9ab972e6be519 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java @@ -22,7 +22,6 @@ import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; import org.apache.pulsar.PulsarStandaloneStarter; -import org.testng.Assert; import org.testng.annotations.Test; @Test(groups = "broker") From 43a84ed5366fcefafdc443161170e03624032538 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 30 Sep 2025 00:17:14 +0800 Subject: [PATCH 3/3] correct the default value --- conf/broker.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index d98b20bd5a4a0..8fd0e18af3696 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -336,7 +336,7 @@ pulsarChannelWriteBufferHighWaterMark=65536 # It relates to configuration "WriteBufferLowWaterMark" of Netty Channel Config. If the number of bytes" # queued in the write buffer is smaller than this value, channel writable state will start to return "true". -pulsarChannelWriteBufferLowWaterMark=131072 +pulsarChannelWriteBufferLowWaterMark=32768 # If enabled, the broker will pause reading from the channel to deal with new request once the writer # buffer is full, until it is changed to writable.