Skip to content

Commit

Permalink
Fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Oct 2, 2024
1 parent 660939a commit d5b6fc4
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.TimeGenerator;
import org.apache.hudi.common.table.timeline.TimeGenerators;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
Expand Down Expand Up @@ -154,6 +157,7 @@ public void testPartitionChanges(HoodieTableType tableType, IndexType indexType)
public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexType indexType, boolean isUpsert) throws IOException {
final Class<?> payloadClass = DefaultHoodieRecordPayload.class;
HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType);
TimeGenerator timeGenerator = TimeGenerators.getTimeGenerator(writeConfig.getTimeGeneratorConfig(), storageConf());
HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, writeConfig.getProps());
final int totalRecords = 8;
final String p1 = "p1";
Expand All @@ -164,12 +168,12 @@ public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp

try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
// 1st batch: inserts
String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
String commitTimeAtEpoch0 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator);
client.startCommitWithTime(commitTimeAtEpoch0);
assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0).collect());

// 2nd batch: update 4 records from p1 to p2
String commitTimeAtEpoch5 = getCommitTimeAtUTC(0);
String commitTimeAtEpoch5 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator);
client.startCommitWithTime(commitTimeAtEpoch5);
if (isUpsert) {
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5).collect());
Expand All @@ -187,7 +191,7 @@ public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp

try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
// re-ingest same batch
String commitTimeAtEpoch10 = getCommitTimeAtUTC(0);
String commitTimeAtEpoch10 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator);
client.startCommitWithTime(commitTimeAtEpoch10);
if (isUpsert) {
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch10).collect());
Expand All @@ -204,7 +208,7 @@ public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp
// update 4 of them from p2 to p3.
// delete test:
// update 4 of them to p3. these are treated as new inserts since they are deleted. no changes should be seen wrt p2.
String commitTimeAtEpoch15 = getCommitTimeAtUTC(0);
String commitTimeAtEpoch15 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator);
List<HoodieRecord> updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3, 15, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch15);
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15).collect());
Expand All @@ -213,7 +217,7 @@ public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 15);

// lets move 2 of them back to p1
String commitTimeAtEpoch20 = getCommitTimeAtUTC(0);
String commitTimeAtEpoch20 = HoodieActiveTimeline.createNewInstantTime(false, timeGenerator);
List<HoodieRecord> updatesAtEpoch20 = getUpdates(updatesAtEpoch5.subList(0, 2), p1, 20, payloadClass);
client.startCommitWithTime(commitTimeAtEpoch20);
assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 1), commitTimeAtEpoch20).collect());
Expand Down

0 comments on commit d5b6fc4

Please sign in to comment.