Skip to content

Commit fb1da7d

Browse files
committed
Address comments
1 parent 8e39e79 commit fb1da7d

File tree

51 files changed

+329
-325
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+329
-325
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hudi.client.WriteStatus;
2222
import org.apache.hudi.common.engine.TaskContextSupplier;
2323
import org.apache.hudi.common.model.HoodieRecord;
24+
import org.apache.hudi.common.schema.HoodieSchema;
2425
import org.apache.hudi.common.util.queue.HoodieConsumer;
2526
import org.apache.hudi.config.HoodieWriteConfig;
2627
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
@@ -112,7 +113,7 @@ public void consume(HoodieInsertValueGenResult<HoodieRecord> genResult) {
112113
record.getPartitionPath(), idPrefix, taskContextSupplier);
113114
handles.put(partitionPath, handle);
114115
}
115-
handle.write(record, genResult.schema, config.getProps());
116+
handle.write(record, HoodieSchema.fromAvroSchema(genResult.schema), config.getProps());
116117
}
117118

118119
@Override

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
package org.apache.hudi.index;
2020

2121
import org.apache.hudi.avro.AvroSchemaCache;
22-
import org.apache.hudi.avro.HoodieAvroUtils;
2322
import org.apache.hudi.common.config.RecordMergeMode;
24-
import org.apache.hudi.common.config.SerializableSchema;
2523
import org.apache.hudi.common.config.TypedProperties;
2624
import org.apache.hudi.common.data.HoodieData;
2725
import org.apache.hudi.common.data.HoodiePairData;
@@ -42,6 +40,7 @@
4240
import org.apache.hudi.common.model.HoodieTableType;
4341
import org.apache.hudi.common.model.MetadataValues;
4442
import org.apache.hudi.common.schema.HoodieSchema;
43+
import org.apache.hudi.common.schema.HoodieSchemaUtils;
4544
import org.apache.hudi.common.table.HoodieTableConfig;
4645
import org.apache.hudi.common.table.HoodieTableMetaClient;
4746
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -341,7 +340,7 @@ public static HoodieIndex createUserDefinedIndex(HoodieWriteConfig config) {
341340
* @return {@link HoodieRecord}s that have the current location being set.
342341
*/
343342
private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
344-
HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable, ReaderContextFactory<R> readerContextFactory, Schema dataSchema) {
343+
HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable, ReaderContextFactory<R> readerContextFactory, HoodieSchema dataSchema) {
345344
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
346345
final Option<String> instantTime = metaClient
347346
.getActiveTimeline() // we need to include all actions and completed
@@ -368,8 +367,8 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
368367
.withHoodieTableMetaClient(metaClient)
369368
.withLatestCommitTime(instantTime.get())
370369
.withFileSlice(fileSlice)
371-
.withDataSchema(dataSchema)
372-
.withRequestedSchema(dataSchema)
370+
.withDataSchema(dataSchema.toAvroSchema())
371+
.withRequestedSchema(dataSchema.toAvroSchema())
373372
.withInternalSchema(internalSchemaOption)
374373
.withProps(metaClient.getTableConfig().getProps())
375374
.withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan())
@@ -516,7 +515,7 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesAndDeletio
516515
HoodieWriteConfig config,
517516
HoodieTable hoodieTable,
518517
HoodieReaderContext<R> readerContext,
519-
SerializableSchema writerSchema) {
518+
HoodieSchema writerSchema) {
520519
boolean isExpressionPayload = config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload");
521520
Pair<HoodieWriteConfig, BaseKeyGenerator> keyGeneratorWriteConfigOpt =
522521
getKeygenAndUpdatedWriteConfig(config, hoodieTable.getMetaClient().getTableConfig(), isExpressionPayload);
@@ -543,25 +542,27 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesAndDeletio
543542
.getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), config.getRecordMerger().getRecordType(), hoodieTable.getMetaClient().getTableConfig().getProps());
544543
RecordContext<R> existingRecordContext = readerContextFactoryForExistingRecords.getContext().getRecordContext();
545544
// merged existing records with current locations being set
546-
SerializableSchema writerSchemaWithMetaFields = new SerializableSchema(HoodieAvroUtils.addMetadataFields(writerSchema.get(), updatedConfig.allowOperationMetadataField()));
547-
AvroSchemaCache.intern(writerSchema.get());
548-
AvroSchemaCache.intern(writerSchemaWithMetaFields.get());
545+
HoodieSchema writerSchemaWithMetaFields = HoodieSchemaUtils.addMetadataFields(writerSchema, updatedConfig.allowOperationMetadataField());
546+
// TODO: Add HoodieSchemaCache#intern after #14374 is merged
547+
AvroSchemaCache.intern(writerSchema.toAvroSchema());
548+
// TODO: Add HoodieSchemaCache#intern after #14374 is merged
549+
AvroSchemaCache.intern(writerSchemaWithMetaFields.toAvroSchema());
549550
// Read the existing records with the meta fields and current writer schema as the output schema
550551
HoodieData<HoodieRecord<R>> existingRecords =
551-
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, readerContextFactoryForExistingRecords, writerSchemaWithMetaFields.get());
552+
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, readerContextFactoryForExistingRecords, writerSchemaWithMetaFields);
552553
List<String> orderingFieldNames = getOrderingFieldNames(
553554
readerContext.getMergeMode(), hoodieTable.getMetaClient());
554555
BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
555556
readerContext,
556557
readerContext.getMergeMode(),
557558
false,
558559
readerContext.getRecordMerger(),
559-
writerSchema.get(),
560+
writerSchema.toAvroSchema(),
560561
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(), hoodieTable.getConfig().getPayloadClass())),
561562
properties,
562563
hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
563564
String[] orderingFieldsArray = orderingFieldNames.toArray(new String[0]);
564-
DeleteContext deleteContext = DeleteContext.fromRecordSchema(properties, HoodieSchema.fromAvroSchema(writerSchema.get()));
565+
DeleteContext deleteContext = DeleteContext.fromRecordSchema(properties, writerSchema);
565566
HoodieData<HoodieRecord<R>> taggedUpdatingRecords = untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
566567
.leftOuterJoin(existingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r)))
567568
.values().flatMap(entry -> {
@@ -572,10 +573,9 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesAndDeletio
572573
return Collections.singletonList(incoming).iterator();
573574
}
574575
HoodieRecord<R> existing = existingOpt.get();
575-
HoodieSchema writeSchema = HoodieSchema.fromAvroSchema(writerSchema.get());
576576

577577
Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(
578-
incoming, existing, writeSchema, HoodieSchema.fromAvroSchema(writerSchemaWithMetaFields.get()), updatedConfig,
578+
incoming, existing, writerSchema, writerSchemaWithMetaFields, updatedConfig,
579579
recordMerger, keyGenerator, incomingRecordContext, existingRecordContext, orderingFieldsArray, properties, isExpressionPayload, deleteContext);
580580
if (!mergedOpt.isPresent()) {
581581
// merge resulted in delete: force tag the incoming to the old partition
@@ -632,12 +632,12 @@ public static <R> HoodieData<HoodieRecord<R>> tagGlobalLocationBackToRecords(
632632
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
633633
readerContext.initRecordMergerForIngestion(config.getProps());
634634
TypedProperties properties = readerContext.getMergeProps(config.getProps());
635-
SerializableSchema writerSchema = new SerializableSchema(config.getWriteSchema());
635+
HoodieSchema writerSchema = HoodieSchema.parse(config.getWriteSchema());
636636
boolean isCommitTimeOrdered = readerContext.getMergeMode() == RecordMergeMode.COMMIT_TIME_ORDERING;
637637
// if the index is not updating the partition of the record, and the table is COW, then we do not need to do merging at
638638
// this phase since the writer path will merge when rewriting the files as part of the upsert operation.
639639
boolean requiresMergingWithOlderRecordVersion = shouldUpdatePartitionPath || table.getMetaClient().getTableConfig().getTableType() == HoodieTableType.MERGE_ON_READ;
640-
DeleteContext deleteContext = DeleteContext.fromRecordSchema(properties, HoodieSchema.fromAvroSchema(writerSchema.get()));
640+
DeleteContext deleteContext = DeleteContext.fromRecordSchema(properties, writerSchema);
641641

642642
// Pair of incoming record and the global location if meant for merged lookup in later stage
643643
HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,11 @@ public boolean canWrite(HoodieRecord record) {
9292
* Perform the actual writing of the given record into the backing file.
9393
*/
9494
@Override
95-
protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props) {
95+
protected void doWrite(HoodieRecord record, HoodieSchema schema, TypedProperties props) {
9696
Option<Map<String, String>> recordMetadata = getRecordMetadata(record, schema, props);
9797
try {
9898
if (!HoodieOperation.isDelete(record.getOperation()) && !record.isDelete(deleteContext, config.getProps())) {
99-
if (record.shouldIgnore(schema, config.getProps())) {
99+
if (record.shouldIgnore(schema.toAvroSchema(), config.getProps())) {
100100
return;
101101
}
102102

@@ -146,26 +146,26 @@ public void write() {
146146
/**
147147
* Write record to file using fileWriter
148148
*/
149-
protected void writeRecordToFile(HoodieRecord record, Schema schema) throws IOException {
149+
protected void writeRecordToFile(HoodieRecord record, HoodieSchema schema) throws IOException {
150150
if (preserveMetadata) {
151151
HoodieRecord populatedRecord = updateFileName(record, schema, writeSchemaWithMetaFields, path.getName(), config.getProps());
152152
if (isSecondaryIndexStatsStreamingWritesEnabled) {
153153
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(populatedRecord, writeStatus, writeSchemaWithMetaFields, secondaryIndexDefns, config);
154154
}
155-
fileWriter.write(record.getRecordKey(), populatedRecord, HoodieSchema.fromAvroSchema(writeSchemaWithMetaFields), config.getProps());
155+
fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields, config.getProps());
156156
} else {
157157
// rewrite the record to include metadata fields in schema, and the values will be set later.
158-
record = record.prependMetaFields(schema, writeSchemaWithMetaFields, new MetadataValues(), config.getProps());
158+
record = record.prependMetaFields(schema.toAvroSchema(), writeSchemaWithMetaFields.toAvroSchema(), new MetadataValues(), config.getProps());
159159
if (isSecondaryIndexStatsStreamingWritesEnabled) {
160160
SecondaryIndexStreamingTracker.trackSecondaryIndexStats(record, writeStatus, writeSchemaWithMetaFields, secondaryIndexDefns, config);
161161
}
162-
fileWriter.writeWithMetadata(record.getKey(), record, HoodieSchema.fromAvroSchema(writeSchemaWithMetaFields), config.getProps());
162+
fileWriter.writeWithMetadata(record.getKey(), record, writeSchemaWithMetaFields, config.getProps());
163163
}
164164
}
165165

166-
protected HoodieRecord<T> updateFileName(HoodieRecord<T> record, Schema schema, Schema targetSchema, String fileName, Properties prop) {
166+
protected HoodieRecord<T> updateFileName(HoodieRecord<T> record, HoodieSchema schema, HoodieSchema targetSchema, String fileName, Properties prop) {
167167
MetadataValues metadataValues = new MetadataValues().setFileName(fileName);
168-
return record.prependMetaFields(schema, targetSchema, metadataValues, prop);
168+
return record.prependMetaFields(schema.toAvroSchema(), targetSchema.toAvroSchema(), metadataValues, prop);
169169
}
170170

171171
@Override

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public void doAppend() {
8484
config.getBasePath(), operation.getPartitionPath()), logFileName)));
8585
// Initializes the record iterator, log compaction requires writing the deletes into the delete block of the resulting log file.
8686
try (HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
87-
.withLatestCommitTime(instantTime).withPartitionPath(partitionPath).withLogFiles(logFiles).withBaseFileOption(Option.empty()).withDataSchema(writeSchemaWithMetaFields)
88-
.withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
87+
.withLatestCommitTime(instantTime).withPartitionPath(partitionPath).withLogFiles(logFiles).withBaseFileOption(Option.empty()).withDataSchema(writeSchemaWithMetaFields.toAvroSchema())
88+
.withRequestedSchema(writeSchemaWithMetaFields.toAvroSchema()).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
8989
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
9090
// instead of using config.enableOptimizedLogBlocksScan(), we set to true as log compaction blocks only supported in scanV2
9191
.withEnableOptimizedLogBlockScan(true).build()) {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import org.apache.hudi.table.HoodieTable;
5858
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
5959

60-
import org.apache.avro.Schema;
6160
import org.apache.avro.generic.GenericRecord;
6261
import org.slf4j.Logger;
6362
import org.slf4j.LoggerFactory;
@@ -233,8 +232,7 @@ private void init(CompactionOperation operation, String partitionPath) {
233232

234233
// Create the writer for writing the new version file
235234
fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath, hoodieTable.getStorage(),
236-
//TODO boundary to revisit in follow up to use HoodieSchema directly
237-
config, HoodieSchema.fromAvroSchema(writeSchemaWithMetaFields), taskContextSupplier, recordType);
235+
config, writeSchemaWithMetaFields, taskContextSupplier, recordType);
238236
} catch (IOException io) {
239237
writeStatus.setGlobalError(io);
240238
throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit "
@@ -259,7 +257,7 @@ public void doMerge() {
259257
}
260258
boolean usePosition = config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
261259
Option<InternalSchema> internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema())
262-
.map(internalSchema -> AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields, internalSchema,
260+
.map(internalSchema -> AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields.toAvroSchema(), internalSchema,
263261
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
264262
long maxMemoryPerCompaction = getMaxMemoryForMerge();
265263
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), String.valueOf(maxMemoryPerCompaction));
@@ -287,7 +285,7 @@ public void doMerge() {
287285
// For other updates, we only want to preserve the metadata if the record is not being modified by this update. If the record already exists in the base file and is not updated,
288286
// the operation will be null. Records that are being updated or records being added to the file group for the first time will have an operation set and must generate new metadata.
289287
boolean shouldPreserveRecordMetadata = preserveMetadata || record.getOperation() == null;
290-
Schema recordSchema = shouldPreserveRecordMetadata ? writeSchemaWithMetaFields : writeSchema;
288+
HoodieSchema recordSchema = shouldPreserveRecordMetadata ? writeSchemaWithMetaFields : writeSchema;
291289
writeToFile(record.getKey(), record, recordSchema, config.getPayloadConfig().getProps(), shouldPreserveRecordMetadata);
292290
writeStatus.markSuccess(record, recordMetadata);
293291
recordsWritten++;
@@ -319,7 +317,7 @@ private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, Option<
319317
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>> incomingRecordsItr) {
320318
HoodieFileGroupReader.Builder<T> fileGroupBuilder = HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
321319
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge))
322-
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
320+
.withDataSchema(writeSchemaWithMetaFields.toAvroSchema()).withRequestedSchema(writeSchemaWithMetaFields.toAvroSchema())
323321
.withInternalSchema(internalSchemaOption).withProps(props)
324322
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
325323
.withFileGroupUpdateCallback(createCallback());

0 commit comments

Comments
 (0)