Skip to content

Commit c35119a

Browse files
committed
#14270 - phase 5: Perform Java Client Core Migration
1 parent 78a7204 commit c35119a

File tree

28 files changed

+337
-125
lines changed

28 files changed

+337
-125
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SecondaryIndexStreamingTracker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.model.HoodieKey;
2828
import org.apache.hudi.common.model.HoodieLogFile;
2929
import org.apache.hudi.common.model.HoodieRecord;
30+
import org.apache.hudi.common.schema.HoodieSchema;
3031
import org.apache.hudi.common.table.read.BufferedRecord;
3132
import org.apache.hudi.common.util.Option;
3233
import org.apache.hudi.config.HoodieWriteConfig;
@@ -255,7 +256,7 @@ static <T> void trackSecondaryIndexStats(HoodieKey hoodieKey, Option<BufferedRec
255256

256257
if (hasOldValue) {
257258
Schema schema = recordContext.decodeAvroSchema(oldRecord.getSchemaId());
258-
oldSecondaryKey = recordContext.getTypeConverter().castToString(recordContext.getValue(oldRecord.getRecord(), schema, secondaryIndexSourceField));
259+
oldSecondaryKey = recordContext.getTypeConverter().castToString(recordContext.getValue(oldRecord.getRecord(), HoodieSchema.fromAvroSchema(schema), secondaryIndexSourceField));
259260
}
260261

261262
// For new/combined record
@@ -264,7 +265,7 @@ static <T> void trackSecondaryIndexStats(HoodieKey hoodieKey, Option<BufferedRec
264265

265266
if (combinedRecordOpt.isPresent() && !isDelete) {
266267
Schema schema = recordContext.decodeAvroSchema(combinedRecordOpt.get().getSchemaId());
267-
newSecondaryKey = recordContext.getTypeConverter().castToString(recordContext.getValue(combinedRecordOpt.get().getRecord(), schema, secondaryIndexSourceField));
268+
newSecondaryKey = recordContext.getTypeConverter().castToString(recordContext.getValue(combinedRecordOpt.get().getRecord(), HoodieSchema.fromAvroSchema(schema), secondaryIndexSourceField));
268269
hasNewValue = true;
269270
}
270271

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hudi.common.model.HoodieLogFile;
3535
import org.apache.hudi.common.model.HoodieRecord;
3636
import org.apache.hudi.common.model.HoodieWriteStat;
37+
import org.apache.hudi.common.schema.HoodieSchema;
3738
import org.apache.hudi.common.table.HoodieTableMetaClient;
3839
import org.apache.hudi.common.table.TableSchemaResolver;
3940
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
@@ -316,7 +317,7 @@ public boolean hasNext() {
316317

317318
while (recordIterator.hasNext()) {
318319
T record = recordIterator.next();
319-
Object secondaryKey = readerContext.getRecordContext().getValue(record, requestedSchema, secondaryKeyField);
320+
Object secondaryKey = readerContext.getRecordContext().getValue(record, HoodieSchema.fromAvroSchema(requestedSchema), secondaryKeyField);
320321
nextValidRecord = Pair.of(
321322
readerContext.getRecordContext().getRecordKey(record, requestedSchema),
322323
secondaryKey == null ? null : secondaryKey.toString()

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hudi.common.model.HoodieKey;
2626
import org.apache.hudi.common.model.HoodieOperation;
2727
import org.apache.hudi.common.model.HoodieRecord;
28+
import org.apache.hudi.common.schema.HoodieSchema;
2829
import org.apache.hudi.common.table.HoodieTableConfig;
2930
import org.apache.hudi.common.table.read.BufferedRecord;
3031
import org.apache.hudi.common.util.DefaultJavaTypeConverter;
@@ -77,9 +78,9 @@ public static FlinkRecordContext getDeleteCheckingInstance() {
7778
}
7879

7980
@Override
80-
public Object getValue(RowData record, Schema schema, String fieldName) {
81+
public Object getValue(RowData record, HoodieSchema schema, String fieldName) {
8182
RowDataAvroQueryContexts.FieldQueryContext fieldQueryContext =
82-
RowDataAvroQueryContexts.fromAvroSchema(schema, utcTimezone).getFieldQueryContext(fieldName);
83+
RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema(), utcTimezone).getFieldQueryContext(fieldName);
8384
if (fieldQueryContext == null) {
8485
return null;
8586
} else {
@@ -157,7 +158,7 @@ public RowData mergeWithEngineRecord(Schema schema,
157158
if (updateValues.containsKey(pos)) {
158159
genericRowData.setField(pos, updateValues.get(pos));
159160
} else {
160-
genericRowData.setField(pos, getValue(baseRecord.getRecord(), schema, field.name()));
161+
genericRowData.setField(pos, getValue(baseRecord.getRecord(), HoodieSchema.fromAvroSchema(schema), field.name()));
161162
}
162163
}
163164
return genericRowData;
@@ -172,11 +173,11 @@ public RowData seal(RowData rowData) {
172173
}
173174

174175
@Override
175-
public RowData toBinaryRow(Schema avroSchema, RowData record) {
176+
public RowData toBinaryRow(HoodieSchema schema, RowData record) {
176177
if (record instanceof BinaryRowData) {
177178
return record;
178179
}
179-
RowDataSerializer rowDataSerializer = RowDataAvroQueryContexts.getRowDataSerializer(avroSchema);
180+
RowDataSerializer rowDataSerializer = RowDataAvroQueryContexts.getRowDataSerializer(schema.toAvroSchema());
180181
return rowDataSerializer.toBinaryRow(record);
181182
}
182183

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hudi.common.model.HoodieKey;
2525
import org.apache.hudi.common.model.HoodieRecord;
2626
import org.apache.hudi.common.model.HoodieSparkRecord;
27+
import org.apache.hudi.common.schema.HoodieSchema;
2728
import org.apache.hudi.common.table.HoodieTableConfig;
2829
import org.apache.hudi.common.table.read.BufferedRecord;
2930
import org.apache.hudi.common.util.DefaultJavaTypeConverter;
@@ -104,8 +105,8 @@ private static Object sparkTypeToJavaType(Object value) {
104105
}
105106

106107
@Override
107-
public Object getValue(InternalRow row, Schema schema, String fieldName) {
108-
return getFieldValueFromInternalRow(row, schema, fieldName);
108+
public Object getValue(InternalRow row, HoodieSchema schema, String fieldName) {
109+
return getFieldValueFromInternalRow(row, schema.toAvroSchema(), fieldName);
109110
}
110111

111112
@Override
@@ -146,7 +147,7 @@ public InternalRow mergeWithEngineRecord(Schema schema,
146147
if (updateValues.containsKey(pos)) {
147148
values[pos] = updateValues.get(pos);
148149
} else {
149-
values[pos] = getValue(baseRecord.getRecord(), schema, field.name());
150+
values[pos] = getValue(baseRecord.getRecord(), HoodieSchema.fromAvroSchema(schema), field.name());
150151
}
151152
}
152153
return new GenericInternalRow(values);
@@ -210,11 +211,11 @@ public InternalRow seal(InternalRow internalRow) {
210211
}
211212

212213
@Override
213-
public InternalRow toBinaryRow(Schema schema, InternalRow internalRow) {
214+
public InternalRow toBinaryRow(HoodieSchema schema, InternalRow internalRow) {
214215
if (internalRow instanceof UnsafeRow) {
215216
return internalRow;
216217
}
217-
final UnsafeProjection unsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema);
218+
final UnsafeProjection unsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema.toAvroSchema());
218219
return unsafeProjection.apply(internalRow);
219220
}
220221

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
2626
import org.apache.hudi.common.engine.HoodieReaderContext
2727
import org.apache.hudi.common.fs.FSUtils
2828
import org.apache.hudi.common.model.HoodieRecord
29+
import org.apache.hudi.common.schema.HoodieSchema
2930
import org.apache.hudi.common.table.HoodieTableConfig
3031
import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
3132
import org.apache.hudi.common.util.ValidationUtils.checkState
3233
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, Pair => HPair}
3334
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
3435
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
3536
import org.apache.hudi.util.CloseableInternalRowIterator
36-
3737
import org.apache.parquet.avro.AvroSchemaConverter
3838
import org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter
3939
import org.apache.spark.sql.HoodieInternalRowUtils
@@ -130,34 +130,34 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
130130
* @return iterator that concatenates the skeletonFileIterator and dataFileIterator
131131
*/
132132
override def mergeBootstrapReaders(skeletonFileIterator: ClosableIterator[InternalRow],
133-
skeletonRequiredSchema: Schema,
133+
skeletonRequiredSchema: HoodieSchema,
134134
dataFileIterator: ClosableIterator[InternalRow],
135-
dataRequiredSchema: Schema,
135+
dataRequiredSchema: HoodieSchema,
136136
partitionFieldAndValues: java.util.List[HPair[String, Object]]): ClosableIterator[InternalRow] = {
137137
doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], skeletonRequiredSchema,
138138
dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema, partitionFieldAndValues)
139139
}
140140

141141
private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
142-
skeletonRequiredSchema: Schema,
142+
skeletonRequiredSchema: HoodieSchema,
143143
dataFileIterator: ClosableIterator[Any],
144-
dataRequiredSchema: Schema,
144+
dataRequiredSchema: HoodieSchema,
145145
partitionFieldAndValues: java.util.List[HPair[String, Object]]): ClosableIterator[InternalRow] = {
146146
if (getRecordContext.supportsParquetRowIndex()) {
147-
assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME))
148-
assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME))
147+
assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema.toAvroSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME))
148+
assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema.toAvroSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME))
149149
val rowIndexColumn = new java.util.HashSet[String]()
150150
rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
151151
//always remove the row index column from the skeleton because the data file will also have the same column
152-
val skeletonProjection = recordContext.projectRecord(skeletonRequiredSchema,
153-
HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))
152+
val skeletonProjection = recordContext.projectRecord(skeletonRequiredSchema.toAvroSchema,
153+
HoodieAvroUtils.removeFields(skeletonRequiredSchema.toAvroSchema, rowIndexColumn))
154154

