Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8248] Fixing Log Record reader to include rollback blocks with timestamps > maxInstant times #12033

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) {
final Option<String> instantTime = hoodieTable
.getMetaClient()
.getCommitsTimeline()
.getActiveTimeline() // we need to include all actions and completed
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,11 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
HoodieLogBlock logBlock = logFormatReaderWrapper.next();
final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
if (logBlock.getBlockType() != CORRUPT_BLOCK
&& !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
}
if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) {
if (logBlock.isDataOrDeleteBlock()) {
if (HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) {
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
continue;
}
if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
// filter the log block by instant range
continue;
Expand Down Expand Up @@ -440,10 +438,10 @@ private void scanInternalV2(Option<KeySpec> keySpecOption, boolean skipProcessin
totalCorruptBlocks.incrementAndGet();
continue;
}
if (!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime)) {
// hit a block with instant time greater than should be processed, stop processing further
break;
if (logBlock.isDataOrDeleteBlock()
&& HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.GREATER_THAN, this.latestInstantTime)) {
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
continue;
}
if (logBlock.getBlockType() != COMMAND_BLOCK) {
if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public byte[] getMagic() {

public abstract HoodieLogBlockType getBlockType();

public boolean isDataOrDeleteBlock() {
return getBlockType().isDataOrDeleteBlock();
}

public long getLogBlockLength() {
throw new HoodieException("No implementation was provided");
}
Expand Down Expand Up @@ -177,6 +181,13 @@ public enum HoodieLogBlockType {
public static HoodieLogBlockType fromId(String id) {
return ID_TO_ENUM_MAP.get(id);
}

/**
* @returns true if the log block type refers to data or delete block. false otherwise.
*/
public boolean isDataOrDeleteBlock() {
return this != HoodieLogBlockType.COMMAND_BLOCK && this != HoodieLogBlockType.CORRUPT_BLOCK;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,20 @@ public void tearDown() throws IOException {
storage.deleteDirectory(new StoragePath(spillableBasePath));
}

@Test
public void testHoodieLogBlockTypeIsDataOrDeleteBlock() {
List<HoodieLogBlock.HoodieLogBlockType> dataOrDeleteBlocks = new ArrayList<>();
dataOrDeleteBlocks.add(HoodieLogBlockType.DELETE_BLOCK);
dataOrDeleteBlocks.add(HoodieLogBlockType.AVRO_DATA_BLOCK);
dataOrDeleteBlocks.add(HoodieLogBlockType.PARQUET_DATA_BLOCK);
dataOrDeleteBlocks.add(HoodieLogBlockType.HFILE_DATA_BLOCK);
dataOrDeleteBlocks.add(HoodieLogBlockType.CDC_DATA_BLOCK);

Arrays.stream(HoodieLogBlockType.values()).forEach(logBlockType -> {
assertEquals(dataOrDeleteBlocks.contains(logBlockType), logBlockType.isDataOrDeleteBlock());
});
}

@Test
public void testEmptyLog() throws IOException {
Writer writer =
Expand Down Expand Up @@ -673,23 +687,184 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
boolean enableOptimizedLogBlocksScan)
throws IOException, URISyntaxException, InterruptedException {

// Generate 4 delta-log files w/ random records
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
SchemaTestUtil testUtil = new SchemaTestUtil();
appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan,
"100");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLogRecordReaderWithMaxInstantTimeConfigured(boolean enableOptimizedLogScan) throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
SchemaTestUtil testUtil = new SchemaTestUtil();

Pair<List<IndexedRecord>, Set<HoodieLogFile>> firstBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan,
"100");

// trigger another batch of writes for next commit
Pair<List<IndexedRecord>, Set<HoodieLogFile>> secondBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan,
"200", firstBatch.getKey(), firstBatch.getValue());

List<IndexedRecord> firstAndSecondBatch = new ArrayList<>(firstBatch.getKey());
firstAndSecondBatch.addAll(secondBatch.getKey());

// set max commit time as 200 and validate only first batch of records are returned
List<HoodieLogFile> allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), secondBatch.getValue()));

// expect records only from first batch when max commit time is set to 100.
readAndValidate(schema, "100", allLogFiles, firstBatch.getKey());

