Skip to content

Commit 6d27465

Browse files
committed
Remove dangling usages of Avro.Schema in RecordContext
1 parent 2a64752 commit 6d27465

File tree

15 files changed

+305
-70
lines changed

15 files changed

+305
-70
lines changed

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ public RowData toBinaryRow(HoodieSchema schema, RowData record) {
194194
* @return a function that takes in a record and returns the record with reordered columns
195195
*/
196196
@Override
197-
public UnaryOperator<RowData> projectRecord(Schema from, Schema to, Map<String, String> renamedColumns) {
198-
RowType fromType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(from).getRowType().getLogicalType();
199-
RowType toType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(to).getRowType().getLogicalType();
197+
public UnaryOperator<RowData> projectRecord(HoodieSchema from, HoodieSchema to, Map<String, String> renamedColumns) {
198+
RowType fromType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(from.toAvroSchema()).getRowType().getLogicalType();
199+
RowType toType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(to.toAvroSchema()).getRowType().getLogicalType();
200200
RowProjection rowProjection = SchemaEvolvingRowDataProjection.instance(fromType, toType, renamedColumns);
201201
return rowProjection::project;
202202
}

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,9 @@ public InternalRow toBinaryRow(HoodieSchema schema, InternalRow internalRow) {
221221
}
222222

223223
@Override
224-
public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to, Map<String, String> renamedColumns) {
224+
public UnaryOperator<InternalRow> projectRecord(HoodieSchema from, HoodieSchema to, Map<String, String> renamedColumns) {
225225
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
226-
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), getCachedSchema(to), renamedColumns, Collections.emptyMap());
226+
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from.toAvroSchema()), getCachedSchema(to.toAvroSchema()), renamedColumns, Collections.emptyMap());
227227
return row -> (InternalRow) unsafeRowWriter.apply(row);
228228
}
229229

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
2525
import org.apache.hudi.common.engine.HoodieReaderContext
2626
import org.apache.hudi.common.fs.FSUtils
2727
import org.apache.hudi.common.model.HoodieRecord
28-
import org.apache.hudi.common.schema.HoodieSchema
28+
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils}
2929
import org.apache.hudi.common.table.HoodieTableConfig
3030
import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
3131
import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -147,8 +147,8 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
147147
val rowIndexColumn = new java.util.HashSet[String]()
148148
rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
149149
//always remove the row index column from the skeleton because the data file will also have the same column
150-
val skeletonProjection = recordContext.projectRecord(skeletonRequiredSchema.toAvroSchema,
151-
HoodieAvroUtils.removeFields(skeletonRequiredSchema.toAvroSchema, rowIndexColumn))
150+
val skeletonProjection = recordContext.projectRecord(skeletonRequiredSchema,
151+
HoodieSchemaUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))
152152

