Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8685] Re-enabling flaky/buggy test (TestHoodieBackedTableMetadata.testCleaningArchivingAndCompaction) #12656

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2509,13 +2509,14 @@ public void testReader() throws Exception {
* <p>
* Metadata Table should be automatically compacted as per config.
*/
@Disabled
@Test
public void testCleaningArchivingAndCompaction() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reduce the complexity of the test or break it into separate tests?
Probably clustering related behaviour can be verified in a separate test.

init(HoodieTableType.COPY_ON_WRITE, false);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

final int maxDeltaCommitsBeforeCompaction = 3;
final int maxDeltaCommitsBeforeCompaction = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Rename to maxDeltaCommitsBeforeCompactionInMDT

HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
Expand All @@ -2528,6 +2529,10 @@ public void testCleaningArchivingAndCompaction() throws Exception {

List<HoodieRecord> records;
String newCommitTime;
HoodieTableMetaClient metadataMetaClient = null;
HoodieTableMetaClient datasetMetaClient = null;
int totalExpectedDeltaCommitsinMdt = 2; // FILES and COl STATS

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
// Some initial commits so compaction is not triggered.
// 1 deltacommit will be from bootstrap. So we can perform maxDeltaCommitsBeforeCompaction - 2 more commits before
Expand All @@ -2537,63 +2542,90 @@ public void testCleaningArchivingAndCompaction() throws Exception {
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
totalExpectedDeltaCommitsinMdt += 1;
}

HoodieTableMetaClient metadataMetaClient = createMetaClient(metadataTableBasePath);
HoodieTableMetaClient datasetMetaClient = createMetaClient(config.getBasePath());

metadataMetaClient = createMetaClient(metadataTableBasePath);
datasetMetaClient = createMetaClient(config.getBasePath());
// There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction
// deltacommits (1 will be due to bootstrap)
// deltacommits (1 will be due to bootstrap FILEs and 2nd one for cols stats bootstrap)
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0);
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1);
assertEquals(maxDeltaCommitsBeforeCompaction + 1, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0);

// Next commit will initiate a compaction
newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
totalExpectedDeltaCommitsinMdt += 1;
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1);
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1);
assertEquals(maxDeltaCommitsBeforeCompaction + 2, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants());
assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0);
}

// trigger async clustering, but do not cluster.
HoodieWriteConfig configForClustering = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER)
.retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false)
.build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(4, 5).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())
.build();

// More than maxDeltaCommitsBeforeCompaction commits
String inflightCommitTime = newCommitTime;
for (int i = 0; i < maxDeltaCommitsBeforeCompaction + 1; ++i) {
Option<String> pendingClustering = Option.empty();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, configForClustering)) {
pendingClustering = client.scheduleClustering(Option.empty());
}

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
for (int i = 0; i < maxDeltaCommitsBeforeCompaction; ++i) {
newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.startCommitWithTime(newCommitTime); // when i = 2 completes, 1st commit gets archived
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
if (i == 0) {
// Mark this commit inflight so compactions don't take place
FileCreateUtils.deleteCommit(basePath, newCommitTime);
FileCreateUtils.createInflightCommit(basePath, newCommitTime);
inflightCommitTime = newCommitTime;
totalExpectedDeltaCommitsinMdt += 1;
if (i == 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this only happen at i == 2 or also in subsequent iterations?

totalExpectedDeltaCommitsinMdt -= 1; // archival in mdt
}
}
}

// Ensure no more compactions took place due to the leftover inflight commit
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1);
assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */);
// We changed the compaction trigger for MDT in 1.0. So, compaction will take find the highest commit time less than the earliest inflight in data table and subtract by 1 ms.
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2);

// Complete commit
FileCreateUtils.createCommit(basePath, inflightCommitTime);
// complete the clustering.
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, configForClustering)) {
client.cluster(pendingClustering.get());
totalExpectedDeltaCommitsinMdt += 1;
}

// Next commit should lead to compaction
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) {
// Add 1 more commit and next compaction should kick in and validate the compaction timestamp.
newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
totalExpectedDeltaCommitsinMdt += 1;

newCommitTime = client.createNewInstantTime();
records = dataGen.generateInserts(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
totalExpectedDeltaCommitsinMdt += 2;
totalExpectedDeltaCommitsinMdt -= 4; // archival in mdt.
// Ensure compactions took place
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2);
assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(),
((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */));
assertEquals(totalExpectedDeltaCommitsinMdt, metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants());
assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0);

validateMetadata(client);
Expand Down
Loading