Skip to content

Commit ae3be81

Browse files
committed
HIVE-29251: Hive ACID: HiveConf object shouldn't be shared between multiple cleanup tasks
1 parent 329ce88 commit ae3be81

File tree

3 files changed

+106
-75
lines changed

3 files changed

+106
-75
lines changed

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.stream.Collectors;
4242

4343
import static java.util.Objects.isNull;
44+
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD;
45+
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
4446

4547
/**
4648
* Abort-cleanup based implementation of TaskHandler.
@@ -51,8 +53,7 @@ class AbortedTxnCleaner extends TaskHandler {
5153
private static final Logger LOG = LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
5254

5355
public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
54-
MetadataCache metadataCache, boolean metricsEnabled,
55-
FSRemover fsRemover) {
56+
MetadataCache metadataCache, boolean metricsEnabled, FSRemover fsRemover) {
5657
super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
5758
}
5859

@@ -74,17 +75,17 @@ a. Find the list of entries which are suitable for cleanup (This is done in {@li
7475
**/
7576
@Override
7677
public List<Runnable> getTasks() throws MetaException {
77-
int abortedThreshold = HiveConf.getIntVar(conf,
78-
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
79-
long abortedTimeThreshold = HiveConf
80-
.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
81-
TimeUnit.MILLISECONDS);
82-
List<CompactionInfo> readyToCleanAborts = txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
78+
int abortedThreshold = HiveConf.getIntVar(getConf(), HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
79+
long abortedTimeThreshold = HiveConf.getTimeVar(getConf(), HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
80+
TimeUnit.MILLISECONDS);
81+
List<CompactionInfo> readyToCleanAborts =
82+
txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
8383

8484
if (!readyToCleanAborts.isEmpty()) {
85-
return readyToCleanAborts.stream().map(info -> ThrowingRunnable.unchecked(() ->
86-
clean(info, info.minOpenWriteTxnId, metricsEnabled)))
87-
.collect(Collectors.toList());
85+
return readyToCleanAborts.stream()
86+
.map(info -> ThrowingRunnable.unchecked(
87+
() -> clean(info, info.minOpenWriteTxnId, metricsEnabled)))
88+
.collect(Collectors.toList());
8889
}
8990
return Collections.emptyList();
9091
}
@@ -98,7 +99,9 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
9899
perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(), cleanerMetric);
99100
}
100101
Partition p = null;
101-
Table t = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
102+
Table t = metadataCache.computeIfAbsent(info.getFullTableName(),
103+
() -> resolveTable(info.dbname, info.tableName));
104+
102105
if (isNull(t)) {
103106
// The table was dropped before we got around to cleaning it.
104107
LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName());
@@ -109,25 +112,24 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
109112
p = resolvePartition(info.dbname, info.tableName, info.partName);
110113
if (isNull(p)) {
111114
// The partition was dropped before we got around to cleaning it.
112-
LOG.info("Unable to find partition {}, assuming it was dropped.",
113-
info.getFullPartitionName());
115+
LOG.info("Unable to find partition {}, assuming it was dropped.", info.getFullPartitionName());
114116
txnHandler.markCleaned(info);
115117
return;
116118
}
117119
}
118120

119-
String location = CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
120-
info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
121+
String location = CompactorUtil.resolveStorageDescriptor(t, p).getLocation();
122+
info.runAs = TxnUtils.findUserToRunAs(location, t, getConf());
121123
abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
122124

123125
} catch (InterruptedException e) {
124126
LOG.error("Caught an interrupted exception when cleaning, unable to complete cleaning of {} due to {}", info,
125-
e.getMessage());
127+
e.getMessage());
126128
handleCleanerAttemptFailure(info, e.getMessage());
127129
throw e;
128130
} catch (Exception e) {
129131
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info,
130-
e.getMessage());
132+
e.getMessage());
131133
handleCleanerAttemptFailure(info, e.getMessage());
132134
throw new MetaException(e.getMessage());
133135
} finally {
@@ -139,19 +141,23 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
139141

140142
private void abortCleanUsingAcidDir(CompactionInfo info, String location, long minOpenWriteTxn) throws Exception {
141143
ValidTxnList validTxnList =
142-
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenWriteTxn, true);
144+
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenWriteTxn, true);
143145
//save it so that getAcidState() sees it
144-
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
146+
getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
145147

146148
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info, validTxnList);
147149

148150
// Set the highestWriteId of the cleanup equal to the min(minOpenWriteId - 1, highWatermark).
149151
// This is necessary for looking at the complete state of the table till the min open write Id
150152
// (if there is an open txn on the table) or the highestWatermark.
151153
// This is used later on while deleting the records in TXN_COMPONENTS table.
152-
info.highestWriteId = Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
153-
Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1, validWriteIdList.getHighWatermark());
154-
Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
154+
info.highestWriteId = Math.min(
155+
isNull(validWriteIdList.getMinOpenWriteId()) ?
156+
Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1,
157+
validWriteIdList.getHighWatermark());
158+
159+
Table table = metadataCache.computeIfAbsent(info.getFullTableName(),
160+
() -> resolveTable(info.dbname, info.tableName));
155161
LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
156162

157163
boolean success = cleanAndVerifyObsoleteDirectories(info, location, validWriteIdList, table);
@@ -160,6 +166,5 @@ private void abortCleanUsingAcidDir(CompactionInfo info, String location, long m
160166
} else {
161167
LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
162168
}
163-
164169
}
165170
}

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,16 @@ class CompactionCleaner extends TaskHandler {
6767
private static final Logger LOG = LoggerFactory.getLogger(CompactionCleaner.class.getName());
6868

6969
public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
70-
MetadataCache metadataCache, boolean metricsEnabled,
71-
FSRemover fsRemover) {
70+
MetadataCache metadataCache, boolean metricsEnabled, FSRemover fsRemover) {
7271
super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
7372
}
7473