153153
//If we need to do position based merging with log files we will leave the row index column at the end
154154
val dataProjection = if (getShouldMergeUseRecordPosition) {

hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ public IndexedRecord toBinaryRow(HoodieSchema schema, IndexedRecord record) {
183183
}
184184

185185
@Override
186-
public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to, Map<String, String> renamedColumns) {
187-
return record -> HoodieAvroUtils.rewriteRecordWithNewSchema(record, to, renamedColumns);
186+
public UnaryOperator<IndexedRecord> projectRecord(HoodieSchema from, HoodieSchema to, Map<String, String> renamedColumns) {
187+
return record -> HoodieAvroUtils.rewriteRecordWithNewSchema(record, to.toAvroSchema(), renamedColumns);
188188
}
189189

190190
@Override

hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.hudi.exception.HoodieKeyException;
3636
import org.apache.hudi.keygen.KeyGenerator;
3737

38-
import org.apache.avro.Schema;
3938
import org.apache.avro.generic.GenericRecord;
4039
import org.apache.avro.generic.IndexedRecord;
4140

@@ -62,7 +61,7 @@ public abstract class RecordContext<T> implements Serializable {
6261

6362
private static final long serialVersionUID = 1L;
6463

65-
private final SerializableBiFunction<T, Schema, String> recordKeyExtractor;
64+
private final SerializableBiFunction<T, HoodieSchema, String> recordKeyExtractor;
6665
// for encoding and decoding schemas to the spillable map
6766
private final LocalHoodieSchemaCache localSchemaCache = LocalHoodieSchemaCache.getInstance();
6867

@@ -183,7 +182,7 @@ public JavaTypeConverter getTypeConverter() {
183182
* @return The record key in String.
184183
*/
185184
public String getRecordKey(T record, HoodieSchema schema) {
186-
return recordKeyExtractor.apply(record, schema.toAvroSchema());
185+
return recordKeyExtractor.apply(record, schema);
187186
}
188187

189188
/**
@@ -331,27 +330,7 @@ private boolean isCustomDeleteRecord(T record, DeleteContext deleteContext) {
331330
* the value is the old name that exists in the file
332331
* @return a function that takes in a record and returns the record with reordered columns
333332
*/
334-
public abstract UnaryOperator<T> projectRecord(Schema from, Schema to, Map<String, String> renamedColumns);
335-
336-
public final UnaryOperator<T> projectRecord(Schema from, Schema to) {
337-
return projectRecord(from, to, Collections.emptyMap());
338-
}
339-
340-
/**
341-
* Creates a function that will reorder records of schema "from" to schema of "to" using HoodieSchema.
342-
* This method uses HoodieSchema for in-memory processing while maintaining
343-
* compatibility with existing Avro-based operations.
344-
*
345-
* @param from the HoodieSchema of records to be passed into UnaryOperator
346-
* @param to the HoodieSchema of records produced by UnaryOperator
347-
* @param renamedColumns map of renamed columns where the key is the new name from the query and
348-
* the value is the old name that exists in the file
349-
* @return a function that takes in a record and returns the record with reordered columns
350-
* @since 1.2.0
351-
*/
352-
public UnaryOperator<T> projectRecord(HoodieSchema from, HoodieSchema to, Map<String, String> renamedColumns) {
353-
return projectRecord(from.toAvroSchema(), to.toAvroSchema(), renamedColumns);
354-
}
333+
public abstract UnaryOperator<T> projectRecord(HoodieSchema from, HoodieSchema to, Map<String, String> renamedColumns);
355334

356335
public final UnaryOperator<T> projectRecord(HoodieSchema from, HoodieSchema to) {
357336
return projectRecord(from, to, Collections.emptyMap());
@@ -452,27 +431,26 @@ public boolean supportsParquetRowIndex() {
452431
return false;
453432
}
454433

455-
private SerializableBiFunction<T, Schema, String> metadataKeyExtractor() {
456-
return (record, schema) -> getValue(record, HoodieSchema.fromAvroSchema(schema), RECORD_KEY_METADATA_FIELD).toString();
434+
private SerializableBiFunction<T, HoodieSchema, String> metadataKeyExtractor() {
435+
return (record, schema) -> getValue(record, schema, RECORD_KEY_METADATA_FIELD).toString();
457436
}
458437

459-
private SerializableBiFunction<T, Schema, String> virtualKeyExtractor(String[] recordKeyFields) {
438+
private SerializableBiFunction<T, HoodieSchema, String> virtualKeyExtractor(String[] recordKeyFields) {
460439
if (recordKeyFields.length == 1) {
461440
// there might be consistency for record key encoding when partition fields are multiple for cow merging,
462441
// currently the incoming records are using the keys from HoodieRecord which utilities the write config and by default encodes the field name with the value
463442
// while here the field names are ignored, this function would be used to extract record keys from old base file.
464443
return (record, schema) -> {
465-
Object result = getValue(record, HoodieSchema.fromAvroSchema(schema), recordKeyFields[0]);
444+
Object result = getValue(record, schema, recordKeyFields[0]);
466445
if (result == null) {
467446
throw new HoodieKeyException("recordKey cannot be null");
468447
}
469448
return result.toString();
470449
};
471450
}
472451
return (record, schema) -> {
473-
HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema);
474452
BiFunction<String, Integer, String> valueFunction = (recordKeyField, index) -> {
475-
Object result = getValue(record, hoodieSchema, recordKeyField);
453+
Object result = getValue(record, schema, recordKeyField);
476454
return result != null ? result.toString() : null;
477455
};
478456
return KeyGenerator.constructRecordKey(recordKeyFields, valueFunction);

hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,4 +346,26 @@ public static boolean areSchemasProjectionEquivalent(HoodieSchema schema1, Hoodi
346346
// Delegate to AvroSchemaUtils
347347
return AvroSchemaUtils.areSchemasProjectionEquivalent(schema1.toAvroSchema(), schema2.toAvroSchema());
348348
}
349+
350+
/**
351+
* Removes specified fields from the schema.
352+
* This is equivalent to HoodieAvroUtils.removeFields() but operates on HoodieSchema.
353+
*
354+
* <p>This method creates a new schema with only the fields that are not in the fieldsToRemove set.
355+
* All other schema metadata (name, namespace, doc, properties) is preserved from the original schema.</p>
356+
*
357+
* @param schema the input schema
358+
* @param fieldsToRemove set of field names to remove from the schema
359+
* @return new HoodieSchema with specified fields removed
360+
* @throws IllegalArgumentException if schema is null or fieldsToRemove is null
361+
* @since 1.2.0
362+
*/
363+
public static HoodieSchema removeFields(HoodieSchema schema, Set<String> fieldsToRemove) {
364+
ValidationUtils.checkArgument(schema != null, "Schema cannot be null");
365+
ValidationUtils.checkArgument(fieldsToRemove != null, "Fields to remove cannot be null");
366+
367+
// Delegate to HoodieAvroUtils
368+
Schema resultAvro = HoodieAvroUtils.removeFields(schema.toAvroSchema(), fieldsToRemove);
369+
return HoodieSchema.fromAvroSchema(resultAvro);
370+
}
349371
}

hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public Option<InternalSchema> getInternalSchemaOpt() {
143143

144144
public Option<UnaryOperator<T>> getOutputConverter() {
145145
if (!AvroSchemaUtils.areSchemasProjectionEquivalent(requiredSchema, requestedSchema)) {
146-
return Option.of(readerContext.getRecordContext().projectRecord(requiredSchema, requestedSchema));
146+
return Option.of(readerContext.getRecordContext().projectRecord(HoodieSchema.fromAvroSchema(requiredSchema), HoodieSchema.fromAvroSchema(requestedSchema)));
147147
}
148148
return Option.empty();
149149
}

hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
5151
import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
5252

53-
import org.apache.avro.Schema;
54-
5553
import java.io.IOException;
5654
import java.io.Serializable;
5755
import java.util.Iterator;
@@ -193,7 +191,7 @@ public void close() {
193191
* @param keySpecOpt
194192
* @return
195193
*/
196-
protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) {
194+
protected Pair<ClosableIterator<T>, HoodieSchema> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) {
197195
ClosableIterator<T> blockRecordsIterator;
198196
try {
199197
if (keySpecOpt.isPresent()) {
@@ -202,7 +200,7 @@ protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(HoodieDataBlock d
202200
} else {
203201
blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
204202
}
205-
Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock);
203+
Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock);
206204
return Pair.of(new CloseableMappingIterator<>(
207205
blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()), schemaTransformerWithEvolvedSchema.getRight());
208206
} catch (IOException e) {
@@ -219,7 +217,7 @@ protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(HoodieDataBlock d
219217
* @param dataBlock current processed block
220218
* @return final read schema.
221219
*/
222-
protected Option<Pair<Function<T, T>, Schema>> composeEvolvedSchemaTransformer(
220+
protected Option<Pair<Function<T, T>, HoodieSchema>> composeEvolvedSchemaTransformer(
223221
HoodieDataBlock dataBlock) {
224222
if (internalSchema.isEmptySchema()) {
225223
return Option.empty();
@@ -229,13 +227,13 @@ protected Option<Pair<Function<T, T>, Schema>> composeEvolvedSchemaTransformer(
229227
InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(currentInstantTime, hoodieTableMetaClient);
230228
Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema,
231229
true, false, false).mergeSchemaGetRenamed();
232-
Schema mergedAvroSchema = InternalSchemaConverter.convert(mergedInternalSchema.getLeft(), readerSchema.getFullName()).getAvroSchema();
230+
HoodieSchema mergedAvroSchema = HoodieSchema.fromAvroSchema(InternalSchemaConverter.convert(mergedInternalSchema.getLeft(), readerSchema.getFullName()).getAvroSchema());
233231
// `mergedAvroSchema` maybe not equal with `readerSchema`, case: drop a column `f_x`, and then add a new column with same name `f_x`,
234232
// then the new added column in `mergedAvroSchema` will have a suffix: `f_xsuffix`, distinguished from the original column `f_x`, see
235233
// InternalSchemaMerger#buildRecordType() for details.
236234
// Delete and add a field with the same name, reads should not return previously inserted datum of dropped field of the same name,
237235
// so we use `mergedAvroSchema` as the target schema for record projecting.
238-
return Option.of(Pair.of(readerContext.getRecordContext().projectRecord(dataBlock.getSchema().toAvroSchema(), mergedAvroSchema, mergedInternalSchema.getRight()), mergedAvroSchema));
236+
return Option.of(Pair.of(readerContext.getRecordContext().projectRecord(dataBlock.getSchema(), mergedAvroSchema, mergedInternalSchema.getRight()), mergedAvroSchema));
239237
}
240238

241239
protected boolean hasNextBaseRecord(T baseRecord, BufferedRecord<T> logRecordInfo) throws IOException {
@@ -270,8 +268,8 @@ protected boolean hasNextLogRecord() {
270268
return false;
271269
}
272270

273-
protected Pair<Function<T, T>, Schema> getSchemaTransformerWithEvolvedSchema(HoodieDataBlock dataBlock) {
274-
Option<Pair<Function<T, T>, Schema>> schemaEvolutionTransformerOpt =
271+
protected Pair<Function<T, T>, HoodieSchema> getSchemaTransformerWithEvolvedSchema(HoodieDataBlock dataBlock) {
272+
Option<Pair<Function<T, T>, HoodieSchema>> schemaEvolutionTransformerOpt =
275273
composeEvolvedSchemaTransformer(dataBlock);
276274

277275
// In case when schema has been evolved original persisted records will have to be
@@ -280,8 +278,7 @@ protected Pair<Function<T, T>, Schema> getSchemaTransformerWithEvolvedSchema(Hoo
280278
schemaEvolutionTransformerOpt.map(Pair::getLeft)
281279
.orElse(Function.identity());
282280

283-
Schema evolvedSchema = schemaEvolutionTransformerOpt.map(Pair::getRight)
284-
.orElseGet(() -> dataBlock.getSchema().toAvroSchema());
281+
HoodieSchema evolvedSchema = schemaEvolutionTransformerOpt.map(Pair::getRight).orElseGet(dataBlock::getSchema);
285282
return Pair.of(transformer, evolvedSchema);
286283
}
287284

hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/KeyBasedFileGroupRecordBuffer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.hudi.common.table.read.buffer;
2121

22-
import org.apache.hudi.avro.AvroSchemaCache;
2322
import org.apache.hudi.common.config.RecordMergeMode;
2423
import org.apache.hudi.common.config.TypedProperties;
2524
import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -40,8 +39,6 @@
4039
import org.apache.hudi.common.util.collection.ClosableIterator;
4140
import org.apache.hudi.common.util.collection.Pair;
4241

43-
import org.apache.avro.Schema;
44-
4542
import java.io.IOException;
4643
import java.io.Serializable;
4744
import java.util.Arrays;
@@ -73,7 +70,7 @@ public BufferType getBufferType() {
7370

7471
@Override
7572
public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws IOException {
76-
Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
73+
Pair<ClosableIterator<T>, HoodieSchema> recordsIteratorSchemaPair =
7774
getRecordsIterator(dataBlock, keySpecOpt);
7875
if (dataBlock.containsPartialUpdates() && !enablePartialMerging) {
7976
// When a data block contains partial updates, subsequent record merging must always use
@@ -90,8 +87,8 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO
9087
partialUpdateModeOpt);
9188
}
9289

93-
// TODO: Add HoodieSchemaCache#intern after #14374 is merged
94-
HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaCache.intern(recordsIteratorSchemaPair.getRight()));
90+
// TODO: Add HoodieSchemaCache#intern after #14374 is merged (this schema is interned originaly)
91+
HoodieSchema schema = recordsIteratorSchemaPair.getRight();
9592

9693
RecordContext<T> recordContext = readerContext.getRecordContext();
9794
try (ClosableIterator<T> recordIterator = recordsIteratorSchemaPair.getLeft()) {

hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.hudi.common.table.read.buffer;
2121

22-
import org.apache.hudi.avro.AvroSchemaCache;
2322
import org.apache.hudi.common.config.RecordMergeMode;
2423
import org.apache.hudi.common.config.TypedProperties;
2524
import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -41,7 +40,6 @@
4140
import org.apache.hudi.common.util.collection.Pair;
4241
import org.apache.hudi.exception.HoodieKeyException;
4342

44-
import org.apache.avro.Schema;
4543
import org.roaringbitmap.longlong.Roaring64NavigableMap;
4644
import org.slf4j.Logger;
4745
import org.slf4j.LoggerFactory;
@@ -127,10 +125,10 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO
127125
partialUpdateModeOpt);
128126
}
129127

130-
Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock);
128+
Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock);
131129

132-
// TODO: Add HoodieSchemaCache#intern after #14374 is merged
133-
HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaCache.intern(schemaTransformerWithEvolvedSchema.getRight()));
130+
// TODO: Add HoodieSchemaCache#intern after #14374 is merged (this schema is interned originally)
131+
HoodieSchema schema = schemaTransformerWithEvolvedSchema.getRight();
134132

135133
// TODO: Return an iterator that can generate sequence number with the record.
136134
// Then we can hide this logic into data block.

0 commit comments

Comments
 (0)