Skip to content

Commit e718c0b

Browse files
authored
[fix](nereids) fix sql cache bug and some tests (#46443)
1. use retry to fix unstable test `colocate_union_numbers`, `prune_bucket_with_bucket_shuffle_join` 2. fix failed test `explain`, this bug only exists in master branch, introduced by #40202 3. fix sql cache bug which use stale cache after drop table and create(table id changed), test in `parse_sql_from_sql_cache`, introduced by #33262 4. regression test add `foreachFrontends`, `foreachBackends`, `retry` function
1 parent 4f3b9bb commit e718c0b

File tree

9 files changed

+898
-834
lines changed

9 files changed

+898
-834
lines changed

fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java

+45-17
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.catalog.OlapTable;
2424
import org.apache.doris.catalog.Partition;
2525
import org.apache.doris.catalog.TableIf;
26+
import org.apache.doris.catalog.TableIf.TableType;
2627
import org.apache.doris.catalog.View;
2728
import org.apache.doris.common.Config;
2829
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
@@ -39,6 +40,7 @@
3940
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
4041
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
4142
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
43+
import org.apache.doris.nereids.SqlCacheContext.TableVersion;
4244
import org.apache.doris.nereids.StatementContext;
4345
import org.apache.doris.nereids.analyzer.UnboundVariable;
4446
import org.apache.doris.nereids.parser.NereidsParser;
@@ -199,14 +201,14 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
199201
.getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5));
200202

201203
if (sqlCacheContextWithVariable != null) {
202-
return tryParseSqlWithoutCheckVariable(
203-
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity
204+
return tryParseSql(
205+
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity, true
204206
);
205207
} else {
206208
return Optional.empty();
207209
}
208210
} else {
209-
return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity);
211+
return tryParseSql(connectContext, key, sqlCacheContext, currentUserIdentity, false);
210212
}
211213
}
212214

@@ -223,9 +225,9 @@ private String normalizeSql(String sql) {
223225
return NereidsParser.removeCommentAndTrimBlank(sql);
224226
}
225227

226-
private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
227-
ConnectContext connectContext, String key,
228-
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
228+
private Optional<LogicalSqlCache> tryParseSql(
229+
ConnectContext connectContext, String key, SqlCacheContext sqlCacheContext,
230+
UserIdentity currentUserIdentity, boolean checkUserVariable) {
229231
Env env = connectContext.getEnv();
230232

231233
if (!tryLockTables(connectContext, env, sqlCacheContext)) {
@@ -259,8 +261,12 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
259261
try {
260262
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();
261263

262-
List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
263-
boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext);
264+
List<Variable> currentVariables = ImmutableList.of();
265+
if (checkUserVariable) {
266+
currentVariables = resolveUserVariables(sqlCacheContext);
267+
}
268+
boolean usedVariablesChanged
269+
= checkUserVariable && usedVariablesChanged(currentVariables, sqlCacheContext);
264270
if (resultSetInFe.isPresent() && !usedVariablesChanged) {
265271
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);
266272

@@ -274,9 +280,15 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
274280
}
275281

276282
Status status = new Status();
277-
PUniqueId cacheKeyMd5 = usedVariablesChanged
278-
? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables))
279-
: sqlCacheContext.getOrComputeCacheKeyMd5();
283+
284+
PUniqueId cacheKeyMd5;
285+
if (usedVariablesChanged) {
286+
invalidateCache(key);
287+
cacheKeyMd5 = sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables));
288+
} else {
289+
cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
290+
}
291+
280292
InternalService.PFetchCacheResult cacheData =
281293
SqlCache.getCacheData(sqlCacheContext.getCacheProxy(),
282294
cacheKeyMd5, sqlCacheContext.getLatestPartitionId(),
@@ -308,20 +320,36 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
308320
return true;
309321
}
310322

