From 2c4ef5dac5c985d778a116a7d63b23c3bd314a67 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 30 Dec 2024 11:38:24 +0800 Subject: [PATCH] address jark's comments --- .../client/metadata/MetadataUpdater.java | 23 ++++++- .../fluss/client/scanner/log/LogFetcher.java | 13 ++-- .../fluss/client/utils/MetadataUtils.java | 12 ++-- .../alibaba/fluss/config/ConfigOptions.java | 69 ++++++++++++++----- .../alibaba/fluss/metrics/MetricNames.java | 14 ++-- .../fluss/server/log/LogOffsetMetadata.java | 4 ++ .../fluss/server/log/LogOffsetSnapshot.java | 4 +- .../group/TabletServerMetricGroup.java | 26 +++---- .../alibaba/fluss/server/replica/Replica.java | 10 ++- .../fluss/server/replica/ReplicaManager.java | 48 ++++++------- .../server/replica/delay/DelayedFetchLog.java | 12 ++-- .../replica/fetcher/RemoteLeaderEndpoint.java | 2 +- .../fluss/server/replica/AdjustIsrTest.java | 6 +- .../server/replica/ReplicaManagerTest.java | 2 +- .../fluss/server/replica/ReplicaTest.java | 6 +- .../replica/delay/DelayedFetchLogTest.java | 38 +++++----- .../replica/fetcher/ReplicaFetcherITCase.java | 29 ++++---- .../fetcher/TestingLeaderEndpoint.java | 2 +- .../server/tablet/TabletServiceITCase.java | 13 ++-- website/docs/maintenance/configuration.md | 12 ++-- website/docs/maintenance/monitor-metrics.md | 16 ++--- 21 files changed, 226 insertions(+), 135 deletions(-) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java index 6fb12669c..82d7bd407 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static com.alibaba.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster; @@ -59,6 +60,8 @@ public class MetadataUpdater { private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class); + private static final int MAX_RETRY_TIMES = 5; + private final RpcClient rpcClient; protected volatile Cluster cluster; @@ -119,8 +122,24 @@ public TableDescriptor getTableDescriptorOrElseThrow(long tableId) { public int leaderFor(TableBucket tableBucket) { ServerNode serverNode = cluster.leaderFor(tableBucket); if (serverNode == null) { - throw new FlussRuntimeException("Leader not found for table bucket: " + tableBucket); + for (int i = 0; i < MAX_RETRY_TIMES; i++) { + TablePath tablePath = cluster.getTablePathOrElseThrow(tableBucket.getTableId()); + updateMetadata(Collections.singleton(tablePath), null, null); + serverNode = cluster.leaderFor(tableBucket); + if (serverNode != null) { + break; + } + } + + if (serverNode == null) { + throw new FlussRuntimeException( + "Leader not found after retry " + + MAX_RETRY_TIMES + + " times for table bucket: " + + tableBucket); + } } + return serverNode.id(); } @@ -239,7 +258,7 @@ private void updateMetadata( } } catch (Exception e) { Throwable t = ExceptionUtils.stripExecutionException(e); - if (t instanceof RetriableException) { + if (t instanceof RetriableException || t instanceof TimeoutException) { LOG.warn("Failed to update metadata, but the exception is re-triable.", t); } else { throw new FlussRuntimeException("Failed to update metadata", t); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java index f8b46d737..665ff091c 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java @@ -125,11 +125,16 @@ public LogFetcher( this.projection = projection; this.rpcClient = rpcClient; this.logScannerStatus = logScannerStatus; - this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes(); + this.maxFetchBytes = + (int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes(); this.maxBucketFetchBytes = - (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes(); - this.minFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MIN_BYTES).getBytes(); - this.maxFetchWaitMs = (int) conf.get(ConfigOptions.LOG_FETCH_WAIT_MAX_TIME).toMillis(); + (int) + conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET) + .getBytes(); + this.minFetchBytes = + (int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MIN_BYTES).getBytes(); + this.maxFetchWaitMs = + (int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME).toMillis(); this.isCheckCrcs = conf.getBoolean(ConfigOptions.CLIENT_SCANNER_LOG_CHECK_CRC); this.logFetchBuffer = new LogFetchBuffer(); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java index 8a51faf9b..b4d011980 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java @@ -47,6 +47,8 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** Utils for metadata for client. */ public class MetadataUtils { @@ -60,7 +62,7 @@ public class MetadataUtils { */ public static Cluster sendMetadataRequestAndRebuildCluster( AdminReadOnlyGateway gateway, Set tablePaths) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException, TimeoutException { return sendMetadataRequestAndRebuildCluster(gateway, false, null, tablePaths, null, null); } @@ -76,7 +78,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster( @Nullable Set tablePaths, @Nullable Collection tablePartitionNames, @Nullable Collection tablePartitionIds) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException, TimeoutException { AdminReadOnlyGateway gateway = GatewayClientProxy.createGatewayProxy( () -> getOneAvailableTabletServerNode(cluster), @@ -94,7 +96,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster( @Nullable Set tablePaths, @Nullable Collection tablePartitions, @Nullable Collection tablePartitionIds) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException, TimeoutException { MetadataRequest metadataRequest = ClientRpcMessageUtils.makeMetadataRequest( tablePaths, tablePartitions, tablePartitionIds); @@ -151,7 +153,9 @@ public static Cluster sendMetadataRequestAndRebuildCluster( newPartitionIdByPath, newTablePathToTableInfo); }) - .get(); + .get(5, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in + // RpcClient, it will let the get() block forever. So we + // time out here } private static NewTableMetadata getTableMetadataToUpdate( diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index d8b1f30b1..a9f3b6723 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -410,35 +410,34 @@ public class ConfigOptions { .defaultValue(Duration.ofSeconds(1)) .withDescription("The amount of time to sleep when fetch bucket error occurs."); - public static final ConfigOption LOG_FETCH_MAX_BYTES = - key("log.fetch.max-bytes") + public static final ConfigOption LOG_REPLICA_FETCH_MAX_BYTES = + key("log.replica.fetch-max-bytes") .memoryType() .defaultValue(MemorySize.parse("16mb")) .withDescription( - "The maximum amount of data the server should return for a fetch request. " - + "Records are fetched in batches for log scanner or follower, for one request batch, " - + "and if the first record batch in the first non-empty bucket of the fetch is " - + "larger than this value, the record batch will still be returned to ensure that " - + "the fetch can make progress. As such, this is not a absolute maximum. Note that " - + "the fetcher performs multiple fetches in parallel."); + "The maximum amount of data the server should return for a fetch request from follower. " + + "Records are fetched in batches, and if the first record batch in the first " + + "non-empty bucket of the fetch is larger than this value, the record batch " + + "will still be returned to ensure that the fetch can make progress. As such, " + + "this is not a absolute maximum. Note that the fetcher performs multiple fetches " + + "in parallel."); public static final ConfigOption LOG_FETCH_MAX_BYTES_FOR_BUCKET = key("log.fetch.max-bytes-for-bucket") .memoryType() .defaultValue(MemorySize.parse("1mb")) .withDescription( - "The maximum amount of data the server should return for a table bucket in fetch request. " - + "Records are fetched in batches for consumer or follower, for one request batch, " - + "the max bytes size is config by this option."); + "The maximum amount of data the server should return for a table bucket in fetch request " + + "from follower. Records are fetched in batches, the max bytes size is " + + "config by this option."); public static final ConfigOption LOG_FETCH_WAIT_MAX_TIME = key("log.fetch.wait-max-time") .durationType() .defaultValue(Duration.ofMillis(500)) .withDescription( - "The maximum time to wait for enough bytes to be available for a fetch log response " - + "(including fetch log request from the follower or client). " - + "This value should always be less than the " + "The maximum time to wait for enough bytes to be available for a fetch log request " + + "from follower to response. This value should always be less than the " + "'log.replica.max-lag-time' at all times to prevent frequent shrinking of ISR for " + "low throughput tables"); @@ -447,8 +446,8 @@ public class ConfigOptions { .memoryType() .defaultValue(MemorySize.parse("1b")) .withDescription( - "The minimum bytes expected for each fetch log response (including fetch " - + "log request from the follower or client). If not enough bytes, wait up to " + "The minimum bytes expected for each fetch log request from follower to response. " + + "If not enough bytes, wait up to " + LOG_FETCH_WAIT_MAX_TIME.key() + " time to return."); @@ -743,6 +742,44 @@ public class ConfigOptions { + "The Scanner will cache the records from each fetch request and returns " + "them incrementally from each poll."); + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MAX_BYTES = + key("client.scanner.log.fetch-max-bytes") + .memoryType() + .defaultValue(MemorySize.parse("16mb")) + .withDescription( + "The maximum amount of data the server should return for a fetch request from client. " + + "Records are fetched in batches, and if the first record batch in the first " + + "non-empty bucket of the fetch is larger than this value, the record batch " + + "will still be returned to ensure that the fetch can make progress. As such, " + + "this is not a absolute maximum."); + + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET = + key("client.scanner.log.fetch-max-bytes-for-bucket") + .memoryType() + .defaultValue(MemorySize.parse("1mb")) + .withDescription( + "The maximum amount of data the server should return for a table bucket in fetch request " + + "from client. Records are fetched in batches, the max bytes size is config by " + + "this option."); + + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME = + key("client.scanner.log.fetch-wait-max-time") + .durationType() + .defaultValue(Duration.ofMillis(500)) + .withDescription( + "The maximum time to wait for enough bytes to be available for a fetch log " + + "request from client to response."); + + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MIN_BYTES = + key("client.scanner.log.fetch-min-bytes") + .memoryType() + .defaultValue(MemorySize.parse("1b")) + .withDescription( + "The minimum bytes expected for each fetch log request from client to response. " + + "If not enough bytes, wait up to " + + CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME.key() + + " time to return."); + public static final ConfigOption CLIENT_LOOKUP_QUEUE_SIZE = key("client.lookup.queue-size") .intType() diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java index b3eae480d..c94ab2f20 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java @@ -48,13 +48,13 @@ public class MetricNames { public static final String REPLICA_LEADER_COUNT = "leaderCount"; public static final String REPLICA_COUNT = "replicaCount"; public static final String WRITE_ID_COUNT = "writerIdCount"; - public static final String DELAYED_WRITE_SIZE = "delayedWriteSize"; - public static final String DELAYED_WRITE_EXPIRATION_RATE = "delayedWriteExpirationPerSecond"; - public static final String DELAYED_FETCH_LOG_SIZE = "delayedFetchLogSize"; - public static final String DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE = - "delayedFetchLogFromFollowerExpirationPerSecond"; - public static final String DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE = - "delayedFetchLogFromClientExpirationPerSecond"; + public static final String DELAYED_WRITE_COUNT = "delayedWriteCount"; + public static final String DELAYED_WRITE_EXPIRES_RATE = "delayedWriteExpiresPerSecond"; + public static final String DELAYED_FETCH_COUNT = "delayedFetchCount"; + public static final String DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE = + "delayedFetchFromFollowerExpiresPerSecond"; + public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE = + "delayedFetchFromClientExpiresPerSecond"; // -------------------------------------------------------------------------------------------- // metrics for table diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java index a2092fb9d..55377e54b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java @@ -67,6 +67,10 @@ public int getRelativePositionInSegment() { return relativePositionInSegment; } + /** + * Compute the number of bytes between this offset to the given offset, if they are on the same + * segment and this offset precedes the given offset. + */ public int positionDiff(LogOffsetMetadata that) { if (messageOffsetOnly()) { throw new FlussRuntimeException( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java index 8e94f720c..010707724 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java @@ -56,8 +56,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = (int) (logStartOffset ^ (logStartOffset >>> 32)); - result = 31 * result + (int) (localLogStartOffset ^ (localLogStartOffset >>> 32)); + int result = Long.hashCode(logStartOffset); + result = 31 * result + Long.hashCode(localLogStartOffset); result = 31 * result + logEndOffset.hashCode(); result = 31 * result + highWatermark.hashCode(); return result; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java index 5f3d5e39b..eb6520794 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -44,8 +44,8 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { private final Counter replicationBytesIn; private final Counter replicationBytesOut; private final Counter delayedWriteExpireCount; - private final Counter delayedFetchLogFromFollowerExpireCount; - private final Counter delayedFetchLogFromClientExpireCount; + private final Counter delayedFetchFromFollowerExpireCount; + private final Counter delayedFetchFromClientExpireCount; public TabletServerMetricGroup( MetricRegistry registry, String clusterId, String hostname, int serverId) { @@ -60,15 +60,15 @@ public TabletServerMetricGroup( meter(MetricNames.REPLICATION_OUT_RATE, new MeterView(replicationBytesOut)); delayedWriteExpireCount = new ThreadSafeSimpleCounter(); - meter(MetricNames.DELAYED_WRITE_EXPIRATION_RATE, new MeterView(delayedWriteExpireCount)); - delayedFetchLogFromFollowerExpireCount = new ThreadSafeSimpleCounter(); + meter(MetricNames.DELAYED_WRITE_EXPIRES_RATE, new MeterView(delayedWriteExpireCount)); + delayedFetchFromFollowerExpireCount = new ThreadSafeSimpleCounter(); meter( - MetricNames.DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE, - new MeterView(delayedFetchLogFromFollowerExpireCount)); - delayedFetchLogFromClientExpireCount = new ThreadSafeSimpleCounter(); + MetricNames.DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE, + new MeterView(delayedFetchFromFollowerExpireCount)); + delayedFetchFromClientExpireCount = new ThreadSafeSimpleCounter(); meter( - MetricNames.DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE, - new MeterView(delayedFetchLogFromClientExpireCount)); + MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE, + new MeterView(delayedFetchFromClientExpireCount)); } @Override @@ -95,12 +95,12 @@ public Counter delayedWriteExpireCount() { return delayedWriteExpireCount; } - public Counter delayedFetchLogFromFollowerExpireCount() { - return delayedFetchLogFromFollowerExpireCount; + public Counter delayedFetchFromFollowerExpireCount() { + return delayedFetchFromFollowerExpireCount; } - public Counter delayedFetchLogFromClientExpireCount() { - return delayedFetchLogFromClientExpireCount; + public Counter delayedFetchFromClientExpireCount() { + return delayedFetchFromClientExpireCount; } // ------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 814121516..6d3c80e72 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -793,8 +793,14 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in // TODO WRITE a leader epoch. LogAppendInfo appendInfo = logTablet.appendAsLeader(memoryLogRecords); - // we may need to increment high watermark. - maybeIncrementLeaderHW(logTablet, System.currentTimeMillis()); + // we may need to increment high watermark if isr could be down to 1 or the + // replica count is 1. + boolean hwIncreased = + maybeIncrementLeaderHW(logTablet, System.currentTimeMillis()); + + if (hwIncreased) { + tryCompleteDelayedOperations(); + } return appendInfo; }); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java index 8ad0b7ee9..cef200922 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java @@ -286,9 +286,9 @@ private void registerMetrics() { () -> onlineReplicas().filter(Replica::isLeader).count()); serverMetricGroup.gauge(MetricNames.REPLICA_COUNT, allReplicas::size); serverMetricGroup.gauge(MetricNames.WRITE_ID_COUNT, this::writerIdCount); - serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_SIZE, delayedWriteManager::numDelayed); + serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_COUNT, delayedWriteManager::numDelayed); serverMetricGroup.gauge( - MetricNames.DELAYED_FETCH_LOG_SIZE, delayedFetchLogManager::numDelayed); + MetricNames.DELAYED_FETCH_COUNT, delayedFetchLogManager::numDelayed); } private Stream onlineReplicas() { @@ -393,7 +393,7 @@ public void fetchLogRecords( Map bucketFetchInfo, Consumer> responseCallback) { long startTime = System.currentTimeMillis(); - Map logFetchResults = readFromLog(params, bucketFetchInfo); + Map logReadResults = readFromLog(params, bucketFetchInfo); if (LOG.isTraceEnabled()) { LOG.trace( "Fetch log records from local log in {} ms", @@ -401,7 +401,7 @@ public void fetchLogRecords( } // maybe do delay fetch log operation. - maybeAddDelayedFetchLog(params, bucketFetchInfo, logFetchResults, responseCallback); + maybeAddDelayedFetchLog(params, bucketFetchInfo, logReadResults, responseCallback); } /** @@ -781,7 +781,7 @@ private void makeFollowers( .collect(Collectors.toSet())); replicasBecomeFollower.forEach( - replica -> completeDelayedWriteAndFetchLogOperations(replica.getTableBucket())); + replica -> completeDelayedOperations(replica.getTableBucket())); LOG.info( "Stopped fetchers as part of become follower request for {} replicas", @@ -950,9 +950,9 @@ public void limitScan( responseCallback.accept(limitScanResultForBucket); } - public Map readFromLog( + public Map readFromLog( FetchParams fetchParams, Map bucketFetchInfo) { - Map logFetchResult = new HashMap<>(); + Map logReadResult = new HashMap<>(); boolean isFromFollower = fetchParams.isFromFollower(); int limitBytes = fetchParams.maxFetchBytes(); for (Map.Entry entry : bucketFetchInfo.entrySet()) { @@ -988,9 +988,9 @@ public Map readFromLog( } limitBytes = Math.max(0, limitBytes - recordBatchSize); - logFetchResult.put( + logReadResult.put( tb, - new LogReadStatus( + new LogReadResult( new FetchLogResultForBucket( tb, fetchedData.getRecords(), readInfo.getHighWatermark()), fetchedData.getFetchOffsetMetadata())); @@ -1020,11 +1020,11 @@ public Map readFromLog( } else { result = new FetchLogResultForBucket(tb, ApiError.fromThrowable(e)); } - logFetchResult.put( - tb, new LogReadStatus(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)); + logReadResult.put( + tb, new LogReadResult(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)); } } - return logFetchResult; + return logReadResult; } private FetchLogResultForBucket handleFetchOutOfRangeException( @@ -1190,17 +1190,17 @@ private void maybeAddDelayedWrite( private void maybeAddDelayedFetchLog( FetchParams params, Map bucketFetchInfo, - Map logFetchResults, + Map logReadResults, Consumer> responseCallback) { long bytesReadable = 0; boolean errorReadingData = false; boolean hasFetchFromLocal = false; Map fetchBucketStatusMap = new HashMap<>(); - for (Map.Entry logFetchResult : logFetchResults.entrySet()) { - TableBucket tb = logFetchResult.getKey(); - LogReadStatus logReadStatus = logFetchResult.getValue(); + for (Map.Entry logReadResultEntry : logReadResults.entrySet()) { + TableBucket tb = logReadResultEntry.getKey(); + LogReadResult logReadResult = logReadResultEntry.getValue(); FetchLogResultForBucket fetchLogResultForBucket = - logReadStatus.getFetchLogResultForBucket(); + logReadResult.getFetchLogResultForBucket(); if (fetchLogResultForBucket.failed()) { errorReadingData = true; break; @@ -1215,7 +1215,7 @@ private void maybeAddDelayedFetchLog( tb, new FetchBucketStatus( bucketFetchInfo.get(tb), - logReadStatus.getLogOffsetMetadata(), + logReadResult.getLogOffsetMetadata(), fetchLogResultForBucket)); } @@ -1225,7 +1225,7 @@ private void maybeAddDelayedFetchLog( || bytesReadable >= params.minFetchBytes() || errorReadingData) { responseCallback.accept( - logFetchResults.entrySet().stream() + logReadResults.entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, @@ -1253,7 +1253,7 @@ private void maybeAddDelayedFetchLog( } } - private void completeDelayedWriteAndFetchLogOperations(TableBucket tableBucket) { + private void completeDelayedOperations(TableBucket tableBucket) { DelayedTableBucketKey delayedTableBucketKey = new DelayedTableBucketKey(tableBucket); delayedWriteManager.checkAndComplete(delayedTableBucketKey); delayedFetchLogManager.checkAndComplete(delayedTableBucketKey); @@ -1373,7 +1373,7 @@ private StopReplicaResultForBucket stopReplica( // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. - completeDelayedWriteAndFetchLogOperations(tb); + completeDelayedOperations(tb); return new StopReplicaResultForBucket(tb); } @@ -1563,12 +1563,12 @@ public void shutdown() throws InterruptedException { checkpointHighWatermarks(); } - /** The status of reading log. */ - public static final class LogReadStatus { + /** The result of reading log. */ + public static final class LogReadResult { private final FetchLogResultForBucket fetchLogResultForBucket; private final LogOffsetMetadata logOffsetMetadata; - public LogReadStatus( + public LogReadResult( FetchLogResultForBucket fetchLogResultForBucket, LogOffsetMetadata logOffsetMetadata) { this.fetchLogResultForBucket = fetchLogResultForBucket; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java index 4e3ec8319..80b145d5a 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java @@ -34,7 +34,7 @@ import com.alibaba.fluss.server.metrics.group.TabletServerMetricGroup; import com.alibaba.fluss.server.replica.Replica; import com.alibaba.fluss.server.replica.ReplicaManager; -import com.alibaba.fluss.server.replica.ReplicaManager.LogReadStatus; +import com.alibaba.fluss.server.replica.ReplicaManager.LogReadResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,9 +91,9 @@ public void onComplete() { } // re-fetch data. - Map reFetchResult = + Map reReadResult = replicaManager.readFromLog(params, reFetchBuckets); - reFetchResult.forEach((key, value) -> result.put(key, value.getFetchLogResultForBucket())); + reReadResult.forEach((key, value) -> result.put(key, value.getFetchLogResultForBucket())); responseCallback.accept(result); } @@ -103,7 +103,7 @@ public void onComplete() { *
    *
  • Case A: The server is no longer the leader for some buckets it tries to fetch *
  • Case B: The replica is no longer available on this server - *
  • Case C: This server doesn't know of some buckets ot tries to fetch + *
  • Case C: This server doesn't know of some buckets it tries to fetch *
  • Case D: The fetch offset locates not on the last segment of the log *
  • Case E: The accumulated bytes from all the fetching buckets exceeds the minimum bytes *
