Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testLeakAfterHistoryException() throws Exception {
// Depending on the Xmx value the leak may lead to OOM; if you definitely want to see the OOM
// increase the size of the configuration or the number of failed compactions.
Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds acceptable variance of 250MB.",
diffMem < 250_000_000);
diffMem < 450_000_000);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increase because of thread-local use in a cleaner task thread

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void run() {
for (TaskHandler cleanupHandler : cleanupHandlers) {
try {
CompactorUtil.checkInterrupt(CLASS_NAME);
List<Runnable> tasks = cleanupHandler.getTasks();
List<Runnable> tasks = cleanupHandler.getTasks(conf);
List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
for (Runnable task : tasks) {
CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ public class MetadataCache {

public MetadataCache(boolean isCacheEnabled) {
if (isCacheEnabled) {
metaCache = Caffeine.newBuilder().softValues().build();
metaCache = Caffeine.newBuilder()
.softValues()
.maximumSize(10_000)
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Objects.isNull;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;

/**
* Abort-cleanup based implementation of TaskHandler.
Expand All @@ -51,8 +52,7 @@ class AbortedTxnCleaner extends TaskHandler {
private static final Logger LOG = LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());

public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
MetadataCache metadataCache, boolean metricsEnabled,
FSRemover fsRemover) {
MetadataCache metadataCache, boolean metricsEnabled, FSRemover fsRemover) {
super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
}

Expand All @@ -73,23 +73,23 @@ a. Find the list of entries which are suitable for cleanup (This is done in {@li
e. Fetch the aborted write IDs from the AcidState and use it to delete the associated metadata in the TXN_COMPONENTS table.
**/
@Override
public List<Runnable> getTasks() throws MetaException {
int abortedThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
long abortedTimeThreshold = HiveConf
.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
TimeUnit.MILLISECONDS);
List<CompactionInfo> readyToCleanAborts = txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
public List<Runnable> getTasks(HiveConf conf) throws MetaException {
int abortedThreshold = HiveConf.getIntVar(conf, HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
long abortedTimeThreshold = HiveConf.getTimeVar(conf, HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
TimeUnit.MILLISECONDS);
List<CompactionInfo> readyToCleanAborts =
txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);

if (!readyToCleanAborts.isEmpty()) {
return readyToCleanAborts.stream().map(info -> ThrowingRunnable.unchecked(() ->
clean(info, info.minOpenWriteTxnId, metricsEnabled)))
.collect(Collectors.toList());
return readyToCleanAborts.stream()
.map(info -> ThrowingRunnable.unchecked(
() -> clean(info, metricsEnabled)))
.toList();
}
return Collections.emptyList();
}

private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEnabled) throws MetaException, InterruptedException {
private void clean(CompactionInfo info, boolean metricsEnabled) throws MetaException, InterruptedException {
LOG.info("Starting cleaning for {}", info);
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
Expand All @@ -98,7 +98,8 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(), cleanerMetric);
}
Partition p = null;
Table t = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
Table t = resolveTable(info);

if (isNull(t)) {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName());
Expand All @@ -109,25 +110,24 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
p = resolvePartition(info.dbname, info.tableName, info.partName);
if (isNull(p)) {
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition {}, assuming it was dropped.",
info.getFullPartitionName());
LOG.info("Unable to find partition {}, assuming it was dropped.", info.getFullPartitionName());
txnHandler.markCleaned(info);
return;
}
}

String location = CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
String location = CompactorUtil.resolveStorageDescriptor(t, p).getLocation();
info.runAs = TxnUtils.findUserToRunAs(location, t, getConf());
abortCleanUsingAcidDir(info, t, location);

} catch (InterruptedException e) {
LOG.error("Caught an interrupted exception when cleaning, unable to complete cleaning of {} due to {}", info,
e.getMessage());
e.getMessage());
handleCleanerAttemptFailure(info, e.getMessage());
throw e;
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info,
e.getMessage());
e.getMessage());
handleCleanerAttemptFailure(info, e.getMessage());
throw new MetaException(e.getMessage());
} finally {
Expand All @@ -137,29 +137,30 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna
}
}

private void abortCleanUsingAcidDir(CompactionInfo info, String location, long minOpenWriteTxn) throws Exception {
ValidTxnList validTxnList =
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenWriteTxn, true);
private void abortCleanUsingAcidDir(CompactionInfo info, Table table, String location) throws Exception {
ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(
getOpenTxns(), info.minOpenWriteTxnId, true);
//save it so that getAcidState() sees it
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());

// Creating 'reader' list since we are interested in the set of 'obsolete' files
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info, validTxnList);
LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);

// Set the highestWriteId of the cleanup equal to the min(minOpenWriteId - 1, highWatermark).
// This is necessary for looking at the complete state of the table till the min open write Id
// This is necessary for looking at the complete state of the table till the min open writeId
// (if there is an open txn on the table) or the highestWatermark.
// This is used later on while deleting the records in TXN_COMPONENTS table.
info.highestWriteId = Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1, validWriteIdList.getHighWatermark());
Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
info.highestWriteId = Math.min(
isNull(validWriteIdList.getMinOpenWriteId()) ?
Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1,
validWriteIdList.getHighWatermark());