// add another batch.
Pair<List<IndexedRecord>, Set<HoodieLogFile>> thirdBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan,
"300", firstAndSecondBatch, new HashSet<>(allLogFiles));

allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), secondBatch.getValue(), thirdBatch.getValue()));

// set max commit time as 100 and validate only first batch of records are returned
readAndValidate(schema, "100", allLogFiles, firstBatch.getKey());
readAndValidate(schema, "200", allLogFiles, firstAndSecondBatch);
List<IndexedRecord> allBatches = new ArrayList<>(firstAndSecondBatch);
allBatches.addAll(thirdBatch.getKey());
readAndValidate(schema, "300", allLogFiles, allBatches);

// add rollback to commit 200
addRollbackBlock("400", "200");

// lets not remove commit 200 from timeline. but still due to presence of rollback block, 2nd batch should be ignored.
List<IndexedRecord> firstAndThirdBatch = new ArrayList<>(firstBatch.getKey());
firstAndThirdBatch.addAll(thirdBatch.getKey());
readAndValidate(schema, "300", allLogFiles, firstAndThirdBatch);

// if we set maxCommitTime as 200 (which is rolled back), expected records are just from batch1
readAndValidate(schema, "200", allLogFiles, firstBatch.getKey());

// lets repeat the same after removing the commit from timeline.
FileCreateUtils.deleteDeltaCommit(basePath, "200", storage);
readAndValidate(schema, "300", allLogFiles, firstAndThirdBatch);
// if we set maxCommitTime as 200 (which is rolled back commit), expected records are just from batch1
readAndValidate(schema, "200", allLogFiles, firstBatch.getKey());

// let's test rollback issue from HUDI-8248
// lets add commit 400 (batch4). add a rollback block with commit time 500 which rollsback 400. again, add log files with commit time 400 (batch5)
// when we read all log files w/ max commit time as 400, batch4 needs to be ignored and only batch5 should be read.
// trigger another batch of writes for next commit
Pair<List<IndexedRecord>, Set<HoodieLogFile>> fourthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan,
"400", firstAndThirdBatch, new HashSet<>(allLogFiles));

// lets delete commit 400 from timeline to simulate crash.
FileCreateUtils.deleteDeltaCommit(basePath, "400", storage);

// set max commit time as 400 and validate only first and 3rd batch is read. 1st batch is rolled back completely. 4th batch is partially failed commit.
allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), thirdBatch.getValue(), fourthBatch.getValue()));
readAndValidate(schema, "400", allLogFiles, firstAndThirdBatch);

// lets add the rollback block
addRollbackBlock("500", "400");
// lets redo the read test
readAndValidate(schema, "400", allLogFiles, firstAndThirdBatch);

// and lets re-add new log files w/ commit time 400.
Pair<List<IndexedRecord>, Set<HoodieLogFile>> fifthBatch = appendAndValidate(schema, testUtil, ExternalSpillableMap.DiskMapType.BITCASK, false, enableOptimizedLogScan,
"400", firstBatch.getKey(), firstBatch.getValue());

// lets redo the read test. this time, first batch, 3rd batch and fifth batch should be expected.
allLogFiles = getSortedLogFilesList(Arrays.asList(firstBatch.getValue(), thirdBatch.getValue(), fourthBatch.getValue(), fifthBatch.getValue()));
List<IndexedRecord> firstThirdFifthBatch = new ArrayList<>(firstAndThirdBatch);
firstThirdFifthBatch.addAll(fifthBatch.getKey());
readAndValidate(schema, "400", allLogFiles, firstThirdFifthBatch);

// even setting very high value for max commit time should not matter.
readAndValidate(schema, "600", allLogFiles, firstThirdFifthBatch);
}

private void addRollbackBlock(String rollbackCommitTime, String commitToRollback) throws IOException, InterruptedException {
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withSizeThreshold(1024).withFileId("test-fileid1").withDeltaCommit("100").withStorage(storage).build();
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();

// Rollback the 1st block i.e. a data block.
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, rollbackCommitTime);
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commitToRollback);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
HoodieCommandBlock commandBlock = new HoodieCommandBlock(header);
writer.appendBlock(commandBlock);
writer.close();
}

