Skip to content

Commit 18340f2

Browse files
authored
feat(avro): simplify union handling in AvroValueAdapter and add support for nested union types (#2916)
1 parent f41f5f0 commit 18340f2

File tree

3 files changed

+75
-87
lines changed

3 files changed

+75
-87
lines changed

core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,6 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types
118118
@Override
119119
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
120120
Schema listSchema = sourceSchema;
121-
if (listSchema.getType() == Schema.Type.UNION) {
122-
listSchema = listSchema.getTypes().stream()
123-
.filter(s -> s.getType() == Schema.Type.ARRAY)
124-
.findFirst()
125-
.orElseThrow(() -> new IllegalStateException(
126-
"UNION schema does not contain an ARRAY type: " + sourceSchema));
127-
}
128-
129121
Schema elementSchema = listSchema.getElementType();
130122

131123
List<?> sourceList;
@@ -151,8 +143,7 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
151143
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
152144
Map<Object, Object> recordMap = new HashMap<>(arrayValue.size());
153145

154-
Schema kvSchema = resolveUnionElement(sourceSchema.getElementType(), Schema.Type.RECORD,
155-
"Map element UNION schema does not contain a RECORD type");
146+
Schema kvSchema = sourceSchema.getElementType();
156147

157148
Schema.Field keyField = kvSchema.getFields().get(0);
158149
Schema.Field valueField = kvSchema.getFields().get(1);
@@ -177,8 +168,7 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
177168
return recordMap;
178169
}
179170

180-
Schema mapSchema = resolveUnionElement(sourceSchema, Schema.Type.MAP,
181-
"UNION schema does not contain a MAP type");
171+
Schema mapSchema = sourceSchema;
182172

183173
Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
184174
Map<Object, Object> adaptedMap = new HashMap<>(sourceMap.size());
@@ -195,21 +185,4 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
195185
}
196186
return adaptedMap;
197187
}
198-
199-
private Schema resolveUnionElement(Schema schema, Schema.Type expectedType, String errorMessage) {
200-
Schema resolved = schema;
201-
if (schema.getType() == Schema.Type.UNION) {
202-
resolved = null;
203-
for (Schema unionMember : schema.getTypes()) {
204-
if (unionMember.getType() == expectedType) {
205-
resolved = unionMember;
206-
break;
207-
}
208-
}
209-
if (resolved == null) {
210-
throw new IllegalStateException(errorMessage + ": " + schema);
211-
}
212-
}
213-
return resolved;
214-
}
215188
}

core/src/main/java/kafka/automq/table/binder/RecordBinder.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.Map;
3535
import java.util.concurrent.atomic.AtomicLong;
3636