boolean success = cleanAndVerifyObsoleteDirectories(info, location, validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
txnHandler.markCleaned(info);
} else {
LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
Expand All @@ -67,17 +66,16 @@ class CompactionCleaner extends TaskHandler {
private static final Logger LOG = LoggerFactory.getLogger(CompactionCleaner.class.getName());

public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
MetadataCache metadataCache, boolean metricsEnabled,
FSRemover fsRemover) {
MetadataCache metadataCache, boolean metricsEnabled, FSRemover fsRemover) {
super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
}

@Override
public List<Runnable> getTasks() throws MetaException {
public List<Runnable> getTasks(HiveConf conf) throws MetaException {
long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
: 0;
? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
: 0;
List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
if (!readyToClean.isEmpty()) {
long minTxnIdSeenOpen = Math.min(minOpenTxnId, txnHandler.findMinTxnIdSeenOpen());
Expand All @@ -87,20 +85,21 @@ public List<Runnable> getTasks() throws MetaException {
// to the clean method, to avoid cleaning up deltas needed for running queries
// when min_history_level is finally dropped, than every HMS will commit compaction the new way
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
return readyToClean.stream().map(ci -> {
long cleanerWaterMark = (ci.minOpenWriteId >= 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen;
LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, metricsEnabled));
}).collect(Collectors.toList());
return readyToClean.stream()
.map(ci -> ThrowingRunnable.unchecked(
() -> clean(ci, minTxnIdSeenOpen, metricsEnabled)))
.toList();
}
return Collections.emptyList();
}

private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) throws MetaException {
LOG.info("Starting cleaning for {}", ci);
LOG.info("Starting cleaning for {}, based on min open {}", ci,
(ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: " + minOpenTxn);

PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
(!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
(!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
try {
if (metricsEnabled) {
perfLogger.perfLogBegin(CompactionCleaner.class.getName(), cleanerMetric);
Expand All @@ -111,11 +110,12 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
Partition p = null;

if (isNull(location)) {
t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
t = resolveTable(ci);

if (isNull(t)) {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
idWatermark(ci));
idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
Expand All @@ -129,8 +129,8 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t
p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
if (isNull(p)) {
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition {}, assuming it was dropped. {}",
ci.getFullPartitionName(), idWatermark(ci));
LOG.info("Unable to find partition {}, assuming it was dropped. {}", ci.getFullPartitionName(),
idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
Expand All @@ -146,22 +146,23 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t

if (!isNull(t) || !isNull(ci.partName)) {
String path = isNull(location)
? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
: location;
? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
: location;
boolean dropPartition = !isNull(ci.partName) && isNull(p);

//check if partition wasn't re-created
if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, ci.partName))) {
cleanUsingLocation(ci, path, true);
} else {
cleanUsingAcidDir(ci, path, minOpenTxn);
long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : minOpenTxn;
cleanUsingAcidDir(ci, t, path, cleanerWaterMark);
}
} else {
cleanUsingLocation(ci, location, false);
}
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci,
e.getMessage());
e.getMessage());
if (metricsEnabled) {
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
}
Expand Down Expand Up @@ -203,19 +204,18 @@ private void cleanUsingLocation(CompactionInfo ci, String path, boolean requires
}
}

private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxn) throws Exception {
ValidTxnList validTxnList =
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn, false);
private void cleanUsingAcidDir(CompactionInfo ci, Table table, String location, long minOpenTxn) throws Exception {
ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(
getOpenTxns(), minOpenTxn, false);
//save it so that getAcidState() sees it
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
/*
* {@code validTxnList} is capped by minOpenTxnGLB so if
* {@code validTxnList} is capped by global minOpenTxn so if
* {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
* produced by a compactor, that means every reader that could be active right now see it
* as well. That means if this base/delta shadows some earlier base/delta, it will be
* as well. That means if this base/delta shadows some earlier base/delta, it will be
* used in favor of any files that it shadows. Thus, the shadowed files are safe to delete.
*
*
* The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
* above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
* See {@link TxnStore#markCleaned(CompactionInfo)} for details.
Expand All @@ -241,20 +241,20 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT

// Creating 'reader' list since we are interested in the set of 'obsolete' files
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);

boolean success = cleanAndVerifyObsoleteDirectories(ci, location, validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
txnHandler.markCleaned(ci);
} else {
txnHandler.clearCleanerStart(ci);
LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci);
LOG.warn("Leaving queue entry {} in ready for cleaning state.", ci);
}
}

private LockRequest createLockRequest(CompactionInfo ci) {
return CompactorUtil.createLockRequest(conf, ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
return CompactorUtil.createLockRequest(
getConf(), ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
}

private static String idWatermark(CompactionInfo ci) {
Expand All @@ -263,7 +263,7 @@ private static String idWatermark(CompactionInfo ci) {

@Override
protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
throws NoSuchTxnException, MetaException {
throws Exception {
ValidReaderWriteIdList validWriteIdList = super.getValidCleanerWriteIdList(ci, validTxnList);
/*
* We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
Expand All @@ -279,11 +279,14 @@ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, V
private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, String location) {
String strIfPurge = ci.getProperty("ifPurge");
boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));

Path obsoletePath = new Path(location);

return new CleanupRequestBuilder()
.setLocation(location).setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
.setRunAs(ci.runAs).setPurge(ifPurge).setObsoleteDirs(Collections.singletonList(obsoletePath))
.build();
.setLocation(location).setDbName(ci.dbname)
.setFullPartitionName(ci.getFullPartitionName())
.setRunAs(ci.runAs)
.setPurge(ifPurge)
.setObsoleteDirs(Collections.singletonList(obsoletePath))
.build();
}
}
Loading