311-
for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
312-
FullTableName fullTableName = scanTable.fullTableName;
313-
TableIf tableIf = findTableIf(env, fullTableName);
314-
if (!(tableIf instanceof OlapTable)) {
323+
// the query maybe scan empty partition of the table, we should check these table version too,
324+
// but the table not exists in sqlCacheContext.getScanTables(), so we need check here.
325+
// check table type and version
326+
for (Entry<FullTableName, TableVersion> scanTable : sqlCacheContext.getUsedTables().entrySet()) {
327+
TableVersion tableVersion = scanTable.getValue();
328+
if (tableVersion.type != TableType.OLAP) {
329+
return true;
330+
}
331+
TableIf tableIf = findTableIf(env, scanTable.getKey());
332+
if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) {
315333
return true;
316334
}
335+
317336
OlapTable olapTable = (OlapTable) tableIf;
318337
long currentTableVersion = olapTable.getVisibleVersion();
319-
long cacheTableVersion = scanTable.latestVersion;
338+
long cacheTableVersion = tableVersion.version;
320339
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
321340
if (currentTableVersion != cacheTableVersion) {
322341
return true;
323342
}
343+
}
324344

345+
// check partition version
346+
for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
347+
FullTableName fullTableName = scanTable.fullTableName;
348+
TableIf tableIf = findTableIf(env, fullTableName);
349+
if (!(tableIf instanceof OlapTable)) {
350+
return true;
351+
}
352+
OlapTable olapTable = (OlapTable) tableIf;
325353
Collection<Long> partitionIds = scanTable.getScanPartitions();
326354
olapTable.getVersionInBatchForCloudMode(partitionIds);
327355

