diff --git a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java index 2667a83d43dc..f40e3d75c903 100644 --- a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java +++ b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java @@ -173,7 +173,7 @@ public void testLeakAfterHistoryException() throws Exception { // Depending on the Xmx value the leak may lead to OOM; if you definitely want to see the OOM // increase the size of the configuration or the number of failed compactions. Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds acceptable variance of 250MB.", - diffMem < 250_000_000); + diffMem < 450_000_000); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 6126f150e3ae..b155e600dc89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -91,7 +91,7 @@ public void run() { for (TaskHandler cleanupHandler : cleanupHandlers) { try { CompactorUtil.checkInterrupt(CLASS_NAME); - List tasks = cleanupHandler.getTasks(); + List tasks = cleanupHandler.getTasks(conf); List> asyncTasks = new ArrayList<>(); for (Runnable task : tasks) { CompletableFuture asyncTask = CompletableFuture.runAsync( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java index d3c7e1d64ad8..8898ac46c369 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java @@ -29,7 +29,10 @@ public class MetadataCache { public MetadataCache(boolean isCacheEnabled) { if (isCacheEnabled) { - metaCache = Caffeine.newBuilder().softValues().build(); + metaCache = Caffeine.newBuilder() + .softValues() + .maximumSize(10_000) + .build(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java index 2314ce4d2e4b..9ad9191e144c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java @@ -38,9 +38,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static java.util.Objects.isNull; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD; /** * Abort-cleanup based implementation of TaskHandler. @@ -51,8 +52,7 @@ class AbortedTxnCleaner extends TaskHandler { private static final Logger LOG = LoggerFactory.getLogger(AbortedTxnCleaner.class.getName()); public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler, - MetadataCache metadataCache, boolean metricsEnabled, - FSRemover fsRemover) { + MetadataCache metadataCache, boolean metricsEnabled, FSRemover fsRemover) { super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover); } @@ -73,23 +73,23 @@ a. Find the list of entries which are suitable for cleanup (This is done in {@li e. Fetch the aborted write IDs from the AcidState and use it to delete the associated metadata in the TXN_COMPONENTS table. **/ @Override - public List getTasks() throws MetaException { - int abortedThreshold = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); - long abortedTimeThreshold = HiveConf - .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, - TimeUnit.MILLISECONDS); - List readyToCleanAborts = txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold); + public List getTasks(HiveConf conf) throws MetaException { + int abortedThreshold = HiveConf.getIntVar(conf, HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); + long abortedTimeThreshold = HiveConf.getTimeVar(conf, HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, + TimeUnit.MILLISECONDS); + List readyToCleanAborts = + txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold); if (!readyToCleanAborts.isEmpty()) { - return readyToCleanAborts.stream().map(info -> ThrowingRunnable.unchecked(() -> - clean(info, info.minOpenWriteTxnId, metricsEnabled))) - .collect(Collectors.toList()); + return readyToCleanAborts.stream() + .map(info -> ThrowingRunnable.unchecked( + () -> clean(info, metricsEnabled))) + .toList(); } return Collections.emptyList(); } - private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEnabled) throws MetaException, InterruptedException { + private void clean(CompactionInfo info, boolean metricsEnabled) throws MetaException, InterruptedException { LOG.info("Starting cleaning for {}", info); PerfLogger perfLogger = PerfLogger.getPerfLogger(false); String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_"; @@ -98,7 +98,8 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(), cleanerMetric); } Partition p = null; - Table t = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName)); + Table t = resolveTable(info); + if (isNull(t)) { // The table was dropped before we got around to cleaning it. LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName()); @@ -109,25 +110,24 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna p = resolvePartition(info.dbname, info.tableName, info.partName); if (isNull(p)) { // The partition was dropped before we got around to cleaning it. - LOG.info("Unable to find partition {}, assuming it was dropped.", - info.getFullPartitionName()); + LOG.info("Unable to find partition {}, assuming it was dropped.", info.getFullPartitionName()); txnHandler.markCleaned(info); return; } } - String location = CompactorUtil.resolveStorageDescriptor(t,p).getLocation(); - info.runAs = TxnUtils.findUserToRunAs(location, t, conf); - abortCleanUsingAcidDir(info, location, minOpenWriteTxn); + String location = CompactorUtil.resolveStorageDescriptor(t, p).getLocation(); + info.runAs = TxnUtils.findUserToRunAs(location, t, getConf()); + abortCleanUsingAcidDir(info, t, location); } catch (InterruptedException e) { LOG.error("Caught an interrupted exception when cleaning, unable to complete cleaning of {} due to {}", info, - e.getMessage()); + e.getMessage()); handleCleanerAttemptFailure(info, e.getMessage()); throw e; } catch (Exception e) { LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info, - e.getMessage()); + e.getMessage()); handleCleanerAttemptFailure(info, e.getMessage()); throw new MetaException(e.getMessage()); } finally { @@ -137,22 +137,24 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEna } } - private void abortCleanUsingAcidDir(CompactionInfo info, String location, long minOpenWriteTxn) throws Exception { - ValidTxnList validTxnList = - TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenWriteTxn, true); + private void abortCleanUsingAcidDir(CompactionInfo info, Table table, String location) throws Exception { + ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner( + getOpenTxns(), info.minOpenWriteTxnId, true); //save it so that getAcidState() sees it - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + // Creating 'reader' list since we are interested in the set of 'obsolete' files ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info, validTxnList); + LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList); // Set the highestWriteId of the cleanup equal to the min(minOpenWriteId - 1, highWatermark). - // This is necessary for looking at the complete state of the table till the min open write Id + // This is necessary for looking at the complete state of the table till the min open writeId // (if there is an open txn on the table) or the highestWatermark. // This is used later on while deleting the records in TXN_COMPONENTS table. - info.highestWriteId = Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ? - Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1, validWriteIdList.getHighWatermark()); - Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName)); - LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList); + info.highestWriteId = Math.min( + isNull(validWriteIdList.getMinOpenWriteId()) ? + Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1, + validWriteIdList.getHighWatermark()); boolean success = cleanAndVerifyObsoleteDirectories(info, location, validWriteIdList, table); if (success || CompactorUtil.isDynPartAbort(table, info.partName)) { @@ -160,6 +162,5 @@ private void abortCleanUsingAcidDir(CompactionInfo info, String location, long m } else { LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info); } - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java index 634da542717c..fd49bbb90f13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java @@ -52,7 +52,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED; @@ -67,17 +66,16 @@ class CompactionCleaner extends TaskHandler { private static final Logger LOG = LoggerFactory.getLogger(CompactionCleaner.class.getName()); public CompactionCleaner(HiveConf conf, TxnStore txnHandler, - MetadataCache metadataCache, boolean metricsEnabled, - FSRemover fsRemover) { + MetadataCache metadataCache, boolean metricsEnabled, FSRemover fsRemover) { super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover); } @Override - public List getTasks() throws MetaException { + public List getTasks(HiveConf conf) throws MetaException { long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED) - ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS) - : 0; + ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS) + : 0; List readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime); if (!readyToClean.isEmpty()) { long minTxnIdSeenOpen = Math.min(minOpenTxnId, txnHandler.findMinTxnIdSeenOpen()); @@ -87,20 +85,21 @@ public List getTasks() throws MetaException { // to the clean method, to avoid cleaning up deltas needed for running queries // when min_history_level is finally dropped, than every HMS will commit compaction the new way // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead. - return readyToClean.stream().map(ci -> { - long cleanerWaterMark = (ci.minOpenWriteId >= 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen; - LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark); - return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, metricsEnabled)); - }).collect(Collectors.toList()); + return readyToClean.stream() + .map(ci -> ThrowingRunnable.unchecked( + () -> clean(ci, minTxnIdSeenOpen, metricsEnabled))) + .toList(); } return Collections.emptyList(); } private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) throws MetaException { - LOG.info("Starting cleaning for {}", ci); + LOG.info("Starting cleaning for {}, based on min open {}", ci, + (ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: " + minOpenTxn); + PerfLogger perfLogger = PerfLogger.getPerfLogger(false); String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" + - (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null); + (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null); try { if (metricsEnabled) { perfLogger.perfLogBegin(CompactionCleaner.class.getName(), cleanerMetric); @@ -111,11 +110,12 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t Partition p = null; if (isNull(location)) { - t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName)); + t = resolveTable(ci); + if (isNull(t)) { // The table was dropped before we got around to cleaning it. LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(), - idWatermark(ci)); + idWatermark(ci)); txnHandler.markCleaned(ci); return; } @@ -129,8 +129,8 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t p = resolvePartition(ci.dbname, ci.tableName, ci.partName); if (isNull(p)) { // The partition was dropped before we got around to cleaning it. - LOG.info("Unable to find partition {}, assuming it was dropped. {}", - ci.getFullPartitionName(), idWatermark(ci)); + LOG.info("Unable to find partition {}, assuming it was dropped. {}", ci.getFullPartitionName(), + idWatermark(ci)); txnHandler.markCleaned(ci); return; } @@ -146,22 +146,23 @@ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) t if (!isNull(t) || !isNull(ci.partName)) { String path = isNull(location) - ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation() - : location; + ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation() + : location; boolean dropPartition = !isNull(ci.partName) && isNull(p); //check if partition wasn't re-created if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, ci.partName))) { cleanUsingLocation(ci, path, true); } else { - cleanUsingAcidDir(ci, path, minOpenTxn); + long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : minOpenTxn; + cleanUsingAcidDir(ci, t, path, cleanerWaterMark); } } else { cleanUsingLocation(ci, location, false); } } catch (Exception e) { LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci, - e.getMessage()); + e.getMessage()); if (metricsEnabled) { Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc(); } @@ -203,19 +204,18 @@ private void cleanUsingLocation(CompactionInfo ci, String path, boolean requires } } - private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxn) throws Exception { - ValidTxnList validTxnList = - TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn, false); + private void cleanUsingAcidDir(CompactionInfo ci, Table table, String location, long minOpenTxn) throws Exception { + ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner( + getOpenTxns(), minOpenTxn, false); //save it so that getAcidState() sees it - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); /* - * {@code validTxnList} is capped by minOpenTxnGLB so if + * {@code validTxnList} is capped by global minOpenTxn so if * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta * produced by a compactor, that means every reader that could be active right now see it - * as well. That means if this base/delta shadows some earlier base/delta, it will be + * as well. That means if this base/delta shadows some earlier base/delta, it will be * used in favor of any files that it shadows. Thus, the shadowed files are safe to delete. * - * * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID. * See {@link TxnStore#markCleaned(CompactionInfo)} for details. @@ -241,7 +241,6 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT // Creating 'reader' list since we are interested in the set of 'obsolete' files ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList); - Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName)); LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList); boolean success = cleanAndVerifyObsoleteDirectories(ci, location, validWriteIdList, table); @@ -249,12 +248,13 @@ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenT txnHandler.markCleaned(ci); } else { txnHandler.clearCleanerStart(ci); - LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci); + LOG.warn("Leaving queue entry {} in ready for cleaning state.", ci); } } private LockRequest createLockRequest(CompactionInfo ci) { - return CompactorUtil.createLockRequest(conf, ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE); + return CompactorUtil.createLockRequest( + getConf(), ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE); } private static String idWatermark(CompactionInfo ci) { @@ -263,7 +263,7 @@ private static String idWatermark(CompactionInfo ci) { @Override protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList) - throws NoSuchTxnException, MetaException { + throws Exception { ValidReaderWriteIdList validWriteIdList = super.getValidCleanerWriteIdList(ci, validTxnList); /* * We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction @@ -279,11 +279,14 @@ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, V private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, String location) { String strIfPurge = ci.getProperty("ifPurge"); boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge")); - Path obsoletePath = new Path(location); + return new CleanupRequestBuilder() - .setLocation(location).setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName()) - .setRunAs(ci.runAs).setPurge(ifPurge).setObsoleteDirs(Collections.singletonList(obsoletePath)) - .build(); + .setLocation(location).setDbName(ci.dbname) + .setFullPartitionName(ci.getFullPartitionName()) + .setRunAs(ci.runAs) + .setPurge(ifPurge) + .setObsoleteDirs(Collections.singletonList(obsoletePath)) + .build(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java index f4d0a5adc15f..d140eb4d9ac4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor.handler; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidCleanerWriteIdList; @@ -24,13 +25,14 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.metrics.AcidMetricService; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -45,7 +47,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -63,57 +64,81 @@ public abstract class TaskHandler { private static final Logger LOG = LoggerFactory.getLogger(TaskHandler.class.getName()); + protected final TxnStore txnHandler; - protected final HiveConf conf; + private final ThreadLocal threadLocalConf; protected final boolean metricsEnabled; - protected final MetadataCache metadataCache; + private final MetadataCache metadataCache; protected final FSRemover fsRemover; - protected final long defaultRetention; + private final long defaultRetention; TaskHandler(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache, - boolean metricsEnabled, FSRemover fsRemover) { - this.conf = conf; + boolean metricsEnabled, FSRemover fsRemover) { + this.threadLocalConf = ThreadLocal.withInitial(() -> new HiveConf(conf)); this.txnHandler = txnHandler; this.metadataCache = metadataCache; this.metricsEnabled = metricsEnabled; this.fsRemover = fsRemover; - this.defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS); + this.defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, + TimeUnit.MILLISECONDS); + } + + public abstract List getTasks(HiveConf conf) throws MetaException; + + protected HiveConf getConf() { + return threadLocalConf.get(); } - public abstract List getTasks() throws MetaException; + protected GetOpenTxnsResponse getOpenTxns() throws Exception { + return metadataCache.computeIfAbsent("openTxns", txnHandler::getOpenTxns); + } - protected Table resolveTable(String dbName, String tableName) throws MetaException { - return CompactorUtil.resolveTable(conf, dbName, tableName); + protected Table resolveTable(CompactionInfo info) throws Exception { + return metadataCache.computeIfAbsent(info.getFullTableName(), + () -> resolveTable(info.dbname, info.tableName)); + } + + @VisibleForTesting + Table resolveTable(String dbName, String tableName) throws MetaException { + return CompactorUtil.resolveTable(getConf(), dbName, tableName); } protected Partition resolvePartition(String dbName, String tableName, String partName) throws MetaException { - return CompactorUtil.resolvePartition(conf, null, dbName, tableName, partName, CompactorUtil.METADATA_FETCH_MODE.LOCAL); + return CompactorUtil.resolvePartition( + getConf(), null, dbName, tableName, partName, CompactorUtil.METADATA_FETCH_MODE.LOCAL); } protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo info, ValidTxnList validTxnList) - throws NoSuchTxnException, MetaException { - List tblNames = Collections.singletonList(AcidUtils.getFullTableName(info.dbname, info.tableName)); + throws Exception { + List tblNames = Collections.singletonList( + TxnUtils.getFullTableName(info.dbname, info.tableName)); + GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames); request.setValidTxnList(validTxnList.writeToString()); - GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request); + GetValidWriteIdsResponse rsp = metadataCache.computeIfAbsent(info.getFullTableName() + validTxnList.writeToString(), + () -> txnHandler.getValidWriteIds(request)); // we could have no write IDs for a table if it was never written to but // since we are in the Cleaner phase of compactions, there must have // been some delta/base dirs assert rsp != null && rsp.getTblValidWriteIdsSize() == 1; - return new ValidCleanerWriteIdList( - TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0))); + ValidReaderWriteIdList validWriteIdList = TxnCommonUtils.createValidReaderWriteIdList( + rsp.getTblValidWriteIds().getFirst()); + return new ValidCleanerWriteIdList(validWriteIdList); } protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, String location, - ValidReaderWriteIdList validWriteIdList, Table table) throws MetaException, IOException { + ValidReaderWriteIdList validWriteIdList, Table table) throws MetaException, IOException { Path path = new Path(location); - FileSystem fs = path.getFileSystem(conf); + FileSystem fs = path.getFileSystem(getConf()); // Collect all the files/dirs Map dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path); - AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false, - dirSnapshots); + + AcidDirectory dir = AcidUtils.getAcidState( + fs, path, getConf(), validWriteIdList, Ref.from(false), false, + dirSnapshots); + boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, info.partName); List obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort); @@ -121,26 +146,35 @@ protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, String info.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds()); } - List deleted = fsRemover.clean(new CleanupRequest.CleanupRequestBuilder().setLocation(location) - .setDbName(info.dbname).setFullPartitionName(info.getFullPartitionName()) - .setRunAs(info.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true) + List deleted = fsRemover.clean( + new CleanupRequest.CleanupRequestBuilder() + .setLocation(location) + .setDbName(info.dbname) + .setFullPartitionName(info.getFullPartitionName()) + .setRunAs(info.runAs) + .setObsoleteDirs(obsoleteDirs).setPurge(true) .build()); if (!deleted.isEmpty()) { - AcidMetricService.updateMetricsFromCleaner(info.dbname, info.tableName, info.partName, dir.getObsolete(), conf, - txnHandler); + AcidMetricService.updateMetricsFromCleaner( + info.dbname, info.tableName, info.partName, dir.getObsolete(), getConf(), + txnHandler); } // Make sure there are no leftovers below the compacted watermark boolean success = false; - conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString()); - dir = AcidUtils.getAcidState(fs, path, conf, new ValidCleanerWriteIdList(info.getFullTableName(), info.highestWriteId), - Ref.from(false), false, dirSnapshots); + getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString()); + + dir = AcidUtils.getAcidState( + fs, path, getConf(), + new ValidCleanerWriteIdList(info.getFullTableName(), info.highestWriteId), + Ref.from(false), false, + dirSnapshots); List remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted); if (!remained.isEmpty()) { - LOG.warn("Remained {} obsolete directories from {}. {}", - remained.size(), location, CompactorUtil.getDebugInfo(remained)); + LOG.warn("Remained {} obsolete directories from {}. {}", remained.size(), location, + CompactorUtil.getDebugInfo(remained)); } else { LOG.debug("All cleared below the watermark: {} from {}", info.highestWriteId, location); success = true; @@ -160,7 +194,7 @@ protected void handleCleanerAttemptFailure(CompactionInfo info, String errorMess if (info.retryRetention > 0) { cleanAttempts = (int) (Math.log(info.retryRetention / defaultRetention) / Math.log(2)) + 1; } - if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) { + if (cleanAttempts >= getIntVar(getConf(), HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) { //Mark it as failed if the max attempt threshold is reached. txnHandler.markFailed(info); } else { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java index 8f6814d4890f..7e73d06c9ff1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java @@ -78,7 +78,7 @@ public void testCleaningOfAbortedDirectoriesForUnpartitionedTables() throws Exce cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); List directories = getDirectories(conf, t, null); // All aborted directories removed, hence 1 committed delta directory must be present @@ -109,7 +109,7 @@ public void testCleaningOfAbortedDirectoriesForSinglePartition() throws Exceptio cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); List directories = getDirectories(conf, t, p); // All aborted directories removed, hence 1 committed delta directory must be present @@ -147,7 +147,7 @@ public void testCleaningOfAbortedDirectoriesForMultiplePartitions() throws Excep cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(2)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); List directories = getDirectories(conf, t, p1); // All aborted directories removed, hence 1 committed delta directory must be present @@ -190,7 +190,7 @@ public void testCleaningOfAbortedDirectoriesWithLongRunningOpenWriteTxn() throws cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); List directories = getDirectories(conf, t, null); // All aborted directories below min open write ID are removed, @@ -241,7 +241,7 @@ public void testCleaningOfAbortedDirectoriesOnTopOfBase() throws Exception { cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); directories = getDirectories(conf, t, null); Assert.assertEquals(1, directories.size()); @@ -283,7 +283,7 @@ public void testCleaningOfAbortedDirectoriesBelowBase() throws Exception { cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); directories = getDirectories(conf, t, null); // The table is already compacted, so we must see 1 base delta @@ -367,7 +367,7 @@ public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean isPartit cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence)); Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " + diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java index b1d60b8a8512..1279cd6cc180 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor.handler; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.CompactionRequest; @@ -72,7 +73,7 @@ public void testCompactionHandlerAndFsRemover() throws Exception { cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); - Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(); + Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); } @Test @@ -105,7 +106,7 @@ public void testMetaCache() throws Exception { ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); Assert.assertEquals(1, compacts.size()); - Mockito.verify(mockedMetadataCache, times(3)).computeIfAbsent(any(), any()); + Mockito.verify(mockedMetadataCache, times(4)).computeIfAbsent(any(), any()); Mockito.verify(mockedTaskHandler, times(1)).resolveTable(any(), any()); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java index 4940d384095b..ae88852ebef9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java @@ -113,7 +113,7 @@ public List extractData(ResultSet rs) throws DataAccessException info.partName = rs.getString("PART"); // In this case, this field contains min open write txn ID. long value = rs.getLong("MIN_OPEN_WRITE_TXNID"); - info.minOpenWriteTxnId = value > 0 ? value : Long.MAX_VALUE; + info.minOpenWriteTxnId = !rs.wasNull() ? value : Long.MAX_VALUE; // The specific type, state assigned to abort cleanup. info.type = CompactionType.ABORT_TXN_CLEANUP; info.state = READY_FOR_CLEANING; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java index 0f22b00e1976..26bb2bf6d115 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java @@ -120,7 +120,8 @@ public List extractData(ResultSet rs) throws SQLException, DataA info.retryRetention = rs.getInt(9); info.nextTxnId = rs.getLong(10); if (TxnHandler.ConfVars.useMinHistoryWriteId()) { - info.minOpenWriteId = rs.getLong(11); + long value = rs.getLong(11); + info.minOpenWriteId = !rs.wasNull() ? value : Long.MAX_VALUE; } infos.add(info); }