155155
//If we need to do position based merging with log files we will leave the row index column at the end
156156
val dataProjection = if (getShouldMergeUseRecordPosition) {
157-
getBootstrapProjection(dataRequiredSchema, dataRequiredSchema, partitionFieldAndValues)
157+
getBootstrapProjection(dataRequiredSchema.toAvroSchema, dataRequiredSchema.toAvroSchema, partitionFieldAndValues)
158158
} else {
159-
getBootstrapProjection(dataRequiredSchema,
160-
HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn), partitionFieldAndValues)
159+
getBootstrapProjection(dataRequiredSchema.toAvroSchema,
160+
HoodieAvroUtils.removeFields(dataRequiredSchema.toAvroSchema, rowIndexColumn), partitionFieldAndValues)
161161
}
162162

163163
//row index will always be the last column
@@ -211,7 +211,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
211211
}
212212
}
213213
} else {
214-
val dataProjection = getBootstrapProjection(dataRequiredSchema, dataRequiredSchema, partitionFieldAndValues)
214+
val dataProjection = getBootstrapProjection(dataRequiredSchema.toAvroSchema, dataRequiredSchema.toAvroSchema, partitionFieldAndValues)
215215
new ClosableIterator[Any] {
216216
val combinedRow = new JoinedRow()
217217

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/TestBaseSparkInternalRowReaderContext.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.hudi;
2121

2222
import org.apache.hudi.common.model.HoodieFileFormat;
23+
import org.apache.hudi.common.schema.HoodieSchema;
2324
import org.apache.hudi.common.table.HoodieTableConfig;
2425
import org.apache.hudi.common.table.read.BufferedRecord;
2526
import org.apache.hudi.common.util.Option;
@@ -154,7 +155,7 @@ public GenericRecord convertToAvroRecord(InternalRow record, Schema schema) {
154155
}
155156

156157
@Override
157-
public Object getValue(InternalRow row, Schema schema, String fieldName) {
158+
public Object getValue(InternalRow row, HoodieSchema schema, String fieldName) {
158159
if (fieldName.equals("id")) {
159160
if (row.isNullAt(0)) {
160161
return null;
@@ -174,7 +175,7 @@ public Object getValue(InternalRow row, Schema schema, String fieldName) {
174175
}
175176

176177
@Override
177-
public InternalRow toBinaryRow(Schema schema, InternalRow internalRow) {
178+
public InternalRow toBinaryRow(HoodieSchema schema, InternalRow internalRow) {
178179
return internalRow;
179180
}
180181
});
@@ -192,9 +193,9 @@ public ClosableIterator<InternalRow> getFileRecordIterator(StoragePath filePath,
192193

193194
@Override
194195
public ClosableIterator<InternalRow> mergeBootstrapReaders(ClosableIterator<InternalRow> skeletonFileIterator,
195-
Schema skeletonRequiredSchema,
196+
HoodieSchema skeletonRequiredSchema,
196197
ClosableIterator<InternalRow> dataFileIterator,
197-
Schema dataRequiredSchema,
198+
HoodieSchema dataRequiredSchema,
198199
List<Pair<String, Object>> requiredPartitionFieldAndValues) {
199200
return null;
200201
}

hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hudi.common.model.HoodieEmptyRecord;
2525
import org.apache.hudi.common.model.HoodieKey;
2626
import org.apache.hudi.common.model.HoodieRecord;
27+
import org.apache.hudi.common.schema.HoodieSchema;
2728
import org.apache.hudi.common.table.HoodieTableConfig;
2829
import org.apache.hudi.common.table.read.BufferedRecord;
2930
import org.apache.hudi.common.util.AvroJavaTypeConverter;
@@ -91,7 +92,7 @@ public static Object getFieldValueFromIndexedRecord(
9192
}
9293

9394
@Override
94-
public Object getValue(IndexedRecord record, Schema schema, String fieldName) {
95+
public Object getValue(IndexedRecord record, HoodieSchema schema, String fieldName) {
9596
return getFieldValueFromIndexedRecord(record, fieldName);
9697
}
9798

@@ -177,7 +178,7 @@ public IndexedRecord seal(IndexedRecord record) {
177178
}
178179

179180
@Override
180-
public IndexedRecord toBinaryRow(Schema avroSchema, IndexedRecord record) {
181+
public IndexedRecord toBinaryRow(HoodieSchema schema, IndexedRecord record) {
181182
return record;
182183
}
183184

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,11 @@ public CustomSerializer<BufferedRecord<IndexedRecord>> getRecordSerializer() {
268268

269269
@Override
270270
public ClosableIterator<IndexedRecord> mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
271-
Schema skeletonRequiredSchema,
271+
HoodieSchema skeletonRequiredSchema,
272272
ClosableIterator<IndexedRecord> dataFileIterator,
273-
Schema dataRequiredSchema,
273+
HoodieSchema dataRequiredSchema,
274274
List<Pair<String, Object>> partitionFieldAndValues) {
275-
return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema, dataFileIterator, dataRequiredSchema, partitionFieldAndValues);
275+
return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema.toAvroSchema(), dataFileIterator, dataRequiredSchema.toAvroSchema(), partitionFieldAndValues);
276276
}
277277

278278
/**

0 commit comments

Comments
 (0)