Skip to content

Commit 8898a33

Browse files
committed
Cache and reuse ValidTxnWriteIdList
1 parent 94874ff commit 8898a33

File tree

2 files changed

+48
-26
lines changed

2 files changed

+48
-26
lines changed

ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map.Entry;
3131
import java.util.Optional;
3232
import java.util.function.Function;
33+
import java.util.function.Predicate;
3334
import java.util.regex.Pattern;
3435
import org.antlr.runtime.ClassicToken;
3536
import org.antlr.runtime.CommonToken;
@@ -1700,6 +1701,11 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu
17001701
materializationValidator.getAutomaticRewritingValidationResult());
17011702
perfLogger.perfLogEnd(this.getClass().getName(), PerfLogger.VALIDATE_QUERY_MATERIALIZATION);
17021703

1704+
// this step primes the cache containing the validTxnWriteIdList. It will fetch
1705+
// all the tables into the MetaStore Client cache with one HMS call. These are
1706+
// uses within the partition prune rule.
1707+
callAndCacheValidTxnWriteIdList(calcitePlan);
1708+
17031709
// 2. Apply pre-join order optimizations
17041710
perfLogger.perfLogBegin(this.getClass().getName(), PerfLogger.PREJOIN_ORDERING);
17051711
calcitePlan = applyPreJoinOrderingTransforms(calcitePlan, mdProvider.getMetadataProvider(), executorProvider);
@@ -2454,15 +2460,20 @@ private RelNode applyPostJoinOrderingTransform(RelNode basePlan, RelMetadataProv
24542460
}
24552461

24562462
protected Set<TableName> getTablesUsed(RelNode plan) {
2463+
Predicate<Table> tableFilter = table -> AcidUtils.isTransactionalTable(table) ||
2464+
table.isNonNative() && table.getStorageHandler().areSnapshotsSupported();
2465+
return getTablesUsed(plan, tableFilter);
2466+
}
2467+
2468+
private Set<TableName> getTablesUsed(RelNode plan, Predicate<Table> tableFilter) {
24572469
Set<TableName> tablesUsed = new HashSet<>();
24582470
new RelVisitor() {
24592471
@Override
24602472
public void visit(RelNode node, int ordinal, RelNode parent) {
24612473
if (node instanceof TableScan) {
24622474
TableScan ts = (TableScan) node;
24632475
Table table = ((RelOptHiveTable) ts.getTable()).getHiveTableMD();
2464-
if (AcidUtils.isTransactionalTable(table) ||
2465-
table.isNonNative() && table.getStorageHandler().areSnapshotsSupported()) {
2476+
if (tableFilter.test(table)) {
24662477
tablesUsed.add(table.getFullTableName());
24672478
}
24682479
}
@@ -5112,9 +5123,21 @@ private ImmutableMap<String, Integer> buildHiveColNameToInputPosMap(
51125123
return hiveColNameToInputPosMapBuilder.build();
51135124
}
51145125

5115-
private QBParseInfo getQBParseInfo(QB qb) throws CalciteSemanticException {
5126+
private QBParseInfo getQBParseInfo(QB qb) {
51165127
return qb.getParseInfo();
51175128
}
5129+
5130+
private void callAndCacheValidTxnWriteIdList(RelNode relNode) {
5131+
var transactionalTables = getTablesUsed(relNode, AcidUtils::isTransactionalTable)
5132+
.stream()
5133+
.map(TableName::getNotEmptyDbTable)
5134+
.toList();
5135+
try {
5136+
getValidTxnWriteIdList(transactionalTables);
5137+
} catch (SemanticException e) {
5138+
throw new RuntimeException(e.getMessage(), e);
5139+
}
5140+
}
51185141
}
51195142

51205143
@Override

ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15166,30 +15166,29 @@ private String getQueryStringForCache(ASTNode ast) {
1516615166

1516715167
private ValidTxnWriteIdList getQueryValidTxnWriteIdList() throws SemanticException {
1516815168
// TODO: Once HIVE-18948 is in, should be able to retrieve writeIdList from the conf.
15169-
//cachedWriteIdList = AcidUtils.getValidTxnWriteIdList(conf);
15170-
//
15171-
List<String> transactionalTables = tablesFromReadEntities(inputs)
15172-
.stream()
15173-
.filter(AcidUtils::isTransactionalTable)
15174-
.map(Table::getFullyQualifiedName)
15175-
.collect(Collectors.toList());
15176-
15177-
if (transactionalTables.size() > 0) {
15178-
String txnString = queryState.getValidTxnList();
15179-
if (txnString == null) {
15180-
return null;
15181-
}
15182-
try {
15183-
return getTxnMgr().getValidWriteIds(transactionalTables, txnString);
15184-
} catch (Exception err) {
15185-
String msg = "Error while getting the txnWriteIdList for tables " + transactionalTables
15186-
+ " and validTxnList " + conf.get(ValidTxnList.VALID_TXNS_KEY);
15187-
throw new SemanticException(msg, err);
15188-
}
15189-
}
15169+
// cachedWriteIdList = AcidUtils.getValidTxnWriteIdList(conf);
15170+
var transactionalTables = tablesFromReadEntities(inputs)
15171+
.stream()
15172+
.filter(AcidUtils::isTransactionalTable)
15173+
.map(Table::getFullyQualifiedName)
15174+
.toList();
1519015175

15191-
// No transactional tables.
15192-
return null;
15176+
return getValidTxnWriteIdList(transactionalTables);
15177+
}
15178+
15179+
protected ValidTxnWriteIdList getValidTxnWriteIdList(List<String> transactionalTables)
15180+
throws SemanticException {
15181+
if (transactionalTables.isEmpty()) {
15182+
return null;
15183+
}
15184+
String txnString = queryState.getValidTxnList();
15185+
try {
15186+
return getTxnMgr().getValidWriteIds(transactionalTables, txnString);
15187+
} catch (Exception err) {
15188+
String msg = "Error while getting the txnWriteIdList for tables " + transactionalTables
15189+
+ " and validTxnList " + txnString;
15190+
throw new SemanticException(msg, err);
15191+
}
1519315192
}
1519415193

1519515194
private QueryResultsCache.LookupInfo createLookupInfoForQuery(ASTNode astNode) throws SemanticException {

0 commit comments

Comments
 (0)