diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java index c3759a533a571..b1d777dbbf287 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java @@ -28,6 +28,7 @@ public class ManagedLedgerAttributes { private final Attributes attributes; + private final Attributes attributesOnlyNamespace; private final Attributes attributesOperationSucceed; private final Attributes attributesOperationFailure; @@ -37,6 +38,9 @@ public ManagedLedgerAttributes(ManagedLedger ml) { OpenTelemetryAttributes.ML_NAME, mlName, OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName) ); + attributesOnlyNamespace = Attributes.of( + OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName) + ); attributesOperationSucceed = Attributes.builder() .putAll(attributes) .putAll(ManagedLedgerOperationStatus.SUCCESS.attributes) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 8a4f0fa3a3c92..a452c6682a53b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final MetadataStore metadataStore; private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats; + @Getter private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats; private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 634afccf6ac39..92e3052588af8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -87,6 +87,8 @@ public void refreshStats(long period, TimeUnit unit) { public void addAddEntrySample(long size) { addEntryOps.recordEvent(size); entryStats.addValue(size); + managedLedger.getFactory().getOpenTelemetryManagedLedgerStats() + .recordEntrySize(size, managedLedger); addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize()); } @@ -108,14 +110,20 @@ public void recordReadEntriesOpsCacheMisses(int count, long totalSize) { public void addAddEntryLatencySample(long latency, TimeUnit unit) { addEntryLatencyStatsUsec.addValue(unit.toMicros(latency)); + managedLedger.getFactory().getOpenTelemetryManagedLedgerStats() + .recordAddEntryLatency(latency, unit, managedLedger); } public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) { ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency)); + managedLedger.getFactory().getOpenTelemetryManagedLedgerStats() + .recordLedgerAddEntryLatency(latency, unit, managedLedger); } public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) { ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency)); + managedLedger.getFactory().getOpenTelemetryManagedLedgerStats() + .recordLedgerSwitchLatency(latency, unit, managedLedger); } public void addReadEntriesSample(int count, long totalSize) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java index 26c4b62cf7694..6e86532bdd864 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java @@ -20,12 +20,18 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.metrics.BatchCallback; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.opentelemetry.Constants; public class OpenTelemetryManagedLedgerStats implements AutoCloseable { + // ml-level metrics + // Replaces pulsar_ml_AddEntryMessagesRate public static final String ADD_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.count"; private final ObservableLongMeasurement addEntryCounter; @@ -62,6 +68,34 @@ public class OpenTelemetryManagedLedgerStats implements AutoCloseable { private final BatchCallback batchCallback; + // namespace-level metrics + + // Histograms support only synchronous mode, so record measurements directly. + // Synchronous histograms currently do not support delete operations. + // Therefore, use only namespace-level attributes to avoid leaking high-cardinality attributes (e.g. topic name). + // See: https://github.com/apache/pulsar/blob/master/pip/pip-264.md + + // Replaces ['pulsar_ml_AddEntryLatencyBuckets', 'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW', + // 'pulsar_storage_write_latency_*'] + public static final String ADD_ENTRY_LATENCY_HISTOGRAM = "pulsar.broker.managed_ledger.message.outgoing.latency"; + private final DoubleHistogram addEntryLatencyHistogram; + + // Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets', 'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW', + // 'pulsar_storage_ledger_write_latency_*'] + public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM = + "pulsar.broker.managed_ledger.message.outgoing.ledger.latency"; + private final DoubleHistogram ledgerAddEntryLatencyHistogram; + + // Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets', 'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW'] + public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM = + "pulsar.broker.managed_ledger.ledger.switch.latency"; + private final DoubleHistogram ledgerSwitchLatencyHistogram; + + // Replaces ['pulsar_ml_EntrySizeBuckets', 'pulsar_ml_EntrySizeBuckets_OVERFLOW', + // 'pulsar_entry_size_*'] + public static final String ENTRY_SIZE_HISTOGRAM = "pulsar.broker.managed_ledger.entry.size"; + private final LongHistogram entrySizeHistogram; + public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) { var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME); @@ -124,6 +158,39 @@ public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedge bytesInCounter, readEntryCacheMissCounter, markDeleteCounter); + + addEntryLatencyHistogram = meter + .histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM) + .setDescription("End-to-end write latency, including time spent in the executor queue.") + .setUnit("s") + .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1, + 0.2, 0.5, 1.0, 5.0, 30.0)) + .build(); + + ledgerAddEntryLatencyHistogram = meter + .histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM) + .setDescription("End-to end write latency.") + .setUnit("s") + .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1, + 0.2, 0.5, 1.0, 5.0, 30.0)) + .build(); + + ledgerSwitchLatencyHistogram = meter + .histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM) + .setDescription("Time taken to switch to a new ledger.") + .setUnit("s") + .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1, + 0.2, 0.5, 1.0, 5.0, 30.0)) + .build(); + + entrySizeHistogram = meter + .histogramBuilder(ENTRY_SIZE_HISTOGRAM) + .ofLongs() + .setDescription("Size of entries written to the ledger.") + .setUnit("By") + .setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L, 1024L, 2048L, 4096L, 16_384L, + 102_400L, 1_048_576L)) + .build(); } @Override @@ -151,4 +218,24 @@ private void recordMetrics(ManagedLedger ml) { markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes); readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), attributes); } + + void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) { + final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace(); + this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes); + } + + void recordLedgerAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) { + final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace(); + this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes); + } + + void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger ml) { + final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace(); + this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes); + } + + void recordEntrySize(long entrySize, ManagedLedger ml) { + final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace(); + this.entrySizeHistogram.record(entrySize, attributes); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java index 87e751fcc5952..394af363d99b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS; import static org.assertj.core.api.Assertions.assertThat; @@ -135,6 +136,9 @@ public void testManagedLedgerMetrics() throws Exception { OpenTelemetryAttributes.ML_NAME, mlName, OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace() ); + final var attribOnlyNamespace = Attributes.of( + OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace() + ); var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); Awaitility.await().untilAsserted(() -> { @@ -189,6 +193,16 @@ public void testManagedLedgerMetrics() throws Exception { value -> assertThat(value).isGreaterThanOrEqualTo(0)); assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER, attribCommon, value -> assertThat(value).isGreaterThanOrEqualTo(0)); + + assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ADD_ENTRY_LATENCY_HISTOGRAM, + attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L), + sum -> assertThat(sum).isGreaterThan(0.0)); + assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM, + attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L), + sum -> assertThat(sum).isGreaterThan(0.0)); + assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ENTRY_SIZE_HISTOGRAM, + attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L), + sum -> assertThat(sum).isGreaterThan(0.0)); }); }