From 855190482afcd5c06b186dd563c6c25896b4a53e Mon Sep 17 00:00:00 2001 From: Gezi-lzq Date: Mon, 3 Nov 2025 10:19:54 +0800 Subject: [PATCH 1/2] feat(timestamp): enhance RecordBinder to support TIMESTAMP and TIME types in union handling --- .../automq/table/binder/RecordBinder.java | 5 +- .../table/binder/AvroRecordBinderTest.java | 794 +++++++----------- 2 files changed, 313 insertions(+), 486 deletions(-) diff --git a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java index 0c1be6a3db..e6020390b4 100644 --- a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java +++ b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java @@ -156,7 +156,10 @@ private FieldMapping createOptimizedMapping(String avroFieldName, int avroPositi nestedSchema = icebergType.asStructType().asSchema(); nestedSchemaId = icebergType.toString(); } - if (Type.TypeID.MAP.equals(icebergType.typeId()) || Type.TypeID.LIST.equals(icebergType.typeId())) { + if (Type.TypeID.TIMESTAMP.equals(icebergType.typeId()) + || Type.TypeID.TIME.equals(icebergType.typeId()) + || Type.TypeID.MAP.equals(icebergType.typeId()) + || Type.TypeID.LIST.equals(icebergType.typeId())) { avroType = resolveUnionElement(avroType); } return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId); diff --git a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java index 9051e27a6a..9b25c1e52d 100644 --- a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java +++ b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java @@ -20,10 +20,10 @@ package kafka.automq.table.binder; import com.google.common.collect.ImmutableMap; - import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -75,6 +75,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; @@ -84,10 +87,13 @@ @Tag("S3Unit") public class AvroRecordBinderTest { + private static final String TEST_NAMESPACE = "kafka.automq.table.binder"; + private static Schema avroSchema; private InMemoryCatalog catalog; private Table table; private TaskWriter writer; + private int tableCounter; static { CodecSetup.setup(); @@ -99,10 +105,12 @@ void setUp() { catalog = new InMemoryCatalog(); catalog.initialize("test", ImmutableMap.of()); catalog.createNamespace(Namespace.of("default")); + tableCounter = 0; } private void testSendRecord(org.apache.iceberg.Schema schema, Record record) { - table = catalog.createTable(TableIdentifier.of(Namespace.of("default"), "test"), schema); + String tableName = "test_" + tableCounter++; + table = catalog.createTable(TableIdentifier.of(Namespace.of("default"), tableName), schema); writer = createTableWriter(table); try { writer.write(record); @@ -206,6 +214,102 @@ private static Map normalizeMapValues(Object value) { return result; } + private static Schema createOptionalSchema(Schema nonNullSchema) { + return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), nonNullSchema)); + } + + private static Schema ensureNonNullBranch(Schema schema) { + if (schema.getType() != Schema.Type.UNION) { + return schema; + } + return schema.getTypes().stream() + .filter(type -> type.getType() != Schema.Type.NULL) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Union schema lacks non-null branch: " + schema)); + } + + private void runRoundTrip(Schema recordSchema, Consumer avroPopulator, Consumer assertions) { + GenericRecord avroRecord = new GenericData.Record(recordSchema); + avroPopulator.accept(avroRecord); + GenericRecord roundTripRecord = serializeAndDeserialize(avroRecord, recordSchema); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(recordSchema); + Record icebergRecord = new RecordBinder(icebergSchema, recordSchema).bind(roundTripRecord); + + assertions.accept(icebergRecord); + testSendRecord(icebergSchema, icebergRecord); + } + + // Helper method to test round-trip conversion for a single field + private void assertFieldRoundTrips(String recordPrefix, + String fieldName, + Supplier fieldSchemaSupplier, + Function avroValueSupplier, + Consumer valueAssertion) { + Schema baseFieldSchema = fieldSchemaSupplier.get(); + Schema baseRecordSchema = SchemaBuilder.builder() + .record(recordPrefix + "Base") + .namespace(TEST_NAMESPACE) + .fields() + .name(fieldName).type(baseFieldSchema).noDefault() + .endRecord(); + + // Direct field + runRoundTrip(baseRecordSchema, + record -> record.put(fieldName, avroValueSupplier.apply(baseFieldSchema)), + icebergRecord -> valueAssertion.accept(icebergRecord.getField(fieldName)) + ); + + Schema optionalFieldSchema = createOptionalSchema(fieldSchemaSupplier.get()); + Schema unionRecordSchema = SchemaBuilder.builder() + .record(recordPrefix + "Union") + .namespace(TEST_NAMESPACE) + .fields() + .name(fieldName).type(optionalFieldSchema).withDefault(null) + .endRecord(); + Schema nonNullBranch = ensureNonNullBranch(optionalFieldSchema); + + // Optional field with non-null value + runRoundTrip(unionRecordSchema, + record -> record.put(fieldName, avroValueSupplier.apply(nonNullBranch)), + icebergRecord -> valueAssertion.accept(icebergRecord.getField(fieldName)) + ); + + // Optional field with null value + runRoundTrip(unionRecordSchema, + record -> record.put(fieldName, null), + icebergRecord -> assertNull(icebergRecord.getField(fieldName)) + ); + } + + /** + * Writes a record whose map field may still contain Avro GenericRecord entries for non-string keys. + * The entries are rebound to Iceberg Records when necessary so the writer sees the expected structure. + */ + private void writeRecordWithConvertedMapEntries(Record icebergRecord, + org.apache.iceberg.Schema icebergSchema, + Schema entryAvroSchema) { + Object fieldValue = icebergRecord.getField("mapField"); + if (fieldValue instanceof List) { + List entries = (List) fieldValue; + if (!entries.isEmpty() && !(entries.get(0) instanceof Record)) { + org.apache.iceberg.types.Type elementType = + icebergSchema.findField("mapField").type().asListType().elementType(); + org.apache.iceberg.Schema entryIcebergSchema = elementType.asStructType().asSchema(); + RecordBinder entryBinder = new RecordBinder(entryIcebergSchema, entryAvroSchema); + List convertedEntries = new ArrayList<>(entries.size()); + for (Object entry : entries) { + convertedEntries.add(entryBinder.bind((GenericRecord) entry)); + } + org.apache.iceberg.data.GenericRecord copy = org.apache.iceberg.data.GenericRecord.create(icebergSchema.asStruct()); + copy.setField("mapField", convertedEntries); + testSendRecord(icebergSchema.asStruct().asSchema(), copy); + return; + } + } + testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord); + } + @Test public void testSchemaEvolution() { @@ -286,229 +390,92 @@ public void testWrapperReusability() { // Test method for converting a single string field @Test public void testStringConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"stringField\", \"type\": \"string\"}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("stringField", "test_string"); - - GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); - - // Verify the field value - assertEquals("test_string", icebergRecord.getField("stringField").toString()); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("String", "stringField", + () -> Schema.create(Schema.Type.STRING), + schema -> "test_string", + value -> assertEquals("test_string", value.toString()) + ); } // Test method for converting a single integer field @Test public void testIntegerConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"intField\", \"type\": \"int\"}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("intField", 42); - - GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); - - // Verify the field value - assertEquals(42, icebergRecord.getField("intField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Int", "intField", + () -> Schema.create(Schema.Type.INT), + schema -> 42, + value -> assertEquals(42, value) + ); } // Test method for converting a single long field @Test public void testLongConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"longField\", \"type\": \"long\"}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("longField", 123456789L); - - GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); - - // Verify the field value - assertEquals(123456789L, icebergRecord.getField("longField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Long", "longField", + () -> Schema.create(Schema.Type.LONG), + schema -> 123456789L, + value -> assertEquals(123456789L, value) + ); } // Test method for converting a single float field @Test public void testFloatConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"floatField\", \"type\": \"float\"}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("floatField", 3.14f); - - GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); - - // Verify the field value - assertEquals(3.14f, icebergRecord.getField("floatField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Float", "floatField", + () -> Schema.create(Schema.Type.FLOAT), + schema -> 3.14f, + value -> assertEquals(3.14f, (Float) value) + ); } // Test method for converting a single double field @Test public void testDoubleConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"doubleField\", \"type\": \"double\"}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("doubleField", 6.28); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(6.28, icebergRecord.getField("doubleField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Double", "doubleField", + () -> Schema.create(Schema.Type.DOUBLE), + schema -> 6.28, + value -> assertEquals(6.28, value) + ); } // Test method for converting a single boolean field @Test public void testBooleanConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"booleanField\", \"type\": \"boolean\"}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("booleanField", true); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(true, icebergRecord.getField("booleanField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Boolean", "booleanField", + () -> Schema.create(Schema.Type.BOOLEAN), + schema -> true, + value -> assertEquals(true, value) + ); } // Test method for converting a single date field (number of days from epoch) @Test public void testDateConversion() { - // Define Avro schema - String avroSchemaStr = "{\"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [ {\"name\": \"dateField\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}} ] }"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); LocalDate localDate = LocalDate.of(2020, 1, 1); int epochDays = (int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), localDate); - avroRecord.put("dateField", epochDays); // Represents 2020-01-01 - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(localDate, icebergRecord.getField("dateField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Date", "dateField", + () -> LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)), + schema -> epochDays, + value -> assertEquals(localDate, value) + ); } // Test method for converting a single time field (number of milliseconds from midnight) @Test public void testTimeConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"timeField\", \"type\": {\"type\": \"long\", \"logicalType\": \"time-micros\"}},\n" + - " {\"name\": \"timeField2\", \"type\": {\"type\": \"int\", \"logicalType\": \"time-millis\"}}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); LocalTime localTime = LocalTime.of(10, 0); long epochMicros = localTime.toNanoOfDay() / 1000; - avroRecord.put("timeField", epochMicros); // Represents 10:00 AM - int epochMillis = (int) (localTime.toNanoOfDay() / 1_000_000); - avroRecord.put("timeField2", epochMillis); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(localTime, icebergRecord.getField("timeField")); - assertEquals(localTime, icebergRecord.getField("timeField2")); + assertFieldRoundTrips("TimeMicros", "timeField", + () -> LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)), + schema -> epochMicros, + value -> assertEquals(localTime, value) + ); - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("TimeMillis", "timeField2", + () -> LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)), + schema -> epochMillis, + value -> assertEquals(localTime, value) + ); } // Test method for converting a single timestamp field (number of milliseconds from epoch) @@ -516,337 +483,177 @@ public void testTimeConversion() { // timestamptz: Stores microseconds from 1970-01-01 00:00:00.000000 UTC. [1] @Test public void testTimestampConversion() { - // Define Avro schema - // Avro type annotation adjust-to-utc is an Iceberg convention; default value is false if not present. - // Avro logical type timestamp-nanos is an Iceberg convention; the Avro specification does not define this type. - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"timestampField1\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\", \"adjust-to-utc\": true}},\n" + - " {\"name\": \"timestampField2\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\", \"adjust-to-utc\": false}},\n" + - " {\"name\": \"timestampField3\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\", \"adjust-to-utc\": true}},\n" + - " {\"name\": \"timestampField4\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\", \"adjust-to-utc\": false}}\n" + - " ]\n" + - " }\n"; - Schema avroSchema = new Schema.Parser().parse(avroSchemaStr); - - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - Instant instant = Instant.now(); + Instant instant = Instant.parse("2020-01-01T12:34:56.123456Z"); long timestampMicros = instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1_000; long timestampMillis = instant.toEpochMilli(); - avroRecord.put("timestampField1", timestampMicros); - avroRecord.put("timestampField2", timestampMicros); - avroRecord.put("timestampField3", timestampMillis); - avroRecord.put("timestampField4", timestampMillis); - - // Serialize and deserialize - GenericRecord deserializedRecord = serializeAndDeserialize(avroRecord, avroSchema); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(deserializedRecord); - - // Verify the field value - OffsetDateTime timestampField1 = (OffsetDateTime) icebergRecord.getField("timestampField1"); - assertEquals(DateTimeUtil.timestamptzFromMicros(timestampMicros), timestampField1); - - LocalDateTime timestampField2 = (LocalDateTime) icebergRecord.getField("timestampField2"); - assertEquals(DateTimeUtil.timestampFromMicros(timestampMicros), timestampField2); + Supplier timestampMicrosTzSchema = () -> { + Schema schema = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + schema.addProp("adjust-to-utc", true); + return schema; + }; + + Supplier timestampMicrosSchema = () -> { + Schema schema = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + schema.addProp("adjust-to-utc", false); + return schema; + }; + + Supplier timestampMillisTzSchema = () -> { + Schema schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); + schema.addProp("adjust-to-utc", true); + return schema; + }; + + Supplier timestampMillisSchema = () -> { + Schema schema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); + schema.addProp("adjust-to-utc", false); + return schema; + }; + + OffsetDateTime expectedMicrosTz = DateTimeUtil.timestamptzFromMicros(timestampMicros); + LocalDateTime expectedMicros = DateTimeUtil.timestampFromMicros(timestampMicros); + OffsetDateTime expectedMillisTz = DateTimeUtil.timestamptzFromMicros(timestampMillis * 1000); + LocalDateTime expectedMillis = DateTimeUtil.timestampFromMicros(timestampMillis * 1000); + + assertFieldRoundTrips("TimestampMicrosTz", "timestampField1", + timestampMicrosTzSchema, + schema -> timestampMicros, + value -> assertEquals(expectedMicrosTz, value) + ); - OffsetDateTime timestampField3 = (OffsetDateTime) icebergRecord.getField("timestampField3"); - assertEquals(DateTimeUtil.timestamptzFromMicros(timestampMillis * 1000), timestampField3); + assertFieldRoundTrips("TimestampMicros", "timestampField2", + timestampMicrosSchema, + schema -> timestampMicros, + value -> assertEquals(expectedMicros, value) + ); - LocalDateTime timestampField4 = (LocalDateTime) icebergRecord.getField("timestampField4"); - assertEquals(DateTimeUtil.timestampFromMicros(timestampMillis * 1000), timestampField4); + assertFieldRoundTrips("TimestampMillisTz", "timestampField3", + timestampMillisTzSchema, + schema -> timestampMillis, + value -> assertEquals(expectedMillisTz, value) + ); - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("TimestampMillis", "timestampField4", + timestampMillisSchema, + schema -> timestampMillis, + value -> assertEquals(expectedMillis, value) + ); } // Test method for converting a single binary field @Test public void testBinaryConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"binaryField\", \"type\": \"bytes\"}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); String randomAlphabetic = RandomStringUtils.randomAlphabetic(64); - avroRecord.put("binaryField", ByteBuffer.wrap(randomAlphabetic.getBytes(StandardCharsets.UTF_8))); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - ByteBuffer binaryField = (ByteBuffer) icebergRecord.getField("binaryField"); - assertEquals(randomAlphabetic, new String(binaryField.array(), StandardCharsets.UTF_8)); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Binary", "binaryField", + () -> Schema.create(Schema.Type.BYTES), + schema -> ByteBuffer.wrap(randomAlphabetic.getBytes(StandardCharsets.UTF_8)), + value -> { + ByteBuffer binaryField = (ByteBuffer) value; + assertEquals(randomAlphabetic, new String(binaryField.array(), StandardCharsets.UTF_8)); + } + ); } // Test method for converting a single fixed field @Test public void testFixedConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\n" + - " \"name\": \"fixedField\",\n" + - " \"type\": {\n" + - " \"type\": \"fixed\",\n" + - " \"name\": \"FixedField\",\n" + - " \"size\": 3\n" + - " }\n" + - " }\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - byte[] fixedBytes = "bar".getBytes(StandardCharsets.UTF_8); - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("fixedField", new GenericData.Fixed(avroSchema.getField("fixedField").schema(), fixedBytes)); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - byte[] result = (byte[]) icebergRecord.getField("fixedField"); - assertEquals("bar", new String(result, StandardCharsets.UTF_8)); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Fixed", "fixedField", + () -> Schema.createFixed("FixedField", null, null, 3), + schema -> new GenericData.Fixed(schema, "bar".getBytes(StandardCharsets.UTF_8)), + value -> assertEquals("bar", new String((byte[]) value, StandardCharsets.UTF_8)) + ); } // Test method for converting a single enum field @Test public void testEnumConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\n" + - " \"name\": \"enumField\",\n" + - " \"type\": {\n" + - " \"type\": \"enum\",\n" + - " \"name\": \"EnumField\",\n" + - " \"symbols\": [\"A\", \"B\", \"C\"]\n" + - " }\n" + - " }\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("enumField", new GenericData.EnumSymbol(avroSchema.getField("enumField").schema(), "B")); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals("B", icebergRecord.getField("enumField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Enum", "enumField", + () -> Schema.createEnum("EnumField", null, null, Arrays.asList("A", "B", "C")), + schema -> new GenericData.EnumSymbol(schema, "B"), + value -> assertEquals("B", value.toString()) + ); } // Test method for converting a single UUID field @Test public void testUUIDConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"uuidField\", \"type\": {\"type\": \"string\", \"logicalType\": \"uuid\"}}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); UUID uuid = UUID.randomUUID(); - CharSequence charSequence = new Conversions.UUIDConversion().toCharSequence(uuid, avroSchema, LogicalTypes.uuid()); - - avroRecord.put("uuidField", charSequence); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(uuid, UUIDUtil.convert((byte[]) icebergRecord.getField("uuidField"))); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("UUID", "uuidField", + () -> LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)), + schema -> new Conversions.UUIDConversion().toCharSequence(uuid, schema, LogicalTypes.uuid()), + value -> assertEquals(uuid, UUIDUtil.convert((byte[]) value)) + ); } // Test method for converting a single decimal field @Test public void testDecimalConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"decimalField\", \"type\": {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 9, \"scale\": 2}}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record BigDecimal bigDecimal = BigDecimal.valueOf(1000.00).setScale(2); - LogicalTypes.Decimal decimalType = LogicalTypes.decimal(9, 2); - byte[] decimalBytes = new Conversions.DecimalConversion().toBytes(bigDecimal, avroSchema, decimalType).array(); - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("decimalField", ByteBuffer.wrap(decimalBytes)); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(bigDecimal, icebergRecord.getField("decimalField")); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("Decimal", "decimalField", + () -> LogicalTypes.decimal(9, 2).addToSchema(Schema.create(Schema.Type.BYTES)), + schema -> { + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType(); + return new Conversions.DecimalConversion().toBytes(bigDecimal, schema, decimalType); + }, + value -> assertEquals(bigDecimal, value) + ); } // Test method for converting a list field @Test public void testListConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"listField\", \"type\": {\"type\": \"array\", \"items\": \"string\"}}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("listField", Arrays.asList("a", "b", "c")); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(Arrays.asList("a", "b", "c"), normalizeValue(icebergRecord.getField("listField"))); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + List expected = Arrays.asList("a", "b", "c"); + assertFieldRoundTrips("List", "listField", + () -> Schema.createArray(Schema.create(Schema.Type.STRING)), + schema -> new ArrayList<>(expected), + value -> assertEquals(expected, normalizeValue(value)) + ); } @Test - public void testUnionListConversion() { - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\n" + - " \"name\": \"listField\",\n" + - " \"type\": [\"null\", {\"type\": \"array\", \"items\": [\"null\", \"string\"]}],\n" + - " \"default\": null\n" + - " }\n" + - " ]\n" + - " }\n"; - - avroSchema = new Schema.Parser().parse(avroSchemaStr); - - Schema listSchema = avroSchema.getField("listField").schema().getTypes().stream() - .filter(s -> s.getType() == Schema.Type.ARRAY) - .findFirst() - .orElseThrow(() -> new IllegalStateException("UNION schema does not contain an ARRAY type")); - - GenericData.Array listValue = new GenericData.Array<>(3, listSchema); - listValue.add(new Utf8("a")); - listValue.add(null); - listValue.add(new Utf8("c")); - - GenericRecord avroRecord = new GenericData.Record(avroSchema); - avroRecord.put("listField", listValue); - - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - assertEquals(Arrays.asList("a", null, "c"), normalizeValue(icebergRecord.getField("listField"))); - - testSendRecord(icebergSchema, icebergRecord); + public void testListWithNullableElementsConversion() { + assertFieldRoundTrips("ListNullableElements", "listField", + () -> Schema.createArray(Schema.createUnion(Arrays.asList( + Schema.create(Schema.Type.NULL), + Schema.create(Schema.Type.STRING) + ))), + schema -> { + @SuppressWarnings("unchecked") + GenericData.Array listValue = new GenericData.Array<>(3, schema); + listValue.add(new Utf8("a")); + listValue.add(null); + listValue.add(new Utf8("c")); + return listValue; + }, + value -> assertEquals(Arrays.asList("a", null, "c"), normalizeValue(value)) + ); } // Test method for converting a map field @Test public void testStringMapConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"mapField\", \"type\": {\"type\": \"map\", \"values\": \"string\"}}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); Map map = new HashMap<>(); map.put("key1", "value1"); map.put("key2", "value2"); - avroRecord.put("mapField", map); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(map, normalizeValue(icebergRecord.getField("mapField"))); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("StringMap", "mapField", + () -> Schema.createMap(Schema.create(Schema.Type.STRING)), + schema -> new HashMap<>(map), + value -> assertEquals(map, normalizeValue(value)) + ); } // Test method for converting a map field @Test public void testIntMapConversion() { - // Define Avro schema - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\"name\": \"mapField\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}\n" + - " ]\n" + - " }\n"; - avroSchema = new Schema.Parser().parse(avroSchemaStr); - // Create Avro record - GenericRecord avroRecord = new GenericData.Record(avroSchema); Map map = new HashMap<>(); map.put("key1", 1); map.put("key2", 2); - avroRecord.put("mapField", map); - - // Convert Avro record to Iceberg record using the wrapper - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); - - // Verify the field value - assertEquals(map, normalizeValue(icebergRecord.getField("mapField"))); - - // Send the record to the table - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("IntMap", "mapField", + () -> Schema.createMap(Schema.create(Schema.Type.INT)), + schema -> new HashMap<>(map), + value -> assertEquals(map, normalizeValue(value)) + ); } // Test method for converting a map field with non-string keys @@ -907,35 +714,19 @@ public void testMapWithNonStringKeysConversion() { } @Test - public void testUnionStringMapConversion() { - String avroSchemaStr = " {\n" + - " \"type\": \"record\",\n" + - " \"name\": \"TestRecord\",\n" + - " \"fields\": [\n" + - " {\n" + - " \"name\": \"mapField\",\n" + - " \"type\": [\"null\", {\"type\": \"map\", \"values\": [\"null\", \"string\"]}],\n" + - " \"default\": null\n" + - " }\n" + - " ]\n" + - " }\n"; - - avroSchema = new Schema.Parser().parse(avroSchemaStr); - - GenericRecord avroRecord = new GenericData.Record(avroSchema); + public void testMapWithNullableValuesConversion() { Map expectedMap = new HashMap<>(); expectedMap.put("key1", "value1"); expectedMap.put("key2", null); - avroRecord.put("mapField", expectedMap); - GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); - - org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); - Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); - - assertEquals(expectedMap, normalizeValue(icebergRecord.getField("mapField"))); - - testSendRecord(icebergSchema, icebergRecord); + assertFieldRoundTrips("NullableValueMap", "mapField", + () -> Schema.createMap(Schema.createUnion(Arrays.asList( + Schema.create(Schema.Type.NULL), + Schema.create(Schema.Type.STRING) + ))), + schema -> new HashMap<>(expectedMap), + value -> assertEquals(expectedMap, normalizeValue(value)) + ); } // Test method for converting a record with nested fields @@ -1225,7 +1016,8 @@ public void testFieldCountStatistics() { " {\"name\": \"smallString\", \"type\": \"string\"},\n" + " {\"name\": \"largeString\", \"type\": \"string\"},\n" + " {\"name\": \"intField\", \"type\": \"int\"},\n" + - " {\"name\": \"binaryField\", \"type\": \"bytes\"}\n" + + " {\"name\": \"binaryField\", \"type\": \"bytes\"},\n" + + " {\"name\": \"optionalStringField\", \"type\": [\"null\", \"string\"], \"default\": null}\n" + " ]\n" + "}"; @@ -1239,6 +1031,7 @@ public void testFieldCountStatistics() { avroRecord.put("largeString", "a".repeat(50)); // 50 chars = 3 + 50/32 = 4 avroRecord.put("intField", 42); // primitive = 1 field avroRecord.put("binaryField", ByteBuffer.wrap("test".repeat(10).getBytes())); // 5 + avroRecord.put("optionalStringField", "optional"); // Bind record - this should trigger field counting Record icebergRecord = recordBinder.bind(avroRecord); @@ -1248,15 +1041,16 @@ public void testFieldCountStatistics() { assertEquals("a".repeat(50), icebergRecord.getField("largeString")); assertEquals(42, icebergRecord.getField("intField")); assertEquals("test".repeat(10), new String(((ByteBuffer) icebergRecord.getField("binaryField")).array())); + assertEquals("optional", icebergRecord.getField("optionalStringField").toString()); long fieldCount = recordBinder.getAndResetFieldCount(); - assertEquals(13, fieldCount); + assertEquals(16, fieldCount); // Second call should return 0 (reset) assertEquals(0, recordBinder.getAndResetFieldCount()); testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord); - assertEquals(13, recordBinder.getAndResetFieldCount()); + assertEquals(16, recordBinder.getAndResetFieldCount()); } @Test @@ -1417,4 +1211,34 @@ public void testFieldCountWithNullValues() { testSendRecord(icebergSchema.asStruct().asSchema(), icebergRecord); assertEquals(3, recordBinder.getAndResetFieldCount()); } + + @Test + public void testFieldCountWithUnionFields() { + String avroSchemaStr = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"UnionCountRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"optionalString\", \"type\": [\"null\", \"string\"], \"default\": null}\n" + + " ]\n" + + "}"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaStr); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema); + + GenericRecord nonNullRecord = new GenericData.Record(avroSchema); + nonNullRecord.put("optionalString", "value"); + + Record icebergRecord = recordBinder.bind(nonNullRecord); + assertEquals("value", icebergRecord.getField("optionalString").toString()); + + assertEquals(3, recordBinder.getAndResetFieldCount()); + + GenericRecord nullRecord = new GenericData.Record(avroSchema); + nullRecord.put("optionalString", null); + + Record nullIcebergRecord = recordBinder.bind(nullRecord); + assertNull(nullIcebergRecord.getField("optionalString")); + assertEquals(0, recordBinder.getAndResetFieldCount()); + } } From 31939b97a6893f06c3fad758da7b4f1763821e6e Mon Sep 17 00:00:00 2001 From: Gezi-lzq Date: Mon, 3 Nov 2025 10:45:43 +0800 Subject: [PATCH 2/2] test(avro): add missing import for AvroRecordBinderTest --- .../java/kafka/automq/table/binder/AvroRecordBinderTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java index 9b25c1e52d..ab589a68b1 100644 --- a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java +++ b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java @@ -20,6 +20,7 @@ package kafka.automq.table.binder; import com.google.common.collect.ImmutableMap; + import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema;