diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java index c4fbef1b..b6e8d82c 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java @@ -48,13 +48,13 @@ public class MetricNames { public static final String REPLICA_LEADER_COUNT = "leaderCount"; public static final String REPLICA_COUNT = "replicaCount"; public static final String WRITE_ID_COUNT = "writerIdCount"; - public static final String DELAYED_WRITE_SIZE = "delayedWriteSize"; - public static final String DELAYED_WRITE_EXPIRATION_RATE = "delayedWriteExpirationPerSecond"; - public static final String DELAYED_FETCH_LOG_SIZE = "delayedFetchLogSize"; - public static final String DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE = - "delayedFetchLogFromFollowerExpirationPerSecond"; - public static final String DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE = - "delayedFetchLogFromClientExpirationPerSecond"; + public static final String DELAYED_WRITE_COUNT = "delayedWriteCount"; + public static final String DELAYED_WRITE_EXPIRES_RATE = "delayedWriteExpiresPerSecond"; + public static final String DELAYED_FETCH_COUNT = "delayedFetchCount"; + public static final String DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE = + "delayedFetchFromFollowerExpiresPerSecond"; + public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE = + "delayedFetchFromClientExpiresPerSecond"; // -------------------------------------------------------------------------------------------- // metrics for table diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java index a2092fb9..55377e54 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetMetadata.java @@ -67,6 +67,10 @@ public int getRelativePositionInSegment() { return relativePositionInSegment; } + /** + * Compute the number of bytes between this offset to the given offset, if they are on the same + * segment and this offset precedes the given offset. + */ public int positionDiff(LogOffsetMetadata that) { if (messageOffsetOnly()) { throw new FlussRuntimeException( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java index 8e94f720..01070772 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogOffsetSnapshot.java @@ -56,8 +56,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = (int) (logStartOffset ^ (logStartOffset >>> 32)); - result = 31 * result + (int) (localLogStartOffset ^ (localLogStartOffset >>> 32)); + int result = Long.hashCode(logStartOffset); + result = 31 * result + Long.hashCode(localLogStartOffset); result = 31 * result + logEndOffset.hashCode(); result = 31 * result + highWatermark.hashCode(); return result; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java index 5f3d5e39..eb652079 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -44,8 +44,8 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { private final Counter replicationBytesIn; private final Counter replicationBytesOut; private final Counter delayedWriteExpireCount; - private final Counter delayedFetchLogFromFollowerExpireCount; - private final Counter delayedFetchLogFromClientExpireCount; + private final Counter delayedFetchFromFollowerExpireCount; + private final Counter delayedFetchFromClientExpireCount; public TabletServerMetricGroup( MetricRegistry registry, String clusterId, String hostname, int serverId) { @@ -60,15 +60,15 @@ public TabletServerMetricGroup( meter(MetricNames.REPLICATION_OUT_RATE, new MeterView(replicationBytesOut)); delayedWriteExpireCount = new ThreadSafeSimpleCounter(); - meter(MetricNames.DELAYED_WRITE_EXPIRATION_RATE, new MeterView(delayedWriteExpireCount)); - delayedFetchLogFromFollowerExpireCount = new ThreadSafeSimpleCounter(); + meter(MetricNames.DELAYED_WRITE_EXPIRES_RATE, new MeterView(delayedWriteExpireCount)); + delayedFetchFromFollowerExpireCount = new ThreadSafeSimpleCounter(); meter( - MetricNames.DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE, - new MeterView(delayedFetchLogFromFollowerExpireCount)); - delayedFetchLogFromClientExpireCount = new ThreadSafeSimpleCounter(); + MetricNames.DELAYED_FETCH_FROM_FOLLOWER_EXPIRES_RATE, + new MeterView(delayedFetchFromFollowerExpireCount)); + delayedFetchFromClientExpireCount = new ThreadSafeSimpleCounter(); meter( - MetricNames.DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE, - new MeterView(delayedFetchLogFromClientExpireCount)); + MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE, + new MeterView(delayedFetchFromClientExpireCount)); } @Override @@ -95,12 +95,12 @@ public Counter delayedWriteExpireCount() { return delayedWriteExpireCount; } - public Counter delayedFetchLogFromFollowerExpireCount() { - return delayedFetchLogFromFollowerExpireCount; + public Counter delayedFetchFromFollowerExpireCount() { + return delayedFetchFromFollowerExpireCount; } - public Counter delayedFetchLogFromClientExpireCount() { - return delayedFetchLogFromClientExpireCount; + public Counter delayedFetchFromClientExpireCount() { + return delayedFetchFromClientExpireCount; } // ------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index bad629f5..56ef5174 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -793,8 +793,14 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in // TODO WRITE a leader epoch. LogAppendInfo appendInfo = logTablet.appendAsLeader(memoryLogRecords); - // we may need to increment high watermark. - maybeIncrementLeaderHW(logTablet, System.currentTimeMillis()); + // we may need to increment high watermark if isr could be down to 1 or the + // replica count is 1. + boolean hwIncreased = + maybeIncrementLeaderHW(logTablet, System.currentTimeMillis()); + + if (hwIncreased) { + tryCompleteDelayedOperations(); + } return appendInfo; }); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java index 7ea607a5..d3d9ec2d 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java @@ -285,9 +285,9 @@ private void registerMetrics() { () -> onlineReplicas().filter(Replica::isLeader).count()); serverMetricGroup.gauge(MetricNames.REPLICA_COUNT, allReplicas::size); serverMetricGroup.gauge(MetricNames.WRITE_ID_COUNT, this::writerIdCount); - serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_SIZE, delayedWriteManager::numDelayed); + serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_COUNT, delayedWriteManager::numDelayed); serverMetricGroup.gauge( - MetricNames.DELAYED_FETCH_LOG_SIZE, delayedFetchLogManager::numDelayed); + MetricNames.DELAYED_FETCH_COUNT, delayedFetchLogManager::numDelayed); } private Stream onlineReplicas() { @@ -392,7 +392,7 @@ public void fetchLogRecords( Map bucketFetchInfo, Consumer> responseCallback) { long startTime = System.currentTimeMillis(); - Map logFetchResults = readFromLog(params, bucketFetchInfo); + Map logReadResults = readFromLog(params, bucketFetchInfo); if (LOG.isTraceEnabled()) { LOG.trace( "Fetch log records from local log in {} ms", @@ -400,7 +400,7 @@ public void fetchLogRecords( } // maybe do delay fetch log operation. - maybeAddDelayedFetchLog(params, bucketFetchInfo, logFetchResults, responseCallback); + maybeAddDelayedFetchLog(params, bucketFetchInfo, logReadResults, responseCallback); } /** @@ -749,7 +749,7 @@ private void makeFollowers( .collect(Collectors.toSet())); replicasBecomeFollower.forEach( - replica -> completeDelayedWriteAndFetchLogOperations(replica.getTableBucket())); + replica -> completeDelayedOperations(replica.getTableBucket())); LOG.info( "Stopped fetchers as part of become follower request for {} replicas", @@ -918,9 +918,9 @@ public void limitScan( responseCallback.accept(limitScanResultForBucket); } - public Map readFromLog( + public Map readFromLog( FetchParams fetchParams, Map bucketFetchInfo) { - Map logFetchResult = new HashMap<>(); + Map logReadResult = new HashMap<>(); boolean isFromFollower = fetchParams.isFromFollower(); int limitBytes = fetchParams.maxFetchBytes(); for (Map.Entry entry : bucketFetchInfo.entrySet()) { @@ -956,9 +956,9 @@ public Map readFromLog( } limitBytes = Math.max(0, limitBytes - recordBatchSize); - logFetchResult.put( + logReadResult.put( tb, - new LogReadStatus( + new LogReadResult( new FetchLogResultForBucket( tb, fetchedData.getRecords(), readInfo.getHighWatermark()), fetchedData.getFetchOffsetMetadata())); @@ -988,11 +988,11 @@ public Map readFromLog( } else { result = new FetchLogResultForBucket(tb, ApiError.fromThrowable(e)); } - logFetchResult.put( - tb, new LogReadStatus(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)); + logReadResult.put( + tb, new LogReadResult(result, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)); } } - return logFetchResult; + return logReadResult; } private FetchLogResultForBucket handleFetchOutOfRangeException( @@ -1158,17 +1158,17 @@ private void maybeAddDelayedWrite( private void maybeAddDelayedFetchLog( FetchParams params, Map bucketFetchInfo, - Map logFetchResults, + Map logReadResults, Consumer> responseCallback) { long bytesReadable = 0; boolean errorReadingData = false; boolean hasFetchFromLocal = false; Map fetchBucketStatusMap = new HashMap<>(); - for (Map.Entry logFetchResult : logFetchResults.entrySet()) { - TableBucket tb = logFetchResult.getKey(); - LogReadStatus logReadStatus = logFetchResult.getValue(); + for (Map.Entry logReadResultEntry : logReadResults.entrySet()) { + TableBucket tb = logReadResultEntry.getKey(); + LogReadResult logReadResult = logReadResultEntry.getValue(); FetchLogResultForBucket fetchLogResultForBucket = - logReadStatus.getFetchLogResultForBucket(); + logReadResult.getFetchLogResultForBucket(); if (fetchLogResultForBucket.failed()) { errorReadingData = true; break; @@ -1183,7 +1183,7 @@ private void maybeAddDelayedFetchLog( tb, new FetchBucketStatus( bucketFetchInfo.get(tb), - logReadStatus.getLogOffsetMetadata(), + logReadResult.getLogOffsetMetadata(), fetchLogResultForBucket)); } @@ -1193,7 +1193,7 @@ private void maybeAddDelayedFetchLog( || bytesReadable >= params.minFetchBytes() || errorReadingData) { responseCallback.accept( - logFetchResults.entrySet().stream() + logReadResults.entrySet().stream() .collect( Collectors.toMap( Map.Entry::getKey, @@ -1221,7 +1221,7 @@ private void maybeAddDelayedFetchLog( } } - private void completeDelayedWriteAndFetchLogOperations(TableBucket tableBucket) { + private void completeDelayedOperations(TableBucket tableBucket) { DelayedTableBucketKey delayedTableBucketKey = new DelayedTableBucketKey(tableBucket); delayedWriteManager.checkAndComplete(delayedTableBucketKey); delayedFetchLogManager.checkAndComplete(delayedTableBucketKey); @@ -1337,7 +1337,7 @@ private StopReplicaResultForBucket stopReplica( // If we were the leader, we may have some operations still waiting for completion. // We force completion to prevent them from timing out. - completeDelayedWriteAndFetchLogOperations(tb); + completeDelayedOperations(tb); return new StopReplicaResultForBucket(tb); } @@ -1527,12 +1527,12 @@ public void shutdown() throws InterruptedException { checkpointHighWatermarks(); } - /** The status of reading log. */ - public static final class LogReadStatus { + /** The result of reading log. */ + public static final class LogReadResult { private final FetchLogResultForBucket fetchLogResultForBucket; private final LogOffsetMetadata logOffsetMetadata; - public LogReadStatus( + public LogReadResult( FetchLogResultForBucket fetchLogResultForBucket, LogOffsetMetadata logOffsetMetadata) { this.fetchLogResultForBucket = fetchLogResultForBucket; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java index 4e3ec831..80b145d5 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLog.java @@ -34,7 +34,7 @@ import com.alibaba.fluss.server.metrics.group.TabletServerMetricGroup; import com.alibaba.fluss.server.replica.Replica; import com.alibaba.fluss.server.replica.ReplicaManager; -import com.alibaba.fluss.server.replica.ReplicaManager.LogReadStatus; +import com.alibaba.fluss.server.replica.ReplicaManager.LogReadResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,9 +91,9 @@ public void onComplete() { } // re-fetch data. - Map reFetchResult = + Map reReadResult = replicaManager.readFromLog(params, reFetchBuckets); - reFetchResult.forEach((key, value) -> result.put(key, value.getFetchLogResultForBucket())); + reReadResult.forEach((key, value) -> result.put(key, value.getFetchLogResultForBucket())); responseCallback.accept(result); } @@ -103,7 +103,7 @@ public void onComplete() { *
    *
  • Case A: The server is no longer the leader for some buckets it tries to fetch *
  • Case B: The replica is no longer available on this server - *
  • Case C: This server doesn't know of some buckets ot tries to fetch + *
  • Case C: This server doesn't know of some buckets it tries to fetch *
  • Case D: The fetch offset locates not on the last segment of the log *
  • Case E: The accumulated bytes from all the fetching buckets exceeds the minimum bytes *
@@ -195,9 +195,9 @@ public boolean tryComplete() { @Override public void onExpiration() { if (params.isFromFollower()) { - serverMetricGroup.delayedFetchLogFromFollowerExpireCount().inc(); + serverMetricGroup.delayedFetchFromFollowerExpireCount().inc(); } else { - serverMetricGroup.delayedFetchLogFromClientExpireCount().inc(); + serverMetricGroup.delayedFetchFromClientExpireCount().inc(); } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java index a1ea5cff..423c6c63 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java @@ -165,7 +165,7 @@ void testFetchLog() throws Exception { FetchLogResultForBucket resultForBucket = result.get(tb); assertThat(resultForBucket.getTableBucket()).isEqualTo(tb); assertThat(resultForBucket.getHighWatermark()).isEqualTo(0L); - assertThat(resultForBucket.records()).isNull(); + assertThat(resultForBucket.records().sizeInBytes()).isEqualTo(0); // produce one batch to this bucket. CompletableFuture> future = new CompletableFuture<>(); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java index ffcb96e1..782cd592 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java @@ -54,6 +54,8 @@ void testCompleteDelayedFetchLog() throws Exception { FetchLogResultForBucket preFetchResultForBucket = new FetchLogResultForBucket(tb, MemoryLogRecords.EMPTY, 0L); + CompletableFuture> delayedResponse = + new CompletableFuture<>(); DelayedFetchLog delayedFetchLog = createDelayedFetchLogRequest( tb, @@ -63,13 +65,7 @@ void testCompleteDelayedFetchLog() throws Exception { new FetchData(150001L, 0L, Integer.MAX_VALUE), new LogOffsetMetadata(0L, 0L, 0), preFetchResultForBucket), - response -> { - assertThat(response.containsKey(tb)).isTrue(); - FetchLogResultForBucket resultForBucket = response.get(tb); - assertThat(resultForBucket.getHighWatermark()).isEqualTo(10L); - assertLogRecordsEquals( - DATA1_ROW_TYPE, resultForBucket.records(), DATA1); - }); + delayedResponse::complete); DelayedOperationManager delayedFetchLogManager = replicaManager.getDelayedFetchLogManager(); @@ -87,6 +83,7 @@ void testCompleteDelayedFetchLog() throws Exception { assertThat(delayedFetchLogManager.watched()).isEqualTo(1); // write data. + assertThat(delayedResponse.isDone()).isFalse(); CompletableFuture> future = new CompletableFuture<>(); replicaManager.appendRecordsToLog( 20000, @@ -95,12 +92,14 @@ void testCompleteDelayedFetchLog() throws Exception { future::complete); assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 0, 10L)); - // manual trigger the complete delayed fetch log operation. In normal case, it will be - // triggered by the highWatermark increment. - numComplete = delayedFetchLogManager.checkAndComplete(delayedTableBucketKey); - assertThat(numComplete).isEqualTo(1); assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(0); assertThat(delayedFetchLogManager.watched()).isEqualTo(0); + + Map result = delayedResponse.get(); + assertThat(result.containsKey(tb)).isTrue(); + FetchLogResultForBucket resultForBucket = result.get(tb); + assertThat(resultForBucket.getHighWatermark()).isEqualTo(10L); + assertLogRecordsEquals(DATA1_ROW_TYPE, resultForBucket.records(), DATA1); } @Test @@ -110,6 +109,8 @@ void testDelayFetchLogTimeout() { FetchLogResultForBucket preFetchResultForBucket = new FetchLogResultForBucket(tb, MemoryLogRecords.EMPTY, 0L); + CompletableFuture> delayedResponse = + new CompletableFuture<>(); DelayedFetchLog delayedFetchLog = createDelayedFetchLogRequest( tb, @@ -119,13 +120,7 @@ void testDelayFetchLogTimeout() { new FetchData(150001L, 0L, Integer.MAX_VALUE), new LogOffsetMetadata(0L, 0L, 0), preFetchResultForBucket), - response -> { - assertThat(response.containsKey(tb)).isTrue(); - FetchLogResultForBucket resultForBucket = response.get(tb); - assertThat(resultForBucket.getHighWatermark()).isEqualTo(0L); - assertThat(resultForBucket.recordsOrEmpty()) - .isEqualTo(MemoryLogRecords.EMPTY); - }); + delayedResponse::complete); DelayedOperationManager delayedFetchLogManager = replicaManager.getDelayedFetchLogManager(); @@ -140,6 +135,13 @@ void testDelayFetchLogTimeout() { delayedFetchLogManager.checkAndComplete(delayedTableBucketKey); assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(0); assertThat(delayedFetchLogManager.watched()).isEqualTo(0); + + assertThat(delayedResponse.isDone()).isTrue(); + Map result = delayedResponse.get(); + assertThat(result.containsKey(tb)).isTrue(); + FetchLogResultForBucket resultForBucket = result.get(tb); + assertThat(resultForBucket.getHighWatermark()).isEqualTo(0L); + assertThat(resultForBucket.recordsOrEmpty()).isEqualTo(MemoryLogRecords.EMPTY); }); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java index 9ce6a94c..395dce4e 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java @@ -59,7 +59,9 @@ import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK; import static com.alibaba.fluss.server.testutils.KvTestUtils.assertLookupResponse; +import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponseWithRowKind; +import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.createTable; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newFetchLogRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest; @@ -121,7 +123,7 @@ void testProduceLogNeedAck() throws Exception { FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); // send one batch, which need ack. - RpcMessageTestUtils.assertProduceLogResponse( + assertProduceLogResponse( leaderGateWay .produceLog( RpcMessageTestUtils.newProduceLogRequest( @@ -134,7 +136,7 @@ void testProduceLogNeedAck() throws Exception { 0L); // check leader log data. - RpcMessageTestUtils.assertFetchLogResponse( + assertFetchLogResponse( leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, bucketId, 0L)).get(), tableId, bucketId, @@ -147,8 +149,20 @@ void testProduceLogNeedAck() throws Exception { leaderAndIsr.isr().stream() .filter(id -> id != leader) .collect(Collectors.toList())) { + ReplicaManager replicaManager = FLUSS_CLUSTER_EXTENSION.getTabletServerById(followId).getReplicaManager(); + // wait util follower highWaterMark equals leader. + retry( + Duration.ofMinutes(1), + () -> + assertThat( + replicaManager + .getReplicaOrException(tb) + .getLogTablet() + .getHighWatermark()) + .isEqualTo(10L)); + CompletableFuture> future = new CompletableFuture<>(); // mock client fetch from follower. @@ -163,17 +177,6 @@ void testProduceLogNeedAck() throws Exception { LogRecords records = resultForBucket.records(); assertThat(records).isNotNull(); assertLogRecordsEquals(DATA1_ROW_TYPE, records, DATA1); - - // wait util follower highWaterMark equals leader. - retry( - Duration.ofMinutes(1), - () -> - assertThat( - replicaManager - .getReplicaOrException(tb) - .getLogTablet() - .getHighWatermark()) - .isEqualTo(10L)); } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java index cb5e47f4..e0598752 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java @@ -48,6 +48,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import static com.alibaba.fluss.record.TestData.ANOTHER_DATA1; import static com.alibaba.fluss.record.TestData.DATA1; @@ -289,8 +290,8 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { // third send a fetch request with minFetchSize and much bigger maxFetchWaitMs, the request // will return after we send a produce log request to this bucket. - leaderGateWay - .fetchLog( + CompletableFuture fetchResultFuture = + leaderGateWay.fetchLog( newFetchLogRequest( -1, tableId, @@ -299,19 +300,19 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { null, 1, Integer.MAX_VALUE, - (int) Duration.ofMinutes(5).toMillis())) - .whenComplete( - (res, throwable) -> assertFetchLogResponse(res, tableId, 0, 10L, DATA1)); + (int) Duration.ofMinutes(5).toMillis())); // send a produce log request to trigger delay fetch log finish. assertProduceLogResponse( leaderGateWay .produceLog( newProduceLogRequest( - tableId, 0, 1, genMemoryLogRecordsByObject(DATA1))) + tableId, 0, -1, genMemoryLogRecordsByObject(DATA1))) .get(), 0, 0L); + // the delay fetch will be completed. + assertFetchLogResponse(fetchResultFuture.get(), tableId, 0, 10L, DATA1); // return immediately. assertFetchLogResponse( diff --git a/website/docs/maintenance/monitor-metrics.md b/website/docs/maintenance/monitor-metrics.md index cc167de5..87b4474f 100644 --- a/website/docs/maintenance/monitor-metrics.md +++ b/website/docs/maintenance/monitor-metrics.md @@ -326,28 +326,28 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM Gauge - delayedWriteSize - The delayed write size in this TabletServer. + delayedWriteCount + The delayed write count in this TabletServer. Gauge - delayedFetchLogSize - The delayed fetch log operation size in this TabletServer. + delayedFetchCount + The delayed fetch log operation count in this TabletServer. Gauge - delayedWriteExpirationPerSecond + delayedWriteExpiresPerSecond The delayed write operation expire count per second in this TabletServer. Meter - delayedFetchLogFromFollowerExpirationPerSecond + delayedFetchFromFollowerExpiresPerSecond The delayed fetch log operation from follower expire count per second in this TabletServer. Meter - delayedFetchLogFromClientExpirationPerSecond - The delayed write operation from client expire count per second in this TabletServer. + delayedFetchFromClientExpiresPerSecond + The delayed fetch log operation from client expire count per second in this TabletServer. Meter