From 9cc9cec9b92a571cd173a5e1cbd8c0dace17cbe1 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 27 Sep 2024 15:30:53 -0700 Subject: [PATCH 1/3] [ENG-14372][HUDI-8248] Fixing Log Record reader to include rollback blocks with timestamps > maxInstant times (#930) Co-authored-by: Y Ethan Guo --- .../apache/hudi/index/HoodieIndexUtils.java | 2 +- .../log/AbstractHoodieLogRecordReader.java | 20 +- .../table/log/block/HoodieLogBlock.java | 8 + .../functional/TestHoodieLogFormat.java | 209 ++++++++++++++++-- ...TestGlobalIndexEnableUpdatePartitions.java | 99 ++++++++- 5 files changed, 305 insertions(+), 33 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index f46fadb0a7aa..918d4be7fcd3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -241,7 +241,7 @@ private static HoodieData> getExistingRecords( HoodieData> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) { final Option instantTime = hoodieTable .getMetaClient() - .getCommitsTimeline() + .getActiveTimeline() // we need to include all actions and completed .filterCompletedInstants() .lastInstant() .map(HoodieInstant::getTimestamp); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 4c38d11467a0..c013a5342f2b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -252,13 +252,11 @@ private void scanInternalV1(Option keySpecOpt) { HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); - if (logBlock.getBlockType() != CORRUPT_BLOCK - && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime - )) { - // hit a block with instant time greater than should be processed, stop processing further - break; - } - if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) { + if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType())) { + if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; + } if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) { // filter the log block by instant range continue; @@ -440,10 +438,10 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin totalCorruptBlocks.incrementAndGet(); continue; } - if (!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), - HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) { - // hit a block with instant time greater than should be processed, stop processing further - break; + if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType()) && + HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader + continue; } if (logBlock.getBlockType() != COMMAND_BLOCK) { if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index f806d635fa9a..6a2427f40ddc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -177,6 +177,14 @@ public enum HoodieLogBlockType { public static HoodieLogBlockType fromId(String id) { return ID_TO_ENUM_MAP.get(id); } + + /** + * @param logBlockType log block type to be inspected. + * @returns true if the log block type refers to data or delete block. false otherwise. + */ + public static boolean isDataOrDeleteBlock(HoodieLogBlockType logBlockType) { + return logBlockType != HoodieLogBlockType.COMMAND_BLOCK && logBlockType != HoodieLogBlockType.CORRUPT_BLOCK; + } } /** diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 0e27b3e3f329..10568ada4778 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -181,6 +181,20 @@ public void tearDown() throws IOException { storage.deleteDirectory(new StoragePath(spillableBasePath)); } + @Test + public void testHoodieLogBlockTypeIsDataOrDeleteBlock() { + List dataOrDeleteBlocks = new ArrayList<>(); + dataOrDeleteBlocks.add(HoodieLogBlockType.DELETE_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.AVRO_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.PARQUET_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.HFILE_DATA_BLOCK); + dataOrDeleteBlocks.add(HoodieLogBlockType.CDC_DATA_BLOCK); + + Arrays.stream(HoodieLogBlockType.values()).forEach(logBlockType -> { + assertEquals(HoodieLogBlockType.isDataOrDeleteBlock(logBlockType), dataOrDeleteBlocks.contains(logBlockType)); + }); + } + @Test public void testEmptyLog() throws IOException { Writer writer = @@ -673,15 +687,122 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { - // Generate 4 delta-log files w/ random records Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); SchemaTestUtil testUtil = new SchemaTestUtil(); - List genRecords = testUtil.generateHoodieTestRecords(0, 400); + appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan, + "100"); + } - Set logFiles = writeLogFiles(partitionPath, schema, genRecords, 4); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimizedLogScan) throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + SchemaTestUtil testUtil = new SchemaTestUtil(); - FileCreateUtils.createDeltaCommit(basePath, "100", storage); - // scan all log blocks (across multiple log files) + Pair, Set> firstBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + "100"); + + // trigger another batch of writes for next commit + Pair, Set> secondBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + "200", firstBatch.getKey(), firstBatch.getValue()); + + List firstAndSecondBatch = new ArrayList<>(firstBatch.getKey()); + firstAndSecondBatch.addAll(secondBatch.getKey()); + + // set max commit time as 200 and validate only first batch of records are returned + List allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), secondBatch.getValue())); + + // expect records only from first batch when max commit time is set to 100. + readAndValidate(schema, "100", allLogFiles, firstBatch.getKey()); + + // add another batch. + Pair, Set> thirdBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + "300", firstAndSecondBatch, new HashSet<>(allLogFiles)); + + allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), secondBatch.getValue(), thirdBatch.getValue())); + + // set max commit time as 100 and validate only first batch of records are returned + readAndValidate(schema, "100", allLogFiles, firstBatch.getKey()); + readAndValidate(schema, "200", allLogFiles, firstAndSecondBatch); + List allBatches = new ArrayList<>(firstAndSecondBatch); + allBatches.addAll(thirdBatch.getKey()); + readAndValidate(schema, "300", allLogFiles, allBatches); + + // add rollback to commit 200 + addRollbackBlock("400", "200"); + + // lets not remove commit 200 from timeline. but still due to presence of rollback block, 2nd batch should be ignored. + List firstAndThirdBatch = new ArrayList<>(firstBatch.getKey()); + firstAndThirdBatch.addAll(thirdBatch.getKey()); + readAndValidate(schema, "300", allLogFiles, firstAndThirdBatch); + + // if we set maxCommitTime as 200 (which is rolled back), expected records are just from batch1 + readAndValidate(schema, "200", allLogFiles, firstBatch.getKey()); + + // lets repeat the same after removing the commit from timeline. + FileCreateUtils.deleteDeltaCommit(basePath, "200", fs); + readAndValidate(schema, "300", allLogFiles, firstAndThirdBatch); + // if we set maxCommitTime as 200 (which is rolled back commit), expected records are just from batch1 + readAndValidate(schema, "200", allLogFiles, firstBatch.getKey()); + + // let's test rollback issue from HUDI-8248 + // lets add commit 400 (batch4). add a rollback block with commit time 500 which rollsback 400. again, add log files with commit time 400 (batch5) + // when we read all log files w/ max commit time as 400, batch4 needs to be ignored and only batch5 should be read. + // trigger another batch of writes for next commit + Pair, Set> fourthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + "400", firstAndThirdBatch, new HashSet<>(allLogFiles)); + + // lets delete commit 400 from timeline to simulate crash. + FileCreateUtils.deleteDeltaCommit(basePath, "400", fs); + + // set max commit time as 400 and validate only first and 3rd batch is read. 1st batch is rolled back completely. 4th batch is partially failed commit. + allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), thirdBatch.getValue(), fourthBatch.getValue())); + readAndValidate(schema, "400", allLogFiles, firstAndThirdBatch); + + // lets add the rollback block + addRollbackBlock("500", "400"); + // lets redo the read test + readAndValidate(schema, "400", allLogFiles, firstAndThirdBatch); + + // and lets re-add new log files w/ commit time 400. + Pair, Set> fifthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + "400", firstBatch.getKey(), firstBatch.getValue()); + + // lets redo the read test. this time, first batch, 3rd batch and fifth batch should be expected. + allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), thirdBatch.getValue(), fourthBatch.getValue(), fifthBatch.getValue())); + List firstThirdFifthBatch = new ArrayList<>(firstAndThirdBatch); + firstThirdFifthBatch.addAll(fifthBatch.getKey()); + readAndValidate(schema, "400", allLogFiles, firstThirdFifthBatch); + + // even setting very high value for max commit time should not matter. + readAndValidate(schema, "600", allLogFiles, firstThirdFifthBatch); + } + + private void addRollbackBlock(String rollbackCommitTime, String commitToRollback) throws IOException, InterruptedException { + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); + Map header = new HashMap<>(); + + // Rollback the 1st block i.e. a data block. + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, rollbackCommitTime); + header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commitToRollback); + header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal())); + HoodieCommandBlock commandBlock = new HoodieCommandBlock(header); + writer.appendBlock(commandBlock); + writer.close(); + } + + private List getSortedLogFilesList(List> logFilesSets) { + Set allLogFiles = new HashSet<>(); + logFilesSets.forEach(logfileSet -> allLogFiles.addAll(logfileSet)); + List allLogFilesList = new ArrayList<>(allLogFiles); + Collections.sort(allLogFilesList, new HoodieLogFile.LogFileComparator()); + return allLogFilesList; + } + + private void readAndValidate(Schema schema, String maxCommitTime, List logFiles, List expectedRecords) throws IOException { HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withStorage(storage) .withBasePath(basePath) @@ -689,7 +810,64 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType logFiles.stream() .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) .withReaderSchema(schema) - .withLatestInstantTime("100") + .withLatestInstantTime(maxCommitTime) + .withMaxMemorySizeInBytes(10240L) + .withReverseReader(false) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableBasePath) + .withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK) + .withBitCaskDiskMapCompressionEnabled(false) + .withOptimizedLogBlocksScan(false) + .build(); + + List scannedRecords = new ArrayList<>(); + for (HoodieRecord record : scanner) { + scannedRecords.add((IndexedRecord) + ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); + } + + assertEquals(sort(expectedRecords), sort(scannedRecords), + "Scanner records count should be the same as appended records"); + scanner.close(); + } + + private Pair, Set> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily, + boolean enableOptimizedLogBlocksScan, + String commitTime) throws IOException, URISyntaxException, InterruptedException { + return appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan, commitTime, + Collections.emptyList(), Collections.emptySet()); + } + + private Pair, Set> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily, + boolean enableOptimizedLogBlocksScan, + String commitTime, + List prevGenRecords, Set prevLogFiles) throws IOException, + URISyntaxException, InterruptedException { + + // Generate 4 delta-log files w/ random records + List genRecords = testUtil.generateHoodieTestRecords(0, 400); + Set logFiles = writeLogFiles(partitionPath, schema, genRecords, 4, commitTime); + + Set allLogFiles = new HashSet<>(); + allLogFiles.addAll(logFiles); + allLogFiles.addAll(prevLogFiles); + List allLogFilesList = new ArrayList<>(allLogFiles); + Collections.sort(allLogFilesList, new HoodieLogFile.LogFileComparator()); + + FileCreateUtils.createDeltaCommit(basePath, commitTime, fs); + // scan all log blocks (across multiple log files) + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths( + allLogFilesList.stream() + .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) + .withReaderSchema(schema) + .withLatestInstantTime(commitTime) .withMaxMemorySizeInBytes(10240L) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) @@ -705,9 +883,13 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); } - assertEquals(sort(genRecords), sort(scannedRecords), + List allGenRecords = new ArrayList<>(genRecords); + allGenRecords.addAll(prevGenRecords); + + assertEquals(sort(allGenRecords), sort(scannedRecords), "Scanner records count should be the same as appended records"); scanner.close(); + return Pair.of(genRecords, logFiles); } @ParameterizedTest @@ -2811,21 +2993,12 @@ private static Stream testArgumentsWithoutOptimizedScanArg() { ); } - private static Set writeLogFiles(StoragePath partitionPath, - Schema schema, - List records, - int numFiles) - throws IOException, InterruptedException { - return writeLogFiles(partitionPath, schema, records, numFiles, false); - } - private static Set writeLogFiles(StoragePath partitionPath, Schema schema, List records, int numFiles, - boolean enableBlockSequenceNumbers) + String commitTime) throws IOException, InterruptedException { - int blockSeqNo = 0; Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) .withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -2837,7 +3010,7 @@ private static Set writeLogFiles(StoragePath partitionPath, (FSDataOutputStream) storage.append(writer.getLogFile().getPath())); } Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); Set logFiles = new HashSet<>(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index 86619019c284..ac7154739046 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -25,10 +25,13 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.storage.StoragePath; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.spark.SparkConf; @@ -43,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; @@ -79,6 +83,13 @@ private static Stream getTableTypeAndIndexType() { ); } + private static Stream getTableTypeAndIndexTypeUpdateOrDelete() { + return Stream.of( + Arguments.of(MERGE_ON_READ, RECORD_INDEX, true), + Arguments.of(MERGE_ON_READ, RECORD_INDEX, false) + ); + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -135,6 +146,87 @@ public void testPartitionChanges(HoodieTableType tableType, IndexType indexType) } } + /** + * Tests getTableTypeAndIndexTypeUpdateOrDelete + * @throws IOException + */ + @ParameterizedTest + @MethodSource("getTableTypeAndIndexTypeUpdateOrDelete") + public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexType indexType, boolean isUpsert) throws IOException { + final Class payloadClass = DefaultHoodieRecordPayload.class; + HoodieWriteConfig writeConfig = getWriteConfig(payloadClass, indexType); + HoodieTableMetaClient metaClient = getHoodieMetaClient(tableType, writeConfig.getProps()); + final int totalRecords = 8; + final String p1 = "p1"; + final String p2 = "p2"; + final String p3 = "p3"; + List insertsAtEpoch0 = getInserts(totalRecords, p1, 0, payloadClass); + List updatesAtEpoch5 = getUpdates(insertsAtEpoch0.subList(0, 4), p2, 5, payloadClass); + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + + // 1st batch: inserts + String commitTimeAtEpoch0 = getCommitTimeAtUTC(0); + client.startCommitWithTime(commitTimeAtEpoch0); + assertNoWriteErrors(client.upsert(jsc().parallelize(insertsAtEpoch0, 2), commitTimeAtEpoch0).collect()); + + // 2nd batch: update 4 records from p1 to p2 + String commitTimeAtEpoch5 = getCommitTimeAtUTC(0); + client.startCommitWithTime(commitTimeAtEpoch5); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch5).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + // simuate crash. delete latest completed dc. + String latestCompletedDeltaCommit = metaClient.reloadActiveTimeline().getCommitsAndCompactionTimeline().lastInstant().get().getFileName(); + metaClient.getStorage().deleteFile(new StoragePath(metaClient.getBasePath() + "/.hoodie/" + latestCompletedDeltaCommit)); + } + + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + // re-ingest same batch + String commitTimeAtEpoch10 = getCommitTimeAtUTC(0); + client.startCommitWithTime(commitTimeAtEpoch10); + if (isUpsert) { + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch10).collect()); + // this also tests snapshot query. We had a bug where MOR snapshot was ignoring rollbacks while determining last instant while reading log records. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p2, 5); + } else { + assertNoWriteErrors(client.delete(jsc().parallelize(updatesAtEpoch5.stream().map(hoodieRecord -> hoodieRecord.getKey()).collect(Collectors.toList()), 2), commitTimeAtEpoch10).collect()); + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {}, p2, 0); + } + + // upsert test + // update 4 of them from p2 to p3. + // delete test: + // update 4 of them to p3. these are treated as new inserts since they are deleted. no changes should be seen wrt p2. + String commitTimeAtEpoch15 = getCommitTimeAtUTC(0); + List updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3, 15, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch15); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + readTableAndValidate(metaClient, new int[] {4, 5, 6, 7}, p1, 0); + readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p3, 15); + + // lets move 2 of them back to p1 + String commitTimeAtEpoch20 = getCommitTimeAtUTC(0); + List updatesAtEpoch20 = getUpdates(updatesAtEpoch5.subList(0, 2), p1, 20, payloadClass); + client.startCommitWithTime(commitTimeAtEpoch20); + assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 1), commitTimeAtEpoch20).collect()); + // for the same bug pointed out earlier, (ignoring rollbacks while determining last instant while reading log records), this tests the HoodieMergedReadHandle. + Map expectedTsMap = new HashMap<>(); + Arrays.stream(new int[] {0, 1}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 20L)); + Arrays.stream(new int[] {4, 5, 6, 7}).forEach(entry -> expectedTsMap.put(String.valueOf(entry), 0L)); + readTableAndValidate(metaClient, new int[] {0, 1, 4, 5, 6, 7}, p1, expectedTsMap); + readTableAndValidate(metaClient, new int[] {2, 3}, p3, 15); + } + } + @ParameterizedTest @MethodSource("getTableTypeAndIndexType") public void testUpdatePartitionsThenDelete(HoodieTableType tableType, IndexType indexType) throws IOException { @@ -252,9 +344,8 @@ private void readTableAndValidate(HoodieTableMetaClient metaClient, int[] expect .select("_hoodie_record_key", "_hoodie_partition_path", "id", "pt", "ts") .cache(); int expectedCount = expectedIds.length; - assertEquals(expectedCount, df.count()); - assertEquals(expectedCount, df.filter(String.format("pt = '%s'", expectedPartition)).count()); - Row[] allRows = (Row[]) df.collect(); + Row[] allRows = (Row[]) df.filter(String.format("pt = '%s'", expectedPartition)).collect(); + assertEquals(expectedCount, allRows.length); for (int i = 0; i < expectedCount; i++) { int expectedId = expectedIds[i]; Row r = allRows[i]; @@ -289,6 +380,8 @@ private HoodieWriteConfig getWriteConfig(Class payloadClass, IndexType indexT .withGlobalBloomIndexUpdatePartitionPath(true) .withGlobalSimpleIndexUpdatePartitionPath(true) .withRecordIndexUpdatePartitionPath(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(4).build()) .withSchema(SCHEMA_STR) .withPayloadConfig(HoodiePayloadConfig.newBuilder() .fromProperties(getPayloadProps(payloadClass)).build()) From 5a220689475b6d98be1e252881939c11cb795b1e Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 30 Sep 2024 19:28:48 -0700 Subject: [PATCH 2/3] Fixing test failures --- .../log/AbstractHoodieLogRecordReader.java | 4 +-- .../functional/TestHoodieLogFormat.java | 34 +++++++++++-------- ...TestGlobalIndexEnableUpdatePartitions.java | 1 - 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index c013a5342f2b..346116fb4953 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -438,8 +438,8 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin totalCorruptBlocks.incrementAndGet(); continue; } - if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType()) && - HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { + if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType()) + && HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader continue; } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 10568ada4778..cdf46bc234bd 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -689,7 +689,7 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); SchemaTestUtil testUtil = new SchemaTestUtil(); - appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan, + appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, "100"); } @@ -699,11 +699,11 @@ public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimi Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); SchemaTestUtil testUtil = new SchemaTestUtil(); - Pair, Set> firstBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + Pair, Set> firstBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, "100"); // trigger another batch of writes for next commit - Pair, Set> secondBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + Pair, Set> secondBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, "200", firstBatch.getKey(), firstBatch.getValue()); List firstAndSecondBatch = new ArrayList<>(firstBatch.getKey()); @@ -716,7 +716,7 @@ public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimi readAndValidate(schema, "100", allLogFiles, firstBatch.getKey()); // add another batch. - Pair, Set> thirdBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + Pair, Set> thirdBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, "300", firstAndSecondBatch, new HashSet<>(allLogFiles)); allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), secondBatch.getValue(), thirdBatch.getValue())); @@ -740,7 +740,7 @@ public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimi readAndValidate(schema, "200", allLogFiles, firstBatch.getKey()); // lets repeat the same after removing the commit from timeline. - FileCreateUtils.deleteDeltaCommit(basePath, "200", fs); + FileCreateUtils.deleteDeltaCommit(basePath, "200", storage); readAndValidate(schema, "300", allLogFiles, firstAndThirdBatch); // if we set maxCommitTime as 200 (which is rolled back commit), expected records are just from batch1 readAndValidate(schema, "200", allLogFiles, firstBatch.getKey()); @@ -749,11 +749,11 @@ public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimi // lets add commit 400 (batch4). add a rollback block with commit time 500 which rollsback 400. again, add log files with commit time 400 (batch5) // when we read all log files w/ max commit time as 400, batch4 needs to be ignored and only batch5 should be read. // trigger another batch of writes for next commit - Pair, Set> fourthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + Pair, Set> fourthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, "400", firstAndThirdBatch, new HashSet<>(allLogFiles)); // lets delete commit 400 from timeline to simulate crash. - FileCreateUtils.deleteDeltaCommit(basePath, "400", fs); + FileCreateUtils.deleteDeltaCommit(basePath, "400", storage); // set max commit time as 400 and validate only first and 3rd batch is read. 1st batch is rolled back completely. 4th batch is partially failed commit. allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), thirdBatch.getValue(), fourthBatch.getValue())); @@ -765,7 +765,7 @@ public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimi readAndValidate(schema, "400", allLogFiles, firstAndThirdBatch); // and lets re-add new log files w/ commit time 400. - Pair, Set> fifthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, false, enableOptimizedLogScan, + Pair, Set> fifthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan, "400", firstBatch.getKey(), firstBatch.getValue()); // lets redo the read test. this time, first batch, 3rd batch and fifth batch should be expected. @@ -781,7 +781,7 @@ public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimi private void addRollbackBlock(String rollbackCommitTime, String commitToRollback) throws IOException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) - .withSizeThreshold(1024).withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); + .withSizeThreshold(1024).withFileId("test-fileid1").withDeltaCommit("100").withStorage(storage).build(); Map header = new HashMap<>(); // Rollback the 1st block i.e. a data block. @@ -833,16 +833,14 @@ private void readAndValidate(Schema schema, String maxCommitTime, List, Set> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan, String commitTime) throws IOException, URISyntaxException, InterruptedException { - return appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan, commitTime, + return appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, commitTime, Collections.emptyList(), Collections.emptySet()); } private Pair, Set> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan, String commitTime, List prevGenRecords, Set prevLogFiles) throws IOException, @@ -858,10 +856,10 @@ private Pair, Set> appendAndValidate(Schema s List allLogFilesList = new ArrayList<>(allLogFiles); Collections.sort(allLogFilesList, new HoodieLogFile.LogFileComparator()); - FileCreateUtils.createDeltaCommit(basePath, commitTime, fs); + FileCreateUtils.createDeltaCommit(basePath, commitTime, storage); // scan all log blocks (across multiple log files) HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(fs) + .withStorage(storage) .withBasePath(basePath) .withLogFilePaths( allLogFilesList.stream() @@ -2993,6 +2991,14 @@ private static Stream testArgumentsWithoutOptimizedScanArg() { ); } + private static Set writeLogFiles(StoragePath partitionPath, + Schema schema, + List records, + int numFiles) + throws IOException, InterruptedException { + return writeLogFiles(partitionPath, schema, records, numFiles, "100"); + } + private static Set writeLogFiles(StoragePath partitionPath, Schema schema, List records, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index ac7154739046..593509c4fc12 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; From 660939a38700862e2edc4259e40070e7891243bd Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 30 Sep 2024 15:08:44 -0700 Subject: [PATCH 3/3] [ENG-14372][HUDI-8248] Fixing Log Record reader to include rollback blocks with timestamps > maxInstant times (#942) --- .../log/AbstractHoodieLogRecordReader.java | 4 +-- .../table/log/block/HoodieLogBlock.java | 9 ++++--- .../functional/TestHoodieLogFormat.java | 25 +++++++++---------- ...TestGlobalIndexEnableUpdatePartitions.java | 2 +- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 346116fb4953..01353175912b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -252,7 +252,7 @@ private void scanInternalV1(Option keySpecOpt) { HoodieLogBlock logBlock = logFormatReaderWrapper.next(); final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); - if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType())) { + if (logBlock.isDataOrDeleteBlock()) { if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader continue; @@ -438,7 +438,7 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin totalCorruptBlocks.incrementAndGet(); continue; } - if (HoodieLogBlock.HoodieLogBlockType.isDataOrDeleteBlock(logBlock.getBlockType()) + if (logBlock.isDataOrDeleteBlock() && HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) { // Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader continue; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index 6a2427f40ddc..73e199225a8c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -98,6 +98,10 @@ public byte[] getMagic() { public abstract HoodieLogBlockType getBlockType(); + public boolean isDataOrDeleteBlock() { + return getBlockType().isDataOrDeleteBlock(); + } + public long getLogBlockLength() { throw new HoodieException("No implementation was provided"); } @@ -179,11 +183,10 @@ public static HoodieLogBlockType fromId(String id) { } /** - * @param logBlockType log block type to be inspected. * @returns true if the log block type refers to data or delete block. false otherwise. */ - public static boolean isDataOrDeleteBlock(HoodieLogBlockType logBlockType) { - return logBlockType != HoodieLogBlockType.COMMAND_BLOCK && logBlockType != HoodieLogBlockType.CORRUPT_BLOCK; + public boolean isDataOrDeleteBlock() { + return this != HoodieLogBlockType.COMMAND_BLOCK && this != HoodieLogBlockType.CORRUPT_BLOCK; } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index cdf46bc234bd..c56163e058f7 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -191,7 +191,7 @@ public void testHoodieLogBlockTypeIsDataOrDeleteBlock() { dataOrDeleteBlocks.add(HoodieLogBlockType.CDC_DATA_BLOCK); Arrays.stream(HoodieLogBlockType.values()).forEach(logBlockType -> { - assertEquals(HoodieLogBlockType.isDataOrDeleteBlock(logBlockType), dataOrDeleteBlocks.contains(logBlockType)); + assertEquals(dataOrDeleteBlocks.contains(logBlockType), logBlockType.isDataOrDeleteBlock()); }); } @@ -803,9 +803,8 @@ private List getSortedLogFilesList(List> logFi } private void readAndValidate(Schema schema, String maxCommitTime, List logFiles, List expectedRecords) throws IOException { - HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() - .withStorage(storage) - .withBasePath(basePath) + try (HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withStorage(storage).withBasePath(basePath) .withLogFilePaths( logFiles.stream() .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())) @@ -818,17 +817,17 @@ private void readAndValidate(Schema schema, String maxCommitTime, List scannedRecords = new ArrayList<>(); - for (HoodieRecord record : scanner) { - scannedRecords.add((IndexedRecord) - ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); - } + List scannedRecords = new ArrayList<>(); + for (HoodieRecord record : scanner) { + scannedRecords.add((IndexedRecord) + ((HoodieAvroRecord) record).getData().getInsertValue(schema).get()); + } - assertEquals(sort(expectedRecords), sort(scannedRecords), - "Scanner records count should be the same as appended records"); - scanner.close(); + assertEquals(sort(expectedRecords), sort(scannedRecords), + "Scanner records count should be the same as appended records"); + } } private Pair, Set> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java index 593509c4fc12..0ca040b782f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java @@ -161,8 +161,8 @@ public void testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp final String p3 = "p3"; List insertsAtEpoch0 = getInserts(totalRecords, p1, 0, payloadClass); List updatesAtEpoch5 = getUpdates(insertsAtEpoch0.subList(0, 4), p2, 5, payloadClass); - try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { + try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) { // 1st batch: inserts String commitTimeAtEpoch0 = getCommitTimeAtUTC(0); client.startCommitWithTime(commitTimeAtEpoch0);