diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 240bf1c8a36e..61d423ba85d0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -2509,13 +2509,14 @@ public void testReader() throws Exception {
*
* Metadata Table should be automatically compacted as per config.
*/
- @Disabled
+ @Test
public void testCleaningArchivingAndCompaction() throws Exception {
init(HoodieTableType.COPY_ON_WRITE, false);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
- final int maxDeltaCommitsBeforeCompaction = 3;
+ final int maxDeltaCommitsBeforeCompaction = 5;
HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -2528,6 +2529,10 @@ public void testCleaningArchivingAndCompaction() throws Exception {
List records;
String newCommitTime;
+ HoodieTableMetaClient metadataMetaClient = null;
+ HoodieTableMetaClient datasetMetaClient = null;
+ int totalExpectedDeltaCommitsinMdt = 2; // FILES and COl STATS
+
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
// Some initial commits so compaction is not triggered.
// 1 deltacommit will be from bootstrap. So we can perform maxDeltaCommitsBeforeCompaction - 2 more commits before
@@ -2537,16 +2542,16 @@ public void testCleaningArchivingAndCompaction() throws Exception {
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+ totalExpectedDeltaCommitsinMdt += 1;
}
- HoodieTableMetaClient metadataMetaClient = createMetaClient(metadataTableBasePath);
- HoodieTableMetaClient datasetMetaClient = createMetaClient(config.getBasePath());
-
+ metadataMetaClient = createMetaClient(metadataTableBasePath);
+ datasetMetaClient = createMetaClient(config.getBasePath());
// There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction
- // deltacommits (1 will be due to bootstrap)
+ // deltacommits (1 will be due to bootstrap FILEs and 2nd one for cols stats bootstrap)
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0);
- assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1);
+ assertEquals(maxDeltaCommitsBeforeCompaction + 1, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0);
// Next commit will initiate a compaction
@@ -2554,46 +2559,73 @@ public void testCleaningArchivingAndCompaction() throws Exception {
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+ totalExpectedDeltaCommitsinMdt += 1;
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1);
- assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1);
+ assertEquals(maxDeltaCommitsBeforeCompaction + 2, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0);
+ }
+
+ // trigger async clustering, but do not cluster.
+ HoodieWriteConfig configForClustering = getWriteConfigBuilder(true, true, false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER)
+ .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(4, 5).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
+ .build();
- // More than maxDeltaCommitsBeforeCompaction commits
- String inflightCommitTime = newCommitTime;
- for (int i = 0; i < maxDeltaCommitsBeforeCompaction + 1; ++i) {
+ Option pendingClustering = Option.empty();
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, configForClustering)) {
+ pendingClustering = client.scheduleClustering(Option.empty());
+ }
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
+ for (int i = 0; i < maxDeltaCommitsBeforeCompaction; ++i) {
newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
- client.startCommitWithTime(newCommitTime);
+ client.startCommitWithTime(newCommitTime); // when i = 2 completes, 1st commit gets archived
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
- if (i == 0) {
- // Mark this commit inflight so compactions don't take place
- FileCreateUtils.deleteCommit(basePath, newCommitTime);
- FileCreateUtils.createInflightCommit(basePath, newCommitTime);
- inflightCommitTime = newCommitTime;
+ totalExpectedDeltaCommitsinMdt += 1;
+ if (i == 2) {
+ totalExpectedDeltaCommitsinMdt -= 1; // archival in mdt
}
}
+ }
- // Ensure no more compactions took place due to the leftover inflight commit
- metadataTimeline = metadataMetaClient.reloadActiveTimeline();
- assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1);
- assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
- ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */);
+ // We changed the compaction trigger for MDT in 1.0. So, compaction will take find the highest commit time less than the earliest inflight in data table and subtract by 1 ms.
+ HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
+ assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2);
- // Complete commit
- FileCreateUtils.createCommit(basePath, inflightCommitTime);
+ // complete the clustering.
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, configForClustering)) {
+ client.cluster(pendingClustering.get());
+ totalExpectedDeltaCommitsinMdt += 1;
+ }
- // Next commit should lead to compaction
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
+ // Add 1 more commit and next compaction should kick in and validate the compaction timestamp.
newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+ totalExpectedDeltaCommitsinMdt += 1;
+ newCommitTime = client.createNewInstantTime();
+ records = dataGen.generateInserts(newCommitTime, 5);
+ client.startCommitWithTime(newCommitTime);
+ client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+ totalExpectedDeltaCommitsinMdt += 2;
+ totalExpectedDeltaCommitsinMdt -= 4; // archival in mdt.
// Ensure compactions took place
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2);
- assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
- ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */));
+ assertEquals(totalExpectedDeltaCommitsinMdt, metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants());
assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0);
validateMetadata(client);