@@ -392,7 +420,7 @@ private boolean dataMaskPoliciesChanged(
392420
*/
393421
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
394422
StatementContext currentStatementContext = connectContext.getStatementContext();
395-
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
423+
for (FullTableName fullTableName : sqlCacheContext.getUsedTables().keySet()) {
396424
TableIf tableIf = findTableIf(env, fullTableName);
397425
if (tableIf == null) {
398426
return false;

fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java

+23-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.apache.doris.analysis.Expr;
2121
import org.apache.doris.analysis.UserIdentity;
2222
import org.apache.doris.catalog.DatabaseIf;
23+
import org.apache.doris.catalog.OlapTable;
2324
import org.apache.doris.catalog.TableIf;
25+
import org.apache.doris.catalog.TableIf.TableType;
2426
import org.apache.doris.common.Pair;
2527
import org.apache.doris.datasource.CatalogIf;
2628
import org.apache.doris.mysql.FieldInfo;
@@ -42,6 +44,7 @@
4244
import com.google.common.collect.Maps;
4345
import com.google.common.collect.Sets;
4446

47+
import java.util.Collections;
4548
import java.util.List;
4649
import java.util.Locale;
4750
import java.util.Map;
@@ -62,7 +65,8 @@ public class SqlCacheContext {
6265
private volatile long latestPartitionTime = -1;
6366
private volatile long latestPartitionVersion = -1;
6467
private volatile long sumOfPartitionNum = -1;
65-
private final Set<FullTableName> usedTables = Sets.newLinkedHashSet();
68+
// value: version of table
69+
private final Map<FullTableName, TableVersion> usedTables = Maps.newLinkedHashMap();
6670
// value: ddl sql
6771
private final Map<FullTableName, String> usedViews = Maps.newLinkedHashMap();
6872
// value: usedColumns
@@ -136,8 +140,13 @@ public synchronized void addUsedTable(TableIf tableIf) {
136140
return;
137141
}
138142

139-
usedTables.add(
140-
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName())
143+
usedTables.put(
144+
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()),
145+
new TableVersion(
146+
tableIf.getId(),
147+
tableIf instanceof OlapTable ? ((OlapTable) tableIf).getVisibleVersion() : 0L,
148+
tableIf.getType()
149+
)
141150
);
142151
}
143152

@@ -283,8 +292,8 @@ public void setCacheProxy(CacheProxy cacheProxy) {
283292
this.cacheProxy = cacheProxy;
284293
}
285294

286-
public Set<FullTableName> getUsedTables() {
287-
return ImmutableSet.copyOf(usedTables);
295+
public Map<FullTableName, TableVersion> getUsedTables() {
296+
return Collections.unmodifiableMap(usedTables);
288297
}
289298

290299
public Map<FullTableName, String> getUsedViews() {
@@ -459,6 +468,15 @@ public void addScanPartition(Long partitionId) {
459468
}
460469
}
461470

471+
/** TableVersion */
472+
@lombok.Data
473+
@lombok.AllArgsConstructor
474+
public static class TableVersion {
475+
public final long id;
476+
public final long version;
477+
public final TableType type;
478+
}
479+
462480
/** CacheKeyType */
463481
public enum CacheKeyType {
464482
// use `userIdentity`:`sql`.trim() as Cache key in FE

fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public StatementContext(ConnectContext connectContext, OriginStatement originSta
235235
this.sqlCacheContext = new SqlCacheContext(
236236
connectContext.getCurrentUserIdentity(), connectContext.queryId());
237237
if (originStatement != null) {
238-
this.sqlCacheContext.setOriginSql(originStatement.originStmt.trim());
238+
this.sqlCacheContext.setOriginSql(originStatement.originStmt);
239239
}
240240
} else {
241241
this.sqlCacheContext = null;

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ public List<Plan> getAncestors() {
228228
}
229229

230230
public void updateActualRowCount(long actualRowCount) {
231-
statistics.setActualRowCount(actualRowCount);
231+
if (statistics != null) {
232+
statistics.setActualRowCount(actualRowCount);
233+
}
232234
}
233235
}

regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy

+32-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.google.common.collect.Maps
2525
import com.google.common.util.concurrent.Futures
2626
import com.google.common.util.concurrent.ListenableFuture
2727
import com.google.common.util.concurrent.MoreExecutors
28+
import com.google.common.util.concurrent.Uninterruptibles
2829
import com.google.gson.Gson
2930
import groovy.json.JsonOutput
3031
import groovy.json.JsonSlurper
@@ -37,7 +38,6 @@ import org.apache.doris.regression.RegressionTest
3738
import org.apache.doris.regression.action.BenchmarkAction
3839
import org.apache.doris.regression.action.ProfileAction
3940
import org.apache.doris.regression.action.WaitForAction
40-
import org.apache.doris.regression.util.DataUtils
4141
import org.apache.doris.regression.util.OutputUtils
4242
import org.apache.doris.regression.action.CreateMVAction
4343
import org.apache.doris.regression.action.ExplainAction
@@ -59,13 +59,7 @@ import org.slf4j.Logger
5959
import org.slf4j.LoggerFactory
6060

6161
import java.sql.Connection
62-
import java.io.File
63-
import java.math.BigDecimal;
64-
import java.sql.PreparedStatement
65-
import java.sql.ResultSetMetaData
66-
import java.util.Map;
6762
import java.util.concurrent.Callable
68-
import java.util.concurrent.ExecutorService
6963
import java.util.concurrent.Executors
7064
import java.util.concurrent.Future
7165
import java.util.concurrent.ThreadFactory
@@ -701,6 +695,23 @@ class Suite implements GroovyInterceptable {
701695
return sql
702696
}
703697

698+
<T> T retry(int executeTimes = 3, int intervalMillis = 1000, Closure<Integer> closure) {
699+
Throwable throwable = null
700+
for (int i = 1; i <= executeTimes; ++i) {
701+
try {
702+
return closure(i) as T
703+
} catch (Throwable t) {
704+
logger.warn("Retry failed: $t", t)
705+
throwable = t
706+
Uninterruptibles.sleepUninterruptibly(intervalMillis, TimeUnit.MILLISECONDS)
707+
}
708+
}
709+
if (throwable != null) {
710+
throw throwable
711+
}
712+
return null
713+
}
714+
704715
void explain(Closure actionSupplier) {
705716
if (context.useArrowFlightSql()) {
706717
runAction(new ExplainAction(context, "ARROW_FLIGHT_SQL"), actionSupplier)
@@ -1073,6 +1084,20 @@ class Suite implements GroovyInterceptable {
10731084
}
10741085
}
10751086

1087+
void foreachFrontends(Closure action) {
1088+
def rows = sql_return_maparray("show frontends")
1089+
for (def row in rows) {
1090+
action(row)
1091+
}
1092+
}
1093+
1094+
void foreachBackends(Closure action) {
1095+
def rows = sql_return_maparray("show backends")
1096+
for (def row in rows) {
1097+
action(row)
1098+
}
1099+
}
1100+
10761101
List<String> getFrontendIpHttpPort() {
10771102
return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort };
10781103
}

0 commit comments

Comments
 (0)