37+
import static org.apache.avro.Schema.Type.NULL;
38+
3739
/**
3840
* A factory that creates lazy-evaluation Record views of Avro GenericRecords.
3941
* Field values are converted only when accessed, avoiding upfront conversion overhead.
@@ -154,9 +156,26 @@ private FieldMapping createOptimizedMapping(String avroFieldName, int avroPositi
154156
nestedSchema = icebergType.asStructType().asSchema();
155157
nestedSchemaId = icebergType.toString();
156158
}
159+
if (Type.TypeID.MAP.equals(icebergType.typeId()) || Type.TypeID.LIST.equals(icebergType.typeId())) {
160+
avroType = resolveUnionElement(avroType);
161+
}
157162
return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId);
158163
}
159164

165+
private Schema resolveUnionElement(Schema schema) {
166+
Schema resolved = schema;
167+
if (schema.getType() == Schema.Type.UNION) {
168+
resolved = null;
169+
for (Schema unionMember : schema.getTypes()) {
170+
if (unionMember.getType() != NULL) {
171+
resolved = unionMember;
172+
break;
173+
}
174+
}
175+
}
176+
return resolved;
177+
}
178+
160179

161180
/**
162181
* Pre-computes RecordBinders for nested STRUCT fields.

core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java

Lines changed: 54 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -938,64 +938,6 @@ public void testUnionStringMapConversion() {
938938
testSendRecord(icebergSchema, icebergRecord);
939939
}
940940

941-
@Test
942-
public void testUnionArrayMapConversion() {
943-
String avroSchemaStr = " {\n" +
944-
" \"type\": \"record\",\n" +
945-
" \"name\": \"TestRecord\",\n" +
946-
" \"fields\": [\n" +
947-
" {\n" +
948-
" \"name\": \"mapField\",\n" +
949-
" \"type\": {\n" +
950-
" \"type\": \"array\",\n" +
951-
" \"logicalType\": \"map\",\n" +
952-
" \"items\": [\n" +
953-
" \"null\",\n" +
954-
" {\n" +
955-
" \"type\": \"record\",\n" +
956-
" \"name\": \"UnionMapEntry\",\n" +
957-
" \"fields\": [\n" +
958-
" {\"name\": \"key\", \"type\": \"int\"},\n" +
959-
" {\"name\": \"value\", \"type\": \"string\"}\n" +
960-
" ]\n" +
961-
" }\n" +
962-
" ]\n" +
963-
" }\n" +
964-
" }\n" +
965-
" ]\n" +
966-
" }\n";
967-
968-
avroSchema = new Schema.Parser().parse(avroSchemaStr);
969-
970-
Map<Integer, String> expectedMap = new HashMap<>();
971-
expectedMap.put(10, "alpha");
972-
expectedMap.put(20, "beta");
973-
974-
Schema mapFieldSchema = avroSchema.getField("mapField").schema();
975-
Schema elementUnionSchema = mapFieldSchema.getElementType();
976-
Schema entrySchema = elementUnionSchema.getTypes().stream()
977-
.filter(s -> s.getType() == Schema.Type.RECORD)
978-
.findFirst()
979-
.orElseThrow(() -> new IllegalStateException("Array element UNION schema does not contain a RECORD type"));
980-
981-
GenericData.Array<Object> mapEntries = new GenericData.Array<>(expectedMap.size() + 1, mapFieldSchema);
982-
for (Map.Entry<Integer, String> entry : expectedMap.entrySet()) {
983-
GenericRecord mapEntry = new GenericData.Record(entrySchema);
984-
mapEntry.put("key", entry.getKey());
985-
mapEntry.put("value", entry.getValue());
986-
mapEntries.add(mapEntry);
987-
}
988-
mapEntries.add(null);
989-
990-
AvroValueAdapter adapter = new AvroValueAdapter();
991-
Types.MapType mapType = Types.MapType.ofOptional(1, 2, Types.IntegerType.get(), Types.StringType.get());
992-
993-
@SuppressWarnings("unchecked")
994-
Map<Integer, Object> result = (Map<Integer, Object>) adapter.convert(mapEntries, mapFieldSchema, mapType);
995-
996-
assertEquals(expectedMap, result);
997-
}
998-
999941
// Test method for converting a record with nested fields
1000942
@Test
1001943
public void testNestedRecordConversion() {
@@ -1132,6 +1074,40 @@ public void testUnionFieldConversion() {
11321074
" {\n" +
11331075
" \"name\": \"unionField4\",\n" +
11341076
" \"type\": [\"null\", \"string\"]\n" +
1077+
" },\n" +
1078+
" {\n" +
1079+
" \"name\": \"unionListField\",\n" +
1080+
" \"type\": [\n" +
1081+
" \"null\",\n" +
1082+
" {\n" +
1083+
" \"type\": \"array\",\n" +
1084+
" \"items\": \"string\"\n" +
1085+
" }\n" +
1086+
" ]\n" +
1087+
" },\n" +
1088+
" {\n" +
1089+
" \"name\": \"unionMapField\",\n" +
1090+
" \"type\": [\n" +
1091+
" \"null\",\n" +
1092+
" {\n" +
1093+
" \"type\": \"map\",\n" +
1094+
" \"values\": \"int\"\n" +
1095+
" }\n" +
1096+
" ]\n" +
1097+
" },\n" +
1098+
" {\n" +
1099+
" \"name\": \"unionStructField\",\n" +
1100+
" \"type\": [\n" +
1101+
" \"null\",\n" +
1102+
" {\n" +
1103+
" \"type\": \"record\",\n" +
1104+
" \"name\": \"UnionStruct\",\n" +
1105+
" \"fields\": [\n" +
1106+
" {\"name\": \"innerString\", \"type\": \"string\"},\n" +
1107+
" {\"name\": \"innerInt\", \"type\": \"int\"}\n" +
1108+
" ]\n" +
1109+
" }\n" +
1110+
" ]\n" +
11351111
" }\n" +
11361112
" ]\n" +
11371113
" }\n";
@@ -1141,6 +1117,17 @@ public void testUnionFieldConversion() {
11411117
avroRecord.put("unionField1", "union_string");
11421118
avroRecord.put("unionField2", 42);
11431119
avroRecord.put("unionField3", true);
1120+
List<String> unionList = Arrays.asList("item1", "item2");
1121+
avroRecord.put("unionListField", unionList);
1122+
Map<String, Integer> unionMap = new HashMap<>();
1123+
unionMap.put("one", 1);
1124+
unionMap.put("two", 2);
1125+
avroRecord.put("unionMapField", unionMap);
1126+
Schema unionStructSchema = avroSchema.getField("unionStructField").schema().getTypes().get(1);
1127+
GenericRecord unionStruct = new GenericData.Record(unionStructSchema);
1128+
unionStruct.put("innerString", "nested");
1129+
unionStruct.put("innerInt", 99);
1130+
avroRecord.put("unionStructField", unionStruct);
11441131

11451132
// Convert Avro record to Iceberg record using the wrapper
11461133
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
@@ -1156,6 +1143,15 @@ public void testUnionFieldConversion() {
11561143
Object unionField3 = icebergRecord.getField("unionField3");
11571144
assertEquals(true, unionField3);
11581145

1146+
assertNull(icebergRecord.getField("unionField4"));
1147+
1148+
assertEquals(unionList, normalizeValue(icebergRecord.getField("unionListField")));
1149+
assertEquals(unionMap, normalizeValue(icebergRecord.getField("unionMapField")));
1150+
1151+
Record unionStructRecord = (Record) icebergRecord.getField("unionStructField");
1152+
assertEquals("nested", unionStructRecord.getField("innerString").toString());
1153+
assertEquals(99, unionStructRecord.getField("innerInt"));
1154+
11591155
// Send the record to the table
11601156
testSendRecord(icebergSchema, icebergRecord);
11611157
}

0 commit comments

Comments
 (0)