private List<HoodieLogFile> getSortedLogFilesList(List<Set<HoodieLogFile>> logFilesSets) {
Set<HoodieLogFile> allLogFiles = new HashSet<>();
logFilesSets.forEach(logfileSet -> allLogFiles.addAll(logfileSet));
List<HoodieLogFile> allLogFilesList = new ArrayList<>(allLogFiles);
Collections.sort(allLogFilesList, new HoodieLogFile.LogFileComparator());
return allLogFilesList;
}

private void readAndValidate(Schema schema, String maxCommitTime, List<HoodieLogFile> logFiles, List<IndexedRecord> expectedRecords) throws IOException {
try (HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(storage).withBasePath(basePath)
.withLogFilePaths(
logFiles.stream()
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
.withReaderSchema(schema)
.withLatestInstantTime(maxCommitTime)
.withMaxMemorySizeInBytes(10240L)
.withReverseReader(false)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(ExternalSpillableMap.DiskMapType.BITCASK)
.withBitCaskDiskMapCompressionEnabled(false)
.withOptimizedLogBlocksScan(false)
.build()) {

List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) {
scannedRecords.add((IndexedRecord)
((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
}

assertEquals(sort(expectedRecords), sort(scannedRecords),
"Scanner records count should be the same as appended records");
}
}

private Pair<List<IndexedRecord>, Set<HoodieLogFile>> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean enableOptimizedLogBlocksScan,
String commitTime) throws IOException, URISyntaxException, InterruptedException {
return appendAndValidate(schema, testUtil, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, commitTime,
Collections.emptyList(), Collections.emptySet());
}

private Pair<List<IndexedRecord>, Set<HoodieLogFile>> appendAndValidate(Schema schema, SchemaTestUtil testUtil, ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean enableOptimizedLogBlocksScan,
String commitTime,
List<IndexedRecord> prevGenRecords, Set<HoodieLogFile> prevLogFiles) throws IOException,
URISyntaxException, InterruptedException {

// Generate 4 delta-log files w/ random records
List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 400);
Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 4, commitTime);

Set<HoodieLogFile> logFiles = writeLogFiles(partitionPath, schema, genRecords, 4);
Set<HoodieLogFile> allLogFiles = new HashSet<>();
allLogFiles.addAll(logFiles);
allLogFiles.addAll(prevLogFiles);
List<HoodieLogFile> allLogFilesList = new ArrayList<>(allLogFiles);
Collections.sort(allLogFilesList, new HoodieLogFile.LogFileComparator());

FileCreateUtils.createDeltaCommit(basePath, "100", storage);
FileCreateUtils.createDeltaCommit(basePath, commitTime, storage);
// scan all log blocks (across multiple log files)
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withStorage(storage)
.withBasePath(basePath)
.withLogFilePaths(
logFiles.stream()
allLogFilesList.stream()
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withLatestInstantTime(commitTime)
.withMaxMemorySizeInBytes(10240L)
.withReverseReader(false)
.withBufferSize(BUFFER_SIZE)
Expand All @@ -705,9 +880,13 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
((HoodieAvroRecord) record).getData().getInsertValue(schema).get());
}

assertEquals(sort(genRecords), sort(scannedRecords),
List<IndexedRecord> allGenRecords = new ArrayList<>(genRecords);
allGenRecords.addAll(prevGenRecords);

assertEquals(sort(allGenRecords), sort(scannedRecords),
"Scanner records count should be the same as appended records");
scanner.close();
return Pair.of(genRecords, logFiles);
}

@ParameterizedTest
Expand Down Expand Up @@ -2816,16 +2995,15 @@ private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
List<IndexedRecord> records,
int numFiles)
throws IOException, InterruptedException {
return writeLogFiles(partitionPath, schema, records, numFiles, false);
return writeLogFiles(partitionPath, schema, records, numFiles, "100");
}

private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
Schema schema,
List<IndexedRecord> records,
int numFiles,
boolean enableBlockSequenceNumbers)
String commitTime)
throws IOException, InterruptedException {
int blockSeqNo = 0;
Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
Expand All @@ -2837,7 +3015,7 @@ private static Set<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
(FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
}
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());

Set<HoodieLogFile> logFiles = new HashSet<>();
Expand Down
Loading
Loading