Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public class ManagedLedgerAttributes {

private final Attributes attributes;
private final Attributes attributesOnlyNamespace;
private final Attributes attributesOperationSucceed;
private final Attributes attributesOperationFailure;

Expand All @@ -37,6 +38,9 @@ public ManagedLedgerAttributes(ManagedLedger ml) {
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
);
attributesOnlyNamespace = Attributes.of(
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
);
attributesOperationSucceed = Attributes.builder()
.putAll(attributes)
.putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetadataStore metadataStore;

private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
@Getter
private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats;
private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void refreshStats(long period, TimeUnit unit) {
public void addAddEntrySample(long size) {
addEntryOps.recordEvent(size);
entryStats.addValue(size);
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordEntrySize(size, managedLedger);
addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize());
}

Expand All @@ -108,14 +110,20 @@ public void recordReadEntriesOpsCacheMisses(int count, long totalSize) {

public void addAddEntryLatencySample(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordAddEntryLatency(latency, unit, managedLedger);
}

public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordLedgerAddEntryLatency(latency, unit, managedLedger);
}

public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordLedgerSwitchLatency(latency, unit, managedLedger);
}

public void addReadEntriesSample(int count, long totalSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.opentelemetry.Constants;

public class OpenTelemetryManagedLedgerStats implements AutoCloseable {

// ml-level metrics

// Replaces pulsar_ml_AddEntryMessagesRate
public static final String ADD_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.count";
private final ObservableLongMeasurement addEntryCounter;
Expand Down Expand Up @@ -62,6 +68,34 @@ public class OpenTelemetryManagedLedgerStats implements AutoCloseable {

private final BatchCallback batchCallback;

// namespace-level metrics

// Histograms support only synchronous mode, so record measurements directly.
// Synchronous histograms currently do not support delete operations.
// Therefore, use only namespace-level attributes to avoid leaking high-cardinality attributes (e.g. topic name).
// See: https://github.com/apache/pulsar/blob/master/pip/pip-264.md

// Replaces ['pulsar_ml_AddEntryLatencyBuckets', 'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW',
// 'pulsar_storage_write_latency_*']
public static final String ADD_ENTRY_LATENCY_HISTOGRAM = "pulsar.broker.managed_ledger.message.outgoing.latency";
private final DoubleHistogram addEntryLatencyHistogram;

// Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets', 'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW',
// 'pulsar_storage_ledger_write_latency_*']
public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM =
"pulsar.broker.managed_ledger.message.outgoing.ledger.latency";
private final DoubleHistogram ledgerAddEntryLatencyHistogram;

// Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets', 'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW']
public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM =
"pulsar.broker.managed_ledger.ledger.switch.latency";
private final DoubleHistogram ledgerSwitchLatencyHistogram;

// Replaces ['pulsar_ml_EntrySizeBuckets', 'pulsar_ml_EntrySizeBuckets_OVERFLOW',
// 'pulsar_entry_size_*']
public static final String ENTRY_SIZE_HISTOGRAM = "pulsar.broker.managed_ledger.entry.size";
private final LongHistogram entrySizeHistogram;

public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);

Expand Down Expand Up @@ -124,6 +158,39 @@ public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedge
bytesInCounter,
readEntryCacheMissCounter,
markDeleteCounter);

addEntryLatencyHistogram = meter
.histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM)
.setDescription("End-to-end write latency, including time spent in the executor queue.")
.setUnit("s")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();

ledgerAddEntryLatencyHistogram = meter
.histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM)
.setDescription("End-to end write latency.")
.setUnit("s")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();

ledgerSwitchLatencyHistogram = meter
.histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM)
.setDescription("Time taken to switch to a new ledger.")
.setUnit("s")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();

entrySizeHistogram = meter
.histogramBuilder(ENTRY_SIZE_HISTOGRAM)
.ofLongs()
.setDescription("Size of entries written to the ledger.")
.setUnit("By")
.setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L, 1024L, 2048L, 4096L, 16_384L,
102_400L, 1_048_576L))
.build();
}

@Override
Expand Down Expand Up @@ -151,4 +218,24 @@ private void recordMetrics(ManagedLedger ml) {
markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), attributes);
}

void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
}

void recordLedgerAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
}

void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
}

void recordEntrySize(long entrySize, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.entrySizeHistogram.record(entrySize, attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -135,6 +136,9 @@ public void testManagedLedgerMetrics() throws Exception {
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
);
final var attribOnlyNamespace = Attributes.of(
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
);
var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();

Awaitility.await().untilAsserted(() -> {
Expand Down Expand Up @@ -189,6 +193,16 @@ public void testManagedLedgerMetrics() throws Exception {
value -> assertThat(value).isGreaterThanOrEqualTo(0));
assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
attribCommon, value -> assertThat(value).isGreaterThanOrEqualTo(0));

assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ADD_ENTRY_LATENCY_HISTOGRAM,
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
sum -> assertThat(sum).isGreaterThan(0.0));
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM,
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
sum -> assertThat(sum).isGreaterThan(0.0));
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ENTRY_SIZE_HISTOGRAM,
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
sum -> assertThat(sum).isGreaterThan(0.0));
});
}

Expand Down
Loading