7574
@Override
7675
public List<Runnable> getTasks() throws MetaException {
7776
long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
78-
long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
79-
? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
80-
: 0;
77+
long retentionTime = HiveConf.getBoolVar(getConf(), HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
78+
? HiveConf.getTimeVar(getConf(), HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
79+
: 0;
8180
List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
8281
if (!readyToClean.isEmpty()) {
8382
long minTxnIdSeenOpen = Math.min(minOpenTxnId, txnHandler.findMinTxnIdSeenOpen());
@@ -88,19 +87,18 @@ public List<Runnable> getTasks() throws MetaException {
8887
// when min_history_level is finally dropped, than every HMS will commit compaction the new way
8988
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
9089
return readyToClean.stream().map(ci -> {
91-
long cleanerWaterMark = (ci.minOpenWriteId >= 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen;
92-
LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
90+
long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen;
9391
return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, metricsEnabled));
9492
}).collect(Collectors.toList());
9593
}
9694
return Collections.emptyList();
9795
}
9896

9997
private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) throws MetaException {
100-
LOG.info("Starting cleaning for {}", ci);
98+
LOG.info("Starting cleaning for {} based on min open txn id: {}", ci, minOpenTxn);
10199
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
102100
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
103-
(!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
101+
(!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
104102
try {
105103
if (metricsEnabled) {
106104
perfLogger.perfLogBegin(CompactionCleaner.class.getName(), cleanerMetric);
@@ -111,11 +109,13 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
111109
Partition p = null;
112110

113111
if (isNull(location)) {
114-
t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
112+
t = metadataCache.computeIfAbsent(ci.getFullTableName(),
113+
() -> resolveTable(ci.dbname, ci.tableName));
114+
115115
if (isNull(t)) {
116116
// The table was dropped before we got around to cleaning it.
117117
LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
118-
idWatermark(ci));
118+
idWatermark(ci));
119119
txnHandler.markCleaned(ci);
120120
return;
121121
}
@@ -129,8 +129,8 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
129129
p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
130130
if (isNull(p)) {
131131
// The partition was dropped before we got around to cleaning it.
132-
LOG.info("Unable to find partition {}, assuming it was dropped. {}",
133-
ci.getFullPartitionName(), idWatermark(ci));
132+
LOG.info("Unable to find partition {}, assuming it was dropped. {}", ci.getFullPartitionName(),
133+
idWatermark(ci));
134134
txnHandler.markCleaned(ci);
135135
return;
136136
}
@@ -146,8 +146,8 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
146146

147147
if (!isNull(t) || !isNull(ci.partName)) {
148148
String path = isNull(location)
149-
? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
150-
: location;
149+
? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
150+
: location;
151151
boolean dropPartition = !isNull(ci.partName) && isNull(p);
152152

153153
//check if partition wasn't re-created
@@ -161,7 +161,7 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
161161
}
162162
} catch (Exception e) {
163163
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci,
164-
e.getMessage());
164+
e.getMessage());
165165
if (metricsEnabled) {
166166
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
167167
}
@@ -205,9 +205,9 @@ private void cleanUsingLocation(CompactionInfo ci, String path, boolean requires
205205

206206
private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxn) throws Exception {
207207
ValidTxnList validTxnList =
208-
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn, false);
208+
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn, false);
209209
//save it so that getAcidState() sees it
210-
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
210+
getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
211211
/*
212212
* {@code validTxnList} is capped by minOpenTxnGLB so if
213213
* {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
@@ -241,7 +241,8 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT
241241

242242
// Creating 'reader' list since we are interested in the set of 'obsolete' files
243243
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
244-
Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
244+
Table table = metadataCache.computeIfAbsent(ci.getFullTableName(),
245+
() -> resolveTable(ci.dbname, ci.tableName));
245246
LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
246247

247248
boolean success = cleanAndVerifyObsoleteDirectories(ci, location, validWriteIdList, table);
@@ -254,7 +255,8 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT
254255
}
255256

256257
private LockRequest createLockRequest(CompactionInfo ci) {
257-
return CompactorUtil.createLockRequest(conf, ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
258+
return CompactorUtil.createLockRequest(
259+
getConf(), ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
258260
}
259261

260262
private static String idWatermark(CompactionInfo ci) {
@@ -263,7 +265,7 @@ private static String idWatermark(CompactionInfo ci) {
263265

264266
@Override
265267
protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
266-
throws NoSuchTxnException, MetaException {
268+
throws NoSuchTxnException, MetaException {
267269
ValidReaderWriteIdList validWriteIdList = super.getValidCleanerWriteIdList(ci, validTxnList);
268270
/*
269271
* We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
@@ -279,11 +281,14 @@ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, V
279281
private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, String location) {
280282
String strIfPurge = ci.getProperty("ifPurge");
281283
boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
282-
283284
Path obsoletePath = new Path(location);
285+
284286
return new CleanupRequestBuilder()
285-
.setLocation(location).setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
286-
.setRunAs(ci.runAs).setPurge(ifPurge).setObsoleteDirs(Collections.singletonList(obsoletePath))
287-
.build();
287+
.setLocation(location).setDbName(ci.dbname)
288+
.setFullPartitionName(ci.getFullPartitionName())
289+
.setRunAs(ci.runAs)
290+
.setPurge(ifPurge)
291+
.setObsoleteDirs(Collections.singletonList(obsoletePath))
292+
.build();
288293
}
289294
}

0 commit comments

Comments
 (0)