diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
new file mode 100644
index 0000000000000..14c59d5789a85
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.ReflectionUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Converts Flink's LogicalType into HoodieSchema.
+ */
+public class HoodieSchemaConverter {
+
+ /**
+ * Converts a Flink LogicalType into a HoodieSchema.
+ *
+ *
Uses "record" as the default type name for record types.
+ *
+ * @param logicalType Flink logical type definition
+ * @return HoodieSchema matching the logical type
+ */
+ public static HoodieSchema convertToSchema(LogicalType logicalType) {
+ return convertToSchema(logicalType, "record");
+ }
+
+ /**
+ * Converts a Flink LogicalType into a HoodieSchema with specified record name.
+ *
+ *
The "{rowName}." is used as the nested row type name prefix in order to generate
+ * the right schema. Nested record types that only differ by type name are still compatible.
+ *
+ * @param logicalType Flink logical type
+ * @param rowName the record name
+ * @return HoodieSchema matching this logical type
+ */
+ public static HoodieSchema convertToSchema(LogicalType logicalType, String rowName) {
+ int precision;
+ boolean nullable = logicalType.isNullable();
+ HoodieSchema schema;
+
+ switch (logicalType.getTypeRoot()) {
+ case NULL:
+ return HoodieSchema.create(HoodieSchemaType.NULL);
+
+ case BOOLEAN:
+ schema = HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+ break;
+
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ schema = HoodieSchema.create(HoodieSchemaType.INT);
+ break;
+
+ case BIGINT:
+ schema = HoodieSchema.create(HoodieSchemaType.LONG);
+ break;
+
+ case FLOAT:
+ schema = HoodieSchema.create(HoodieSchemaType.FLOAT);
+ break;
+
+ case DOUBLE:
+ schema = HoodieSchema.create(HoodieSchemaType.DOUBLE);
+ break;
+
+ case CHAR:
+ case VARCHAR:
+ schema = HoodieSchema.create(HoodieSchemaType.STRING);
+ break;
+
+ case BINARY:
+ case VARBINARY:
+ schema = HoodieSchema.create(HoodieSchemaType.BYTES);
+ break;
+
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final TimestampType timestampType = (TimestampType) logicalType;
+ precision = timestampType.getPrecision();
+ if (precision <= 3) {
+ schema = HoodieSchema.createTimestampMillis();
+ } else if (precision <= 6) {
+ schema = HoodieSchema.createTimestampMicros();
+ } else {
+ throw new IllegalArgumentException(
+ "HoodieSchema does not support TIMESTAMP type with precision: "
+ + precision
+ + ", it only supports precisions <= 6.");
+ }
+ break;
+
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
+ precision = localZonedTimestampType.getPrecision();
+ if (precision <= 3) {
+ schema = HoodieSchema.createLocalTimestampMillis();
+ } else if (precision <= 6) {
+ schema = HoodieSchema.createLocalTimestampMicros();
+ } else {
+ throw new IllegalArgumentException(
+ "HoodieSchema does not support LOCAL TIMESTAMP type with precision: "
+ + precision
+ + ", it only supports precisions <= 6.");
+ }
+ break;
+
+ case DATE:
+ schema = HoodieSchema.createDate();
+ break;
+
+ case TIME_WITHOUT_TIME_ZONE:
+ precision = ((TimeType) logicalType).getPrecision();
+ if (precision <= 3) {
+ schema = HoodieSchema.createTimeMillis();
+ } else if (precision <= 6) {
+ schema = HoodieSchema.createTimeMicros();
+ } else {
+ throw new IllegalArgumentException(
+ "HoodieSchema does not support TIME type with precision: "
+ + precision
+ + ", maximum precision is 6 (microseconds).");
+ }
+ break;
+
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ int fixedSize = computeMinBytesForDecimalPrecision(decimalType.getPrecision());
+ schema = HoodieSchema.createDecimal(
+ String.format("%s.fixed", rowName),
+ null,
+ null,
+ decimalType.getPrecision(),
+ decimalType.getScale(),
+ fixedSize
+ );
+ break;
+
+ case ROW:
+ RowType rowType = (RowType) logicalType;
+ List fieldNames = rowType.getFieldNames();
+
+ List hoodieFields = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ String fieldName = fieldNames.get(i);
+ LogicalType fieldType = rowType.getTypeAt(i);
+
+ // Recursive call for field schema
+ HoodieSchema fieldSchema = convertToSchema(fieldType, rowName + "." + fieldName);
+
+ // Create field with or without default value
+ HoodieSchemaField field;
+ if (fieldType.isNullable()) {
+ field = HoodieSchemaField.of(fieldName, fieldSchema, null, HoodieSchema.NULL_VALUE);
+ } else {
+ field = HoodieSchemaField.of(fieldName, fieldSchema);
+ }
+ hoodieFields.add(field);
+ }
+
+ schema = HoodieSchema.createRecord(rowName, null, null, hoodieFields);
+ break;
+
+ case MULTISET:
+ case MAP:
+ LogicalType valueType = extractValueTypeForMap(logicalType);
+ HoodieSchema valueSchema = convertToSchema(valueType, rowName);
+ schema = HoodieSchema.createMap(valueSchema);
+ break;
+
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ HoodieSchema elementSchema = convertToSchema(arrayType.getElementType(), rowName);
+ schema = HoodieSchema.createArray(elementSchema);
+ break;
+
+ case RAW:
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported type for HoodieSchema conversion: " + logicalType);
+ }
+
+ return nullable ? HoodieSchema.createNullable(schema) : schema;
+ }
+
+ /**
+ * Extracts value type for map conversion.
+ * Maps must have string keys for Avro/HoodieSchema compatibility.
+ */
+ private static LogicalType extractValueTypeForMap(LogicalType type) {
+ LogicalType keyType;
+ LogicalType valueType;
+ if (type instanceof MapType) {
+ MapType mapType = (MapType) type;
+ keyType = mapType.getKeyType();
+ valueType = mapType.getValueType();
+ } else {
+ MultisetType multisetType = (MultisetType) type;
+ keyType = multisetType.getElementType();
+ valueType = new IntType();
+ }
+ if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new UnsupportedOperationException(
+ "HoodieSchema doesn't support non-string as key type of map. "
+ + "The key type is: "
+ + keyType.asSummaryString());
+ }
+ return valueType;
+ }
+
+ /**
+ * Returns whether the given logical type belongs to the family.
+ */
+ private static boolean isFamily(LogicalType logicalType, LogicalTypeFamily family) {
+ return logicalType.getTypeRoot().getFamilies().contains(family);
+ }
+
+ /**
+ * Computes minimum bytes needed for decimal precision.
+ * This ensures compatibility with Avro fixed-size decimal representation.
+ */
+ private static int computeMinBytesForDecimalPrecision(int precision) {
+ int numBytes = 1;
+ while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
+ numBytes += 1;
+ }
+ return numBytes;
+ }
+
+ // ===== Conversion from HoodieSchema to Flink DataType =====
+
+ /**
+ * Converts a HoodieSchema into Flink's DataType.
+ *
+ * This method provides native conversion from HoodieSchema to Flink DataType
+ * without going through Avro intermediate representation, future-proofing the
+ * implementation against changes in the Avro layer.
+ *
+ * @param hoodieSchema the HoodieSchema to convert
+ * @return Flink DataType matching the schema
+ * @throws IllegalArgumentException if the schema contains unsupported types
+ */
+ public static DataType convertToDataType(HoodieSchema hoodieSchema) {
+ if (hoodieSchema == null) {
+ throw new IllegalArgumentException("HoodieSchema cannot be null");
+ }
+
+ HoodieSchemaType type = hoodieSchema.getType();
+
+ switch (type) {
+ case NULL:
+ return DataTypes.NULL();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN().notNull();
+ case INT:
+ return DataTypes.INT().notNull();
+ case LONG:
+ return DataTypes.BIGINT().notNull();
+ case FLOAT:
+ return DataTypes.FLOAT().notNull();
+ case DOUBLE:
+ return DataTypes.DOUBLE().notNull();
+ case BYTES:
+ return DataTypes.BYTES().notNull();
+ case STRING:
+ return DataTypes.STRING().notNull();
+ case ENUM:
+ // Flink doesn't have native enum type, convert to STRING
+ return DataTypes.STRING().notNull();
+ case FIXED:
+ return DataTypes.VARBINARY(hoodieSchema.getFixedSize()).notNull();
+ case DECIMAL:
+ return convertDecimal(hoodieSchema);
+ case DATE:
+ return DataTypes.DATE().notNull();
+ case TIME:
+ return convertTime(hoodieSchema);
+ case TIMESTAMP:
+ return convertTimestamp(hoodieSchema);
+ case UUID:
+ return DataTypes.STRING().notNull();
+ case ARRAY:
+ return convertArray(hoodieSchema);
+ case MAP:
+ return convertMap(hoodieSchema);
+ case RECORD:
+ return convertRecord(hoodieSchema);
+ case UNION:
+ return convertUnion(hoodieSchema);
+ default:
+ throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + type);
+ }
+ }
+
+ /**
+ * Converts a HoodieSchema (RECORD type) into a Flink RowType.
+ *
+ * @param schema HoodieSchema to convert (must be a RECORD type)
+ * @return RowType matching the HoodieSchema structure
+ * @throws IllegalArgumentException if schema is null or not a RECORD type
+ */
+ public static RowType convertToRowType(HoodieSchema schema) {
+ if (schema == null) {
+ throw new IllegalArgumentException("HoodieSchema cannot be null");
+ }
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ throw new IllegalArgumentException(
+ "Only RECORD type schemas can be converted to RowType, got: " + schema.getType());
+ }
+
+ DataType dataType = convertToDataType(schema);
+ return (RowType) dataType.getLogicalType();
+ }
+
+ private static DataType convertDecimal(HoodieSchema schema) {
+ if (!(schema instanceof HoodieSchema.Decimal)) {
+ throw new IllegalStateException("Expected HoodieSchema.Decimal but got: " + schema.getClass());
+ }
+ HoodieSchema.Decimal decimalSchema = (HoodieSchema.Decimal) schema;
+ return DataTypes.DECIMAL(decimalSchema.getPrecision(), decimalSchema.getScale()).notNull();
+ }
+
+ private static DataType convertTimestamp(HoodieSchema schema) {
+ if (!(schema.getType() == HoodieSchemaType.TIMESTAMP)) {
+ throw new IllegalStateException("Expected HoodieSchema.Timestamp but got: " + schema.getClass());
+ }
+ HoodieSchema.Timestamp timestampSchema = (HoodieSchema.Timestamp) schema;
+ int flinkPrecision = (timestampSchema.getPrecision() == HoodieSchema.TimePrecision.MILLIS) ? 3 : 6;
+
+ if (timestampSchema.isUtcAdjusted()) {
+ return DataTypes.TIMESTAMP(flinkPrecision).notNull();
+ } else {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(flinkPrecision).notNull();
+ }
+ }
+
+ private static DataType convertTime(HoodieSchema schema) {
+ if (!(schema.getType() == HoodieSchemaType.TIME)) {
+ throw new IllegalStateException("Expected HoodieSchema.Time but got: " + schema.getClass());
+ }
+ HoodieSchema.Time timeSchema = (HoodieSchema.Time) schema;
+ int flinkPrecision = (timeSchema.getPrecision() == HoodieSchema.TimePrecision.MILLIS) ? 3 : 6;
+ return DataTypes.TIME(flinkPrecision).notNull();
+ }
+
+ private static DataType convertRecord(HoodieSchema schema) {
+ List fields = schema.getFields();
+ DataTypes.Field[] flinkFields = new DataTypes.Field[fields.size()];
+
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchemaField field = fields.get(i);
+ DataType fieldType = convertToDataType(field.schema());
+ flinkFields[i] = DataTypes.FIELD(field.name(), fieldType);
+ }
+
+ return DataTypes.ROW(flinkFields).notNull();
+ }
+
+ private static DataType convertArray(HoodieSchema schema) {
+ HoodieSchema elementSchema = schema.getElementType();
+ DataType elementType = convertToDataType(elementSchema);
+ return DataTypes.ARRAY(elementType).notNull();
+ }
+
+ private static DataType convertMap(HoodieSchema schema) {
+ HoodieSchema valueSchema = schema.getValueType();
+ DataType valueType = convertToDataType(valueSchema);
+ return DataTypes.MAP(DataTypes.STRING().notNull(), valueType).notNull();
+ }
+
+ private static DataType convertUnion(HoodieSchema schema) {
+ List unionTypes = schema.getTypes();
+
+ // Simple nullable union [null, T]
+ if (schema.isNullable() && unionTypes.size() == 2) {
+ HoodieSchema nonNullType = schema.getNonNullType();
+ DataType converted = convertToDataType(nonNullType);
+ return converted.nullable();
+ }
+
+ // Single-type union
+ if (unionTypes.size() == 1) {
+ return convertToDataType(unionTypes.get(0));
+ }
+
+ // Complex multi-type unions - use RAW type (matches AvroSchemaConverter logic)
+ List nonNullTypes = unionTypes.stream()
+ .filter(t -> t.getType() != HoodieSchemaType.NULL)
+ .collect(Collectors.toList());
+
+ boolean nullable = unionTypes.size() > nonNullTypes.size();
+
+ // Use RAW type for complex unions
+ DataType rawDataType = (DataType) ReflectionUtils.invokeStaticMethod(
+ "org.apache.hudi.utils.DataTypeUtils",
+ "createAtomicRawType",
+ new Object[] {false, Types.GENERIC(Object.class)},
+ Boolean.class,
+ TypeInformation.class);
+
+ if (recordTypesOfSameNumFields(nonNullTypes)) {
+ DataType converted = DataTypes.ROW(
+ DataTypes.FIELD("wrapper", rawDataType))
+ .notNull();
+ return nullable ? converted.nullable() : converted;
+ }
+
+ return nullable ? rawDataType.nullable() : rawDataType;
+ }
+
+ /**
+ * Returns true if all the types are RECORD type with same number of fields.
+ */
+ private static boolean recordTypesOfSameNumFields(List types) {
+ if (types == null || types.isEmpty()) {
+ return false;
+ }
+ if (types.stream().anyMatch(s -> s.getType() != HoodieSchemaType.RECORD)) {
+ return false;
+ }
+ int numFields = types.get(0).getFields().size();
+ return types.stream().allMatch(s -> s.getFields().size() == numFields);
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
new file mode 100644
index 0000000000000..c4892c48e6968
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSchemaConverter}.
+ */
+public class TestHoodieSchemaConverter {
+
+ @Test
+ public void testPrimitiveTypes() {
+ // String
+ HoodieSchema stringSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.STRING().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.STRING, stringSchema.getType());
+
+ // Int
+ HoodieSchema intSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.INT().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.INT, intSchema.getType());
+
+ // Long
+ HoodieSchema longSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.BIGINT().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.LONG, longSchema.getType());
+
+ // Float
+ HoodieSchema floatSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.FLOAT().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.FLOAT, floatSchema.getType());
+
+ // Double
+ HoodieSchema doubleSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.DOUBLE().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.DOUBLE, doubleSchema.getType());
+
+ // Boolean
+ HoodieSchema boolSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.BOOLEAN().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.BOOLEAN, boolSchema.getType());
+
+ // Bytes
+ HoodieSchema bytesSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.BYTES().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.BYTES, bytesSchema.getType());
+ }
+
+ @Test
+ public void testNullableTypes() {
+ HoodieSchema nullableString = HoodieSchemaConverter.convertToSchema(
+ DataTypes.STRING().nullable().getLogicalType());
+ assertEquals(HoodieSchemaType.UNION, nullableString.getType());
+ assertTrue(nullableString.isNullable());
+
+ HoodieSchema nullableInt = HoodieSchemaConverter.convertToSchema(
+ DataTypes.INT().nullable().getLogicalType());
+ assertEquals(HoodieSchemaType.UNION, nullableInt.getType());
+ assertTrue(nullableInt.isNullable());
+ }
+
+ @Test
+ public void testTemporalTypes() {
+ // Date
+ HoodieSchema dateSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.DATE().notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.DATE, dateSchema.getType());
+
+ // Time
+ HoodieSchema timeSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.TIME(3).notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.TIME, timeSchema.getType());
+ assertEquals(HoodieSchema.TimePrecision.MILLIS, ((HoodieSchema.Time) timeSchema).getPrecision());
+
+ // Time micros
+ HoodieSchema timeMicrosSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.TIME(6).notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.TIME, timeMicrosSchema.getType());
+ assertEquals(HoodieSchema.TimePrecision.MICROS, ((HoodieSchema.Time) timeMicrosSchema).getPrecision());
+
+ // Timestamp millis
+ HoodieSchema timestampMillisSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.TIMESTAMP(3).notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.TIMESTAMP, timestampMillisSchema.getType());
+ assertTrue(timestampMillisSchema instanceof HoodieSchema.Timestamp);
+ assertEquals(HoodieSchema.TimePrecision.MILLIS,
+ ((HoodieSchema.Timestamp) timestampMillisSchema).getPrecision());
+
+ // Timestamp micros
+ HoodieSchema timestampMicrosSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.TIMESTAMP(6).notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.TIMESTAMP, timestampMicrosSchema.getType());
+ assertTrue(timestampMicrosSchema instanceof HoodieSchema.Timestamp);
+ assertEquals(HoodieSchema.TimePrecision.MICROS,
+ ((HoodieSchema.Timestamp) timestampMicrosSchema).getPrecision());
+
+ // Local timestamp millis
+ HoodieSchema localTimestampMillisSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMillisSchema.getType());
+
+ // Local timestamp micros
+ HoodieSchema localTimestampMicrosSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull().getLogicalType());
+ assertEquals(HoodieSchemaType.TIMESTAMP, localTimestampMicrosSchema.getType());
+ }
+
+ @Test
+ public void testDecimalType() {
+ HoodieSchema decimalSchema = HoodieSchemaConverter.convertToSchema(
+ DataTypes.DECIMAL(10, 2).notNull().getLogicalType(), "test");
+ assertEquals(HoodieSchemaType.DECIMAL, decimalSchema.getType());
+ assertTrue(decimalSchema instanceof HoodieSchema.Decimal);
+
+ HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) decimalSchema;
+ assertEquals(10, decimal.getPrecision());
+ assertEquals(2, decimal.getScale());
+ }
+
+ @Test
+ public void testArrayType() {
+ LogicalType arrayType = DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull().getLogicalType();
+ HoodieSchema arraySchema = HoodieSchemaConverter.convertToSchema(arrayType);
+
+ assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
+ assertEquals(HoodieSchemaType.STRING, arraySchema.getElementType().getType());
+ }
+
+ @Test
+ public void testArrayTypeWithNullableElements() {
+ LogicalType arrayType = DataTypes.ARRAY(DataTypes.STRING().nullable()).notNull().getLogicalType();
+ HoodieSchema arraySchema = HoodieSchemaConverter.convertToSchema(arrayType);
+
+ assertEquals(HoodieSchemaType.ARRAY, arraySchema.getType());
+
+ HoodieSchema elementSchema = arraySchema.getElementType();
+ assertEquals(HoodieSchemaType.UNION, elementSchema.getType());
+ assertTrue(elementSchema.isNullable());
+
+ HoodieSchema actualElementType = elementSchema.getNonNullType();
+ assertEquals(HoodieSchemaType.STRING, actualElementType.getType());
+ }
+
+ @Test
+ public void testMapType() {
+ LogicalType mapType = DataTypes.MAP(
+ DataTypes.STRING().notNull(),
+ DataTypes.INT().notNull()).notNull().getLogicalType();
+ HoodieSchema mapSchema = HoodieSchemaConverter.convertToSchema(mapType);
+
+ assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
+ assertEquals(HoodieSchemaType.INT, mapSchema.getValueType().getType());
+ }
+
+ @Test
+ public void testMapTypeWithNullableValues() {
+ LogicalType mapType = DataTypes.MAP(
+ DataTypes.STRING().notNull(),
+ DataTypes.INT().nullable()).notNull().getLogicalType();
+ HoodieSchema mapSchema = HoodieSchemaConverter.convertToSchema(mapType);
+
+ assertEquals(HoodieSchemaType.MAP, mapSchema.getType());
+
+ HoodieSchema valueSchema = mapSchema.getValueType();
+ assertEquals(HoodieSchemaType.UNION, valueSchema.getType());
+ assertTrue(valueSchema.isNullable());
+
+ HoodieSchema actualValueType = valueSchema.getNonNullType();
+ assertEquals(HoodieSchemaType.INT, actualValueType.getType());
+ }
+
+ @Test
+ public void testRecordType() {
+ LogicalType recordType = DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT().notNull()),
+ DataTypes.FIELD("name", DataTypes.STRING().notNull()),
+ DataTypes.FIELD("age", DataTypes.INT().nullable())
+ ).notNull().getLogicalType();
+
+ HoodieSchema recordSchema = HoodieSchemaConverter.convertToSchema(recordType, "Person");
+
+ assertEquals(HoodieSchemaType.RECORD, recordSchema.getType());
+ assertEquals(3, recordSchema.getFields().size());
+ assertEquals("id", recordSchema.getFields().get(0).name());
+ assertEquals("name", recordSchema.getFields().get(1).name());
+ assertEquals("age", recordSchema.getFields().get(2).name());
+
+ // Verify nullable field
+ assertTrue(recordSchema.getFields().get(2).schema().isNullable());
+ }
+
+ @Test
+ public void testNestedRecordType() {
+ LogicalType nestedRecordType = DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT().notNull()),
+ DataTypes.FIELD("address", DataTypes.ROW(
+ DataTypes.FIELD("street", DataTypes.STRING().notNull()),
+ DataTypes.FIELD("city", DataTypes.STRING().notNull())
+ ).notNull())
+ ).notNull().getLogicalType();
+
+ HoodieSchema nestedSchema = HoodieSchemaConverter.convertToSchema(nestedRecordType, "User");
+
+ assertEquals(HoodieSchemaType.RECORD, nestedSchema.getType());
+ assertEquals(2, nestedSchema.getFields().size());
+
+ HoodieSchema addressSchema = nestedSchema.getFields().get(1).schema();
+ assertEquals(HoodieSchemaType.RECORD, addressSchema.getType());
+ assertEquals(2, addressSchema.getFields().size());
+ }
+
+ @Test
+ public void testCompareWithAvroConversion() {
+ // Test that HoodieSchemaConverter produces the same result as
+ // AvroSchemaConverter + HoodieSchema.fromAvroSchema()
+
+ RowType flinkRowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
+ DataTypes.FIELD("name", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3).notNull()),
+ DataTypes.FIELD("decimal_val", DataTypes.DECIMAL(10, 2).notNull())
+ ).notNull().getLogicalType();
+
+ // Method 1: Direct HoodieSchema conversion
+ HoodieSchema directSchema = HoodieSchemaConverter.convertToSchema(flinkRowType, "TestRecord");
+
+ // Method 2: Via Avro conversion
+ HoodieSchema viaAvroSchema = HoodieSchema.fromAvroSchema(
+ AvroSchemaConverter.convertToSchema(flinkRowType, "TestRecord"));
+
+ // Both should produce equivalent schemas
+ assertNotNull(directSchema);
+ assertNotNull(viaAvroSchema);
+ assertEquals(HoodieSchemaType.RECORD, directSchema.getType());
+ assertEquals(HoodieSchemaType.RECORD, viaAvroSchema.getType());
+ assertEquals(4, directSchema.getFields().size());
+ assertEquals(4, viaAvroSchema.getFields().size());
+
+ // Verify field types match
+ for (int i = 0; i < 4; i++) {
+ assertEquals(
+ viaAvroSchema.getFields().get(i).schema().getType(),
+ directSchema.getFields().get(i).schema().getType(),
+ "Field " + i + " type mismatch"
+ );
+ }
+ }
+
+ @Test
+ public void testComplexNestedStructure() {
+ LogicalType complexType = DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.STRING().notNull()),
+ DataTypes.FIELD("tags", DataTypes.ARRAY(DataTypes.STRING().notNull()).notNull()),
+ DataTypes.FIELD("metadata", DataTypes.MAP(
+ DataTypes.STRING().notNull(),
+ DataTypes.STRING().notNull()).notNull()),
+ DataTypes.FIELD("nested", DataTypes.ROW(
+ DataTypes.FIELD("value", DataTypes.DOUBLE().notNull()),
+ DataTypes.FIELD("items", DataTypes.ARRAY(DataTypes.INT().notNull()).notNull())
+ ).notNull())
+ ).notNull().getLogicalType();
+
+ HoodieSchema complexSchema = HoodieSchemaConverter.convertToSchema(complexType, "ComplexRecord");
+
+ assertNotNull(complexSchema);
+ assertEquals(HoodieSchemaType.RECORD, complexSchema.getType());
+ assertEquals(4, complexSchema.getFields().size());
+
+ // Verify array field
+ assertEquals(HoodieSchemaType.ARRAY, complexSchema.getFields().get(1).schema().getType());
+
+ // Verify map field
+ assertEquals(HoodieSchemaType.MAP, complexSchema.getFields().get(2).schema().getType());
+
+ // Verify nested record
+ HoodieSchema nestedRecord = complexSchema.getFields().get(3).schema();
+ assertEquals(HoodieSchemaType.RECORD, nestedRecord.getType());
+ assertEquals(2, nestedRecord.getFields().size());
+ }
+
+ @Test
+ public void testNativeConversionMatchesAvroPath() {
+ // Verify native conversion produces same result as Avro path
+ RowType originalRowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
+ DataTypes.FIELD("name", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("age", DataTypes.INT().notNull())
+ ).notNull().getLogicalType();
+
+ HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord");
+
+ // Native conversion
+ DataType nativeResult = HoodieSchemaConverter.convertToDataType(hoodieSchema);
+
+ // Avro path (for comparison)
+ DataType avroResult = AvroSchemaConverter.convertToDataType(hoodieSchema.getAvroSchema());
+
+ assertEquals(avroResult.getLogicalType(), nativeResult.getLogicalType());
+ }
+
+ @Test
+ public void testRoundTripConversion() {
+ RowType originalRowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
+ DataTypes.FIELD("name", DataTypes.STRING().nullable()),
+ DataTypes.FIELD("age", DataTypes.INT().notNull())
+ ).notNull().getLogicalType();
+
+ HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord");
+ RowType convertedRowType = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+ assertEquals(originalRowType, convertedRowType);
+ }
+
+ @Test
+ public void testConvertPrimitiveTypesToDataType() {
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "test_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("string_col", HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("int_col", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("long_col", HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("float_col", HoodieSchema.create(HoodieSchemaType.FLOAT)),
+ HoodieSchemaField.of("double_col", HoodieSchema.create(HoodieSchemaType.DOUBLE)),
+ HoodieSchemaField.of("boolean_col", HoodieSchema.create(HoodieSchemaType.BOOLEAN)),
+ HoodieSchemaField.of("bytes_col", HoodieSchema.create(HoodieSchemaType.BYTES))
+ )
+ );
+
+ RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+ assertEquals(7, result.getFieldCount());
+ // Verify each field name
+ assertEquals("string_col", result.getFieldNames().get(0));
+ assertEquals("int_col", result.getFieldNames().get(1));
+ assertEquals("long_col", result.getFieldNames().get(2));
+ assertEquals("float_col", result.getFieldNames().get(3));
+ assertEquals("double_col", result.getFieldNames().get(4));
+ assertEquals("boolean_col", result.getFieldNames().get(5));
+ assertEquals("bytes_col", result.getFieldNames().get(6));
+ }
+
+ @Test
+ public void testConvertNullableTypesToDataType() {
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "test_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("nullable_string",
+ HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING))),
+ HoodieSchemaField.of("nullable_int",
+ HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.INT)))
+ )
+ );
+
+ RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+ assertTrue(result.getTypeAt(0).isNullable());
+ assertTrue(result.getTypeAt(1).isNullable());
+ assertEquals("nullable_string", result.getFieldNames().get(0));
+ assertEquals("nullable_int", result.getFieldNames().get(1));
+ }
+
+ @Test
+ public void testConvertTemporalTypesToDataType() {
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "test_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("date_col", HoodieSchema.createDate()),
+ HoodieSchemaField.of("time_col", HoodieSchema.createTimeMillis()),
+ HoodieSchemaField.of("timestamp_millis", HoodieSchema.createTimestampMillis()),
+ HoodieSchemaField.of("timestamp_micros", HoodieSchema.createTimestampMicros()),
+ HoodieSchemaField.of("local_timestamp_millis", HoodieSchema.createLocalTimestampMillis()),
+ HoodieSchemaField.of("local_timestamp_micros", HoodieSchema.createLocalTimestampMicros())
+ )
+ );
+
+ RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+ assertEquals(6, result.getFieldCount());
+ assertEquals(3, ((TimestampType) result.getTypeAt(2)).getPrecision());
+ assertEquals(6, ((TimestampType) result.getTypeAt(3)).getPrecision());
+ }
+
+ @Test
+ public void testConvertDecimalTypeToDataType() {
+ // Create a FIXED-backed decimal HoodieSchema directly
+ HoodieSchema decimalSchema = HoodieSchema.createDecimal(
+ "decimal_col.fixed", // name
+ null, // namespace
+ null, // doc
+ 10, // precision
+ 2, // scale
+ 5 // fixedSize (5 bytes for precision 10)
+ );
+
+ // Wrap in a record structure
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "test_record",
+ null,
+ null,
+ Arrays.asList(HoodieSchemaField.of("decimal_col", decimalSchema))
+ );
+
+ // Verify the decimal is FIXED-backed (backed by fixed-size byte array)
+ HoodieSchema decimalField = hoodieSchema.getFields().get(0).schema();
+ assertTrue(decimalField instanceof HoodieSchema.Decimal);
+ HoodieSchema.Decimal decimalSchemaTyped = (HoodieSchema.Decimal) decimalField;
+ assertTrue(decimalSchemaTyped.isFixed()); // Verify it's FIXED-backed
+ assertEquals(5, decimalSchemaTyped.getFixedSize()); // For precision 10, fixed size is 5 bytes
+ assertEquals(10, decimalSchemaTyped.getPrecision());
+ assertEquals(2, decimalSchemaTyped.getScale());
+
+ // Convert to Flink RowType
+ RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+ // Verify conversion
+ assertTrue(result.getTypeAt(0) instanceof DecimalType);
+ DecimalType decimal = (DecimalType) result.getTypeAt(0);
+ assertEquals(10, decimal.getPrecision());
+ assertEquals(2, decimal.getScale());
+ }
+
+ @Test
+ public void testConvertComplexTypesToDataType() {
+ HoodieSchema nestedRecord = HoodieSchema.createRecord(
+ "nested_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("nested_id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("nested_name", HoodieSchema.create(HoodieSchemaType.STRING))
+ )
+ );
+
+ HoodieSchema hoodieSchema = HoodieSchema.createRecord(
+ "test_record",
+ null,
+ null,
+ Arrays.asList(
+ HoodieSchemaField.of("array_col",
+ HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))),
+ HoodieSchemaField.of("map_col",
+ HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))),
+ HoodieSchemaField.of("nested_record", nestedRecord)
+ )
+ );
+
+ RowType result = HoodieSchemaConverter.convertToRowType(hoodieSchema);
+
+ assertEquals(3, result.getFieldCount());
+ assertTrue(result.getTypeAt(0) instanceof ArrayType);
+ assertTrue(result.getTypeAt(1) instanceof MapType);
+ assertTrue(result.getTypeAt(2) instanceof RowType);
+ }
+
+ @Test
+ public void testConvertNullSchemaThrowsException() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieSchemaConverter.convertToRowType(null);
+ });
+ }
+
+ @Test
+ public void testConvertNonRecordSchemaThrowsException() {
+ HoodieSchema stringSchema = HoodieSchema.create(HoodieSchemaType.STRING);
+ assertThrows(IllegalArgumentException.class, () -> {
+ HoodieSchemaConverter.convertToRowType(stringSchema);
+ });
+ }
+
+ @Test
+ public void testEnumToStringConversion() {
+ HoodieSchema enumSchema = HoodieSchema.createEnum(
+ "Color", null, null, Arrays.asList("RED", "GREEN", "BLUE"));
+
+ DataType dataType = HoodieSchemaConverter.convertToDataType(enumSchema);
+ assertTrue(dataType.getLogicalType() instanceof VarCharType);
+ }
+
+ @Test
+ public void testFixedConversion() {
+ HoodieSchema fixedSchema = HoodieSchema.createFixed("MD5", null, null, 16);
+ DataType dataType = HoodieSchemaConverter.convertToDataType(fixedSchema);
+ assertTrue(dataType.getLogicalType() instanceof VarBinaryType);
+ }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index ebdca4903d428..a6d1026815918 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -136,6 +136,17 @@ public static HoodieSchema parse(String jsonSchema) {
return new HoodieSchema.Parser().parse(jsonSchema);
}
+ /**
+ * Parses an InputStream and returns the corresponding HoodieSchema.
+ *
+ * @param inputStream the InputStream to parse
+ * @return parsed HoodieSchema
+ * @throws HoodieAvroSchemaException if the schema string is invalid
+ */
+ public static HoodieSchema parse(InputStream inputStream) {
+ return new HoodieSchema.Parser().parse(inputStream);
+ }
+
/**
* Creates a schema for the specified primitive type.
*
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java
new file mode 100644
index 0000000000000..75ddcf0e1662f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+
+/**
+ * A global cache for HoodieSchema instances to ensure that there is only one
+ * variable instance of the same schema within an entire JVM lifetime.
+ *
+ * This is a global cache which works for a JVM lifecycle.
+ * A collection of schema instances are maintained.
+ *
+ *
NOTE: The schema which is used frequently should be cached through this cache.
+ */
+public class HoodieSchemaCache {
+
+ // Ensure that there is only one variable instance of the same schema within an entire JVM lifetime
+ private static final LoadingCache SCHEMA_CACHE =
+ Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);
+
+ /**
+ * Get schema variable from global cache. If not found, put it into the cache and then return it.
+ * @param schema schema to get
+ * @return if found, return the exist schema variable, otherwise return the param itself.
+ */
+ public static HoodieSchema intern(HoodieSchema schema) {
+ return SCHEMA_CACHE.get(schema);
+ }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 0ddcddc3e6444..cee738df6627e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -18,8 +18,10 @@
package org.apache.hudi.common.schema;
+import org.apache.avro.JsonProperties;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -46,6 +48,9 @@
*/
public final class HoodieSchemaUtils {
+ public static final HoodieSchema METADATA_FIELD_SCHEMA = HoodieSchema.createNullable(HoodieSchemaType.STRING);
+ public static final HoodieSchema RECORD_KEY_SCHEMA = initRecordKeySchema();
+
// Private constructor to prevent instantiation
private HoodieSchemaUtils() {
throw new UnsupportedOperationException("Utility class cannot be instantiated");
@@ -140,6 +145,21 @@ public static HoodieSchema createNullableSchema(HoodieSchema schema) {
return HoodieSchema.fromAvroSchema(nullableAvro);
}
+ /**
+ * Create a new schema by force changing all the fields as nullable.
+ * This is equivalent to AvroSchemaUtils.asNullable() but operates on HoodieSchema.
+ *
+ * @return a new schema with all the fields updated as nullable
+ * @throws IllegalArgumentException if schema is null
+ */
+ public static HoodieSchema asNullable(HoodieSchema schema) {
+ ValidationUtils.checkArgument(schema != null, "Schema cannot be null");
+
+ // Delegate to AvroSchemaUtils
+ Schema nullableAvro = AvroSchemaUtils.asNullable(schema.toAvroSchema());
+ return HoodieSchema.fromAvroSchema(nullableAvro);
+ }
+
/**
* Extracts the non-null type from a union schema.
* This is equivalent to AvroSchemaUtils.getNonNullTypeFromUnion() but operates on HoodieSchema.
@@ -293,4 +313,21 @@ private static Option> getNestedFieldInternal(Ho
));
}
}
+
+ private static HoodieSchema initRecordKeySchema() {
+ HoodieSchemaField recordKeyField =
+ createNewSchemaField(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE);
+ return HoodieSchema.createRecord(
+ "HoodieRecordKey",
+ "",
+ "",
+ false,
+ Collections.singletonList(recordKeyField)
+ );
+ }
+
+ public static HoodieSchema getRecordKeySchema() {
+ return RECORD_KEY_SCHEMA;
+ }
+
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
index 25ea4c16f7e18..f68de07cf4ab1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java
@@ -120,7 +120,8 @@ public void testSchemaParsingWithInvalidJson() {
}, "Should throw exception for invalid JSON schema");
assertThrows(IllegalArgumentException.class, () -> {
- HoodieSchema.parse(null);
+ String invalid = null;
+ HoodieSchema.parse(invalid);
}, "Should throw exception for null schema string");
assertThrows(IllegalArgumentException.class, () -> {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
index 6a089c76310d8..9181666172a92 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
@@ -20,17 +20,18 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collections;
import static org.apache.hudi.common.util.ConfigUtils.OLD_SCHEMAPROVIDER_CONFIG_PREFIX;
@@ -61,9 +62,9 @@ public static class Config {
.withDocumentation("The schema of the target you are writing to");
}
- private final Schema sourceSchema;
+ private final HoodieSchema sourceSchema;
- private Schema targetSchema;
+ private HoodieSchema targetSchema;
@Deprecated
public FilebasedSchemaProvider(TypedProperties props) {
@@ -71,10 +72,13 @@ public FilebasedSchemaProvider(TypedProperties props) {
String sourceSchemaFile = getStringWithAltKeys(props, Config.SOURCE_SCHEMA_FILE);
FileSystem fs = HadoopFSUtils.getFs(sourceSchemaFile, HadoopConfigurations.getHadoopConf(new Configuration()));
try {
- this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaFile)));
+ try (InputStream stream = fs.open(new Path(sourceSchemaFile))) {
+ this.sourceSchema = HoodieSchema.parse(stream);
+ }
if (containsConfigProperty(props, Config.TARGET_SCHEMA_FILE)) {
- this.targetSchema =
- new Schema.Parser().parse(fs.open(new Path(getStringWithAltKeys(props, Config.TARGET_SCHEMA_FILE))));
+ try (InputStream stream = fs.open(new Path(getStringWithAltKeys(props, Config.TARGET_SCHEMA_FILE)))) {
+ this.targetSchema = new HoodieSchema.Parser().parse(stream);
+ }
}
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
@@ -85,19 +89,21 @@ public FilebasedSchemaProvider(Configuration conf) {
final String sourceSchemaPath = conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
final FileSystem fs = HadoopFSUtils.getFs(sourceSchemaPath, HadoopConfigurations.getHadoopConf(conf));
try {
- this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath)));
+ try (InputStream stream = fs.open(new Path(sourceSchemaPath))) {
+ this.sourceSchema = new HoodieSchema.Parser().parse(stream);
+ }
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
}
}
@Override
- public Schema getSourceSchema() {
+ public HoodieSchema getSourceSchema() {
return sourceSchema;
}
@Override
- public Schema getTargetSchema() {
+ public HoodieSchema getTargetSchema() {
if (targetSchema != null) {
return targetSchema;
} else {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
index eba4e51861dc0..9484e68b3e917 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
@@ -18,7 +18,7 @@
package org.apache.hudi.schema;
-import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchema;
import java.io.Serializable;
@@ -29,9 +29,9 @@ public abstract class SchemaProvider implements Serializable {
private static final long serialVersionUID = 1L;
- public abstract Schema getSourceSchema();
+ public abstract HoodieSchema getSourceSchema();
- public Schema getTargetSchema() {
+ public HoodieSchema getTargetSchema() {
// by default, use source schema as target for hoodie table as well
return getSourceSchema();
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java
index d8e67fb7217af..c70127953e732 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java
@@ -20,11 +20,11 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.exception.HoodieIOException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
import java.io.IOException;
import java.io.InputStream;
@@ -110,12 +110,12 @@ public SchemaRegistryProvider(TypedProperties props) {
checkRequiredConfigProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL));
}
- private Schema getSchema(String registryUrl) throws IOException {
- return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+ private HoodieSchema getSchema(String registryUrl) throws IOException {
+ return HoodieSchema.parse(fetchSchemaFromRegistry(registryUrl));
}
@Override
- public Schema getSourceSchema() {
+ public HoodieSchema getSourceSchema() {
String registryUrl = getStringWithAltKeys(config, Config.SRC_SCHEMA_REGISTRY_URL);
try {
return getSchema(registryUrl);
@@ -125,7 +125,7 @@ public Schema getSourceSchema() {
}
@Override
- public Schema getTargetSchema() {
+ public HoodieSchema getTargetSchema() {
String registryUrl = getStringWithAltKeys(config, Config.SRC_SCHEMA_REGISTRY_URL);
String targetRegistryUrl = getStringWithAltKeys(
config, Config.TARGET_SCHEMA_REGISTRY_URL, registryUrl);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index be8ececd82e66..52b6e49620d62 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -22,6 +22,7 @@
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
@@ -44,7 +45,6 @@
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.RuntimeContextUtils;
-import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -217,7 +217,8 @@ protected void loadRecords(String partitionPath) throws Exception {
Option latestCommitTime = commitsTimeline.filterCompletedAndCompactionInstants().lastInstant();
if (latestCommitTime.isPresent()) {
- Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
+ HoodieSchema schema =
+ new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableSchema();
List fileSlices = this.hoodieTable.getSliceView()
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().requestedTime())
@@ -250,7 +251,7 @@ protected void loadRecords(String partitionPath) throws Exception {
*
* @return A record key iterator for the file slice.
*/
- private ClosableIterator getRecordKeyIterator(FileSlice fileSlice, Schema tableSchema) throws IOException {
+ private ClosableIterator getRecordKeyIterator(FileSlice fileSlice, HoodieSchema tableSchema) throws IOException {
FileSlice scanFileSlice = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
// filter out crushed base file
fileSlice.getBaseFile().map(f -> isValidFile(f.getPathInfo()) ? f : null).ifPresent(scanFileSlice::setBaseFile);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 001617f22a2e5..8dbc59475300d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -20,7 +20,6 @@
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.adapter.Utils;
-import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.CloseableConcatenatingIterator;
@@ -32,6 +31,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
@@ -53,13 +53,12 @@
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.HoodieRowDataParquetReader;
import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.FlinkTaskContextSupplier;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.utils.RuntimeContextUtils;
-import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
@@ -107,8 +106,8 @@ public class ClusteringOperator extends TableStreamOperator table;
- private transient Schema schema;
- private transient Schema readerSchema;
+ private transient HoodieSchema schema;
+ private transient HoodieSchema readerSchema;
private transient HoodieFlinkWriteClient writeClient;
private transient StreamRecordCollector collector;
private transient BinaryRowDataSerializer binarySerializer;
@@ -170,11 +169,11 @@ public void open() throws Exception {
this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
this.table = writeClient.getHoodieTable();
- this.schema = AvroSchemaConverter.convertToSchema(rowType);
+ this.schema = HoodieSchemaConverter.convertToSchema(rowType);
// Since there exists discrepancies between flink and spark dealing with nullability of primary key field,
// and there may be some files written by spark, force update schema as nullable to make sure clustering
// scan successfully without schema validating exception.
- this.readerSchema = AvroSchemaUtils.asNullable(schema);
+ this.readerSchema = HoodieSchemaUtils.asNullable(schema);
this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
@@ -312,8 +311,7 @@ private Iterator readRecordsForGroupBaseFiles(List
HoodieRowDataParquetReader fileReader = (HoodieRowDataParquetReader) fileReaderFactory.getFileReader(
table.getConfig(), new StoragePath(clusteringOp.getDataFilePath()));
- //TODO boundary to revisit in later pr to use HoodieSchema directly
- return new CloseableMappingIterator<>(fileReader.getRecordIterator(HoodieSchema.fromAvroSchema(readerSchema)), HoodieRecord::getData);
+ return new CloseableMappingIterator<>(fileReader.getRecordIterator(readerSchema), HoodieRecord::getData);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index b5c955e6d8dce..a9099b74c4071 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -21,6 +21,7 @@
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -32,14 +33,13 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.JCommander;
-import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.deployment.application.ApplicationExecutionException;
@@ -319,8 +319,8 @@ private void cluster() throws Exception {
// Mark instant as clustering inflight
ClusteringUtils.transitionClusteringOrReplaceRequestedToInflight(instant, Option.empty(), table.getActiveTimeline());
- final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
- final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final HoodieSchema tableSchema = StreamerUtil.getTableSchema(table.getMetaClient(), false);
+ final DataType rowDataType = HoodieSchemaConverter.convertToDataType(tableSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 5f19cf8096993..721b3f94e470b 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -27,7 +27,7 @@
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.StreamerUtils;
@@ -73,7 +73,7 @@ public static void main(String[] args) throws Exception {
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
// Read from kafka source
RowType rowType =
- (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 5369e47bb115f..37e5bbbfece0c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -21,13 +21,14 @@
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
import org.apache.hudi.adapter.TableFunctionProviderAdapter;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -70,15 +71,14 @@
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.table.lookup.HoodieLookupFunction;
import org.apache.hudi.table.lookup.HoodieLookupTableReader;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.ExpressionUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
-import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -439,8 +439,8 @@ private List buildInputSplits() {
}
private InputFormat getBatchInputFormat() {
- final Schema tableAvroSchema = getTableAvroSchema();
- final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final HoodieSchema tableSchema = getTableSchema();
+ final DataType rowDataType = HoodieSchemaConverter.convertToDataType(tableSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
@@ -456,7 +456,7 @@ private List buildInputSplits() {
LOG.info("No input splits generate for MERGE_ON_READ input format. Returning empty collection");
return InputFormats.EMPTY_INPUT_FORMAT;
}
- return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+ return mergeOnReadInputFormat(rowType, requiredRowType, tableSchema,
rowDataType, inputSplits, false);
case COPY_ON_WRITE:
return baseFileOnlyInputFormat();
@@ -480,9 +480,9 @@ private List buildInputSplits() {
LOG.info("No input splits generated for incremental read. Returning empty collection");
return InputFormats.EMPTY_INPUT_FORMAT;
} else if (cdcEnabled) {
- return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, result.getInputSplits());
+ return cdcInputFormat(rowType, requiredRowType, tableSchema, rowDataType, result.getInputSplits());
} else {
- return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+ return mergeOnReadInputFormat(rowType, requiredRowType, tableSchema,
rowDataType, result.getInputSplits(), false);
}
default:
@@ -494,8 +494,8 @@ private List buildInputSplits() {
private InputFormat getStreamInputFormat() {
// if table does not exist or table data does not exist, use schema from the DDL
- Schema tableAvroSchema = (this.metaClient == null || !tableDataExists()) ? inferSchemaFromDdl() : getTableAvroSchema();
- final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ HoodieSchema tableSchema = (this.metaClient == null || !tableDataExists()) ? inferSchemaFromDdl() : getTableSchema();
+ final DataType rowDataType = HoodieSchemaConverter.convertToDataType(tableSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
@@ -506,9 +506,9 @@ private List buildInputSplits() {
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.get(FlinkOptions.TABLE_TYPE));
boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
if (this.conf.get(FlinkOptions.CDC_ENABLED)) {
- return cdcInputFormat(rowType, requiredRowType, tableAvroSchema, rowDataType, Collections.emptyList());
+ return cdcInputFormat(rowType, requiredRowType, tableSchema, rowDataType, Collections.emptyList());
} else {
- return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+ return mergeOnReadInputFormat(rowType, requiredRowType, tableSchema,
rowDataType, Collections.emptyList(), emitDelete);
}
default:
@@ -530,14 +530,14 @@ private boolean tableDataExists() {
private MergeOnReadInputFormat cdcInputFormat(
RowType rowType,
RowType requiredRowType,
- Schema tableAvroSchema,
+ HoodieSchema tableSchema,
DataType rowDataType,
List inputSplits) {
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
requiredRowType,
- tableAvroSchema.toString(),
- AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+ tableSchema.toString(),
+ HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
inputSplits,
conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","));
return CdcInputFormat.builder()
@@ -555,7 +555,7 @@ private MergeOnReadInputFormat cdcInputFormat(
private MergeOnReadInputFormat mergeOnReadInputFormat(
RowType rowType,
RowType requiredRowType,
- Schema tableAvroSchema,
+ HoodieSchema tableAvroSchema,
DataType rowDataType,
List inputSplits,
boolean emitDelete) {
@@ -563,7 +563,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
rowType,
requiredRowType,
tableAvroSchema.toString(),
- AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+ HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
inputSplits,
conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
@@ -608,9 +608,9 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
);
}
- private Schema inferSchemaFromDdl() {
- Schema schema = AvroSchemaConverter.convertToSchema(this.tableRowType);
- return HoodieAvroUtils.addMetadataFields(schema, conf.get(FlinkOptions.CHANGELOG_ENABLED));
+ private HoodieSchema inferSchemaFromDdl() {
+ HoodieSchema schema = HoodieSchemaConverter.convertToSchema(this.tableRowType);
+ return HoodieSchemaUtils.addMetadataFields(schema, conf.get(FlinkOptions.CHANGELOG_ENABLED));
}
private FileIndex getOrBuildFileIndex() {
@@ -648,10 +648,10 @@ private boolean isPartitioned() {
}
@VisibleForTesting
- public Schema getTableAvroSchema() {
+ public HoodieSchema getTableSchema() {
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
- return schemaResolver.getTableAvroSchema();
+ return schemaResolver.getTableSchema();
} catch (Throwable e) {
// table exists but has no written data
LOG.warn("Unable to resolve schema from table, using schema from the DDL", e);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 871836dd48054..d69323f22636a 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -22,6 +22,7 @@
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ConfigUtils;
@@ -40,12 +41,11 @@
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.table.HoodieTableFactory;
import org.apache.hudi.table.format.FilePathUtils;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.CatalogUtils;
-import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
@@ -422,7 +422,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
Table hiveTable = translateSparkTable2Flink(tablePath, getHiveTable(tablePath));
String path = hiveTable.getSd().getLocation();
Map parameters = hiveTable.getParameters();
- Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
+ HoodieSchema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
org.apache.flink.table.api.Schema schema;
if (latestTableSchema != null) {
String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
@@ -430,7 +430,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
? null : StringUtils.split(pkColumnsStr, ",");
// if the table is initialized from spark, the write schema is nullable for pk columns.
DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
- AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
+ HoodieSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(tableDataType);
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
@@ -496,7 +496,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
private HoodieTableMetaClient initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
- final String avroSchema = AvroSchemaConverter.convertToSchema(
+ final String avroSchema = HoodieSchemaConverter.convertToSchema(
DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
flinkConf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 5e1f646104b75..96d3a45575dc2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -30,6 +30,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
@@ -107,7 +108,8 @@ public ClosableIterator getFileRecordIterator(
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
.getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, Option.empty());
DataType rowType = RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
- return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema, getSafePredicates(requiredSchema));
+ //TODO revisit once HoodieEngineContext takes in HoodieSchema
+ return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, HoodieSchema.fromAvroSchema(requiredSchema), getSafePredicates(requiredSchema));
}
@Override
@@ -137,6 +139,7 @@ public ClosableIterator mergeBootstrapReaders(
ClosableIterator dataFileIterator,
Schema dataRequiredSchema,
List> partitionFieldAndValues) {
+ //TODO revisit once HoodieEngineContext takes in HoodieSchema
Map partitionOrdinalToValues = partitionFieldAndValues.stream()
.collect(Collectors.toMap(
pair -> dataRequiredSchema.getField(pair.getKey()).pos(),
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index c2f9ca0177b72..8741e66eeac92 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -22,6 +22,8 @@
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
@@ -34,7 +36,6 @@
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.util.FlinkClientUtil;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
@@ -55,13 +56,13 @@ private FormatUtils() {
public static GenericRecord buildAvroRecordBySchema(
IndexedRecord record,
- Schema requiredSchema,
+ HoodieSchema requiredSchema,
int[] requiredPos,
GenericRecordBuilder recordBuilder) {
- List requiredFields = requiredSchema.getFields();
+ List requiredFields = requiredSchema.getFields();
assert (requiredFields.size() == requiredPos.length);
Iterator positionIterator = Arrays.stream(requiredPos).iterator();
- requiredFields.forEach(f -> recordBuilder.set(f, getVal(record, positionIterator.next())));
+ requiredFields.forEach(f -> recordBuilder.set(f.getAvroField(), getVal(record, positionIterator.next())));
return recordBuilder.build();
}
@@ -111,8 +112,8 @@ public static HoodieFileGroupReader createFileGroupReader(
HoodieWriteConfig writeConfig,
InternalSchemaManager internalSchemaManager,
FileSlice fileSlice,
- Schema tableSchema,
- Schema requiredSchema,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
String latestInstant,
String mergeType,
boolean emitDelete,
@@ -134,8 +135,8 @@ public static HoodieFileGroupReader createFileGroupReader(
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(latestInstant)
.withFileSlice(fileSlice)
- .withDataSchema(tableSchema)
- .withRequestedSchema(requiredSchema)
+ .withDataSchema(tableSchema.getAvroSchema())
+ .withRequestedSchema(requiredSchema.getAvroSchema())
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
.withProps(typedProps)
.withShouldUseRecordPosition(false)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index 54c985699de9c..4465124c9a891 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -18,12 +18,12 @@
package org.apache.hudi.table.format;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.model.HoodieFlinkRecord;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -34,13 +34,12 @@
import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.util.AvroSchemaConverter;
-import org.apache.avro.Schema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
@@ -57,7 +56,7 @@ public class HoodieRowDataParquetReader implements HoodieFileReader {
private final HoodieStorage storage;
private final ParquetUtils parquetUtils;
private final StoragePath path;
- private Schema fileSchema;
+ private HoodieSchema fileSchema;
private DataType fileRowType;
private final List> readerIterators = new ArrayList<>();
@@ -84,15 +83,14 @@ public Set> filterRowKeys(Set candidateRowKeys) {
@Override
public ClosableIterator> getRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) throws IOException {
- //TODO boundary to follow up in later pr
- ClosableIterator rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), requestedSchema.getAvroSchema(), Collections.emptyList());
+ ClosableIterator rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), requestedSchema, Collections.emptyList());
readerIterators.add(rowDataItr);
return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
}
@Override
public ClosableIterator getRecordKeyIterator() throws IOException {
- Schema schema = HoodieAvroUtils.getRecordKeySchema();
+ HoodieSchema schema = HoodieSchemaUtils.getRecordKeySchema();
ClosableIterator rowDataItr = getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema, Collections.emptyList());
return new CloseableMappingIterator<>(rowDataItr, rowData -> Objects.toString(rowData.getString(0)));
}
@@ -100,7 +98,7 @@ public ClosableIterator getRecordKeyIterator() throws IOException {
public ClosableIterator getRowDataIterator(
InternalSchemaManager internalSchemaManager,
DataType dataType,
- Schema requestedSchema,
+ HoodieSchema requestedSchema,
List predicates) throws IOException {
return RecordIterators.getParquetRecordIterator(storage.getConf(), internalSchemaManager, dataType, requestedSchema, path, predicates);
}
@@ -108,10 +106,9 @@ public ClosableIterator getRowDataIterator(
@Override
public HoodieSchema getSchema() {
if (fileSchema == null) {
- fileSchema = AvroSchemaConverter.convertToSchema(getRowType().notNull().getLogicalType());
+ fileSchema = HoodieSchemaConverter.convertToSchema(getRowType().notNull().getLogicalType());
}
- //TODO to revisit in later pr to use HoodieSchema directly
- return HoodieSchema.fromAvroSchema(fileSchema);
+ return fileSchema;
}
public DataType getRowType() {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 120e50e5388f5..6a4ed021c2bcc 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.configuration.FlinkOptions;
@@ -29,7 +31,6 @@
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.util.RowDataProjection;
-import org.apache.avro.Schema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -62,12 +63,12 @@ public static ClosableIterator getParquetRecordIterator(
StorageConfiguration> conf,
InternalSchemaManager internalSchemaManager,
DataType dataType,
- Schema requestedSchema,
+ HoodieSchema requestedSchema,
StoragePath path,
List predicates) throws IOException {
List fieldNames = ((RowType) dataType.getLogicalType()).getFieldNames();
List fieldTypes = dataType.getChildren();
- int[] selectedFields = requestedSchema.getFields().stream().map(Schema.Field::name)
+ int[] selectedFields = requestedSchema.getFields().stream().map(HoodieSchemaField::name)
.map(fieldNames::indexOf)
.mapToInt(i -> i)
.toArray();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 30c580d27c028..cad0277b7770f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -18,8 +18,8 @@
package org.apache.hudi.table.format.cdc;
-import org.apache.hudi.avro.AvroSchemaCache;
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.client.model.HoodieFlinkRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -29,6 +29,9 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
@@ -67,7 +70,6 @@
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.StreamerUtil;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.configuration.Configuration;
@@ -139,9 +141,10 @@ public static Builder builder() {
private ClosableIterator getFileSliceIterator(MergeOnReadInputSplit split) {
try {
// get full schema iterator.
- final Schema tableSchema = AvroSchemaCache.intern(new Schema.Parser().parse(tableState.getAvroSchema()));
+ final HoodieSchema schema = HoodieSchemaCache.intern(
+ HoodieSchema.parse(tableState.getAvroSchema()));
// before/after images have assumption of snapshot scan, so `emitDelete` is set as false
- return getSplitRowIterator(split, tableSchema, tableSchema, FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
+ return getSplitRowIterator(split, schema, schema, FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
} catch (IOException e) {
throw new HoodieException("Failed to create iterator for split: " + split, e);
}
@@ -179,8 +182,8 @@ private ClosableIterator getRecordIterator(
MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes);
return new RemoveBaseFileIterator(tableState, getFileSliceIterator(inputSplit));
case AS_IS:
- Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableState.getAvroSchema()));
- Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
+ HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getAvroSchema()));
+ Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema.getAvroSchema());
switch (mode) {
case DATA_BEFORE_AFTER:
return new BeforeAfterImageIterator(tablePath, tableState, hadoopConf, cdcSchema, fileSplit);
@@ -213,9 +216,10 @@ private ClosableIterator getRecordIterator(
* @return {@link RowData} iterator for the given split.
*/
private ClosableIterator> getSplitRecordIterator(MergeOnReadInputSplit split) throws IOException {
- final Schema tableSchema = AvroSchemaCache.intern(new Schema.Parser().parse(tableState.getAvroSchema()));
+ final HoodieSchema schema = HoodieSchemaCache.intern(
+ HoodieSchema.parse(tableState.getAvroSchema()));
HoodieFileGroupReader fileGroupReader =
- createFileGroupReader(split, tableSchema, tableSchema, FlinkOptions.REALTIME_PAYLOAD_COMBINE, true);
+ createFileGroupReader(split, schema, schema, FlinkOptions.REALTIME_PAYLOAD_COMBINE, true);
return fileGroupReader.getClosableHoodieRecordIterator();
}
@@ -360,7 +364,7 @@ static class DataLogFileIterator implements ClosableIterator {
MergeOnReadTableState tableState,
ClosableIterator> logRecordIterator,
HoodieTableMetaClient metaClient) throws IOException {
- this.tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
+ this.tableSchema = HoodieSchema.parse(tableState.getAvroSchema()).getAvroSchema();
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.imageManager = imageManager;
this.projection = tableState.getRequiredRowType().equals(tableState.getRowType())
@@ -472,7 +476,7 @@ private boolean isDelete(HoodieRecord record) {
}
abstract static class BaseImageIterator implements ClosableIterator {
- private final Schema requiredSchema;
+ private final HoodieSchema requiredSchema;
private final int[] requiredPos;
private final GenericRecordBuilder recordBuilder;
private final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
@@ -492,9 +496,9 @@ abstract static class BaseImageIterator implements ClosableIterator {
MergeOnReadTableState tableState,
Schema cdcSchema,
HoodieCDCFileSplit fileSplit) {
- this.requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
+ this.requiredSchema = HoodieSchema.parse(tableState.getRequiredAvroSchema());
this.requiredPos = getRequiredPos(tableState.getAvroSchema(), this.requiredSchema);
- this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+ this.recordBuilder = new GenericRecordBuilder(requiredSchema.getAvroSchema());
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
StoragePath hadoopTablePath = new StoragePath(tablePath);
HoodieStorage storage = new HoodieHadoopStorage(tablePath, hadoopConf);
@@ -509,9 +513,9 @@ abstract static class BaseImageIterator implements ClosableIterator {
this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, cdcSchema);
}
- private int[] getRequiredPos(String tableSchema, Schema required) {
- Schema dataSchema = HoodieAvroUtils.removeMetadataFields(new Schema.Parser().parse(tableSchema));
- List fields = dataSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+ private int[] getRequiredPos(String tableSchema, HoodieSchema required) {
+ HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableSchema));
+ List fields = dataSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList());
return required.getFields().stream()
.map(f -> fields.indexOf(f.name()))
.mapToInt(i -> i)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 01b718b0a97a4..8a1e8b279822f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -18,11 +18,12 @@
package org.apache.hudi.table.format.mor;
-import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.ValidationUtils;
@@ -39,7 +40,6 @@
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
-import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
@@ -187,8 +187,10 @@ protected ClosableIterator initIterator(MergeOnReadInputSplit split) th
+ "hoodie table path: " + split.getTablePath()
+ "flink partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType());
- final Schema tableSchema = AvroSchemaCache.intern(new Schema.Parser().parse(tableState.getAvroSchema()));
- final Schema requiredSchema = AvroSchemaCache.intern(new Schema.Parser().parse(tableState.getRequiredAvroSchema()));
+ final HoodieSchema tableSchema = HoodieSchemaCache.intern(
+ HoodieSchema.parse(tableState.getAvroSchema()));
+ final HoodieSchema requiredSchema = HoodieSchemaCache.intern(
+ HoodieSchema.parse(tableState.getRequiredAvroSchema()));
return getSplitRowIterator(split, tableSchema, requiredSchema, mergeType, emitDelete);
}
@@ -301,8 +303,8 @@ protected ClosableIterator getBaseFileIterator(String path) throws IOEx
*/
protected ClosableIterator getSplitRowIterator(
MergeOnReadInputSplit split,
- Schema tableSchema,
- Schema requiredSchema,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
String mergeType,
boolean emitDelete) throws IOException {
HoodieFileGroupReader fileGroupReader = createFileGroupReader(split, tableSchema, requiredSchema, mergeType, emitDelete);
@@ -322,8 +324,8 @@ protected ClosableIterator getSplitRowIterator(
*/
protected HoodieFileGroupReader createFileGroupReader(
MergeOnReadInputSplit split,
- Schema tableSchema,
- Schema requiredSchema,
+ HoodieSchema tableSchema,
+ HoodieSchema requiredSchema,
String mergeType,
boolean emitDelete) {
FileSlice fileSlice = new FileSlice(
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 446533f5dbc50..73c7a336909d3 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -33,7 +33,7 @@
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +75,7 @@ public static void scheduleCompaction(
*/
public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
- Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+ HoodieSchema tableAvroSchema = tableSchemaResolver.getTableSchema(false);
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
}
@@ -87,7 +87,7 @@ public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaC
*/
public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
- Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+ HoodieSchema tableAvroSchema = tableSchemaResolver.getTableSchema(false);
writeConfig.setSchema(tableAvroSchema.toString());
}
@@ -131,8 +131,8 @@ public static void setPartitionField(Configuration conf, HoodieTableMetaClient m
*/
public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
- Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
- if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) {
+ HoodieSchema tableAvroSchema = HoodieSchema.fromAvroSchema(tableSchemaResolver.getTableAvroSchemaFromDataFile());
+ if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent()) {
conf.set(FlinkOptions.CHANGELOG_ENABLED, true);
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java
index 4c66a13a13020..6b8e3e48786fa 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/JsonDeserializationFunction.java
@@ -41,7 +41,7 @@ public final class JsonDeserializationFunction
public static JsonDeserializationFunction getInstance(Configuration conf) {
// Read from file source
RowType rowType =
- (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
return getInstance(rowType);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 3beb05e350c43..388d5de7e4cd9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -70,7 +70,7 @@
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.streamer.FlinkStreamerConfig;
-import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -136,12 +136,12 @@ public static TypedProperties buildProperties(List props) {
return properties;
}
- public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
+ public static HoodieSchema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
return new FilebasedSchemaProvider(conf).getSourceSchema();
} else if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
final String schemaStr = conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA);
- return new Schema.Parser().parse(schemaStr);
+ return HoodieSchema.parse(schemaStr);
} else {
final String errorMsg = String.format("Either option '%s' or '%s' "
+ "should be specified for avro schema deserialization",
@@ -608,19 +608,19 @@ public static long getMaxCompactionMemoryInBytes(Configuration conf) {
return (long) conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
}
- public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception {
+ public static HoodieSchema getTableSchema(HoodieTableMetaClient metaClient, boolean includeMetadataFields) throws Exception {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- return schemaUtil.getTableAvroSchema(includeMetadataFields);
+ return schemaUtil.getTableSchema(includeMetadataFields);
}
- public static Schema getLatestTableSchema(String path, org.apache.hadoop.conf.Configuration hadoopConf) {
+ public static HoodieSchema getLatestTableSchema(String path, org.apache.hadoop.conf.Configuration hadoopConf) {
if (StringUtils.isNullOrEmpty(path) || !StreamerUtil.tableExists(path, hadoopConf)) {
return null;
}
try {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf);
- return getTableAvroSchema(metaClient, false);
+ return getTableSchema(metaClient, false);
} catch (Exception e) {
LOG.error("Failed to resolve the latest table schema", e);
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java
index 350ae7f33f6d7..28e513e74da7f 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamV2Write.java
@@ -22,7 +22,7 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.sink.v2.utils.PipelinesV2;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
@@ -114,7 +114,7 @@ private void writeAndCheckExpected(
// Read from file source
RowType rowType =
- (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
String sourcePath = Objects.requireNonNull(Thread.currentThread()
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 938f1f7d4026e..d5bd4f895bc1e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -38,8 +38,8 @@
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.catalog.HoodieCatalog;
import org.apache.hudi.table.catalog.TableOptionProperties;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
@@ -283,7 +283,7 @@ private void writeAndCheckExpected(
// Read from file source
RowType rowType =
- (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
String sourcePath = Objects.requireNonNull(Thread.currentThread()
@@ -333,7 +333,7 @@ private void writeWithClusterAndCheckExpected(
// Read from file source
RowType rowType =
- (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
String sourcePath = Objects.requireNonNull(Thread.currentThread()
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
index b36155550cd3c..d8e67f636d06f 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
@@ -27,7 +27,7 @@
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.utils.Pipelines;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.JsonDeserializationFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
@@ -147,7 +147,7 @@ private void testWriteToHoodie(
// Read from file source
RowType rowType =
- (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
String sourcePath = Objects.requireNonNull(Thread.currentThread()
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 6289f9af27b1b..4fbf90e2e421a 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -24,6 +24,7 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -38,16 +39,15 @@
import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
import org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob;
import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
-import org.apache.avro.Schema;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
@@ -180,8 +180,8 @@ public void testHoodieFlinkClustering() throws Exception {
HoodieInstant instant = INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime.get());
table.getActiveTimeline().transitionClusterRequestedToInflight(instant, Option.empty());
- final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
- final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final HoodieSchema tableSchema = StreamerUtil.getTableSchema(table.getMetaClient(), false);
+ final DataType rowDataType = HoodieSchemaConverter.convertToDataType(tableSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime.get(), clusteringPlan, conf))
@@ -383,8 +383,8 @@ public void testHoodieFlinkClusteringScheduleAfterArchive() throws Exception {
HoodieInstant instant = INSTANT_GENERATOR.getClusteringCommitRequestedInstant(firstClusteringInstant.get());
table.getActiveTimeline().transitionClusterRequestedToInflight(instant, Option.empty());
- final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
- final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final HoodieSchema tableAvroSchema = StreamerUtil.getTableSchema(table.getMetaClient(), false);
+ final DataType rowDataType = HoodieSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
DataStream dataStream =
@@ -752,8 +752,8 @@ private void runOfflineCluster(TableEnvironment tableEnv, Configuration conf) th
HoodieInstant instant = INSTANT_GENERATOR.getClusteringCommitRequestedInstant(clusteringInstantTime.get());
table.getActiveTimeline().transitionClusterRequestedToInflight(instant, Option.empty());
- final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
- final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final HoodieSchema tableAvroSchema = StreamerUtil.getTableSchema(table.getMetaClient(), false);
+ final DataType rowDataType = HoodieSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
DataStream dataStream = env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime.get(), clusteringPlan, conf))
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java
index 4b5ce9dc0816f..70f8cacfd9929 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCustomSerDe.java
@@ -22,13 +22,12 @@
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.collection.BitCaskDiskMap;
import org.apache.hudi.common.util.collection.RocksDbDiskMap;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.junit.jupiter.api.BeforeEach;
@@ -72,11 +71,16 @@ public void testRocksDbDiskMapPutDecimal() throws IOException {
}
private static HoodieRecord createAvroRecordWithDecimalOrderingField() {
- Schema precombineFieldSchema = LogicalTypes.decimal(20, 0)
- .addToSchema(Schema.createFixed("fixed", null, "record.precombineField", 9));
+ HoodieSchema decimalSchema = HoodieSchema.createDecimal(
+ "fixed", // name
+ "record.precombineField", // namespace
+ null, // doc
+ 20, // precision
+ 0, // scale
+ 9 // fixedSize in bytes
+ );
byte[] decimalFieldBytes = new byte[] {0, 0, 0, 1, -122, -16, -116, -90, -32};
- GenericFixed genericFixed = new GenericData.Fixed(precombineFieldSchema, decimalFieldBytes);
-
+ GenericFixed genericFixed = new GenericData.Fixed(decimalSchema.getAvroSchema(), decimalFieldBytes);
// nullifying the record attribute in EventTimeAvroPayload here as it is not required in the test
return new HoodieAvroRecord<>(new HoodieKey("recordKey", "partitionPath"),
new EventTimeAvroPayload(null, (Comparable) genericFixed));
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index b899a90787d7d..56b0017a8bcef 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -28,7 +28,7 @@
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -103,7 +103,7 @@ public BucketStreamWriteFunctionWrapper(String tablePath, Configuration conf) th
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
this.gateway = new MockOperatorEventGateway();
this.conf = conf;
- this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+ this.rowType = (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 73fd955d3579b..51d2a4561d1f1 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -31,7 +31,7 @@
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.ExecutionConfig;
@@ -93,7 +93,7 @@ public BulkInsertFunctionWrapper(String tablePath, Configuration conf) throws Ex
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
this.gateway = new MockOperatorEventGateway();
this.conf = conf;
- this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+ this.rowType = (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
this.rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index ea63d58d55085..62d24d346d064 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -26,7 +26,7 @@
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.api.common.ExecutionConfig;
@@ -84,7 +84,7 @@ public InsertFunctionWrapper(String tablePath, Configuration conf) throws Except
this.gateway = new MockOperatorEventGateway();
this.subtaskGateway = new MockSubtaskGateway();
this.conf = conf;
- this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+ this.rowType = (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index fc2d3f9fdf5e4..d5f1a31385fd6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -32,7 +32,7 @@
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -122,7 +122,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
this.gateway = new MockOperatorEventGateway();
this.conf = conf;
- this.rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+ this.rowType = (RowType) HoodieSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index c644d70e2163a..1f5b41bc0f91b 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -31,7 +32,6 @@
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
-import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -247,18 +247,18 @@ private OneInputStreamOperatorTestHarness create
// This input format is used to opening the emitted split.
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
- final Schema tableAvroSchema;
+ final HoodieSchema tableSchema;
try {
- tableAvroSchema = schemaResolver.getTableAvroSchema();
+ tableSchema = schemaResolver.getTableSchema();
} catch (Exception e) {
throw new HoodieException("Get table avro schema error", e);
}
- final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableSchema.getAvroSchema());
final RowType rowType = (RowType) rowDataType.getLogicalType();
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
TestConfigurations.ROW_TYPE,
- tableAvroSchema.toString(),
+ tableSchema.toString(),
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
Collections.emptyList(),
new String[0]);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 601625242991d..e217bb5ab9bce 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -22,6 +22,9 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CompactionUtils;
@@ -41,8 +44,6 @@
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.FlinkMiniCluster;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -63,6 +64,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -245,35 +247,44 @@ private void changeTableSchema(TableOptions tableOptions, boolean shouldCompactB
doCompact(conf);
}
- Schema intType = SchemaBuilder.unionOf().nullType().and().intType().endUnion();
- Schema longType = SchemaBuilder.unionOf().nullType().and().longType().endUnion();
- Schema doubleType = SchemaBuilder.unionOf().nullType().and().doubleType().endUnion();
- Schema stringType = SchemaBuilder.unionOf().nullType().and().stringType().endUnion();
- Schema structType = SchemaBuilder.builder().record("new_row_col").fields()
- .name("f0").type(longType).noDefault()
- .name("f1").type(stringType).noDefault().endRecord();
- Schema arrayType = Schema.createUnion(SchemaBuilder.builder().array().items(stringType), SchemaBuilder.builder().nullType());
- Schema mapType = Schema.createUnion(SchemaBuilder.builder().map().values(stringType), SchemaBuilder.builder().nullType());
-
- writeClient.addColumn("salary", doubleType, null, "name", AFTER);
+ // Create nullable primitive types using HoodieSchema
+ HoodieSchema intType = HoodieSchema.createNullable(HoodieSchemaType.INT);
+ HoodieSchema longType = HoodieSchema.createNullable(HoodieSchemaType.LONG);
+ HoodieSchema doubleType = HoodieSchema.createNullable(HoodieSchemaType.DOUBLE);
+ HoodieSchema stringType = HoodieSchema.createNullable(HoodieSchemaType.STRING);
+
+ // Create struct type with fields
+ List structFields = Arrays.asList(
+ HoodieSchemaField.of("f0", longType, null, HoodieSchema.NULL_VALUE),
+ HoodieSchemaField.of("f1", stringType, null, HoodieSchema.NULL_VALUE)
+ );
+ HoodieSchema structType = HoodieSchema.createRecord("new_row_col", null, null, structFields);
+
+ // Create nullable array type
+ HoodieSchema arrayType = HoodieSchema.createNullable(HoodieSchema.createArray(stringType));
+
+ // Create nullable map type
+ HoodieSchema mapType = HoodieSchema.createNullable(HoodieSchema.createMap(stringType));
+
+ writeClient.addColumn("salary", doubleType.getAvroSchema(), null, "name", AFTER);
writeClient.deleteColumns("gender");
writeClient.renameColumn("name", "first_name");
writeClient.updateColumnType("age", Types.StringType.get());
- writeClient.addColumn("last_name", stringType, "empty allowed", "salary", BEFORE);
+ writeClient.addColumn("last_name", stringType.getAvroSchema(), "empty allowed", "salary", BEFORE);
writeClient.reOrderColPosition("age", "first_name", BEFORE);
// add a field in the middle of the `f_struct` and `f_row_map` columns
- writeClient.addColumn("f_struct.f2", intType, "add field in middle of struct", "f_struct.f0", AFTER);
- writeClient.addColumn("f_row_map.value.f2", intType, "add field in middle of struct", "f_row_map.value.f0", AFTER);
+ writeClient.addColumn("f_struct.f2", intType.getAvroSchema(), "add field in middle of struct", "f_struct.f0", AFTER);
+ writeClient.addColumn("f_row_map.value.f2", intType.getAvroSchema(), "add field in middle of struct", "f_row_map.value.f0", AFTER);
// add a field at the end of `f_struct` and `f_row_map` column
- writeClient.addColumn("f_struct.f3", stringType);
- writeClient.addColumn("f_row_map.value.f3", stringType);
+ writeClient.addColumn("f_struct.f3", stringType.getAvroSchema());
+ writeClient.addColumn("f_row_map.value.f3", stringType.getAvroSchema());
// delete and add a field with the same name
// reads should not return previously inserted datum of dropped field of the same name
writeClient.deleteColumns("f_struct.drop_add");
- writeClient.addColumn("f_struct.drop_add", doubleType);
+ writeClient.addColumn("f_struct.drop_add", doubleType.getAvroSchema());
writeClient.deleteColumns("f_row_map.value.drop_add");
- writeClient.addColumn("f_row_map.value.drop_add", doubleType);
+ writeClient.addColumn("f_row_map.value.drop_add", doubleType.getAvroSchema());
// perform comprehensive evolution on complex types (struct, array, map) by promoting its primitive types
writeClient.updateColumnType("f_struct.change_type", Types.LongType.get());
@@ -284,9 +295,9 @@ private void changeTableSchema(TableOptions tableOptions, boolean shouldCompactB
writeClient.updateColumnType("f_map.value", Types.DoubleType.get());
// perform comprehensive schema evolution on table by adding complex typed columns
- writeClient.addColumn("new_row_col", structType);
- writeClient.addColumn("new_array_col", arrayType);
- writeClient.addColumn("new_map_col", mapType);
+ writeClient.addColumn("new_row_col", structType.getAvroSchema());
+ writeClient.addColumn("new_array_col", arrayType.getAvroSchema());
+ writeClient.addColumn("new_map_col", mapType.getAvroSchema());
writeClient.reOrderColPosition("partition", "new_map_col", AFTER);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 681442274893f..108043503cd43 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -19,11 +19,15 @@
package org.apache.hudi.table;
+import org.apache.avro.Schema;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
@@ -44,13 +48,11 @@
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.format.FlinkRowDataReaderContext;
import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.utils.TestData;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -64,6 +66,7 @@
import org.mockito.Mockito;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -143,13 +146,13 @@ public void commitToTable(List recordList, String operation, boole
writeConfigs.forEach((key, value) -> conf.setString(key, value));
conf.set(FlinkOptions.ORDERING_FIELDS, ConfigUtils.getOrderingFieldsStrDuringWrite(writeConfigs));
conf.set(FlinkOptions.OPERATION, operation);
- Schema localSchema = getRecordAvroSchema(schemaStr);
+ HoodieSchema localSchema = getRecordSchema(schemaStr);
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, localSchema.toString());
AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
- RowDataAvroQueryContexts.fromAvroSchema(localSchema).getAvroToRowDataConverter();
+ RowDataAvroQueryContexts.fromAvroSchema(localSchema.getAvroSchema()).getAvroToRowDataConverter();
List rowDataList = recordList.stream().map(record -> {
try {
- return (RowData) avroConverter.convert(record.toIndexedRecord(localSchema, CollectionUtils.emptyProps()).get().getData());
+ return (RowData) avroConverter.convert(record.toIndexedRecord(localSchema.getAvroSchema(), CollectionUtils.emptyProps()).get().getData());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -204,16 +207,15 @@ public void testGetOrderingValue() {
when(tableConfig.populateMetaFields()).thenReturn(true);
FlinkRowDataReaderContext readerContext =
new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
- Schema schema = SchemaBuilder.builder()
- .record("test")
- .fields()
- .requiredString("field1")
- .optionalString("field2")
- .optionalLong("ts")
- .endRecord();
+ List fields = Arrays.asList(
+ HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("field2", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING))),
+ HoodieSchemaField.of("ts", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)))
+ );
+ HoodieSchema schema = HoodieSchema.createRecord("test", null, null, fields);
GenericRowData rowData = GenericRowData.of(StringData.fromString("f1"), StringData.fromString("f2"), 1000L);
- assertEquals(1000L, readerContext.getRecordContext().getOrderingValue(rowData, schema, Collections.singletonList("ts")));
- assertEquals(OrderingValues.getDefault(), readerContext.getRecordContext().getOrderingValue(rowData, schema, Collections.singletonList("non_existent_col")));
+ assertEquals(1000L, readerContext.getRecordContext().getOrderingValue(rowData, schema.getAvroSchema(), Collections.singletonList("ts")));
+ assertEquals(OrderingValues.getDefault(), readerContext.getRecordContext().getOrderingValue(rowData, schema.getAvroSchema(), Collections.singletonList("non_existent_col")));
}
@Test
@@ -222,15 +224,14 @@ public void getRecordKeyFromMetadataFields() {
when(tableConfig.populateMetaFields()).thenReturn(true);
FlinkRowDataReaderContext readerContext =
new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
- Schema schema = SchemaBuilder.builder()
- .record("test")
- .fields()
- .requiredString(HoodieRecord.RECORD_KEY_METADATA_FIELD)
- .optionalString("field2")
- .endRecord();
+ List fields = Arrays.asList(
+ HoodieSchemaField.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("field2", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING)))
+ );
+ HoodieSchema schema = HoodieSchema.createRecord("test", null, null, fields);
String key = "my_key";
GenericRowData rowData = GenericRowData.of(StringData.fromString(key), StringData.fromString("field2_val"));
- assertEquals(key, readerContext.getRecordContext().getRecordKey(rowData, schema));
+ assertEquals(key, readerContext.getRecordContext().getRecordKey(rowData, schema.getAvroSchema()));
}
@Test
@@ -240,15 +241,15 @@ public void getRecordKeySingleKey() {
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[] {"field1"}));
FlinkRowDataReaderContext readerContext =
new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
- Schema schema = SchemaBuilder.builder()
- .record("test")
- .fields()
- .requiredString("field1")
- .optionalString("field2")
- .endRecord();
+ List fields = Arrays.asList(
+ HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("field2", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING)))
+ );
+ HoodieSchema schema = HoodieSchema.createRecord("test", null, null, fields);
+
String key = "key";
GenericRowData rowData = GenericRowData.of(StringData.fromString(key), StringData.fromString("other"));
- assertEquals(key, readerContext.getRecordContext().getRecordKey(rowData, schema));
+ assertEquals(key, readerContext.getRecordContext().getRecordKey(rowData, schema.getAvroSchema()));
}
@Test
@@ -259,16 +260,16 @@ public void getRecordKeyWithMultipleKeys() {
FlinkRowDataReaderContext readerContext =
new FlinkRowDataReaderContext(getStorageConf(), () -> InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig, Option.empty());
- Schema schema = SchemaBuilder.builder()
- .record("test")
- .fields()
- .requiredString("field1")
- .requiredString("field2")
- .requiredString("field3")
- .endRecord();
+ List fields = Arrays.asList(
+ HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("field2", HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("field3", HoodieSchema.create(HoodieSchemaType.STRING))
+ );
+ HoodieSchema schema = HoodieSchema.createRecord("test", null, null, fields);
+
String key = "field1:va1,field2:__empty__";
GenericRowData rowData = GenericRowData.of(StringData.fromString("va1"), StringData.fromString(""), StringData.fromString("other"));
- assertEquals(key, readerContext.getRecordContext().getRecordKey(rowData, schema));
+ assertEquals(key, readerContext.getRecordContext().getRecordKey(rowData, schema.getAvroSchema()));
}
@ParameterizedTest
@@ -325,8 +326,8 @@ public void testFilterFileWithInstantRange(WriteOperationType firstCommitOperati
}
}
- private static Schema getRecordAvroSchema(String schemaStr) {
- Schema recordSchema = new Schema.Parser().parse(schemaStr);
- return AvroSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema).getRowType().getLogicalType());
+ private static HoodieSchema getRecordSchema(String schemaStr) {
+ HoodieSchema recordSchema = new HoodieSchema.Parser().parse(schemaStr);
+ return HoodieSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.getAvroSchema()).getRowType().getLogicalType());
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index c2c36bec4fd11..87515ac6d2674 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -21,6 +21,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.ExpressionPredicates;
@@ -32,7 +33,6 @@
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
-import org.apache.avro.Schema;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
@@ -145,8 +145,8 @@ void testGetInputFormat() throws Exception {
void testGetTableAvroSchema() {
HoodieTableSource tableSource = getEmptyStreamingSource();
assertNull(tableSource.getMetaClient(), "Streaming source with empty table path is allowed");
- final String schemaFields = tableSource.getTableAvroSchema().getFields().stream()
- .map(Schema.Field::name)
+ final String schemaFields = tableSource.getTableSchema().getFields().stream()
+ .map(HoodieSchemaField::name)
.collect(Collectors.joining(","));
final String expected = "_hoodie_commit_time,"
+ "_hoodie_commit_seqno,"
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
index 037a07a9b425e..03aa1fa9f9ea5 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestFlinkRowDataReaderContext.java
@@ -20,14 +20,15 @@
package org.apache.hudi.table.format;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -35,6 +36,7 @@
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,11 +50,13 @@
class TestFlinkRowDataReaderContext {
private final StorageConfiguration> storageConfig = mock(StorageConfiguration.class);
private final HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
- private static final Schema AVRO_SCHEMA = SchemaBuilder.record("TestRecord").fields()
- .requiredInt("id")
- .requiredString("name")
- .requiredBoolean("active")
- .endRecord();
+ private static final List FIELDS = Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("name", HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("active", HoodieSchema.create(HoodieSchemaType.BOOLEAN))
+ );
+ private static final HoodieSchema HOODIE_SCHEMA = HoodieSchema.createRecord("TestRecord", null, null, FIELDS);
+
private FlinkRowDataReaderContext readerContext;
@BeforeEach
@@ -68,7 +72,7 @@ void setUp() {
@Test
void testConstructEngineRecordWithFieldValues() {
Object[] fieldVals = new Object[] {1, StringData.fromString("Alice"), true};
- RowData row = readerContext.getRecordContext().constructEngineRecord(AVRO_SCHEMA, fieldVals);
+ RowData row = readerContext.getRecordContext().constructEngineRecord(HOODIE_SCHEMA.getAvroSchema(), fieldVals);
assertEquals(fieldVals[0], row.getInt(0));
assertEquals(fieldVals[1], row.getString(1));
assertEquals(fieldVals[2], row.getBoolean(2));
@@ -79,7 +83,7 @@ void testConstructEngineRecordWithNoUpdates() {
RowData base = createBaseRow(1, "Alice", true);
BufferedRecord record = new BufferedRecord<>("anyKey", 1, base, 1, null);
Map updates = new HashMap<>();
- RowData result = readerContext.getRecordContext().mergeWithEngineRecord(AVRO_SCHEMA, updates, record);
+ RowData result = readerContext.getRecordContext().mergeWithEngineRecord(HOODIE_SCHEMA.getAvroSchema(), updates, record);
assertEquals(1, result.getInt(0));
assertEquals("Alice", result.getString(1).toString());
@@ -93,7 +97,7 @@ void testConstructEngineRecordWithUpdateOneField() {
Map updates = new HashMap<>();
updates.put(1, StringData.fromString("Bob"));
- RowData result = readerContext.getRecordContext().mergeWithEngineRecord(AVRO_SCHEMA, updates, record);
+ RowData result = readerContext.getRecordContext().mergeWithEngineRecord(HOODIE_SCHEMA.getAvroSchema(), updates, record);
assertEquals(1, result.getInt(0)); // unchanged
assertEquals("Bob", result.getString(1).toString()); // updated
@@ -108,7 +112,7 @@ void testConstructEngineRecordWithUpdateAllFields() {
updates.put(0, 42);
updates.put(1, StringData.fromString("Zoe"));
updates.put(2, false);
- RowData result = readerContext.getRecordContext().mergeWithEngineRecord(AVRO_SCHEMA, updates, record);
+ RowData result = readerContext.getRecordContext().mergeWithEngineRecord(HOODIE_SCHEMA.getAvroSchema(), updates, record);
assertEquals(42, result.getInt(0));
assertEquals("Zoe", result.getString(1).toString());
@@ -121,7 +125,7 @@ void testConstructEngineRecordWithNullUpdate() {
BufferedRecord record = new BufferedRecord<>("anyKey", 1, base, 1, null);
Map updates = new HashMap<>();
updates.put(1, null);
- RowData result = readerContext.getRecordContext().mergeWithEngineRecord(AVRO_SCHEMA, updates, record);
+ RowData result = readerContext.getRecordContext().mergeWithEngineRecord(HOODIE_SCHEMA.getAvroSchema(), updates, record);
assertEquals(5, result.getInt(0));
assertTrue(result.isNullAt(1));
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
index 74b82eee1846e..afe75e0615661 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
@@ -19,10 +19,10 @@
package org.apache.hudi.utils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.util.AvroSchemaConverter;
-import org.apache.avro.Schema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.junit.jupiter.api.Test;
@@ -37,8 +37,8 @@ public class TestAvroSchemaConverter {
@Test
void testUnionSchemaWithMultipleRecordTypes() {
- Schema schema = HoodieMetadataRecord.SCHEMA$;
- DataType dataType = AvroSchemaConverter.convertToDataType(schema);
+ HoodieSchema schema = HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
+ DataType dataType = AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
final String expected = "ROW<"
+ "`fileName` STRING, "
@@ -62,7 +62,7 @@ void testLocalTimestampType() {
DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
);
// convert to avro schema
- Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
+ HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(dataType.getLogicalType()));
final String expectedSchema = ""
+ "[ \"null\", {\n"
+ " \"type\" : \"record\",\n"
@@ -85,7 +85,7 @@ void testLocalTimestampType() {
+ "} ]";
assertThat(schema.toString(true), is(expectedSchema));
// convert it back
- DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema);
+ DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
final String expectedDataType = "ROW<"
+ "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
+ "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 6b5f79c6872b7..3ed961b8d5bf9 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -25,6 +25,7 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -969,6 +970,7 @@ public static void checkWrittenDataMOR(
HoodieTableMetaClient metaClient = createMetaClient(basePath);
HoodieFlinkTable> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+ HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema);
String latestInstant = metaClient.getActiveTimeline().filterCompletedInstants()
.lastInstant().map(HoodieInstant::requestedTime).orElse(null);
@@ -983,7 +985,7 @@ public static void checkWrittenDataMOR(
List readBuffer = new ArrayList<>();
List fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList());
for (FileSlice fileSlice : fileSlices) {
- try (ClosableIterator rowIterator = getRecordIterator(fileSlice, schema, metaClient, config)) {
+ try (ClosableIterator rowIterator = getRecordIterator(fileSlice, hoodieSchema, metaClient, config)) {
while (rowIterator.hasNext()) {
RowData rowData = rowIterator.next();
readBuffer.add(filterOutVariables(schema, rowData));
@@ -1014,7 +1016,7 @@ public static String filterOutVariablesWithoutHudiMetadata(GenericRecord generic
private static ClosableIterator getRecordIterator(
FileSlice fileSlice,
- Schema tableSchema,
+ HoodieSchema tableSchema,
HoodieTableMetaClient metaClient,
HoodieWriteConfig writeConfig) throws IOException {
HoodieFileGroupReader fileGroupReader =
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index b64f90940dc11..5e2bf96060305 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -66,7 +66,7 @@ public AbstractConnectWriter(KafkaConnectConfigs connectConfigs,
@Override
public void writeRecord(SinkRecord record) throws IOException {
- AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema());
+ AvroConvertor convertor = new AvroConvertor(schemaProvider.getSourceSchema().getAvroSchema());
Option avroRecord;
switch (connectConfigs.getKafkaValueConverter()) {
case KAFKA_AVRO_CONVERTER:
diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
index 5b266e1d4fcaf..1b44f11785450 100644
--- a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
+++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
@@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.writers.AbstractConnectWriter;
@@ -33,7 +34,6 @@
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -75,7 +75,7 @@ public void setUp() throws Exception {
@ParameterizedTest
@EnumSource(value = TestInputFormats.class)
public void testAbstractWriterForAllFormats(TestInputFormats inputFormats) throws Exception {
- Schema schema = schemaProvider.getSourceSchema();
+ HoodieSchema schema = schemaProvider.getSourceSchema();
List> inputRecords;
List expectedRecords;
@@ -83,12 +83,12 @@ public void testAbstractWriterForAllFormats(TestInputFormats inputFormats) throw
switch (inputFormats) {
case JSON_STRING:
formatConverter = AbstractConnectWriter.KAFKA_STRING_CONVERTER;
- GenericDatumReader reader = new GenericDatumReader<>(schema, schema);
+ GenericDatumReader reader = new GenericDatumReader<>(schema.getAvroSchema(), schema.getAvroSchema());
inputRecords = SchemaTestUtil.generateTestJsonRecords(0, NUM_RECORDS);
expectedRecords = ((List) inputRecords).stream().map(s -> {
try {
- return HoodieAvroUtils.rewriteRecord((GenericRecord) reader.read(null, DecoderFactory.get().jsonDecoder(schema, s)),
- schema);
+ return HoodieAvroUtils.rewriteRecord((GenericRecord) reader.read(null, DecoderFactory.get().jsonDecoder(schema.getAvroSchema(), s)),
+ schema.getAvroSchema());
} catch (IOException exception) {
throw new HoodieException("Error converting JSON records to AVRO");
}
@@ -97,7 +97,7 @@ public void testAbstractWriterForAllFormats(TestInputFormats inputFormats) throw
case AVRO:
formatConverter = AbstractConnectWriter.KAFKA_AVRO_CONVERTER;
inputRecords = SchemaTestUtil.generateTestRecords(0, NUM_RECORDS);
- expectedRecords = inputRecords.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, schema))
+ expectedRecords = inputRecords.stream().map(s -> HoodieAvroUtils.rewriteRecord((GenericRecord) s, schema.getAvroSchema()))
.map(p -> convertToHoodieRecords(p, p.get(RECORD_KEY_INDEX).toString(), "000/00/00")).collect(Collectors.toList());
break;
default:
@@ -194,9 +194,9 @@ public HoodieKey getKey(GenericRecord record) {
static class TestSchemaProvider extends SchemaProvider {
@Override
- public Schema getSourceSchema() {
+ public HoodieSchema getSourceSchema() {
try {
- return SchemaTestUtil.getSimpleSchema();
+ return HoodieSchema.fromAvroSchema(SchemaTestUtil.getSimpleSchema());
} catch (IOException exception) {
throw new HoodieException("Fatal error parsing schema", exception);
}