Skip to content

Commit f85a117

Browse files
githubgxllguojn1
authored andcommitted
[fix][dingo-executor] Add back up safe point ignore heartbeat transaction
1 parent 3a9acbf commit f85a117

File tree

3 files changed

+34
-4
lines changed

3 files changed

+34
-4
lines changed

dingo-common/src/main/java/io/dingodb/common/mysql/scope/ScopeVariables.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,16 @@ public static int lookupBatchSize() {
244244
}
245245
}
246246

247+
// for DTS
248+
public static boolean ignoreHeartBeatTxn() {
249+
try {
250+
String autocommitSwitch = executorProp.getOrDefault("ignore_heartbeat_txn", "true").toString();
251+
return "true".equalsIgnoreCase(autocommitSwitch);
252+
} catch (Exception e) {
253+
return false;
254+
}
255+
}
256+
247257
public static synchronized void setExecutorProp(String key, String val) {
248258
if ("rpc_batch_size".equalsIgnoreCase(key)) {
249259
int rpcBatchSize = Integer.parseInt(val);

dingo-exec/src/main/java/io/dingodb/exec/transaction/impl/TransactionManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import lombok.extern.slf4j.Slf4j;
3232
import org.checkerframework.checker.nullness.qual.NonNull;
3333

34+
import java.util.List;
3435
import java.util.Map;
36+
import java.util.Set;
3537
import java.util.concurrent.ConcurrentHashMap;
3638
import java.util.concurrent.TimeUnit;
3739

@@ -206,4 +208,21 @@ public static Object getIndex(CommonId txnId, CommonId indexId) {
206208
public static long getMinTs() {
207209
return trans.keySet().stream().mapToLong(txnId -> txnId.seq).min().orElse(Long.MAX_VALUE);
208210
}
211+
212+
public static long getIgnoreHeartBeatMinTs() {
213+
long minTs = Long.MAX_VALUE;
214+
Set<CommonId> txnIds = trans.keySet();
215+
for (CommonId txnId : txnIds) {
216+
if (txnId.seq < minTs) {
217+
ITransaction iTransaction = trans.get(txnId);
218+
List<String> sqlList = iTransaction.getSqlList();
219+
if (!sqlList.isEmpty() && sqlList.stream().allMatch("select 1"::equals)) {
220+
LogUtils.info(log, "ignore heartbeat sql, txnId:{}", txnId);
221+
continue;
222+
}
223+
minTs = txnId.seq;
224+
}
225+
}
226+
return minTs;
227+
}
209228
}

dingo-store-proxy/src/main/java/io/dingodb/store/proxy/common/Gc.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.dingodb.common.config.DingoConfiguration;
2222
import io.dingodb.common.log.LogUtils;
2323
import io.dingodb.common.meta.Tenant;
24+
import io.dingodb.common.mysql.scope.ScopeVariables;
2425
import io.dingodb.common.session.Session;
2526
import io.dingodb.common.session.SessionUtil;
2627
import io.dingodb.common.tenant.TenantConstant;
@@ -110,7 +111,7 @@ public static Pair<String, Long> safePointUpdate() {
110111
LogUtils.info(log, "Run safe point update task.");
111112
Set<Location> coordinators = coordinatorSet();
112113
long reqTs = tso();
113-
long safeTs = safeTs(getTxnDurationSafeTs(reqTs));
114+
long safeTs = safeTs(getTxnDurationSafeTs(reqTs), false);
114115
List<Region> regions = getRegions(coordinators, reqTs);
115116
LogUtils.info(log, "Run safe point update task, current ts: {}, safe ts: {}", reqTs, safeTs);
116117
for (Region region : regions) {
@@ -228,7 +229,7 @@ public static GcObj startBackUpSafeByPoint(long point, long latestTso) {
228229
try {
229230
LogUtils.info(log, "Run back up safe point update task.");
230231
Set<Location> coordinators = coordinatorSet();
231-
long safeTs = safeTs(point);
232+
long safeTs = safeTs(point, ScopeVariables.ignoreHeartBeatTxn());
232233
List<Region> regions = getRegions(coordinators, latestTso);
233234
LogUtils.info(log, "Run back up safe point update task, current ts: {}, safe ts: {}",
234235
latestTso, safeTs);
@@ -450,7 +451,7 @@ private static long getTxnDurationSafeTs(long requestId) {
450451
return requestId - (TimeUnit.SECONDS.toMillis(duration) << GcApi.PHYSICAL_SHIFT);
451452
}
452453

453-
private static long safeTs(long safeTs) {
454+
private static long safeTs(long safeTs, boolean ignoreHeartBeat) {
454455
Integer retry = Optional.mapOrGet(
455456
DingoConfiguration.instance().find("retry", int.class),
456457
__ -> __,
@@ -469,7 +470,7 @@ private static long safeTs(long safeTs) {
469470
LogUtils.warn(log, "Cross node get remote min start ts failed, retry times: {}", retry, e);
470471
}
471472
}
472-
long localMinTs = TransactionManager.getMinTs();
473+
long localMinTs = ignoreHeartBeat ? TransactionManager.getIgnoreHeartBeatMinTs() : TransactionManager.getMinTs();
473474
long minTxnTs = Math.min(remoteMinStartTs, localMinTs);
474475

475476
return Math.min(minTxnTs, safeTs);

0 commit comments

Comments
 (0)