Skip to content

Commit

Permalink
[server] FetchLogRequest support params: minFetchBytes and maxWaitMs …
Browse files Browse the repository at this point in the history
…to avoid frequently FetchLogRequest send to server
  • Loading branch information
swuferhong committed Dec 30, 2024
1 parent 2700ef5 commit 88105c4
Show file tree
Hide file tree
Showing 29 changed files with 1,037 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class LogFetcher implements Closeable {
private final RpcClient rpcClient;
private final int maxFetchBytes;
private final int maxBucketFetchBytes;
private final int minFetchBytes;
private final int maxFetchWaitMs;
private final boolean isCheckCrcs;
private final LogScannerStatus logScannerStatus;
private final LogFetchBuffer logFetchBuffer;
Expand Down Expand Up @@ -126,6 +128,9 @@ public LogFetcher(
this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes();
this.maxBucketFetchBytes =
(int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes();
this.minFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MIN_BYTES).getBytes();
this.maxFetchWaitMs = (int) conf.get(ConfigOptions.LOG_FETCH_WAIT_MAX_TIME).toMillis();

this.isCheckCrcs = conf.getBoolean(ConfigOptions.CLIENT_SCANNER_LOG_CHECK_CRC);
this.logFetchBuffer = new LogFetchBuffer();
this.nodesWithPendingFetchRequests = new HashSet<>();
Expand Down Expand Up @@ -419,7 +424,9 @@ private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
FetchLogRequest fetchLogRequest =
new FetchLogRequest()
.setFollowerServerId(-1)
.setMaxBytes(maxFetchBytes);
.setMaxBytes(maxFetchBytes)
.setMinBytes(minFetchBytes)
.setMaxWaitMs(maxFetchWaitMs);
PbFetchLogReqForTable reqForTable =
new PbFetchLogReqForTable().setTableId(finalTableId);
if (readContext.isProjectionPushDowned()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ public class ConfigOptions {
"The purge number (in number of requests) of the write operation manager, "
+ "the default value is 1000.");

public static final ConfigOption<Integer> LOG_REPLICA_FETCH_LOG_OPERATION_PURGE_NUMBER =
key("log.replica.fetch-log-operation-purge-number")
.intType()
.defaultValue(1000)
.withDescription(
"The purge number (in number of requests) of the fetch log operation manager, "
+ "the default value is 1000.");

public static final ConfigOption<Integer> LOG_REPLICA_FETCHER_NUMBER =
key("log.replica.fetcher-number")
.intType()
Expand Down Expand Up @@ -423,6 +431,27 @@ public class ConfigOptions {
+ "Records are fetched in batches for consumer or follower, for one request batch, "
+ "the max bytes size is config by this option.");

public static final ConfigOption<Duration> LOG_FETCH_WAIT_MAX_TIME =
key("log.fetch.wait-max-time")
.durationType()
.defaultValue(Duration.ofMillis(500))
.withDescription(
"The maximum time to wait for enough bytes to be available for a fetch log response "
+ "(including fetch log request from the follower or client). "
+ "This value should always be less than the "
+ "'log.replica.max-lag-time' at all times to prevent frequent shrinking of ISR for "
+ "low throughput tables");

public static final ConfigOption<MemorySize> LOG_FETCH_MIN_BYTES =
key("log.fetch.min-bytes")
.memoryType()
.defaultValue(MemorySize.parse("1b"))
.withDescription(
"The minimum bytes expected for each fetch log response (including fetch "
+ "log request from the follower or client). If not enough bytes, wait up to "
+ LOG_FETCH_WAIT_MAX_TIME.key()
+ " time to return.");

public static final ConfigOption<Integer> LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER =
key("log.replica.min-in-sync-replicas-number")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ public class MetricNames {
public static final String REPLICA_LEADER_COUNT = "leaderCount";
public static final String REPLICA_COUNT = "replicaCount";
public static final String WRITE_ID_COUNT = "writerIdCount";
public static final String DELAYED_OPERATIONS_SIZE = "delayedOperationsSize";
public static final String DELAYED_WRITE_SIZE = "delayedWriteSize";
public static final String DELAYED_WRITE_EXPIRATION_RATE = "delayedWriteExpirationPerSecond";
public static final String DELAYED_FETCH_LOG_SIZE = "delayedFetchLogSize";
public static final String DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE =
"delayedFetchLogFromFollowerExpirationPerSecond";
public static final String DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE =
"delayedFetchLogFromClientExpirationPerSecond";

// --------------------------------------------------------------------------------------------
// metrics for table
Expand Down
2 changes: 2 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ message FetchLogRequest {
required int32 follower_server_id = 1; // value -1 indicate the request from client.
required int32 max_bytes = 2;
repeated PbFetchLogReqForTable tables_req = 3;
optional int32 max_wait_ms = 4;
optional int32 min_bytes = 5;
}

message FetchLogResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.record.FileLogProjection;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.types.RowType;

import javax.annotation.Nullable;
Expand All @@ -29,6 +30,21 @@ public final class FetchParams {
/** Value -2L means we will fetch from log start offset. */
public static final long FETCH_FROM_EARLIEST_OFFSET = -2L;

/**
* Default min fetch bytes, which means the fetch request will be satisfied even if no bytes
* fetched.
*/
public static final int DEFAULT_MIN_FETCH_BYTES = -1;

/** Default max wait ms, which means the fetch request will be satisfied immediately. */
public static final long DEFAULT_MAX_WAIT_MS = -1L;

/**
* Default max wait ms when log fetch minBytes set in {@link FetchLogRequest} but maxWaitMs not
* set.
*/
public static final long DEFAULT_MAX_WAIT_MS_WHEN_MIN_BYTES_ENABLE = 100L;

private final int replicaId;
// Currently, FetchOnlyLeader can be set to false only for test,
// which indicate that the client can read log data from follower.
Expand All @@ -46,20 +62,33 @@ public final class FetchParams {
// the lazily initialized projection util to read and project file logs
@Nullable private FileLogProjection fileLogProjection;

// TODO: add more params like epoch, minBytes etc.
private final int minFetchBytes;
private final long maxWaitMs;
// TODO: add more params like epoch etc.

public FetchParams(int replicaId, int maxFetchBytes) {
this(replicaId, true, maxFetchBytes);
this(replicaId, true, maxFetchBytes, DEFAULT_MIN_FETCH_BYTES, DEFAULT_MAX_WAIT_MS);
}

public FetchParams(int replicaId, int maxFetchBytes, int minFetchBytes, long maxWaitMs) {
this(replicaId, true, maxFetchBytes, minFetchBytes, maxWaitMs);
}

@VisibleForTesting
public FetchParams(int replicaId, boolean fetchOnlyLeader, int maxFetchBytes) {
public FetchParams(
int replicaId,
boolean fetchOnlyLeader,
int maxFetchBytes,
int minFetchBytes,
long maxWaitMs) {
this.replicaId = replicaId;
this.fetchOnlyLeader = fetchOnlyLeader;
this.maxFetchBytes = maxFetchBytes;
this.fetchIsolation = FetchIsolation.of(replicaId >= 0);
this.minOneMessage = true;
this.fetchOffset = -1;
this.minFetchBytes = minFetchBytes;
this.maxWaitMs = maxWaitMs;
}

public void setCurrentFetch(
Expand Down Expand Up @@ -118,6 +147,14 @@ public int maxFetchBytes() {
return maxFetchBytes;
}

public int minFetchBytes() {
return minFetchBytes;
}

public long maxWaitMs() {
return maxWaitMs;
}

public int replicaId() {
return replicaId;
}
Expand All @@ -143,12 +180,15 @@ public boolean equals(Object o) {
return false;
}
FetchParams that = (FetchParams) o;
return replicaId == that.replicaId && maxFetchBytes == that.maxFetchBytes;
return replicaId == that.replicaId
&& maxFetchBytes == that.maxFetchBytes
&& minFetchBytes == that.minFetchBytes
&& maxWaitMs == that.maxWaitMs;
}

@Override
public int hashCode() {
return Objects.hash(replicaId, maxFetchBytes);
return Objects.hash(replicaId, maxFetchBytes, minFetchBytes, maxWaitMs);
}

@Override
Expand All @@ -158,6 +198,10 @@ public String toString() {
+ replicaId
+ ", maxFetchBytes="
+ maxFetchBytes
+ ", minFetchBytes="
+ minFetchBytes
+ ", maxWaitMs="
+ maxWaitMs
+ ')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ public int getRelativePositionInSegment() {
return relativePositionInSegment;
}

public int positionDiff(LogOffsetMetadata that) {
if (messageOffsetOnly()) {
throw new FlussRuntimeException(
this
+ " cannot compare its message offset with "
+ that
+ " since it only has message offset info.");
}

if (!onSameSegment(that)) {
throw new FlussRuntimeException(
this
+ " is not on the same segment with "
+ that
+ " since they are not on the same segment.");
}

return this.relativePositionInSegment - that.relativePositionInSegment;
}

// check if this offset is already on an older segment compared with the given offset.
public boolean onOlderSegment(LogOffsetMetadata that) {
if (messageOffsetOnly()) {
Expand All @@ -85,6 +105,11 @@ public boolean messageOffsetOnly() {
&& relativePositionInSegment == UNKNOWN_FILE_POSITION;
}

// check if this offset is on the same segment with the given offset
private boolean onSameSegment(LogOffsetMetadata that) {
return this.segmentBaseOffset == that.segmentBaseOffset;
}

@Override
public String toString() {
return "(offset="
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.server.log;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

/** A snapshot of the different log offsets. */
public class LogOffsetSnapshot {
public final long logStartOffset;
public final long localLogStartOffset;
public final LogOffsetMetadata logEndOffset;
public final LogOffsetMetadata highWatermark;

public LogOffsetSnapshot(
long logStartOffset,
long localLogStartOffset,
LogOffsetMetadata logEndOffset,
LogOffsetMetadata highWatermark) {
this.logStartOffset = logStartOffset;
this.localLogStartOffset = localLogStartOffset;
this.logEndOffset = logEndOffset;
this.highWatermark = highWatermark;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

LogOffsetSnapshot that = (LogOffsetSnapshot) o;
return logStartOffset == that.logStartOffset
&& localLogStartOffset == that.localLogStartOffset
&& logEndOffset.equals(that.logEndOffset)
&& highWatermark.equals(that.highWatermark);
}

@Override
public int hashCode() {
int result = (int) (logStartOffset ^ (logStartOffset >>> 32));
result = 31 * result + (int) (localLogStartOffset ^ (localLogStartOffset >>> 32));
result = 31 * result + logEndOffset.hashCode();
result = 31 * result + highWatermark.hashCode();
return result;
}

@Override
public String toString() {
return "LogOffsetSnapshot("
+ "logStartOffset="
+ logStartOffset
+ ", localLogStartOffset="
+ localLogStartOffset
+ ", logEndOffset="
+ logEndOffset
+ ", highWatermark="
+ highWatermark
+ ')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,20 @@ public void deleteSegmentsAlreadyExistsInRemote() {
deleteSegments(remoteLogEndOffset);
}

/**
* Fully materialize and return an offset snapshot including segment position info. This method
* will update the LogOffsetMetadata for the high watermark if they are message-only. Throws an
* offset out of range error if the segment info cannot be loaded.
*/
public LogOffsetSnapshot fetchOffsetSnapshot() throws IOException {
LogOffsetMetadata highWatermark = fetchHighWatermarkMetadata();
return new LogOffsetSnapshot(
logStartOffset(),
localLogStartOffset(),
localLog.getLocalLogEndOffsetMetadata(),
highWatermark);
}

private void deleteSegments(long cleanUpToOffset) {
// cache to local variables
long localLogStartOffset = localLog.getLocalLogStartOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class TabletServerMetricGroup extends AbstractMetricGroup {
// ---- metrics ----
private final Counter replicationBytesIn;
private final Counter replicationBytesOut;
private final Counter delayedWriteExpireCount;
private final Counter delayedFetchLogFromFollowerExpireCount;
private final Counter delayedFetchLogFromClientExpireCount;

public TabletServerMetricGroup(
MetricRegistry registry, String clusterId, String hostname, int serverId) {
Expand All @@ -55,6 +58,17 @@ public TabletServerMetricGroup(
meter(MetricNames.REPLICATION_IN_RATE, new MeterView(replicationBytesIn));
replicationBytesOut = new ThreadSafeSimpleCounter();
meter(MetricNames.REPLICATION_OUT_RATE, new MeterView(replicationBytesOut));

delayedWriteExpireCount = new ThreadSafeSimpleCounter();
meter(MetricNames.DELAYED_WRITE_EXPIRATION_RATE, new MeterView(delayedWriteExpireCount));
delayedFetchLogFromFollowerExpireCount = new ThreadSafeSimpleCounter();
meter(
MetricNames.DELAYED_FETCH_LOG_FROM_FOLLOWER_EXPIRATION_RATE,
new MeterView(delayedFetchLogFromFollowerExpireCount));
delayedFetchLogFromClientExpireCount = new ThreadSafeSimpleCounter();
meter(
MetricNames.DELAYED_FETCH_LOG_FROM_CLIENT_EXPIRATION_RATE,
new MeterView(delayedFetchLogFromClientExpireCount));
}

@Override
Expand All @@ -77,6 +91,18 @@ public Counter replicationBytesOut() {
return replicationBytesOut;
}

public Counter delayedWriteExpireCount() {
return delayedWriteExpireCount;
}

public Counter delayedFetchLogFromFollowerExpireCount() {
return delayedFetchLogFromFollowerExpireCount;
}

public Counter delayedFetchLogFromClientExpireCount() {
return delayedFetchLogFromClientExpireCount;
}

// ------------------------------------------------------------------------
// table buckets groups
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 88105c4

Please sign in to comment.