@@ -195,9 +195,9 @@ public boolean tryComplete() { @Override public void onExpiration() { if (params.isFromFollower()) { - serverMetricGroup.delayedFetchLogFromFollowerExpireCount().inc(); + serverMetricGroup.delayedFetchFromFollowerExpireCount().inc(); } else { - serverMetricGroup.delayedFetchLogFromClientExpireCount().inc(); + serverMetricGroup.delayedFetchFromClientExpireCount().inc(); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java index 1919485b3..c1e46222e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java @@ -57,7 +57,7 @@ final class RemoteLeaderEndpoint implements LeaderEndpoint { TabletServerGateway tabletServerGateway) { this.followerServerId = followerServerId; this.remoteNode = remoteNode; - this.maxFetchSize = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes(); + this.maxFetchSize = (int) conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes(); this.maxFetchSizeForBucket = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes(); this.minFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MIN_BYTES).getBytes(); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrTest.java index 46d333d96..eb2aa7029 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrTest.java @@ -68,7 +68,8 @@ void testExpandIsr() throws Exception { // mock follower 2 to fetch data from leader. fetch offset is 10 (which indicate the // follower catch up the leader, it will be added into isr list). replicaManager.fetchLogRecords( - new FetchParams(2, (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes()), + new FetchParams( + 2, (int) conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()), Collections.singletonMap( tb, new FetchData(tb.getTableId(), 10L, Integer.MAX_VALUE)), result -> {}); @@ -82,7 +83,8 @@ tb, new FetchData(tb.getTableId(), 10L, Integer.MAX_VALUE)), // mock follower 3 to fetch data from leader. fetch offset is 10 (which indicate the // follower catch up the leader, it will be added into isr list). replicaManager.fetchLogRecords( - new FetchParams(3, (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes()), + new FetchParams( + 3, (int) conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes()), Collections.singletonMap( tb, new FetchData(tb.getTableId(), 10L, Integer.MAX_VALUE)), result -> {}); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java index 948d631eb..32990148d 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java @@ -173,7 +173,7 @@ void testFetchLog() throws Exception { FetchLogResultForBucket resultForBucket = result.get(tb); assertThat(resultForBucket.getTableBucket()).isEqualTo(tb); assertThat(resultForBucket.getHighWatermark()).isEqualTo(0L); - assertThat(resultForBucket.records()).isNull(); + assertThat(resultForBucket.records().sizeInBytes()).isEqualTo(0); // produce one batch to this bucket. CompletableFuture> future = new CompletableFuture<>(); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java index d9cf963c4..1d3e0e472 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java @@ -113,7 +113,11 @@ void testAppendRecordsToLeader() throws Exception { assertThat(appendInfo.shallowCount()).isEqualTo(1); FetchParams fetchParams = - new FetchParams(-1, (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes()); + new FetchParams( + -1, + (int) + conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES) + .getBytes()); fetchParams.setCurrentFetch(DATA1_TABLE_ID, 0, Integer.MAX_VALUE, DATA1_ROW_TYPE, null); LogReadInfo logReadInfo = logReplica.fetchRecords(fetchParams); assertLogRecordsEquals(DATA1_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), DATA1); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java index ffcb96e1f..782cd5921 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java @@ -54,6 +54,8 @@ void testCompleteDelayedFetchLog() throws Exception { FetchLogResultForBucket preFetchResultForBucket = new FetchLogResultForBucket(tb, MemoryLogRecords.EMPTY, 0L); + CompletableFuture> delayedResponse = + new CompletableFuture<>(); DelayedFetchLog delayedFetchLog = createDelayedFetchLogRequest( tb, @@ -63,13 +65,7 @@ void testCompleteDelayedFetchLog() throws Exception { new FetchData(150001L, 0L, Integer.MAX_VALUE), new LogOffsetMetadata(0L, 0L, 0), preFetchResultForBucket), - response -> { - assertThat(response.containsKey(tb)).isTrue(); - FetchLogResultForBucket resultForBucket = response.get(tb); - assertThat(resultForBucket.getHighWatermark()).isEqualTo(10L); - assertLogRecordsEquals( - DATA1_ROW_TYPE, resultForBucket.records(), DATA1); - }); + delayedResponse::complete); DelayedOperationManager delayedFetchLogManager = replicaManager.getDelayedFetchLogManager(); @@ -87,6 +83,7 @@ void testCompleteDelayedFetchLog() throws Exception { assertThat(delayedFetchLogManager.watched()).isEqualTo(1); // write data. + assertThat(delayedResponse.isDone()).isFalse(); CompletableFuture> future = new CompletableFuture<>(); replicaManager.appendRecordsToLog( 20000, @@ -95,12 +92,14 @@ void testCompleteDelayedFetchLog() throws Exception { future::complete); assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 0, 10L)); - // manual trigger the complete delayed fetch log operation. In normal case, it will be - // triggered by the highWatermark increment. - numComplete = delayedFetchLogManager.checkAndComplete(delayedTableBucketKey); - assertThat(numComplete).isEqualTo(1); assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(0); assertThat(delayedFetchLogManager.watched()).isEqualTo(0); + + Map result = delayedResponse.get(); + assertThat(result.containsKey(tb)).isTrue(); + FetchLogResultForBucket resultForBucket = result.get(tb); + assertThat(resultForBucket.getHighWatermark()).isEqualTo(10L); + assertLogRecordsEquals(DATA1_ROW_TYPE, resultForBucket.records(), DATA1); } @Test @@ -110,6 +109,8 @@ void testDelayFetchLogTimeout() { FetchLogResultForBucket preFetchResultForBucket = new FetchLogResultForBucket(tb, MemoryLogRecords.EMPTY, 0L); + CompletableFuture> delayedResponse = + new CompletableFuture<>(); DelayedFetchLog delayedFetchLog = createDelayedFetchLogRequest( tb, @@ -119,13 +120,7 @@ void testDelayFetchLogTimeout() { new FetchData(150001L, 0L, Integer.MAX_VALUE), new LogOffsetMetadata(0L, 0L, 0), preFetchResultForBucket), - response -> { - assertThat(response.containsKey(tb)).isTrue(); - FetchLogResultForBucket resultForBucket = response.get(tb); - assertThat(resultForBucket.getHighWatermark()).isEqualTo(0L); - assertThat(resultForBucket.recordsOrEmpty()) - .isEqualTo(MemoryLogRecords.EMPTY); - }); + delayedResponse::complete); DelayedOperationManager delayedFetchLogManager = replicaManager.getDelayedFetchLogManager(); @@ -140,6 +135,13 @@ void testDelayFetchLogTimeout() { delayedFetchLogManager.checkAndComplete(delayedTableBucketKey); assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(0); assertThat(delayedFetchLogManager.watched()).isEqualTo(0); + + assertThat(delayedResponse.isDone()).isTrue(); + Map result = delayedResponse.get(); + assertThat(result.containsKey(tb)).isTrue(); + FetchLogResultForBucket resultForBucket = result.get(tb); + assertThat(resultForBucket.getHighWatermark()).isEqualTo(0L); + assertThat(resultForBucket.recordsOrEmpty()).isEqualTo(MemoryLogRecords.EMPTY); }); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java index 9ce6a94c9..395dce4ed 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java @@ -59,7 +59,9 @@ import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK; import static com.alibaba.fluss.server.testutils.KvTestUtils.assertLookupResponse; +import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponseWithRowKind; +import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.createTable; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newFetchLogRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest; @@ -121,7 +123,7 @@ void testProduceLogNeedAck() throws Exception { FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); // send one batch, which need ack. - RpcMessageTestUtils.assertProduceLogResponse( + assertProduceLogResponse( leaderGateWay .produceLog( RpcMessageTestUtils.newProduceLogRequest( @@ -134,7 +136,7 @@ void testProduceLogNeedAck() throws Exception { 0L); // check leader log data. - RpcMessageTestUtils.assertFetchLogResponse( + assertFetchLogResponse( leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, bucketId, 0L)).get(), tableId, bucketId, @@ -147,8 +149,20 @@ void testProduceLogNeedAck() throws Exception { leaderAndIsr.isr().stream() .filter(id -> id != leader) .collect(Collectors.toList())) { + ReplicaManager replicaManager = FLUSS_CLUSTER_EXTENSION.getTabletServerById(followId).getReplicaManager(); + // wait util follower highWaterMark equals leader. + retry( + Duration.ofMinutes(1), + () -> + assertThat( + replicaManager + .getReplicaOrException(tb) + .getLogTablet() + .getHighWatermark()) + .isEqualTo(10L)); + CompletableFuture> future = new CompletableFuture<>(); // mock client fetch from follower. @@ -163,17 +177,6 @@ void testProduceLogNeedAck() throws Exception { LogRecords records = resultForBucket.records(); assertThat(records).isNotNull(); assertLogRecordsEquals(DATA1_ROW_TYPE, records, DATA1); - - // wait util follower highWaterMark equals leader. - retry( - Duration.ofMinutes(1), - () -> - assertThat( - replicaManager - .getReplicaOrException(tb) - .getLogTablet() - .getHighWatermark()) - .isEqualTo(10L)); } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java index cb6506100..6d26b1a77 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java @@ -53,7 +53,7 @@ public TestingLeaderEndpoint( Configuration conf, ReplicaManager replicaManager, ServerNode localNode) { this.replicaManager = replicaManager; this.localNode = localNode; - this.maxFetchSize = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes(); + this.maxFetchSize = (int) conf.get(ConfigOptions.LOG_REPLICA_FETCH_MAX_BYTES).getBytes(); this.maxFetchSizeForBucket = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes(); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java index 96ed90138..b3d5afdcd 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java @@ -56,6 +56,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import static com.alibaba.fluss.record.TestData.ANOTHER_DATA1; import static com.alibaba.fluss.record.TestData.DATA1; @@ -299,8 +300,8 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { // third send a fetch request with minFetchSize and much bigger maxFetchWaitMs, the request // will return after we send a produce log request to this bucket. - leaderGateWay - .fetchLog( + CompletableFuture fetchResultFuture = + leaderGateWay.fetchLog( newFetchLogRequest( -1, tableId, @@ -309,19 +310,19 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { null, 1, Integer.MAX_VALUE, - (int) Duration.ofMinutes(5).toMillis())) - .whenComplete( - (res, throwable) -> assertFetchLogResponse(res, tableId, 0, 10L, DATA1)); + (int) Duration.ofMinutes(5).toMillis())); // send a produce log request to trigger delay fetch log finish. assertProduceLogResponse( leaderGateWay .produceLog( newProduceLogRequest( - tableId, 0, 1, genMemoryLogRecordsByObject(DATA1))) + tableId, 0, -1, genMemoryLogRecordsByObject(DATA1))) .get(), 0, 0L); + // the delay fetch will be completed. + assertFetchLogResponse(fetchResultFuture.get(), tableId, 0, 10L, DATA1); // return immediately. assertFetchLogResponse( diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index e3f854b70..700c5ee31 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -99,10 +99,10 @@ during the Fluss cluster working. | log.replica.fetch-log-operation-purge-number | Integer | 1000 | The purge number (in number of requests) of the fetch log operation manager, the default value is 1000. | | log.replica.fetcher-number | Integer | 1 | Number of fetcher threads used to replicate log records from each source tablet server. The total number of fetchers on each tablet server is bound by this parameter multiplied by the number of tablet servers in the cluster. Increasing this value can increase the degree of I/O parallelism in the follower and leader tablet server at the cost of higher CPU and memory utilization. | | log.replica.fetch-backoff-interval | Duration | 1s | The amount of time to sleep when fetch bucket error occurs. | -| log.fetch.max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request. Records are fetched in batches for log scanner or follower, for one request batch, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. Note that the fetcher performs multiple fetches in parallel. | -| log.fetch.max-bytes-for-bucket | MemorySize | 1mb | The maximum amount of data the server should return for a table bucket in fetch request. Records are fetched in batches for consumer or follower, for one request batch, the max bytes size is config by this option. | -| log.fetch.min-bytes | MemorySize | 1b | The minimum bytes expected for each fetch log response (including fetch log request from the follower or client). If not enough bytes, wait up to log.fetch.wait-max-time time to return. | -| log.fetch.wait-max-time | Duration | 500ms | The maximum time to wait for enough bytes to be available for a fetch log response (including fetch log request from the follower or client). This value should always be less than the 'log.replica.max-lag-time' at all times to prevent frequent shrinking of ISR for low throughput tables | +| log.replica.fetch-max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request from follower. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. Note that the fetcher performs multiple fetches in parallel. | +| log.replica.fetch-max-bytes-for-bucket | MemorySize | 1mb | The maximum amount of data the server should return for a table bucket in fetch request fom follower. Records are fetched in batches, and the max bytes size is config by this option. | +| log.replica.fetch-min-bytes | MemorySize | 1b | The minimum bytes expected for each fetch log request from the follower to response. If not enough bytes, wait up to log.replica.fetch-wait-max-time time to return. | +| log.replica.fetch-wait-max-time | Duration | 500ms | The maximum time to wait for enough bytes to be available for a fetch log request from the follower to response. This value should always be less than the 'log.replica.max-lag-time' at all times to prevent frequent shrinking of ISR for low throughput tables | | log.replica.min-in-sync-replicas-number | Integer | 1 | When a producer set acks to all (-1), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception(NotEnoughReplicas). when used together, this config and 'acks' allow you to enforce greater durability guarantees. A typical scenario would be to create a table with a replication factor of 3. set this conf to 2, and produce with acks = -1. This will ensure that the producer raises an exception if a majority of replicas don't receive a write. | ### Log Tiered Storage @@ -219,6 +219,10 @@ Currently, we don't support alter table configuration by Flink. This will be sup | client.request-timeout | Duration | 30s | The timeout for a request to complete. If user set the write ack to -1, this timeout is the max time that delayed write try to complete. The default setting is 30 seconds. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | | client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. | +| client.scanner.log.fetch-max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request from client. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. | +| client.scanner.log.fetch-max-bytes-for-bucket | MemorySize | 1mb | The maximum amount of data the server should return for a table bucket in fetch request fom client. Records are fetched in batches, and the max bytes size is config by this option. | +| client.scanner.log.fetch-min-bytes | MemorySize | 1b | The minimum bytes expected for each fetch log request from client to response. If not enough bytes, wait up to client.scanner.log.fetch-wait-max-time time to return. | +| client.scanner.log.fetch-wait-max-time | Duration | 500ms | The maximum time to wait for enough bytes to be available for a fetch log request from client to response. | | client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | | client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | | client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | diff --git a/website/docs/maintenance/monitor-metrics.md b/website/docs/maintenance/monitor-metrics.md index 26baecb71..795d02b2d 100644 --- a/website/docs/maintenance/monitor-metrics.md +++ b/website/docs/maintenance/monitor-metrics.md @@ -326,28 +326,28 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM Gauge - delayedWriteSize - The delayed write size in this TabletServer. + delayedWriteCount + The delayed write count in this TabletServer. Gauge - delayedFetchLogSize - The delayed fetch log operation size in this TabletServer. + delayedFetchCount + The delayed fetch log operation count in this TabletServer. Gauge - delayedWriteExpirationPerSecond + delayedWriteExpiresPerSecond The delayed write operation expire count per second in this TabletServer. Meter - delayedFetchLogFromFollowerExpirationPerSecond + delayedFetchFromFollowerExpiresPerSecond The delayed fetch log operation from follower expire count per second in this TabletServer. Meter - delayedFetchLogFromClientExpirationPerSecond - The delayed write operation from client expire count per second in this TabletServer. + delayedFetchFromClientExpiresPerSecond + The delayed fetch log operation from client expire count per second in this TabletServer. Meter