From aae52848ec38f1d5e724d2dc34a8a474bf84a850 Mon Sep 17 00:00:00 2001 From: Shenghang Date: Mon, 15 Dec 2025 14:55:58 +0800 Subject: [PATCH 01/14] [Fix][Protobuf] Ensure case-sensitive field names are preserved in Protobuf serialization and deserialization --- .../e2e/connector/kafka/KafkaIT.java | 171 ++++++++++++ ...fake_to_kafka_protobuf_case_sensitive.conf | 86 ++++++ ...fka_protobuf_case_sensitive_to_assert.conf | 106 ++++++++ .../protobuf/RowToProtobufConverter.java | 3 +- .../protobuf/ProtobufConverterTest.java | 246 ++++++++++++++++++ 5 files changed, 610 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 7d717056b00..50b34658735 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -1623,6 +1623,177 @@ public void testKafkaProtobufForTransformToAssert(TestContainer container) } } + @TestTemplate + public void testProtobufCaseSensitiveFieldNames(TestContainer container) + throws IOException, InterruptedException, URISyntaxException { + Container.ExecResult execResult = + container.executeJob("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String path = getTestConfigFile("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf"); + Config config = ConfigFactory.parseFile(new File(path)); + Config sinkConfig = config.getConfigList("sink").get(0); + + Map schemaProperties = new HashMap<>(); + schemaProperties.put( + "protobuf_message_name", sinkConfig.getString("protobuf_message_name")); + schemaProperties.put("protobuf_schema", sinkConfig.getString("protobuf_schema")); + + SeaTunnelRowType nestedType = + new SeaTunnelRowType( + new String[] {"NestedField", "AnotherField"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + + SeaTunnelRowType seaTunnelRowType = + new SeaTunnelRowType( + new String[] { + "MyIntField", + "CamelCaseString", + "snake_case_field", + "NestedObject", + "MyMapField" + }, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + nestedType, + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + + TableSchema schema = + TableSchema.builder() + .columns( + Arrays.asList( + IntStream.range(0, seaTunnelRowType.getTotalFields()) + .mapToObj( + i -> + PhysicalColumn.of( + seaTunnelRowType + .getFieldName(i), + seaTunnelRowType + .getFieldType(i), + 0, + true, + null, + null)) + .toArray(PhysicalColumn[]::new))) + .build(); + + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("", "", "", "test"), + schema, + schemaProperties, + Collections.emptyList(), + "It is converted from RowType and only has column information."); + + ProtobufDeserializationSchema deserializationSchema = + new ProtobufDeserializationSchema(catalogTable); + + List kafkaSTRow = + getKafkaSTRow( + "test_protobuf_case_sensitive_topic", + value -> { + try { + return deserializationSchema.deserialize(value); + } catch (IOException e) { + throw new RuntimeException("Error deserializing Kafka message", e); + } + }); + + Assertions.assertEquals(16, kafkaSTRow.size()); + + kafkaSTRow.forEach( + row -> { + Assertions.assertAll( + "Verify case-sensitive field values", + () -> Assertions.assertNotNull(row.getField(0)), // MyIntField + () -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString + () -> Assertions.assertNotNull(row.getField(2)), // snake_case_field + () -> { + SeaTunnelRow nestedRow = (SeaTunnelRow) row.getField(3); + if (nestedRow != null) { + Assertions.assertNotNull(nestedRow.getField(0)); // NestedField + Assertions.assertNotNull(nestedRow.getField(1)); // AnotherField + } + }, + () -> { + @SuppressWarnings("unchecked") + Map mapField = + (Map) row.getField(4); + if (mapField != null) { + Assertions.assertNotNull(mapField); + } + }); + }); + } + + @TestTemplate + public void testProtobufCaseSensitiveToAssert(TestContainer container) + throws IOException, InterruptedException, URISyntaxException { + + String confFile = "/protobuf/kafka_protobuf_case_sensitive_to_assert.conf"; + String path = getTestConfigFile(confFile); + Config config = ConfigFactory.parseFile(new File(path)); + Config sourceConfig = config.getConfigList("source").get(0); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sourceConfig); + + SeaTunnelRowType nestedType = + new SeaTunnelRowType( + new String[] {"NestedField", "AnotherField"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + + SeaTunnelRowType seaTunnelRowType = + new SeaTunnelRowType( + new String[] { + "MyIntField", + "CamelCaseString", + "snake_case_field", + "NestedObject", + "MyMapField" + }, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + nestedType, + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + + DefaultSeaTunnelRowSerializer serializer = + getDefaultSeaTunnelRowSerializer( + "test_protobuf_case_sensitive_topic", seaTunnelRowType, readonlyConfig); + + SeaTunnelRow nestedRow = new SeaTunnelRow(2); + nestedRow.setField(0, "nested_value"); + nestedRow.setField(1, 999); + + Map mapData = new HashMap<>(); + mapData.put("key1", 100); + mapData.put("key2", 200); + + for (int i = 0; i < 16; i++) { + SeaTunnelRow row = new SeaTunnelRow(5); + row.setField(0, i); + row.setField(1, "test_string_" + i); + row.setField(2, "snake_value_" + i); + row.setField(3, nestedRow); + row.setField(4, mapData); + + ProducerRecord producerRecord = serializer.serializeRow(row); + try { + producer.send(producerRecord).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error sending Kafka message", e); + } + } + producer.flush(); + + Container.ExecResult execResult = container.executeJob(confFile); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { URL resource = KafkaIT.class.getResource(configFile); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf new file mode 100644 index 00000000000..507f6f00e4f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # spark config + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + +} +source { + FakeSource { + parallelism = 1 + plugin_output = "fake" + row.num = 16 + schema = { + fields { + MyIntField = int + CamelCaseString = string + snake_case_field = string + + NestedObject { + NestedField = string + AnotherField = int + } + MyMapField = "map" + } + } + } +} + +sink { + kafka { + topic = "test_protobuf_case_sensitive_topic" + bootstrap.servers = "kafkaCluster:9092" + format = protobuf + kafka.request.timeout.ms = 60000 + kafka.config = { + acks = "all" + request.timeout.ms = 60000 + buffer.memory = 33554432 + } + protobuf_message_name = TestCaseSensitive + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufCaseSensitiveE2E"; + + message TestCaseSensitive { + int32 MyIntField = 1; + string CamelCaseString = 2; + string snake_case_field = 3; + + message NestedObject { + string NestedField = 1; + int32 AnotherField = 2; + } + + NestedObject nestedObject = 4; + + map MyMapField = 5; + } + """ + } +} + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf new file mode 100644 index 00000000000..eb423b31e2c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf @@ -0,0 +1,106 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # spark config + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + topic = "test_protobuf_case_sensitive_topic" + format = protobuf + protobuf_message_name = TestCaseSensitive + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufCaseSensitiveE2E"; + + message TestCaseSensitive { + int32 MyIntField = 1; + string CamelCaseString = 2; + string snake_case_field = 3; + + message NestedObject { + string NestedField = 1; + int32 AnotherField = 2; + } + + NestedObject nestedObject = 4; + + map MyMapField = 5; + } + """ + schema = { + fields { + MyIntField = int + CamelCaseString = string + snake_case_field = string + + NestedObject { + NestedField = string + AnotherField = int + } + MyMapField = "map" + } + } + bootstrap.servers = "kafkaCluster:9092" + start_mode = "earliest" + plugin_output = "kafka_table" + } +} + +sink { + Assert { + plugin_input = "kafka_table" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 16 + }, + { + rule_type = MIN_ROW + rule_value = 16 + } + ], + field_rules = [ + { + field_name = MyIntField + field_type = int + }, + { + field_name = CamelCaseString + field_type = string + }, + { + field_name = snake_case_field + field_type = string + } + ] + } + } +} + diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java index a6a75aaee3f..3f0bf8206d9 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java @@ -56,8 +56,7 @@ public byte[] convertRowToGenericRecord(SeaTunnelRow element) { if (resolvedValue instanceof byte[]) { resolvedValue = ByteString.copyFrom((byte[]) resolvedValue); } - builder.setField( - descriptor.findFieldByName(fieldName.toLowerCase()), resolvedValue); + builder.setField(descriptor.findFieldByName(fieldName), resolvedValue); } } diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java index e3d6dd35a2d..d5448cb0627 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java @@ -152,4 +152,250 @@ public void testConverter() Assertions.assertEquals(originalRow, convertedRow); } + + @Test + public void testFieldNameCaseSensitive() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test that field names are case-sensitive and not converted to lowercase + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"MyIntField", "CamelCaseString", "snake_case_field"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, 100); + row.setField(1, "test"); + row.setField(2, "value"); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 MyIntField = 1;\n" + + " string CamelCaseString = 2;\n" + + " string snake_case_field = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + 100, dynamicMessage.getField(descriptor.findFieldByName("MyIntField"))); + Assertions.assertEquals( + "test", dynamicMessage.getField(descriptor.findFieldByName("CamelCaseString"))); + Assertions.assertEquals( + "value", dynamicMessage.getField(descriptor.findFieldByName("snake_case_field"))); + } + + @Test + public void testNullFieldHandling() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test that null fields are handled correctly + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"field1", "field2", "field3"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, 42); + row.setField(1, null); // null value + row.setField(2, true); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 field1 = 1;\n" + + " string field2 = 2;\n" + + " bool field3 = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals(42, dynamicMessage.getField(descriptor.findFieldByName("field1"))); + // In proto3, null string fields default to empty string + Assertions.assertEquals("", dynamicMessage.getField(descriptor.findFieldByName("field2"))); + Assertions.assertEquals( + true, dynamicMessage.getField(descriptor.findFieldByName("field3"))); + } + + @Test + public void testNestedRowTypeWithCaseSensitiveFields() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test nested row type with case-sensitive field names + SeaTunnelRowType nestedType = + new SeaTunnelRowType( + new String[] {"NestedField", "AnotherField"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"TopLevelField", "MyNestedObject"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, nestedType}); + + SeaTunnelRow nestedRow = new SeaTunnelRow(2); + nestedRow.setField(0, "nested_value"); + nestedRow.setField(1, 999); + + SeaTunnelRow row = new SeaTunnelRow(2); + row.setField(0, "top_value"); + row.setField(1, nestedRow); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " string TopLevelField = 1;\n" + + " message MyNestedObject {\n" + + " string NestedField = 1;\n" + + " int32 AnotherField = 2;\n" + + " }\n" + + " MyNestedObject myNestedObject = 2;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + "top_value", dynamicMessage.getField(descriptor.findFieldByName("TopLevelField"))); + + DynamicMessage nestedMessage = + (DynamicMessage) + dynamicMessage.getField(descriptor.findFieldByName("myNestedObject")); + Descriptors.Descriptor nestedDescriptor = descriptor.findNestedTypeByName("MyNestedObject"); + Assertions.assertEquals( + "nested_value", + nestedMessage.getField(nestedDescriptor.findFieldByName("NestedField"))); + Assertions.assertEquals( + 999, nestedMessage.getField(nestedDescriptor.findFieldByName("AnotherField"))); + } + + @Test + public void testMapTypeWithCaseSensitiveFieldName() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test map type with case-sensitive field name + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"MyMapField"}, + new SeaTunnelDataType[] { + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + + Map mapData = new HashMap<>(); + mapData.put("key1", 100); + mapData.put("key2", 200); + + SeaTunnelRow row = new SeaTunnelRow(1); + row.setField(0, mapData); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " map MyMapField = 1;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + @SuppressWarnings("unchecked") + java.util.List mapEntries = + (java.util.List) + dynamicMessage.getField(descriptor.findFieldByName("MyMapField")); + Assertions.assertEquals(2, mapEntries.size()); + } + + @Test + public void testTinyIntUnsignedConversion() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test TINYINT to unsigned int conversion + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"tinyint_field"}, + new SeaTunnelDataType[] {BasicType.BYTE_TYPE}); + + SeaTunnelRow row = new SeaTunnelRow(1); + row.setField(0, (byte) -1); // -1 as signed byte should become 255 as unsigned + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 tinyint_field = 1;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + 255, dynamicMessage.getField(descriptor.findFieldByName("tinyint_field"))); + } + + @Test + public void testAllNullFields() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test when all fields are null + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"field1", "field2", "field3"}, + new SeaTunnelDataType[] { + BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.DOUBLE_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, null); + row.setField(1, null); + row.setField(2, null); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " string field1 = 1;\n" + + " int32 field2 = 2;\n" + + " double field3 = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + // Should not throw exception, and produce valid protobuf message with default values + Assertions.assertNotNull(protobufMessage); + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertNotNull(dynamicMessage); + } } From 11097d11b28f13ef5323595e57926d3389eec035 Mon Sep 17 00:00:00 2001 From: Shenghang Date: Tue, 16 Dec 2025 11:58:27 +0800 Subject: [PATCH 02/14] [Fix][Protobuf] Add tests to verify case-sensitive field names in Protobuf serialization and deserialization --- .../connector-kafka-e2e/pom.xml | 7 + .../e2e/connector/kafka/KafkaIT.java | 397 ++++++++++-------- 2 files changed, 233 insertions(+), 171 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml index d27478d52c1..e8877b5f5d8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml @@ -115,6 +115,13 @@ ${testcontainer.version} test + + org.apache.seatunnel + connector-common + ${project.version} + test-jar + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 50b34658735..4fe2b4104ef 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -39,8 +39,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions; import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils; +import org.apache.seatunnel.connectors.seatunnel.source.SourceFlowTestUtils; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.EngineType; @@ -1623,177 +1628,6 @@ public void testKafkaProtobufForTransformToAssert(TestContainer container) } } - @TestTemplate - public void testProtobufCaseSensitiveFieldNames(TestContainer container) - throws IOException, InterruptedException, URISyntaxException { - Container.ExecResult execResult = - container.executeJob("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - - String path = getTestConfigFile("/protobuf/fake_to_kafka_protobuf_case_sensitive.conf"); - Config config = ConfigFactory.parseFile(new File(path)); - Config sinkConfig = config.getConfigList("sink").get(0); - - Map schemaProperties = new HashMap<>(); - schemaProperties.put( - "protobuf_message_name", sinkConfig.getString("protobuf_message_name")); - schemaProperties.put("protobuf_schema", sinkConfig.getString("protobuf_schema")); - - SeaTunnelRowType nestedType = - new SeaTunnelRowType( - new String[] {"NestedField", "AnotherField"}, - new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); - - SeaTunnelRowType seaTunnelRowType = - new SeaTunnelRowType( - new String[] { - "MyIntField", - "CamelCaseString", - "snake_case_field", - "NestedObject", - "MyMapField" - }, - new SeaTunnelDataType[] { - BasicType.INT_TYPE, - BasicType.STRING_TYPE, - BasicType.STRING_TYPE, - nestedType, - new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) - }); - - TableSchema schema = - TableSchema.builder() - .columns( - Arrays.asList( - IntStream.range(0, seaTunnelRowType.getTotalFields()) - .mapToObj( - i -> - PhysicalColumn.of( - seaTunnelRowType - .getFieldName(i), - seaTunnelRowType - .getFieldType(i), - 0, - true, - null, - null)) - .toArray(PhysicalColumn[]::new))) - .build(); - - CatalogTable catalogTable = - CatalogTable.of( - TableIdentifier.of("", "", "", "test"), - schema, - schemaProperties, - Collections.emptyList(), - "It is converted from RowType and only has column information."); - - ProtobufDeserializationSchema deserializationSchema = - new ProtobufDeserializationSchema(catalogTable); - - List kafkaSTRow = - getKafkaSTRow( - "test_protobuf_case_sensitive_topic", - value -> { - try { - return deserializationSchema.deserialize(value); - } catch (IOException e) { - throw new RuntimeException("Error deserializing Kafka message", e); - } - }); - - Assertions.assertEquals(16, kafkaSTRow.size()); - - kafkaSTRow.forEach( - row -> { - Assertions.assertAll( - "Verify case-sensitive field values", - () -> Assertions.assertNotNull(row.getField(0)), // MyIntField - () -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString - () -> Assertions.assertNotNull(row.getField(2)), // snake_case_field - () -> { - SeaTunnelRow nestedRow = (SeaTunnelRow) row.getField(3); - if (nestedRow != null) { - Assertions.assertNotNull(nestedRow.getField(0)); // NestedField - Assertions.assertNotNull(nestedRow.getField(1)); // AnotherField - } - }, - () -> { - @SuppressWarnings("unchecked") - Map mapField = - (Map) row.getField(4); - if (mapField != null) { - Assertions.assertNotNull(mapField); - } - }); - }); - } - - @TestTemplate - public void testProtobufCaseSensitiveToAssert(TestContainer container) - throws IOException, InterruptedException, URISyntaxException { - - String confFile = "/protobuf/kafka_protobuf_case_sensitive_to_assert.conf"; - String path = getTestConfigFile(confFile); - Config config = ConfigFactory.parseFile(new File(path)); - Config sourceConfig = config.getConfigList("source").get(0); - ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sourceConfig); - - SeaTunnelRowType nestedType = - new SeaTunnelRowType( - new String[] {"NestedField", "AnotherField"}, - new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); - - SeaTunnelRowType seaTunnelRowType = - new SeaTunnelRowType( - new String[] { - "MyIntField", - "CamelCaseString", - "snake_case_field", - "NestedObject", - "MyMapField" - }, - new SeaTunnelDataType[] { - BasicType.INT_TYPE, - BasicType.STRING_TYPE, - BasicType.STRING_TYPE, - nestedType, - new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) - }); - - DefaultSeaTunnelRowSerializer serializer = - getDefaultSeaTunnelRowSerializer( - "test_protobuf_case_sensitive_topic", seaTunnelRowType, readonlyConfig); - - SeaTunnelRow nestedRow = new SeaTunnelRow(2); - nestedRow.setField(0, "nested_value"); - nestedRow.setField(1, 999); - - Map mapData = new HashMap<>(); - mapData.put("key1", 100); - mapData.put("key2", 200); - - for (int i = 0; i < 16; i++) { - SeaTunnelRow row = new SeaTunnelRow(5); - row.setField(0, i); - row.setField(1, "test_string_" + i); - row.setField(2, "snake_value_" + i); - row.setField(3, nestedRow); - row.setField(4, mapData); - - ProducerRecord producerRecord = serializer.serializeRow(row); - try { - producer.send(producerRecord).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Error sending Kafka message", e); - } - } - producer.flush(); - - Container.ExecResult execResult = container.executeJob(confFile); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); - } - public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { URL resource = KafkaIT.class.getResource(configFile); @@ -2184,6 +2018,227 @@ private SeaTunnelRow buildSeaTunnelRow() { return seaTunnelRow; } + @TestTemplate + public void testProtobufCaseSensitiveFieldNames(TestContainer container) throws Exception { + SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType(); + + Map schemaProperties = new HashMap<>(); + schemaProperties.put("protobuf_message_name", "TestMessage"); + schemaProperties.put( + "protobuf_schema", + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 MyIntField = 1;\n" + + " string CamelCaseString = 2;\n" + + " string snake_case_field = 3;\n" + + " message NestedObject {\n" + + " string NestedField = 1;\n" + + " int32 AnotherField = 2;\n" + + " }\n" + + " NestedObject nestedObject = 4;\n" + + " map MyMapField = 5;\n" + + "}"); + + TableSchema schema = + TableSchema.builder() + .columns( + Arrays.asList( + IntStream.range(0, seaTunnelRowType.getTotalFields()) + .mapToObj( + i -> + PhysicalColumn.of( + seaTunnelRowType + .getFieldName(i), + seaTunnelRowType + .getFieldType(i), + 0, + true, + null, + null)) + .toArray(PhysicalColumn[]::new))) + .build(); + + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("", "", "", "test"), + schema, + schemaProperties, + Collections.emptyList(), + "Protobuf case-sensitive test"); + + Map config = new HashMap<>(); + config.put(KafkaBaseOptions.TOPIC.key(), "test_protobuf_case_sensitive_topic"); + config.put(KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers()); + config.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF); + config.put("protobuf_message_name", "TestMessage"); + config.put( + "protobuf_schema", + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 MyIntField = 1;\n" + + " string CamelCaseString = 2;\n" + + " string snake_case_field = 3;\n" + + " message NestedObject {\n" + + " string NestedField = 1;\n" + + " int32 AnotherField = 2;\n" + + " }\n" + + " NestedObject nestedObject = 4;\n" + + " map MyMapField = 5;\n" + + "}"); + + List rows = createCaseSensitiveTestRows(); + + // Use SinkFlowTestUtils to write data to Kafka + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + catalogTable, ReadonlyConfig.fromMap(config), new KafkaSinkFactory(), rows); + + // Verify data from Kafka + ProtobufDeserializationSchema deserializationSchema = + new ProtobufDeserializationSchema(catalogTable); + + List kafkaSTRow = + getKafkaSTRow( + "test_protobuf_case_sensitive_topic", + value -> { + try { + return deserializationSchema.deserialize(value); + } catch (IOException e) { + throw new RuntimeException("Error deserializing Kafka message", e); + } + }); + + Assertions.assertEquals(2, kafkaSTRow.size()); + + kafkaSTRow.forEach( + row -> { + Assertions.assertAll( + "Verify case-sensitive field values", + () -> Assertions.assertNotNull(row.getField(0)), // MyIntField + () -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString + () -> Assertions.assertNotNull(row.getField(2)), // snake_case_field + () -> { + SeaTunnelRow nestedRow = (SeaTunnelRow) row.getField(3); + if (nestedRow != null) { + Assertions.assertNotNull(nestedRow.getField(0)); // NestedField + Assertions.assertNotNull(nestedRow.getField(1)); // AnotherField + } + }, + () -> { + @SuppressWarnings("unchecked") + Map mapField = + (Map) row.getField(4); + if (mapField != null) { + Assertions.assertNotNull(mapField); + } + }); + }); + } + + @TestTemplate + public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Exception { + SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType(); + + // Write test data to Kafka first + String confFile = "/protobuf/kafka_protobuf_case_sensitive_to_assert.conf"; + String path = getTestConfigFile(confFile); + Config config = ConfigFactory.parseFile(new File(path)); + Config sourceConfig = config.getConfigList("source").get(0); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sourceConfig); + + DefaultSeaTunnelRowSerializer serializer = + getDefaultSeaTunnelRowSerializer( + "test_protobuf_case_sensitive_topic", seaTunnelRowType, readonlyConfig); + + List testRows = createCaseSensitiveTestRows(); + + for (SeaTunnelRow row : testRows) { + ProducerRecord producerRecord = serializer.serializeRow(row); + try { + producer.send(producerRecord).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error sending Kafka message", e); + } + } + producer.flush(); + + // Use SourceFlowTestUtils to read data from Kafka + Map sourceOptions = new HashMap<>(); + sourceOptions.put(KafkaBaseOptions.TOPIC.key(), "test_protobuf_case_sensitive_topic"); + sourceOptions.put( + KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers()); + sourceOptions.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF); + sourceOptions.put( + "protobuf_message_name", + readonlyConfig.get(KafkaBaseOptions.PROTOBUF_MESSAGE_NAME)); + sourceOptions.put("protobuf_schema", readonlyConfig.get(KafkaBaseOptions.PROTOBUF_SCHEMA)); + + List readRows = + SourceFlowTestUtils.runBatchWithCheckpointDisabled( + ReadonlyConfig.fromMap(sourceOptions), new KafkaSourceFactory()); + + Assertions.assertEquals(2, readRows.size()); + + readRows.forEach( + row -> { + Assertions.assertAll( + "Verify case-sensitive field values from source", + () -> Assertions.assertNotNull(row.getField(0)), // MyIntField + () -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString + () -> Assertions.assertNotNull(row.getField(2))); // snake_case_field + }); + } + + private SeaTunnelRowType buildCaseSensitiveSeaTunnelRowType() { + SeaTunnelRowType nestedType = + new SeaTunnelRowType( + new String[] {"NestedField", "AnotherField"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + + return new SeaTunnelRowType( + new String[] { + "MyIntField", + "CamelCaseString", + "snake_case_field", + "NestedObject", + "MyMapField" + }, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + nestedType, + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + } + + private List createCaseSensitiveTestRows() { + SeaTunnelRow nestedRow = new SeaTunnelRow(2); + nestedRow.setField(0, "nested_value"); + nestedRow.setField(1, 999); + + Map mapData = new HashMap<>(); + mapData.put("key1", 100); + mapData.put("key2", 200); + + SeaTunnelRow row1 = new SeaTunnelRow(5); + row1.setField(0, 1); + row1.setField(1, "test_string_1"); + row1.setField(2, "snake_value_1"); + row1.setField(3, nestedRow); + row1.setField(4, mapData); + + SeaTunnelRow row2 = new SeaTunnelRow(5); + row2.setField(0, 2); + row2.setField(1, "test_string_2"); + row2.setField(2, "snake_value_2"); + row2.setField(3, nestedRow); + row2.setField(4, mapData); + + return Arrays.asList(row1, row2); + } + private SeaTunnelRowType buildSeaTunnelRowType() { SeaTunnelRowType addressType = new SeaTunnelRowType( From b1a9666c659b7e509976b1ae3a6d6d80ba395c2a Mon Sep 17 00:00:00 2001 From: Shenghang Date: Tue, 16 Dec 2025 12:48:23 +0800 Subject: [PATCH 03/14] [Fix][Protobuf] Update address fields and types in Protobuf serialization tests to ensure correct order and inclusion --- .../format/protobuf/ProtobufConverterTest.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java index d5448cb0627..d475bb293b1 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java @@ -46,10 +46,11 @@ private SeaTunnelRow buildSeaTunnelRow() { String[] phoneNumbers = {"1", "2"}; byte[] byteVal = {1, 2, 3}; - SeaTunnelRow address = new SeaTunnelRow(3); - address.setField(0, "city_value"); - address.setField(1, "state_value"); - address.setField(2, "street_value"); + SeaTunnelRow address = new SeaTunnelRow(4); + address.setField(0, "street_value"); + address.setField(1, "city_value"); + address.setField(2, "state_value"); + address.setField(3, "zip_value"); seaTunnelRow.setField(0, 123); seaTunnelRow.setField(1, 123123123123L); @@ -68,9 +69,12 @@ private SeaTunnelRow buildSeaTunnelRow() { private SeaTunnelRowType buildSeaTunnelRowType() { SeaTunnelRowType addressType = new SeaTunnelRowType( - new String[] {"city", "state", "street"}, + new String[] {"street", "city", "state", "zip"}, new SeaTunnelDataType[] { - BasicType.STRING_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE }); return new SeaTunnelRowType( @@ -245,7 +249,7 @@ public void testNestedRowTypeWithCaseSensitiveFields() SeaTunnelRowType rowType = new SeaTunnelRowType( - new String[] {"TopLevelField", "MyNestedObject"}, + new String[] {"TopLevelField", "myNestedObject"}, new SeaTunnelDataType[] {BasicType.STRING_TYPE, nestedType}); SeaTunnelRow nestedRow = new SeaTunnelRow(2); From ba58da95c2bdd0c7be0ad4b2f63187d5c07fab1e Mon Sep 17 00:00:00 2001 From: Shenghang Date: Tue, 16 Dec 2025 16:23:59 +0800 Subject: [PATCH 04/14] [Fix][Protobuf] Update field names to be case-sensitive in Protobuf serialization tests --- .../seatunnel/format/protobuf/ProtobufConverterTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java index d475bb293b1..845689094a5 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java @@ -133,7 +133,7 @@ public void testConverter() + " string zip = 4;\n" + " }\n" + "\n" - + " Address address = 8;\n" + + " Address Address = 8;\n" + "\n" + " map attributes = 9;\n" + "\n" @@ -249,7 +249,7 @@ public void testNestedRowTypeWithCaseSensitiveFields() SeaTunnelRowType rowType = new SeaTunnelRowType( - new String[] {"TopLevelField", "myNestedObject"}, + new String[] {"TopLevelField", "MyNestedObject"}, new SeaTunnelDataType[] {BasicType.STRING_TYPE, nestedType}); SeaTunnelRow nestedRow = new SeaTunnelRow(2); @@ -269,7 +269,7 @@ public void testNestedRowTypeWithCaseSensitiveFields() + " string NestedField = 1;\n" + " int32 AnotherField = 2;\n" + " }\n" - + " MyNestedObject myNestedObject = 2;\n" + + " MyNestedObject MyNestedObject = 2;\n" + "}"; String messageName = "TestMessage"; @@ -285,7 +285,7 @@ public void testNestedRowTypeWithCaseSensitiveFields() DynamicMessage nestedMessage = (DynamicMessage) - dynamicMessage.getField(descriptor.findFieldByName("myNestedObject")); + dynamicMessage.getField(descriptor.findFieldByName("MyNestedObject")); Descriptors.Descriptor nestedDescriptor = descriptor.findNestedTypeByName("MyNestedObject"); Assertions.assertEquals( "nested_value", From baf09174cb2327c685ea90e1ea8bd445bfaa5ea1 Mon Sep 17 00:00:00 2001 From: Shenghang Date: Tue, 16 Dec 2025 17:38:31 +0800 Subject: [PATCH 05/14] [Fix][Protobuf] Update field names to be case-sensitive in Protobuf serialization tests --- .../protobuf/ProtobufConverterTest.java | 405 ------------------ 1 file changed, 405 deletions(-) delete mode 100644 seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java deleted file mode 100644 index 845689094a5..00000000000 --- a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seatunnel.format.protobuf; - -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.DynamicMessage; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -class ProtobufConverterTest { - - private SeaTunnelRow buildSeaTunnelRow() { - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(10); - - Map attributesMap = new HashMap<>(); - attributesMap.put("k1", 0.1F); - attributesMap.put("k2", 2.3F); - - String[] phoneNumbers = {"1", "2"}; - byte[] byteVal = {1, 2, 3}; - - SeaTunnelRow address = new SeaTunnelRow(4); - address.setField(0, "street_value"); - address.setField(1, "city_value"); - address.setField(2, "state_value"); - address.setField(3, "zip_value"); - - seaTunnelRow.setField(0, 123); - seaTunnelRow.setField(1, 123123123123L); - seaTunnelRow.setField(2, 0.123f); - seaTunnelRow.setField(3, 0.123d); - seaTunnelRow.setField(4, false); - seaTunnelRow.setField(5, "test data"); - seaTunnelRow.setField(6, byteVal); - seaTunnelRow.setField(7, address); - seaTunnelRow.setField(8, attributesMap); - seaTunnelRow.setField(9, phoneNumbers); - - return seaTunnelRow; - } - - private SeaTunnelRowType buildSeaTunnelRowType() { - SeaTunnelRowType addressType = - new SeaTunnelRowType( - new String[] {"street", "city", "state", "zip"}, - new SeaTunnelDataType[] { - BasicType.STRING_TYPE, - BasicType.STRING_TYPE, - BasicType.STRING_TYPE, - BasicType.STRING_TYPE - }); - - return new SeaTunnelRowType( - new String[] { - "c_int32", - "c_int64", - "c_float", - "c_double", - "c_bool", - "c_string", - "c_bytes", - "Address", - "attributes", - "phone_numbers" - }, - new SeaTunnelDataType[] { - BasicType.INT_TYPE, - BasicType.LONG_TYPE, - BasicType.FLOAT_TYPE, - BasicType.DOUBLE_TYPE, - BasicType.BOOLEAN_TYPE, - BasicType.STRING_TYPE, - PrimitiveByteArrayType.INSTANCE, - addressType, - new MapType<>(BasicType.STRING_TYPE, BasicType.FLOAT_TYPE), - ArrayType.STRING_ARRAY_TYPE - }); - } - - @Test - public void testConverter() - throws Descriptors.DescriptorValidationException, IOException, InterruptedException { - SeaTunnelRowType rowType = buildSeaTunnelRowType(); - SeaTunnelRow originalRow = buildSeaTunnelRow(); - - String protoContent = - "syntax = \"proto3\";\n" - + "\n" - + "package org.apache.seatunnel.format.protobuf;\n" - + "\n" - + "option java_outer_classname = \"ProtobufE2E\";\n" - + "\n" - + "message Person {\n" - + " int32 c_int32 = 1;\n" - + " int64 c_int64 = 2;\n" - + " float c_float = 3;\n" - + " double c_double = 4;\n" - + " bool c_bool = 5;\n" - + " string c_string = 6;\n" - + " bytes c_bytes = 7;\n" - + "\n" - + " message Address {\n" - + " string street = 1;\n" - + " string city = 2;\n" - + " string state = 3;\n" - + " string zip = 4;\n" - + " }\n" - + "\n" - + " Address Address = 8;\n" - + "\n" - + " map attributes = 9;\n" - + "\n" - + " repeated string phone_numbers = 10;\n" - + "}"; - - String messageName = "Person"; - Descriptors.Descriptor descriptor = - CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); - - RowToProtobufConverter rowToProtobufConverter = - new RowToProtobufConverter(rowType, descriptor); - byte[] protobufMessage = rowToProtobufConverter.convertRowToGenericRecord(originalRow); - - ProtobufToRowConverter protobufToRowConverter = - new ProtobufToRowConverter(protoContent, messageName); - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); - SeaTunnelRow convertedRow = - protobufToRowConverter.converter(descriptor, dynamicMessage, rowType); - - Assertions.assertEquals(originalRow, convertedRow); - } - - @Test - public void testFieldNameCaseSensitive() - throws Descriptors.DescriptorValidationException, IOException, InterruptedException { - // Test that field names are case-sensitive and not converted to lowercase - SeaTunnelRowType rowType = - new SeaTunnelRowType( - new String[] {"MyIntField", "CamelCaseString", "snake_case_field"}, - new SeaTunnelDataType[] { - BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE - }); - - SeaTunnelRow row = new SeaTunnelRow(3); - row.setField(0, 100); - row.setField(1, "test"); - row.setField(2, "value"); - - String protoContent = - "syntax = \"proto3\";\n" - + "package org.apache.seatunnel.format.protobuf;\n" - + "message TestMessage {\n" - + " int32 MyIntField = 1;\n" - + " string CamelCaseString = 2;\n" - + " string snake_case_field = 3;\n" - + "}"; - - String messageName = "TestMessage"; - Descriptors.Descriptor descriptor = - CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); - - RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); - byte[] protobufMessage = converter.convertRowToGenericRecord(row); - - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); - Assertions.assertEquals( - 100, dynamicMessage.getField(descriptor.findFieldByName("MyIntField"))); - Assertions.assertEquals( - "test", dynamicMessage.getField(descriptor.findFieldByName("CamelCaseString"))); - Assertions.assertEquals( - "value", dynamicMessage.getField(descriptor.findFieldByName("snake_case_field"))); - } - - @Test - public void testNullFieldHandling() - throws Descriptors.DescriptorValidationException, IOException, InterruptedException { - // Test that null fields are handled correctly - SeaTunnelRowType rowType = - new SeaTunnelRowType( - new String[] {"field1", "field2", "field3"}, - new SeaTunnelDataType[] { - BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE - }); - - SeaTunnelRow row = new SeaTunnelRow(3); - row.setField(0, 42); - row.setField(1, null); // null value - row.setField(2, true); - - String protoContent = - "syntax = \"proto3\";\n" - + "package org.apache.seatunnel.format.protobuf;\n" - + "message TestMessage {\n" - + " int32 field1 = 1;\n" - + " string field2 = 2;\n" - + " bool field3 = 3;\n" - + "}"; - - String messageName = "TestMessage"; - Descriptors.Descriptor descriptor = - CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); - - RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); - byte[] protobufMessage = converter.convertRowToGenericRecord(row); - - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); - Assertions.assertEquals(42, dynamicMessage.getField(descriptor.findFieldByName("field1"))); - // In proto3, null string fields default to empty string - Assertions.assertEquals("", dynamicMessage.getField(descriptor.findFieldByName("field2"))); - Assertions.assertEquals( - true, dynamicMessage.getField(descriptor.findFieldByName("field3"))); - } - - @Test - public void testNestedRowTypeWithCaseSensitiveFields() - throws Descriptors.DescriptorValidationException, IOException, InterruptedException { - // Test nested row type with case-sensitive field names - SeaTunnelRowType nestedType = - new SeaTunnelRowType( - new String[] {"NestedField", "AnotherField"}, - new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); - - SeaTunnelRowType rowType = - new SeaTunnelRowType( - new String[] {"TopLevelField", "MyNestedObject"}, - new SeaTunnelDataType[] {BasicType.STRING_TYPE, nestedType}); - - SeaTunnelRow nestedRow = new SeaTunnelRow(2); - nestedRow.setField(0, "nested_value"); - nestedRow.setField(1, 999); - - SeaTunnelRow row = new SeaTunnelRow(2); - row.setField(0, "top_value"); - row.setField(1, nestedRow); - - String protoContent = - "syntax = \"proto3\";\n" - + "package org.apache.seatunnel.format.protobuf;\n" - + "message TestMessage {\n" - + " string TopLevelField = 1;\n" - + " message MyNestedObject {\n" - + " string NestedField = 1;\n" - + " int32 AnotherField = 2;\n" - + " }\n" - + " MyNestedObject MyNestedObject = 2;\n" - + "}"; - - String messageName = "TestMessage"; - Descriptors.Descriptor descriptor = - CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); - - RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); - byte[] protobufMessage = converter.convertRowToGenericRecord(row); - - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); - Assertions.assertEquals( - "top_value", dynamicMessage.getField(descriptor.findFieldByName("TopLevelField"))); - - DynamicMessage nestedMessage = - (DynamicMessage) - dynamicMessage.getField(descriptor.findFieldByName("MyNestedObject")); - Descriptors.Descriptor nestedDescriptor = descriptor.findNestedTypeByName("MyNestedObject"); - Assertions.assertEquals( - "nested_value", - nestedMessage.getField(nestedDescriptor.findFieldByName("NestedField"))); - Assertions.assertEquals( - 999, nestedMessage.getField(nestedDescriptor.findFieldByName("AnotherField"))); - } - - @Test - public void testMapTypeWithCaseSensitiveFieldName() - throws Descriptors.DescriptorValidationException, IOException, InterruptedException { - // Test map type with case-sensitive field name - SeaTunnelRowType rowType = - new SeaTunnelRowType( - new String[] {"MyMapField"}, - new SeaTunnelDataType[] { - new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) - }); - - Map mapData = new HashMap<>(); - mapData.put("key1", 100); - mapData.put("key2", 200); - - SeaTunnelRow row = new SeaTunnelRow(1); - row.setField(0, mapData); - - String protoContent = - "syntax = \"proto3\";\n" - + "package org.apache.seatunnel.format.protobuf;\n" - + "message TestMessage {\n" - + " map MyMapField = 1;\n" - + "}"; - - String messageName = "TestMessage"; - Descriptors.Descriptor descriptor = - CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); - - RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); - byte[] protobufMessage = converter.convertRowToGenericRecord(row); - - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); - @SuppressWarnings("unchecked") - java.util.List mapEntries = - (java.util.List) - dynamicMessage.getField(descriptor.findFieldByName("MyMapField")); - Assertions.assertEquals(2, mapEntries.size()); - } - - @Test - public void testTinyIntUnsignedConversion() - throws Descriptors.DescriptorValidationException, IOException, InterruptedException { - // Test TINYINT to unsigned int conversion - SeaTunnelRowType rowType = - new SeaTunnelRowType( - new String[] {"tinyint_field"}, - new SeaTunnelDataType[] {BasicType.BYTE_TYPE}); - - SeaTunnelRow row = new SeaTunnelRow(1); - row.setField(0, (byte) -1); // -1 as signed byte should become 255 as unsigned - - String protoContent = - "syntax = \"proto3\";\n" - + "package org.apache.seatunnel.format.protobuf;\n" - + "message TestMessage {\n" - + " int32 tinyint_field = 1;\n" - + "}"; - - String messageName = "TestMessage"; - Descriptors.Descriptor descriptor = - CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); - - RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); - byte[] protobufMessage = converter.convertRowToGenericRecord(row); - - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); - Assertions.assertEquals( - 255, dynamicMessage.getField(descriptor.findFieldByName("tinyint_field"))); - } - - @Test - public void testAllNullFields() - throws Descriptors.DescriptorValidationException, IOException, InterruptedException { - // Test when all fields are null - SeaTunnelRowType rowType = - new SeaTunnelRowType( - new String[] {"field1", "field2", "field3"}, - new SeaTunnelDataType[] { - BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.DOUBLE_TYPE - }); - - SeaTunnelRow row = new SeaTunnelRow(3); - row.setField(0, null); - row.setField(1, null); - row.setField(2, null); - - String protoContent = - "syntax = \"proto3\";\n" - + "package org.apache.seatunnel.format.protobuf;\n" - + "message TestMessage {\n" - + " string field1 = 1;\n" - + " int32 field2 = 2;\n" - + " double field3 = 3;\n" - + "}"; - - String messageName = "TestMessage"; - Descriptors.Descriptor descriptor = - CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); - - RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); - byte[] protobufMessage = converter.convertRowToGenericRecord(row); - - // Should not throw exception, and produce valid protobuf message with default values - Assertions.assertNotNull(protobufMessage); - DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); - Assertions.assertNotNull(dynamicMessage); - } -} From 2011318effb14595a69ca835fcfb54e9612ca3fe Mon Sep 17 00:00:00 2001 From: Shenghang Date: Wed, 17 Dec 2025 13:58:22 +0800 Subject: [PATCH 06/14] [Fix][Protobuf] Add ProtobufConverterTest to validate case-sensitive field names and error handling --- .../protobuf/RowToProtobufConverter.java | 47 +- .../exception/ProtobufFormatErrorCode.java | 3 +- .../protobuf/ProtobufConverterTest.java | 405 ++++++++++++++++++ 3 files changed, 450 insertions(+), 5 deletions(-) create mode 100644 seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java index 3f0bf8206d9..57413fec0cb 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java @@ -56,7 +56,15 @@ public byte[] convertRowToGenericRecord(SeaTunnelRow element) { if (resolvedValue instanceof byte[]) { resolvedValue = ByteString.copyFrom((byte[]) resolvedValue); } - builder.setField(descriptor.findFieldByName(fieldName), resolvedValue); + Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName); + if (fieldDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' not found in Protobuf schema. Available fields: %s", + fieldName, descriptor.getFields())); + } + builder.setField(fieldDescriptor, resolvedValue); } } @@ -135,15 +143,46 @@ private Object handleRowType( SeaTunnelRow seaTunnelRow = (SeaTunnelRow) data; SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes(); String[] fieldNames = ((SeaTunnelRowType) seaTunnelDataType).getFieldNames(); - Descriptors.Descriptor nestedTypeDescriptor = descriptor.findNestedTypeByName(fieldName); + + // Get the field descriptor for the nested message field + Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName); + if (fieldDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' not found in Protobuf schema. Available fields: %s", + fieldName, descriptor.getFields())); + } + + // Get the message type descriptor from the field + Descriptors.Descriptor nestedTypeDescriptor = fieldDescriptor.getMessageType(); + if (nestedTypeDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' is not a message type in Protobuf schema", fieldName)); + } + DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(nestedTypeDescriptor); for (int i = 0; i < fieldNames.length; i++) { Object resolvedValue = resolveObject( fieldNames[i], seaTunnelRow.getField(i), fieldTypes[i], nestedBuilder); - nestedBuilder.setField( - nestedTypeDescriptor.findFieldByName(fieldNames[i]), resolvedValue); + if (resolvedValue != null) { + Descriptors.FieldDescriptor nestedFieldDescriptor = + nestedTypeDescriptor.findFieldByName(fieldNames[i]); + if (nestedFieldDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' not found in nested message type '%s'. Available fields: %s", + fieldNames[i], + nestedTypeDescriptor.getName(), + nestedTypeDescriptor.getFields())); + } + nestedBuilder.setField(nestedFieldDescriptor, resolvedValue); + } } return nestedBuilder.build(); diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java index e43125a41a1..39f86b492ec 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java @@ -21,7 +21,8 @@ public enum ProtobufFormatErrorCode implements SeaTunnelErrorCode { DESCRIPTOR_CONVERT_FAILED("PROTOBUF-01", "Protobuf descriptor conversion failed."), - UNSUPPORTED_DATA_TYPE("PROTOBUF-02", "Unsupported data type."); + UNSUPPORTED_DATA_TYPE("PROTOBUF-02", "Unsupported data type."), + PROTOBUF_SCHEMA_ILLEGAL("PROTOBUF-03", "Illegal Protobuf schema definition."); private final String code; private final String description; diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java new file mode 100644 index 00000000000..845689094a5 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.format.protobuf; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +class ProtobufConverterTest { + + private SeaTunnelRow buildSeaTunnelRow() { + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(10); + + Map attributesMap = new HashMap<>(); + attributesMap.put("k1", 0.1F); + attributesMap.put("k2", 2.3F); + + String[] phoneNumbers = {"1", "2"}; + byte[] byteVal = {1, 2, 3}; + + SeaTunnelRow address = new SeaTunnelRow(4); + address.setField(0, "street_value"); + address.setField(1, "city_value"); + address.setField(2, "state_value"); + address.setField(3, "zip_value"); + + seaTunnelRow.setField(0, 123); + seaTunnelRow.setField(1, 123123123123L); + seaTunnelRow.setField(2, 0.123f); + seaTunnelRow.setField(3, 0.123d); + seaTunnelRow.setField(4, false); + seaTunnelRow.setField(5, "test data"); + seaTunnelRow.setField(6, byteVal); + seaTunnelRow.setField(7, address); + seaTunnelRow.setField(8, attributesMap); + seaTunnelRow.setField(9, phoneNumbers); + + return seaTunnelRow; + } + + private SeaTunnelRowType buildSeaTunnelRowType() { + SeaTunnelRowType addressType = + new SeaTunnelRowType( + new String[] {"street", "city", "state", "zip"}, + new SeaTunnelDataType[] { + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE + }); + + return new SeaTunnelRowType( + new String[] { + "c_int32", + "c_int64", + "c_float", + "c_double", + "c_bool", + "c_string", + "c_bytes", + "Address", + "attributes", + "phone_numbers" + }, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.BOOLEAN_TYPE, + BasicType.STRING_TYPE, + PrimitiveByteArrayType.INSTANCE, + addressType, + new MapType<>(BasicType.STRING_TYPE, BasicType.FLOAT_TYPE), + ArrayType.STRING_ARRAY_TYPE + }); + } + + @Test + public void testConverter() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + SeaTunnelRowType rowType = buildSeaTunnelRowType(); + SeaTunnelRow originalRow = buildSeaTunnelRow(); + + String protoContent = + "syntax = \"proto3\";\n" + + "\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "\n" + + "option java_outer_classname = \"ProtobufE2E\";\n" + + "\n" + + "message Person {\n" + + " int32 c_int32 = 1;\n" + + " int64 c_int64 = 2;\n" + + " float c_float = 3;\n" + + " double c_double = 4;\n" + + " bool c_bool = 5;\n" + + " string c_string = 6;\n" + + " bytes c_bytes = 7;\n" + + "\n" + + " message Address {\n" + + " string street = 1;\n" + + " string city = 2;\n" + + " string state = 3;\n" + + " string zip = 4;\n" + + " }\n" + + "\n" + + " Address Address = 8;\n" + + "\n" + + " map attributes = 9;\n" + + "\n" + + " repeated string phone_numbers = 10;\n" + + "}"; + + String messageName = "Person"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter rowToProtobufConverter = + new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = rowToProtobufConverter.convertRowToGenericRecord(originalRow); + + ProtobufToRowConverter protobufToRowConverter = + new ProtobufToRowConverter(protoContent, messageName); + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + SeaTunnelRow convertedRow = + protobufToRowConverter.converter(descriptor, dynamicMessage, rowType); + + Assertions.assertEquals(originalRow, convertedRow); + } + + @Test + public void testFieldNameCaseSensitive() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test that field names are case-sensitive and not converted to lowercase + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"MyIntField", "CamelCaseString", "snake_case_field"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, 100); + row.setField(1, "test"); + row.setField(2, "value"); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 MyIntField = 1;\n" + + " string CamelCaseString = 2;\n" + + " string snake_case_field = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + 100, dynamicMessage.getField(descriptor.findFieldByName("MyIntField"))); + Assertions.assertEquals( + "test", dynamicMessage.getField(descriptor.findFieldByName("CamelCaseString"))); + Assertions.assertEquals( + "value", dynamicMessage.getField(descriptor.findFieldByName("snake_case_field"))); + } + + @Test + public void testNullFieldHandling() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test that null fields are handled correctly + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"field1", "field2", "field3"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, 42); + row.setField(1, null); // null value + row.setField(2, true); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 field1 = 1;\n" + + " string field2 = 2;\n" + + " bool field3 = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals(42, dynamicMessage.getField(descriptor.findFieldByName("field1"))); + // In proto3, null string fields default to empty string + Assertions.assertEquals("", dynamicMessage.getField(descriptor.findFieldByName("field2"))); + Assertions.assertEquals( + true, dynamicMessage.getField(descriptor.findFieldByName("field3"))); + } + + @Test + public void testNestedRowTypeWithCaseSensitiveFields() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test nested row type with case-sensitive field names + SeaTunnelRowType nestedType = + new SeaTunnelRowType( + new String[] {"NestedField", "AnotherField"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"TopLevelField", "MyNestedObject"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, nestedType}); + + SeaTunnelRow nestedRow = new SeaTunnelRow(2); + nestedRow.setField(0, "nested_value"); + nestedRow.setField(1, 999); + + SeaTunnelRow row = new SeaTunnelRow(2); + row.setField(0, "top_value"); + row.setField(1, nestedRow); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " string TopLevelField = 1;\n" + + " message MyNestedObject {\n" + + " string NestedField = 1;\n" + + " int32 AnotherField = 2;\n" + + " }\n" + + " MyNestedObject MyNestedObject = 2;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + "top_value", dynamicMessage.getField(descriptor.findFieldByName("TopLevelField"))); + + DynamicMessage nestedMessage = + (DynamicMessage) + dynamicMessage.getField(descriptor.findFieldByName("MyNestedObject")); + Descriptors.Descriptor nestedDescriptor = descriptor.findNestedTypeByName("MyNestedObject"); + Assertions.assertEquals( + "nested_value", + nestedMessage.getField(nestedDescriptor.findFieldByName("NestedField"))); + Assertions.assertEquals( + 999, nestedMessage.getField(nestedDescriptor.findFieldByName("AnotherField"))); + } + + @Test + public void testMapTypeWithCaseSensitiveFieldName() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test map type with case-sensitive field name + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"MyMapField"}, + new SeaTunnelDataType[] { + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + + Map mapData = new HashMap<>(); + mapData.put("key1", 100); + mapData.put("key2", 200); + + SeaTunnelRow row = new SeaTunnelRow(1); + row.setField(0, mapData); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " map MyMapField = 1;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + @SuppressWarnings("unchecked") + java.util.List mapEntries = + (java.util.List) + dynamicMessage.getField(descriptor.findFieldByName("MyMapField")); + Assertions.assertEquals(2, mapEntries.size()); + } + + @Test + public void testTinyIntUnsignedConversion() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test TINYINT to unsigned int conversion + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"tinyint_field"}, + new SeaTunnelDataType[] {BasicType.BYTE_TYPE}); + + SeaTunnelRow row = new SeaTunnelRow(1); + row.setField(0, (byte) -1); // -1 as signed byte should become 255 as unsigned + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 tinyint_field = 1;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + 255, dynamicMessage.getField(descriptor.findFieldByName("tinyint_field"))); + } + + @Test + public void testAllNullFields() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test when all fields are null + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"field1", "field2", "field3"}, + new SeaTunnelDataType[] { + BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.DOUBLE_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, null); + row.setField(1, null); + row.setField(2, null); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " string field1 = 1;\n" + + " int32 field2 = 2;\n" + + " double field3 = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + // Should not throw exception, and produce valid protobuf message with default values + Assertions.assertNotNull(protobufMessage); + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertNotNull(dynamicMessage); + } +} From 9dee76e662e71daadbeb4c669afcd8197c6c996e Mon Sep 17 00:00:00 2001 From: Shenghang Date: Wed, 17 Dec 2025 16:08:04 +0800 Subject: [PATCH 07/14] [Fix][Protobuf] Update Protobuf field names to be case-sensitive and improve nested message type naming --- .../format/protobuf/ProtobufConverterTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java index 845689094a5..8eb112d2b97 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java @@ -86,7 +86,7 @@ private SeaTunnelRowType buildSeaTunnelRowType() { "c_bool", "c_string", "c_bytes", - "Address", + "address", "attributes", "phone_numbers" }, @@ -133,7 +133,7 @@ public void testConverter() + " string zip = 4;\n" + " }\n" + "\n" - + " Address Address = 8;\n" + + " Address address = 8;\n" + "\n" + " map attributes = 9;\n" + "\n" @@ -265,11 +265,11 @@ public void testNestedRowTypeWithCaseSensitiveFields() + "package org.apache.seatunnel.format.protobuf;\n" + "message TestMessage {\n" + " string TopLevelField = 1;\n" - + " message MyNestedObject {\n" + + " message MyNestedObjectType {\n" + " string NestedField = 1;\n" + " int32 AnotherField = 2;\n" + " }\n" - + " MyNestedObject MyNestedObject = 2;\n" + + " MyNestedObjectType MyNestedObject = 2;\n" + "}"; String messageName = "TestMessage"; @@ -286,7 +286,8 @@ public void testNestedRowTypeWithCaseSensitiveFields() DynamicMessage nestedMessage = (DynamicMessage) dynamicMessage.getField(descriptor.findFieldByName("MyNestedObject")); - Descriptors.Descriptor nestedDescriptor = descriptor.findNestedTypeByName("MyNestedObject"); + Descriptors.Descriptor nestedDescriptor = + descriptor.findNestedTypeByName("MyNestedObjectType"); Assertions.assertEquals( "nested_value", nestedMessage.getField(nestedDescriptor.findFieldByName("NestedField"))); From 74015ce64918937786900e251f4663e1f884ca9c Mon Sep 17 00:00:00 2001 From: Shenghang Date: Thu, 18 Dec 2025 09:42:45 +0800 Subject: [PATCH 08/14] [Fix][Protobuf] Enhance ProtobufToRowConverter to handle null checks and improve field retrieval logic --- .../protobuf/ProtobufToRowConverter.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java index 3f7d01b3ff6..459f93860fe 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java @@ -143,13 +143,19 @@ private Object convertField( return res; case ROW: - Descriptors.Descriptor nestedTypeByName = - descriptor.findNestedTypeByName(fieldName); - DynamicMessage s = - (DynamicMessage) - dynamicMessage.getField( - descriptor.findFieldByName(fieldName.toLowerCase())); - return converter(nestedTypeByName, s, (SeaTunnelRowType) dataType); + if (descriptor == null || dynamicMessage == null || fieldName == null) { + return null; + } + Descriptors.FieldDescriptor rowField = descriptor.findFieldByName(fieldName); + if (rowField == null) { + return null; + } + Descriptors.Descriptor rowDescriptor = rowField.getMessageType(); + if (rowDescriptor == null) { + return null; + } + DynamicMessage rowMessage = (DynamicMessage) dynamicMessage.getField(rowField); + return converter(rowDescriptor, rowMessage, (SeaTunnelRowType) dataType); case ARRAY: SeaTunnelDataType basicType = ((ArrayType) dataType).getElementType(); List list = (List) val; From c27a6c4d43be7efd3f3ddeb3c9025dc494549d43 Mon Sep 17 00:00:00 2001 From: Shenghang Date: Thu, 18 Dec 2025 20:05:10 +0800 Subject: [PATCH 09/14] [Fix][CI] Increase timeout for CI workflow to accommodate longer build times --- .github/workflows/backend.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 322707e8c8f..19d103d7bc8 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -1238,7 +1238,7 @@ jobs: matrix: java: [ '8', '11' ] os: [ 'ubuntu-latest' ] - timeout-minutes: 140 + timeout-minutes: 200 steps: - uses: actions/checkout@v2 - name: Set up JDK ${{ matrix.java }} From 76d4681f87333819f2c041a1b04c00bc02f6302f Mon Sep 17 00:00:00 2001 From: Shenghang Date: Mon, 22 Dec 2025 21:47:35 +0800 Subject: [PATCH 10/14] [Fix][Protobuf] Update Protobuf configuration to use lowercase field names for consistency --- .../src/test/resources/protobuf/fake_to_kafka_protobuf.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf index e489d222325..ddbc035a3fb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf @@ -41,7 +41,7 @@ source { c_string = string c_bytes = bytes - Address { + address { city = string state = string street = string @@ -97,4 +97,4 @@ sink { } """ } -} \ No newline at end of file +} From ce96a4800c48b1ff15ca193cfc265232a873451b Mon Sep 17 00:00:00 2001 From: Shenghang Date: Tue, 23 Dec 2025 09:52:20 +0800 Subject: [PATCH 11/14] [Fix][Protobuf] Standardize Protobuf field names to lowercase for consistency --- .../org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java | 6 +++--- .../protobuf/fake_to_kafka_protobuf_case_sensitive.conf | 3 +-- .../protobuf/kafka_protobuf_case_sensitive_to_assert.conf | 3 +-- .../test/resources/protobuf/kafka_protobuf_to_assert.conf | 4 ++-- .../protobuf/kafka_protobuf_transform_to_assert.conf | 4 ++-- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 4fe2b4104ef..a1a1f00d4b3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -1260,7 +1260,7 @@ public void testFakeSourceToKafkaProtobufFormat(TestContainer container) "c_bool", "c_string", "c_bytes", - "Address", + "address", "attributes", "phone_numbers" }, @@ -2201,7 +2201,7 @@ private SeaTunnelRowType buildCaseSensitiveSeaTunnelRowType() { "MyIntField", "CamelCaseString", "snake_case_field", - "NestedObject", + "nestedObject", "MyMapField" }, new SeaTunnelDataType[] { @@ -2256,7 +2256,7 @@ private SeaTunnelRowType buildSeaTunnelRowType() { "c_bool", "c_string", "c_bytes", - "Address", + "address", "attributes", "phone_numbers" }, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf index 507f6f00e4f..700d25ddd3f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf @@ -37,7 +37,7 @@ source { CamelCaseString = string snake_case_field = string - NestedObject { + nestedObject { NestedField = string AnotherField = int } @@ -83,4 +83,3 @@ sink { """ } } - diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf index eb423b31e2c..7dd43a5be73 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf @@ -59,7 +59,7 @@ source { CamelCaseString = string snake_case_field = string - NestedObject { + nestedObject { NestedField = string AnotherField = int } @@ -103,4 +103,3 @@ sink { } } } - diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf index 2570fcbdcf4..cbe4e7486a6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf @@ -70,7 +70,7 @@ source { c_string = string c_bytes = bytes - Address { + address { city = string state = string street = string @@ -174,4 +174,4 @@ sink { ] } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf index 86e9342d310..7252206f96d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf @@ -70,7 +70,7 @@ source { c_string = string c_bytes = bytes - Address { + address { city = string state = string street = string @@ -89,7 +89,7 @@ transform { Sql { plugin_input = "kafka_table" plugin_output = "kafka_table_transform" - query = "select Address.city,c_string from dual" + query = "select address.city,c_string from dual" } } From 212d9221b3fb7a5e78cbabad3ace83cd4f6b4087 Mon Sep 17 00:00:00 2001 From: Shenghang Date: Tue, 23 Dec 2025 17:17:16 +0800 Subject: [PATCH 12/14] [Fix][Protobuf] Use dynamic topic names in Protobuf case-sensitive tests for improved isolation --- .../seatunnel/e2e/connector/kafka/KafkaIT.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index a1a1f00d4b3..0ad1e8deb73 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -2020,6 +2020,7 @@ private SeaTunnelRow buildSeaTunnelRow() { @TestTemplate public void testProtobufCaseSensitiveFieldNames(TestContainer container) throws Exception { + String topicName = "test_protobuf_case_sensitive_fieldnames_" + System.nanoTime(); SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType(); Map schemaProperties = new HashMap<>(); @@ -2068,7 +2069,7 @@ public void testProtobufCaseSensitiveFieldNames(TestContainer container) throws "Protobuf case-sensitive test"); Map config = new HashMap<>(); - config.put(KafkaBaseOptions.TOPIC.key(), "test_protobuf_case_sensitive_topic"); + config.put(KafkaBaseOptions.TOPIC.key(), topicName); config.put(KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers()); config.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF); config.put("protobuf_message_name", "TestMessage"); @@ -2100,7 +2101,7 @@ public void testProtobufCaseSensitiveFieldNames(TestContainer container) throws List kafkaSTRow = getKafkaSTRow( - "test_protobuf_case_sensitive_topic", + topicName, value -> { try { return deserializationSchema.deserialize(value); @@ -2138,9 +2139,10 @@ public void testProtobufCaseSensitiveFieldNames(TestContainer container) throws @TestTemplate public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Exception { + String topicName = "test_protobuf_case_sensitive_toassert_" + System.nanoTime(); SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType(); - // Write test data to Kafka first + // Write test data to Kafka first - load schema from config but use dynamic topic String confFile = "/protobuf/kafka_protobuf_case_sensitive_to_assert.conf"; String path = getTestConfigFile(confFile); Config config = ConfigFactory.parseFile(new File(path)); @@ -2148,8 +2150,7 @@ public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Ex ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sourceConfig); DefaultSeaTunnelRowSerializer serializer = - getDefaultSeaTunnelRowSerializer( - "test_protobuf_case_sensitive_topic", seaTunnelRowType, readonlyConfig); + getDefaultSeaTunnelRowSerializer(topicName, seaTunnelRowType, readonlyConfig); List testRows = createCaseSensitiveTestRows(); @@ -2165,7 +2166,7 @@ public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Ex // Use SourceFlowTestUtils to read data from Kafka Map sourceOptions = new HashMap<>(); - sourceOptions.put(KafkaBaseOptions.TOPIC.key(), "test_protobuf_case_sensitive_topic"); + sourceOptions.put(KafkaBaseOptions.TOPIC.key(), topicName); sourceOptions.put( KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers()); sourceOptions.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF); From 6e6a860f4919cafea0db865f59c05f50cd82d18f Mon Sep 17 00:00:00 2001 From: Shenghang Date: Thu, 25 Dec 2025 10:26:33 +0800 Subject: [PATCH 13/14] [Fix][Protobuf] Configure Kafka source options to use earliest start mode and dynamic consumer group for Protobuf tests --- .../org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 0ad1e8deb73..0615865ff47 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -40,7 +40,9 @@ import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants; import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions; import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode; import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkFactory; import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceFactory; @@ -2174,6 +2176,10 @@ public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Ex "protobuf_message_name", readonlyConfig.get(KafkaBaseOptions.PROTOBUF_MESSAGE_NAME)); sourceOptions.put("protobuf_schema", readonlyConfig.get(KafkaBaseOptions.PROTOBUF_SCHEMA)); + sourceOptions.put(KafkaSourceOptions.START_MODE.key(), StartMode.EARLIEST); + sourceOptions.put( + KafkaSourceOptions.CONSUMER_GROUP.key(), + "test_protobuf_case_sensitive_toassert_" + topicName); List readRows = SourceFlowTestUtils.runBatchWithCheckpointDisabled( From 1614426f3727b13fac7ba0535cfa33db9a33a588 Mon Sep 17 00:00:00 2001 From: Shenghang Date: Fri, 26 Dec 2025 17:52:45 +0800 Subject: [PATCH 14/14] [Fix][Protobuf] Add schema configuration for Protobuf with nested object fields and field type definitions --- .../e2e/connector/kafka/KafkaIT.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 0615865ff47..d070d2e178f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -2181,6 +2181,28 @@ public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Ex KafkaSourceOptions.CONSUMER_GROUP.key(), "test_protobuf_case_sensitive_toassert_" + topicName); + // Add schema configuration to match the Protobuf schema + Map schemaConfig = new HashMap<>(); + Map nestedObjectFields = new HashMap<>(); + nestedObjectFields.put("NestedField", "string"); + nestedObjectFields.put("AnotherField", "int"); + + List> fields = new ArrayList<>(); + fields.add(createFieldConfig("MyIntField", "int")); + fields.add(createFieldConfig("CamelCaseString", "string")); + fields.add(createFieldConfig("snake_case_field", "string")); + + Map nestedObjectConfig = new HashMap<>(); + nestedObjectConfig.put("name", "nestedObject"); + nestedObjectConfig.put("type", "row"); + nestedObjectConfig.put("fields", nestedObjectFields); + fields.add(nestedObjectConfig); + + fields.add(createFieldConfig("MyMapField", "map")); + + schemaConfig.put("fields", fields); + sourceOptions.put("schema", schemaConfig); + List readRows = SourceFlowTestUtils.runBatchWithCheckpointDisabled( ReadonlyConfig.fromMap(sourceOptions), new KafkaSourceFactory()); @@ -2197,6 +2219,13 @@ public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Ex }); } + private Map createFieldConfig(String name, String type) { + Map fieldConfig = new HashMap<>(); + fieldConfig.put("name", name); + fieldConfig.put("type", type); + return fieldConfig; + } + private SeaTunnelRowType buildCaseSensitiveSeaTunnelRowType() { SeaTunnelRowType nestedType = new SeaTunnelRowType(