Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8521] Use merge mode to enable processing time merge for Compactor #12285

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withPartition(operation.getPartitionPath())
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
.withRecordMerger(config.getRecordMerger())
.withMergeMode(config.getRecordMergeMode())
.withTableMetaClient(metaClient)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, String
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withOptimizedLogBlocksScan(true)
.withRecordMerger(writeConfig.getRecordMerger())
.withMergeMode(writeConfig.getRecordMergeMode())
.withTableMetaClient(metaClient)
.build();
scanner.scan(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.table.log;

import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -108,6 +109,7 @@ public abstract class AbstractHoodieLogRecordScanner {
protected final String preCombineField;
// Stateless component for merging records
protected final HoodieRecordMerger recordMerger;
protected final RecordMergeMode mergeMode;
private final TypedProperties payloadProps;
// Log File Paths
protected final List<String> logFilePaths;
Expand Down Expand Up @@ -154,16 +156,17 @@ public abstract class AbstractHoodieLogRecordScanner {
private HoodieTimeline completedInstantsTimeline = null;
private HoodieTimeline inflightInstantsTimeline = null;

protected AbstractHoodieLogRecordScanner(HoodieStorage storage, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionNameOverride,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
protected AbstractHoodieLogRecordReader(HoodieStorage storage, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime,
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean forceFullScan,
Option<String> partitionNameOverride,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
RecordMergeMode mergeMode,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = hoodieTableMetaClientOption.orElseGet(
Expand All @@ -182,6 +185,7 @@ protected AbstractHoodieLogRecordScanner(HoodieStorage storage, String basePath,
this.tableVersion = tableConfig.getTableVersion();
this.payloadProps = props;
this.recordMerger = recordMerger;
this.mergeMode = mergeMode;
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.reverseReader = reverseReader;
Expand Down Expand Up @@ -892,6 +896,10 @@ public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
throw new UnsupportedOperationException();
}

public Builder withMergeMode(RecordMergeMode mergeMode) {
throw new UnsupportedOperationException();
}

public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.table.log;

import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -101,11 +102,13 @@ protected HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, L
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
RecordMergeMode mergeMode,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize,
instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
hoodieTableMetaClientOption);
mergeMode, hoodieTableMetaClientOption);
try {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
Expand Down Expand Up @@ -242,17 +245,25 @@ protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOExcepti
String key = newRecord.getRecordKey();
HoodieRecord<T> prevRecord = records.get(key);
if (prevRecord != null) {
// Merge and store the combined record
HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != prevRecord.getData()) {
HoodieRecord latestHoodieRecord = getLatestHoodieRecord(newRecord, combinedRecord, key);

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, latestHoodieRecord.copy());
switch (mergeMode) {
case OVERWRITE_WITH_LATEST:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some responsibility overlapping with the specific payload class: OverriteWithLatestAvroPayload, we should clarify how the merge mode/merger/payload class cooperate here.

And should we call this PROC_TIME_ORDERING akka to EVENT_TIME_ORDERING ?

records.put(key, newRecord);
break;
case EVENT_TIME_ORDERING:
case CUSTOM:
default:
// Merge and store the combined record
HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != prevRecord.getData()) {
HoodieRecord latestHoodieRecord = getLatestHoodieRecord(newRecord, combinedRecord, key);

// NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
// it since these records will be put into records(Map).
records.put(key, latestHoodieRecord.copy());
}
}
} else {
// Put the record as is
Expand Down Expand Up @@ -342,6 +353,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE;
private RecordMergeMode mergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
protected HoodieTableMetaClient hoodieTableMetaClient;

@Override
Expand Down Expand Up @@ -449,6 +461,12 @@ public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
return this;
}

@Override
public Builder withMergeMode(RecordMergeMode mergeMode) {
this.mergeMode = mergeMode;
return this;
}

public Builder withKeyFieldOverride(String keyFieldOverride) {
this.keyFieldOverride = requireNonNull(keyFieldOverride);
return this;
Expand Down Expand Up @@ -478,7 +496,7 @@ public HoodieMergedLogRecordScanner build() {
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan,
Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
Option.ofNullable(hoodieTableMetaClient));
mergeMode, Option.ofNullable(hoodieTableMetaClient));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.table.log;

import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -47,9 +48,9 @@ private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String basePath, L
String latestInstantTime, boolean reverseReader, int bufferSize,
LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema,
boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
RecordMergeMode mergeMode, Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange,
false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger,
false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, mergeMode,
hoodieTableMetaClientOption);
this.callback = callback;
}
Expand Down Expand Up @@ -112,6 +113,7 @@ public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
private LogRecordScannerCallback callback;
private boolean enableOptimizedLogBlocksScan;
private HoodieRecordMerger recordMerger = HoodiePreCombineAvroRecordMerger.INSTANCE;
private RecordMergeMode mergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
private HoodieTableMetaClient hoodieTableMetaClient;

public Builder withStorage(HoodieStorage storage) {
Expand Down Expand Up @@ -185,6 +187,12 @@ public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
return this;
}

@Override
public Builder withMergeMode(RecordMergeMode mergeMode) {
this.mergeMode = mergeMode;
return this;
}

@Override
public HoodieUnMergedLogRecordScanner.Builder withTableMetaClient(
HoodieTableMetaClient hoodieTableMetaClient) {
Expand All @@ -198,7 +206,7 @@ public HoodieUnMergedLogRecordScanner build() {

return new HoodieUnMergedLogRecordScanner(storage, basePath, logFilePaths, readerSchema,
latestInstantTime, reverseReader, bufferSize, callback, instantRange,
internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient));
internalSchema, enableOptimizedLogBlocksScan, recordMerger, mergeMode, Option.ofNullable(hoodieTableMetaClient));
}
}
}
Loading