diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f9e593345d85f..2fc76dbf49fba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -60,11 +60,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import io.prometheus.client.Gauge; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -250,6 +252,26 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final long connectionLivenessCheckTimeoutMillis; + private static final LongAdder totalPendingBytes = new LongAdder(); + private static volatile Gauge pendingPublishBufferUsage; + + private static void initializePendingPublishBufferUsageGauge(PulsarService pulsar) { + if (pendingPublishBufferUsage == null) { + pendingPublishBufferUsage = Gauge.build() + .name("pulsar_pending_publish_buffer_usage") + .help("The usage of pending publish buffer") + .unit("bytes") + .labelNames("cluster") + .register() + .setChild(new Gauge.Child() { + @Override + public double get() { + return totalPendingBytes.longValue(); + } + }, pulsar.getConfiguration().getClusterName()); + } + } + // Tracks and limits number of bytes pending to be published from a single specific IO thread. static final class PendingBytesPerThreadTracker { private static final FastThreadLocal pendingBytesPerThread = @@ -269,6 +291,7 @@ public static PendingBytesPerThreadTracker getInstance() { public void incrementPublishBytes(long bytes, long maxPendingBytesPerThread) { pendingBytes += bytes; + totalPendingBytes.add(bytes); // when the limit is exceeded we throttle all connections that are sharing the same thread if (maxPendingBytesPerThread > 0 && pendingBytes > maxPendingBytesPerThread && !limitExceeded) { @@ -279,6 +302,7 @@ public void incrementPublishBytes(long bytes, long maxPendingBytesPerThread) { public void decrementPublishBytes(long bytes, long resumeThresholdPendingBytesPerThread) { pendingBytes -= bytes; + totalPendingBytes.add(-bytes); // when the limit has been exceeded, and we are below the resume threshold // we resume all connections sharing the same thread if (limitExceeded && pendingBytes <= resumeThresholdPendingBytesPerThread) { @@ -312,6 +336,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) { // the null check is a workaround for #13620 super(pulsar.getBrokerService() != null ? pulsar.getBrokerService().getKeepAliveIntervalSeconds() : 0, TimeUnit.SECONDS); + initializePendingPublishBufferUsageGauge(pulsar); this.service = pulsar.getBrokerService(); this.schemaService = pulsar.getSchemaRegistryService(); this.listenerName = listenerName; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index fa073d3694b26..63647833540f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1969,6 +1969,26 @@ public void testSplitTopicAndPartitionLabel() throws Exception { consumer2.close(); } + @Test + public void testPendingPublishBufferUsageMetric() throws Exception { + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic("persistent://my-property/use/my-ns/testPendingPublishBufferUsageMetric") + .create(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + List cm = (List) metrics.get("pulsar_pending_publish_buffer_usage_bytes"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), pulsar.getConfiguration().getClusterName()); + assertTrue(cm.get(0).value >= 0); + } + @Test public void testMetricsGroupedByTypeDefinitions() throws Exception { Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();