diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index e92c2a5d7c2d..3eeedc3bcf96 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -207,6 +207,7 @@ public List compact(HoodieCompactionHandler compactionHandler, .withPartition(operation.getPartitionPath()) .withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config)) .withRecordMerger(config.getRecordMerger()) + .withMergeMode(config.getRecordMergeMode()) .withTableMetaClient(metaClient) .build(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java index a81ee663fa90..9c63bfdef4c2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java @@ -98,6 +98,7 @@ private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, String .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withOptimizedLogBlocksScan(true) .withRecordMerger(writeConfig.getRecordMerger()) + .withMergeMode(writeConfig.getRecordMergeMode()) .withTableMetaClient(metaClient) .build(); scanner.scan(true); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 10aa50efe3e1..633dc6709aae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieLogFile; @@ -108,6 +109,7 @@ public abstract class AbstractHoodieLogRecordScanner { protected final String preCombineField; // Stateless component for merging records protected final HoodieRecordMerger recordMerger; + protected final RecordMergeMode mergeMode; private final TypedProperties payloadProps; // Log File Paths protected final List logFilePaths; @@ -154,16 +156,17 @@ public abstract class AbstractHoodieLogRecordScanner { private HoodieTimeline completedInstantsTimeline = null; private HoodieTimeline inflightInstantsTimeline = null; - protected AbstractHoodieLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, - Schema readerSchema, String latestInstantTime, - boolean reverseReader, int bufferSize, Option instantRange, - boolean withOperationField, boolean forceFullScan, - Option partitionNameOverride, - InternalSchema internalSchema, - Option keyFieldOverride, - boolean enableOptimizedLogBlocksScan, - HoodieRecordMerger recordMerger, - Option hoodieTableMetaClientOption) { + protected AbstractHoodieLogRecordReader(HoodieStorage storage, String basePath, List logFilePaths, + Schema readerSchema, String latestInstantTime, + boolean reverseReader, int bufferSize, Option instantRange, + boolean withOperationField, boolean forceFullScan, + Option partitionNameOverride, + InternalSchema internalSchema, + Option keyFieldOverride, + boolean enableOptimizedLogBlocksScan, + HoodieRecordMerger recordMerger, + RecordMergeMode mergeMode, + Option hoodieTableMetaClientOption) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = hoodieTableMetaClientOption.orElseGet( @@ -182,6 +185,7 @@ protected AbstractHoodieLogRecordScanner(HoodieStorage storage, String basePath, this.tableVersion = tableConfig.getTableVersion(); this.payloadProps = props; this.recordMerger = recordMerger; + this.mergeMode = mergeMode; this.totalLogFiles.addAndGet(logFilePaths.size()); this.logFilePaths = logFilePaths; this.reverseReader = reverseReader; @@ -892,6 +896,10 @@ public Builder withRecordMerger(HoodieRecordMerger recordMerger) { throw new UnsupportedOperationException(); } + public Builder withMergeMode(RecordMergeMode mergeMode) { + throw new UnsupportedOperationException(); + } + public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index fb1d621eb4ae..a95a3d87f946 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieEmptyRecord; import org.apache.hudi.common.model.HoodieKey; @@ -101,11 +102,13 @@ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, L Option partitionName, InternalSchema internalSchema, Option keyFieldOverride, - boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, + boolean enableOptimizedLogBlocksScan, + HoodieRecordMerger recordMerger, + RecordMergeMode mergeMode, Option hoodieTableMetaClientOption) { super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger, - hoodieTableMetaClientOption); + mergeMode, hoodieTableMetaClientOption); try { this.maxMemorySizeInBytes = maxMemorySizeInBytes; // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize @@ -242,17 +245,25 @@ protected void processNextRecord(HoodieRecord newRecord) throws IOExcepti String key = newRecord.getRecordKey(); HoodieRecord prevRecord = records.get(key); if (prevRecord != null) { - // Merge and store the combined record - HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(prevRecord, readerSchema, - newRecord, readerSchema, this.getPayloadProps()).get().getLeft(); - // If pre-combine returns existing record, no need to update it - if (combinedRecord.getData() != prevRecord.getData()) { - HoodieRecord latestHoodieRecord = getLatestHoodieRecord(newRecord, combinedRecord, key); - - // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific - // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of - // it since these records will be put into records(Map). - records.put(key, latestHoodieRecord.copy()); + switch (mergeMode) { + case OVERWRITE_WITH_LATEST: + records.put(key, newRecord); + break; + case EVENT_TIME_ORDERING: + case CUSTOM: + default: + // Merge and store the combined record + HoodieRecord combinedRecord = (HoodieRecord) recordMerger.merge(prevRecord, readerSchema, + newRecord, readerSchema, this.getPayloadProps()).get().getLeft(); + // If pre-combine returns existing record, no need to update it + if (combinedRecord.getData() != prevRecord.getData()) { + HoodieRecord latestHoodieRecord = getLatestHoodieRecord(newRecord, combinedRecord, key); + + // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific + // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of + // it since these records will be put into records(Map). + records.put(key, latestHoodieRecord.copy()); + } } } else { // Put the record as is @@ -342,6 +353,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder { private boolean forceFullScan = true; private boolean enableOptimizedLogBlocksScan = false; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; + private RecordMergeMode mergeMode = RecordMergeMode.EVENT_TIME_ORDERING; protected HoodieTableMetaClient hoodieTableMetaClient; @Override @@ -449,6 +461,12 @@ public Builder withRecordMerger(HoodieRecordMerger recordMerger) { return this; } + @Override + public Builder withMergeMode(RecordMergeMode mergeMode) { + this.mergeMode = mergeMode; + return this; + } + public Builder withKeyFieldOverride(String keyFieldOverride) { this.keyFieldOverride = requireNonNull(keyFieldOverride); return this; @@ -478,7 +496,7 @@ public HoodieMergedLogRecordScanner build() { bufferSize, spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan, Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger, - Option.ofNullable(hoodieTableMetaClient)); + mergeMode, Option.ofNullable(hoodieTableMetaClient)); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 8b8d43449c67..2ee1875c153d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecord; @@ -47,9 +48,9 @@ private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, L String latestInstantTime, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback, Option instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, - Option hoodieTableMetaClientOption) { + RecordMergeMode mergeMode, Option hoodieTableMetaClientOption) { super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, - false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, + false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, mergeMode, hoodieTableMetaClientOption); this.callback = callback; } @@ -112,6 +113,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder { private LogRecordScannerCallback callback; private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE; + private RecordMergeMode mergeMode = RecordMergeMode.EVENT_TIME_ORDERING; private HoodieTableMetaClient hoodieTableMetaClient; public Builder withStorage(HoodieStorage storage) { @@ -185,6 +187,12 @@ public Builder withRecordMerger(HoodieRecordMerger recordMerger) { return this; } + @Override + public Builder withMergeMode(RecordMergeMode mergeMode) { + this.mergeMode = mergeMode; + return this; + } + @Override public HoodieUnMergedLogRecordScanner.Builder withTableMetaClient( HoodieTableMetaClient hoodieTableMetaClient) { @@ -198,7 +206,7 @@ public HoodieUnMergedLogRecordScanner build() { return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, callback, instantRange, - internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); + internalSchema, enableOptimizedLogBlocksScan, recordMerger, mergeMode, Option.ofNullable(hoodieTableMetaClient)); } } }