Skip to content

Commit 3df3cbf

Browse files
committed
HIVE-28578: Concurrency issue in updateTableColumnStatistics
1 parent 937d100 commit 3df3cbf

File tree

1 file changed

+26
-22
lines changed
  • standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore

1 file changed

+26
-22
lines changed

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9517,6 +9517,9 @@ public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats
95179517
// So let's not use them anywhere unless absolutely necessary.
95189518
String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf);
95199519
MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(), statsDesc.getTableName());
9520+
lockForUpdate("TBLS", "TBL_ID", "\"TBL_ID\" = " + mTable.getId());
9521+
// Get the newest version of MTable that might have been updated
9522+
pm.refresh(mTable);
95209523
Table table = convertToTable(mTable);
95219524
List<String> colNames = new ArrayList<>();
95229525
for (ColumnStatisticsObj statsObj : statsObjs) {
@@ -9621,6 +9624,9 @@ public Map<String, String> updatePartitionColumnStatistics(Table table, MTable m
96219624
if (partition == null) {
96229625
throw new NoSuchObjectException("Partition for which stats is gathered doesn't exist.");
96239626
}
9627+
lockForUpdate("PARTITIONS", "PART_ID", "\"TBL_ID\" = " + mTable.getId()
9628+
+ " and \"PART_NAME\" = '" + statsDesc.getPartName() + "'");
9629+
pm.refresh(mPartition);
96249630

96259631
for (ColumnStatisticsObj statsObj : statsObjs) {
96269632
MPartitionColumnStatistics mStatsObj =
@@ -11448,36 +11454,34 @@ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, Stri
1144811454
return writeEventInfoList;
1144911455
}
1145011456

11451-
private void prepareQuotes() throws SQLException {
11452-
String s = dbType.getPrepareTxnStmt();
11453-
if (s != null) {
11454-
assert pm.currentTransaction().isActive();
11455-
JDOConnection jdoConn = pm.getDataStoreConnection();
11456-
try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) {
11457-
statement.execute(s);
11458-
} finally {
11459-
jdoConn.close();
11460-
}
11461-
}
11462-
}
11463-
11464-
private void lockNotificationSequenceForUpdate() throws MetaException {
11457+
private void lockForUpdate(String tableName, String column, String rowFilter)
11458+
throws MetaException {
1146511459
if (sqlGenerator.getDbProduct().isDERBY() && directSql != null) {
1146611460
// Derby doesn't allow FOR UPDATE to lock the row being selected (See https://db.apache
1146711461
// .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole table. Since there's
1146811462
// only one row in the table, this shouldn't cause any performance degradation.
1146911463
new RetryingExecutor(conf, () -> {
11470-
directSql.lockDbTable("NOTIFICATION_SEQUENCE");
11464+
directSql.lockDbTable(tableName);
1147111465
}).run();
1147211466
} else {
11473-
String selectQuery = "select \"NEXT_EVENT_ID\" from \"NOTIFICATION_SEQUENCE\"";
11467+
String selectQuery = "select " + (column == null ? "*" : "\"" + column + "\"") + " from \"" + tableName + "\"" +
11468+
(rowFilter == null ? "" : " where " + rowFilter);
1147411469
String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery);
1147511470
new RetryingExecutor(conf, () -> {
11476-
prepareQuotes();
11477-
try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", lockingQuery))) {
11478-
query.setUnique(true);
11479-
// only need to execute it to get db Lock
11480-
query.execute();
11471+
String txnStmt = dbType.getPrepareTxnStmt();
11472+
List<String> statements = new ArrayList<>();
11473+
if (txnStmt != null) {
11474+
statements.add(txnStmt);
11475+
}
11476+
statements.add(lockingQuery);
11477+
assert pm.currentTransaction().isActive();
11478+
JDOConnection jdoConn = pm.getDataStoreConnection();
11479+
try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) {
11480+
for (String s : statements) {
11481+
statement.execute(s);
11482+
}
11483+
} finally {
11484+
jdoConn.close();
1148111485
}
1148211486
}).run();
1148311487
}
@@ -11545,7 +11549,7 @@ public void addNotificationEvent(NotificationEvent entry) throws MetaException {
1154511549
try {
1154611550
pm.flush();
1154711551
openTransaction();
11548-
lockNotificationSequenceForUpdate();
11552+
lockForUpdate("NOTIFICATION_SEQUENCE", "NEXT_EVENT_ID", null);
1154911553
query = pm.newQuery(MNotificationNextId.class);
1155011554
Collection<MNotificationNextId> ids = (Collection) query.execute();
1155111555
MNotificationNextId mNotificationNextId = null;

0 commit comments

Comments
 (0)