From dfe6e1f57e69e7b00a59aa87242ace148686d29f Mon Sep 17 00:00:00 2001 From: "daowu.hzy" Date: Tue, 10 Dec 2024 10:54:37 +0800 Subject: [PATCH] [connector/flink] Report pendingRecords. this closes #138 --- .../client/metrics/ScannerMetricGroup.java | 45 ++++++++++ .../client/scanner/log/FlussLogScanner.java | 6 ++ .../fluss/client/scanner/log/LogFetcher.java | 3 + .../fluss/client/scanner/log/LogScanner.java | 5 ++ .../metrics/groups/AbstractMetricGroup.java | 4 + .../metrics/FlinkSourceReaderMetrics.java | 58 +++++++++++++ .../source/reader/FlinkSourceSplitReader.java | 1 + .../reader/FlinkSourceSplitReaderTest.java | 83 +++++++++++++++++++ .../metrics/group/BucketMetricGroup.java | 2 +- 9 files changed, 206 insertions(+), 1 deletion(-) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java index 999383eb..e8670d31 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java @@ -23,13 +23,17 @@ import com.alibaba.fluss.metrics.CharacterFilter; import com.alibaba.fluss.metrics.Counter; import com.alibaba.fluss.metrics.DescriptiveStatisticsHistogram; +import com.alibaba.fluss.metrics.Gauge; import com.alibaba.fluss.metrics.Histogram; import com.alibaba.fluss.metrics.MeterView; +import com.alibaba.fluss.metrics.Metric; import com.alibaba.fluss.metrics.MetricNames; import com.alibaba.fluss.metrics.ThreadSafeSimpleCounter; import com.alibaba.fluss.metrics.groups.AbstractMetricGroup; import com.alibaba.fluss.rpc.metrics.ClientMetricGroup; +import com.alibaba.fluss.server.metrics.group.BucketMetricGroup; +import java.util.HashMap; import java.util.Map; import static com.alibaba.fluss.metrics.utils.MetricGroupUtils.makeScope; @@ -55,6 +59,7 @@ public class ScannerMetricGroup extends AbstractMetricGroup { private volatile double pollIdleRatio; private volatile long lastPollMs; private volatile long pollStartMs; + private final Map buckets = new HashMap<>(); public ScannerMetricGroup(ClientMetricGroup parent, TablePath tablePath) { super(parent.getMetricRegistry(), makeScope(parent, name), parent); @@ -120,6 +125,18 @@ private long lastPollSecondsAgo() { return (System.currentTimeMillis() - lastPollMs) / 1000; } + public void recordBucketLag(int bucketId, long lag) { + buckets.computeIfAbsent( + bucketId, (bucket) -> new BucketMetricGroup(registry, bucketId, this)); + Metric metric = buckets.get(bucketId).metrics().get(bucketRecordsLagMetricName(bucketId)); + if (metric == null) { + SimpleGauge simpleGauge = new SimpleGauge(lag); + buckets.get(bucketId).gauge(bucketRecordsLagMetricName(bucketId), simpleGauge); + } else { + ((SimpleGauge) metric).setValue(lag); + } + } + @Override protected String getGroupName(CharacterFilter filter) { return name; @@ -130,4 +147,32 @@ protected final void putVariables(Map variables) { variables.put("database", tablePath.getDatabaseName()); variables.put("table", tablePath.getTableName()); } + + private static String bucketRecordsLagMetricName(int bucketId) { + return bucketId + ".records-lag"; + } + + @Override + public Map metrics() { + Map allMetric = new HashMap<>(super.metrics()); + buckets.forEach((key, value) -> allMetric.putAll(value.metrics())); + return allMetric; + } + + private static class SimpleGauge implements Gauge { + private long value; + + public SimpleGauge(long value) { + this.value = value; + } + + @Override + public Long getValue() { + return value; + } + + public void setValue(long value) { + this.value = value; + } + } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java index 70fa10a8..32862a7d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/FlussLogScanner.java @@ -26,6 +26,7 @@ import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.metrics.Metric; import com.alibaba.fluss.rpc.RpcClient; import com.alibaba.fluss.rpc.metrics.ClientMetricGroup; import com.alibaba.fluss.types.RowType; @@ -304,4 +305,9 @@ public void close() { release(); } } + + @Override + public Map metrics() { + return Collections.unmodifiableMap(scannerMetricGroup.metrics()); + } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java index f047572b..0ef225f7 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java @@ -314,6 +314,9 @@ private synchronized void handleFetchLogResponse( projection); logFetchBuffer.add(completedFetch); } + scannerMetricGroup.recordBucketLag( + tb.getBucket(), + fetchResultForBucket.getHighWatermark() - fetchOffset); } } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogScanner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogScanner.java index 7d760817..b434f5f0 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogScanner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogScanner.java @@ -17,8 +17,10 @@ package com.alibaba.fluss.client.scanner.log; import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.metrics.Metric; import java.time.Duration; +import java.util.Map; /** * The scanner is used to scan log data of specify table from Fluss. @@ -120,4 +122,7 @@ default void subscribeFromBeginning(long partitionId, int bucket) { * #poll(Duration timeout)}. */ void wakeup(); + + /** Get the metrics of the log scanner. */ + public Map metrics(); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metrics/groups/AbstractMetricGroup.java b/fluss-common/src/main/java/com/alibaba/fluss/metrics/groups/AbstractMetricGroup.java index 3a14a781..59c23f91 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metrics/groups/AbstractMetricGroup.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metrics/groups/AbstractMetricGroup.java @@ -371,4 +371,8 @@ protected enum ChildType { VALUE, GENERIC } + + public Map metrics() { + return this.metrics; + } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/metrics/FlinkSourceReaderMetrics.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/metrics/FlinkSourceReaderMetrics.java index 42fb72d5..3d4c07c9 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/metrics/FlinkSourceReaderMetrics.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/metrics/FlinkSourceReaderMetrics.java @@ -18,6 +18,8 @@ import com.alibaba.fluss.connector.flink.source.reader.FlinkSourceReader; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metrics.Gauge; +import com.alibaba.fluss.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.SourceReaderMetricGroup; @@ -25,8 +27,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; /** * A collection class for handling metrics in {@link FlinkSourceReader} of Fluss. @@ -64,9 +70,14 @@ public class FlinkSourceReaderMetrics { // Map for tracking current consuming offsets private final Map offsets = new HashMap<>(); + // Map for tracking records lag of tableBucket + @Nullable private Map recordsLagMetrics; + // For currentFetchEventTimeLag metric private volatile long currentFetchEventTimeLag = UNINITIALIZED; + public static final String RECORDS_LAG = ".records-lag"; + public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) { this.sourceReaderMetricGroup = sourceReaderMetricGroup; this.flussSourceReaderMetricGroup = @@ -125,4 +136,51 @@ private void checkTableBucketTracked(TableBucket tableBucket) { public SourceReaderMetricGroup getSourceReaderMetricGroup() { return sourceReaderMetricGroup; } + + public void maybeAddRecordsLagMetric(Map metrics, TableBucket tb) { + // Lazily register pendingRecords + if (recordsLagMetrics == null) { + this.recordsLagMetrics = new ConcurrentHashMap<>(); + this.sourceReaderMetricGroup.setPendingRecordsGauge( + () -> { + long pendingRecordsTotal = 0; + for (Metric recordsLagMetric : this.recordsLagMetrics.values()) { + pendingRecordsTotal += + Long.parseLong( + ((Gauge) recordsLagMetric).getValue().toString()); + } + return pendingRecordsTotal; + }); + } + recordsLagMetrics.computeIfAbsent(tb, (ignored) -> getRecordsLagMetric(metrics, tb)); + } + + private @Nullable Metric getRecordsLagMetric( + Map metrics, TableBucket tb) { + try { + int bucket = tb.getBucket(); + Predicate> filter = + entry -> { + final String metricName = entry.getKey(); + return metricName.equals(bucket + RECORDS_LAG); + }; + return metrics.entrySet().stream() + .filter(filter) + .map(Map.Entry::getValue) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Cannot find fluss metric matching current filter.")); + } catch (IllegalStateException e) { + LOG.warn( + String.format( + "Error when getting fluss log scanner metric \"%s\" " + + "for bucket \"%s\". " + + "Metric \"%s\" may not be reported correctly. ", + RECORDS_LAG, tb, MetricNames.PENDING_RECORDS), + e); + return null; + } + } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java index 96798352..276b08f9 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReader.java @@ -364,6 +364,7 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) { } } splitRecords.put(splitId, toRecordAndPos(bucketScanRecords.iterator())); + flinkSourceReaderMetrics.maybeAddRecordsLagMetric(logScanner.metrics(), scanBucket); } Iterator buckets = tableScanBuckets.iterator(); Iterator splitIterator = diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java index 355ea143..84a433d8 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -43,6 +43,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; import org.apache.flink.table.api.ValidationException; import org.junit.jupiter.api.Test; @@ -207,6 +208,80 @@ void testHandleLogSplitChangesAndFetch() throws Exception { } } + @Test + void testPendingRecords() throws Exception { + final Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .build(); + final TableDescriptor tableDescriptor = + TableDescriptor.builder().schema(schema).distributedBy(1).build(); + TablePath tablePath1 = TablePath.of(DEFAULT_DB, "test-only-log-table"); + long tableId = createTable(tablePath1, tableDescriptor); + MetricListener metricListener = new MetricListener(); + FlinkSourceReaderMetrics flinkSourceReaderMetrics = + new FlinkSourceReaderMetrics( + InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); + + try (FlinkSourceSplitReader splitReader = + createSplitReader(tablePath1, schema.toRowType(), flinkSourceReaderMetrics)) { + + // no any records + List logSplits = new ArrayList<>(); + Map> expectedRecords = new HashMap<>(); + assignSplitsAndFetchUntilRetrieveRecords( + splitReader, logSplits, expectedRecords, schema.toRowType()); + + assertThat(metricListener.getGauge(MetricNames.PENDING_RECORDS)).isNotPresent(); + + int rowCnt = 600; + // now, write some records into the table + List internalRows = appendRows(tablePath1, rowCnt); + List expected = new ArrayList<>(internalRows.size()); + for (int i = 0; i < internalRows.size(); i++) { + expected.add( + new RecordAndPos( + new ScanRecord(i, i, RowKind.APPEND_ONLY, internalRows.get(i)))); + } + + TableBucket tableBucket = new TableBucket(tableId, 0); + String splitId = toLogSplitId(tableBucket); + expectedRecords.put(splitId, expected); + + logSplits.add(new LogSplit(tableBucket, null, 0L)); + assignSplits(splitReader, logSplits); + Set finishedSplits = new HashSet<>(); + + int cnt = 0; + while (finishedSplits.size() < logSplits.size()) { + + RecordsWithSplitIds recordsBySplitIds = splitReader.fetch(); + splitId = recordsBySplitIds.nextSplit(); + RecordAndPos record; + if (splitId != null) { + while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) { + cnt++; + } + // because lazy update this metric,client fetch all record in one batch + assertThat( + metricListener + .getGauge(MetricNames.PENDING_RECORDS) + .get() + .getValue()) + .isEqualTo(600L); + if (cnt >= rowCnt) { + finishedSplits.add(splitId); + } + } + } + RecordsWithSplitIds recordsBySplitIds = splitReader.fetch(); + // client update metric + assertThat(metricListener.getGauge(MetricNames.PENDING_RECORDS).get().getValue()) + .isEqualTo(0L); + } + } + @Test void testHandleMixSnapshotLogSplitChangesAndFetch() throws Exception { TablePath tablePath = TablePath.of(DEFAULT_DB, "test-mix-snapshot-log-table"); @@ -343,6 +418,14 @@ private FlinkSourceSplitReader createSplitReader(TablePath tablePath, RowType ro clientConf, tablePath, rowType, null, createMockSourceReaderMetrics()); } + private FlinkSourceSplitReader createSplitReader( + TablePath tablePath, + RowType rowType, + FlinkSourceReaderMetrics flinkSourceReaderMetrics) { + return new FlinkSourceSplitReader( + clientConf, tablePath, rowType, null, flinkSourceReaderMetrics); + } + private FlinkSourceReaderMetrics createMockSourceReaderMetrics() { MetricListener metricListener = new MetricListener(); return new FlinkSourceReaderMetrics( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/BucketMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/BucketMetricGroup.java index 58988d63..c5c73028 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/BucketMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/BucketMetricGroup.java @@ -29,7 +29,7 @@ public class BucketMetricGroup extends AbstractMetricGroup { private final int bucket; - public BucketMetricGroup(MetricRegistry registry, int bucket, PhysicalTableMetricGroup parent) { + public BucketMetricGroup(MetricRegistry registry, int bucket, AbstractMetricGroup parent) { super(registry, makeScope(parent, String.valueOf(bucket)), parent); this.bucket = bucket; }