diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml index d27478d52c1..e8877b5f5d8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml @@ -115,6 +115,13 @@ ${testcontainer.version} test + + org.apache.seatunnel + connector-common + ${project.version} + test-jar + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 7d717056b00..d070d2e178f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -39,8 +39,15 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions; import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode; import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; +import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils; +import org.apache.seatunnel.connectors.seatunnel.source.SourceFlowTestUtils; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.EngineType; @@ -1255,7 +1262,7 @@ public void testFakeSourceToKafkaProtobufFormat(TestContainer container) "c_bool", "c_string", "c_bytes", - "Address", + "address", "attributes", "phone_numbers" }, @@ -2013,6 +2020,261 @@ private SeaTunnelRow buildSeaTunnelRow() { return seaTunnelRow; } + @TestTemplate + public void testProtobufCaseSensitiveFieldNames(TestContainer container) throws Exception { + String topicName = "test_protobuf_case_sensitive_fieldnames_" + System.nanoTime(); + SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType(); + + Map schemaProperties = new HashMap<>(); + schemaProperties.put("protobuf_message_name", "TestMessage"); + schemaProperties.put( + "protobuf_schema", + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 MyIntField = 1;\n" + + " string CamelCaseString = 2;\n" + + " string snake_case_field = 3;\n" + + " message NestedObject {\n" + + " string NestedField = 1;\n" + + " int32 AnotherField = 2;\n" + + " }\n" + + " NestedObject nestedObject = 4;\n" + + " map MyMapField = 5;\n" + + "}"); + + TableSchema schema = + TableSchema.builder() + .columns( + Arrays.asList( + IntStream.range(0, seaTunnelRowType.getTotalFields()) + .mapToObj( + i -> + PhysicalColumn.of( + seaTunnelRowType + .getFieldName(i), + seaTunnelRowType + .getFieldType(i), + 0, + true, + null, + null)) + .toArray(PhysicalColumn[]::new))) + .build(); + + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("", "", "", "test"), + schema, + schemaProperties, + Collections.emptyList(), + "Protobuf case-sensitive test"); + + Map config = new HashMap<>(); + config.put(KafkaBaseOptions.TOPIC.key(), topicName); + config.put(KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers()); + config.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF); + config.put("protobuf_message_name", "TestMessage"); + config.put( + "protobuf_schema", + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 MyIntField = 1;\n" + + " string CamelCaseString = 2;\n" + + " string snake_case_field = 3;\n" + + " message NestedObject {\n" + + " string NestedField = 1;\n" + + " int32 AnotherField = 2;\n" + + " }\n" + + " NestedObject nestedObject = 4;\n" + + " map MyMapField = 5;\n" + + "}"); + + List rows = createCaseSensitiveTestRows(); + + // Use SinkFlowTestUtils to write data to Kafka + SinkFlowTestUtils.runBatchWithCheckpointDisabled( + catalogTable, ReadonlyConfig.fromMap(config), new KafkaSinkFactory(), rows); + + // Verify data from Kafka + ProtobufDeserializationSchema deserializationSchema = + new ProtobufDeserializationSchema(catalogTable); + + List kafkaSTRow = + getKafkaSTRow( + topicName, + value -> { + try { + return deserializationSchema.deserialize(value); + } catch (IOException e) { + throw new RuntimeException("Error deserializing Kafka message", e); + } + }); + + Assertions.assertEquals(2, kafkaSTRow.size()); + + kafkaSTRow.forEach( + row -> { + Assertions.assertAll( + "Verify case-sensitive field values", + () -> Assertions.assertNotNull(row.getField(0)), // MyIntField + () -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString + () -> Assertions.assertNotNull(row.getField(2)), // snake_case_field + () -> { + SeaTunnelRow nestedRow = (SeaTunnelRow) row.getField(3); + if (nestedRow != null) { + Assertions.assertNotNull(nestedRow.getField(0)); // NestedField + Assertions.assertNotNull(nestedRow.getField(1)); // AnotherField + } + }, + () -> { + @SuppressWarnings("unchecked") + Map mapField = + (Map) row.getField(4); + if (mapField != null) { + Assertions.assertNotNull(mapField); + } + }); + }); + } + + @TestTemplate + public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Exception { + String topicName = "test_protobuf_case_sensitive_toassert_" + System.nanoTime(); + SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType(); + + // Write test data to Kafka first - load schema from config but use dynamic topic + String confFile = "/protobuf/kafka_protobuf_case_sensitive_to_assert.conf"; + String path = getTestConfigFile(confFile); + Config config = ConfigFactory.parseFile(new File(path)); + Config sourceConfig = config.getConfigList("source").get(0); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sourceConfig); + + DefaultSeaTunnelRowSerializer serializer = + getDefaultSeaTunnelRowSerializer(topicName, seaTunnelRowType, readonlyConfig); + + List testRows = createCaseSensitiveTestRows(); + + for (SeaTunnelRow row : testRows) { + ProducerRecord producerRecord = serializer.serializeRow(row); + try { + producer.send(producerRecord).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error sending Kafka message", e); + } + } + producer.flush(); + + // Use SourceFlowTestUtils to read data from Kafka + Map sourceOptions = new HashMap<>(); + sourceOptions.put(KafkaBaseOptions.TOPIC.key(), topicName); + sourceOptions.put( + KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers()); + sourceOptions.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF); + sourceOptions.put( + "protobuf_message_name", + readonlyConfig.get(KafkaBaseOptions.PROTOBUF_MESSAGE_NAME)); + sourceOptions.put("protobuf_schema", readonlyConfig.get(KafkaBaseOptions.PROTOBUF_SCHEMA)); + sourceOptions.put(KafkaSourceOptions.START_MODE.key(), StartMode.EARLIEST); + sourceOptions.put( + KafkaSourceOptions.CONSUMER_GROUP.key(), + "test_protobuf_case_sensitive_toassert_" + topicName); + + // Add schema configuration to match the Protobuf schema + Map schemaConfig = new HashMap<>(); + Map nestedObjectFields = new HashMap<>(); + nestedObjectFields.put("NestedField", "string"); + nestedObjectFields.put("AnotherField", "int"); + + List> fields = new ArrayList<>(); + fields.add(createFieldConfig("MyIntField", "int")); + fields.add(createFieldConfig("CamelCaseString", "string")); + fields.add(createFieldConfig("snake_case_field", "string")); + + Map nestedObjectConfig = new HashMap<>(); + nestedObjectConfig.put("name", "nestedObject"); + nestedObjectConfig.put("type", "row"); + nestedObjectConfig.put("fields", nestedObjectFields); + fields.add(nestedObjectConfig); + + fields.add(createFieldConfig("MyMapField", "map")); + + schemaConfig.put("fields", fields); + sourceOptions.put("schema", schemaConfig); + + List readRows = + SourceFlowTestUtils.runBatchWithCheckpointDisabled( + ReadonlyConfig.fromMap(sourceOptions), new KafkaSourceFactory()); + + Assertions.assertEquals(2, readRows.size()); + + readRows.forEach( + row -> { + Assertions.assertAll( + "Verify case-sensitive field values from source", + () -> Assertions.assertNotNull(row.getField(0)), // MyIntField + () -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString + () -> Assertions.assertNotNull(row.getField(2))); // snake_case_field + }); + } + + private Map createFieldConfig(String name, String type) { + Map fieldConfig = new HashMap<>(); + fieldConfig.put("name", name); + fieldConfig.put("type", type); + return fieldConfig; + } + + private SeaTunnelRowType buildCaseSensitiveSeaTunnelRowType() { + SeaTunnelRowType nestedType = + new SeaTunnelRowType( + new String[] {"NestedField", "AnotherField"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + + return new SeaTunnelRowType( + new String[] { + "MyIntField", + "CamelCaseString", + "snake_case_field", + "nestedObject", + "MyMapField" + }, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + nestedType, + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + } + + private List createCaseSensitiveTestRows() { + SeaTunnelRow nestedRow = new SeaTunnelRow(2); + nestedRow.setField(0, "nested_value"); + nestedRow.setField(1, 999); + + Map mapData = new HashMap<>(); + mapData.put("key1", 100); + mapData.put("key2", 200); + + SeaTunnelRow row1 = new SeaTunnelRow(5); + row1.setField(0, 1); + row1.setField(1, "test_string_1"); + row1.setField(2, "snake_value_1"); + row1.setField(3, nestedRow); + row1.setField(4, mapData); + + SeaTunnelRow row2 = new SeaTunnelRow(5); + row2.setField(0, 2); + row2.setField(1, "test_string_2"); + row2.setField(2, "snake_value_2"); + row2.setField(3, nestedRow); + row2.setField(4, mapData); + + return Arrays.asList(row1, row2); + } + private SeaTunnelRowType buildSeaTunnelRowType() { SeaTunnelRowType addressType = new SeaTunnelRowType( @@ -2030,7 +2292,7 @@ private SeaTunnelRowType buildSeaTunnelRowType() { "c_bool", "c_string", "c_bytes", - "Address", + "address", "attributes", "phone_numbers" }, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf index e489d222325..ddbc035a3fb 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf.conf @@ -41,7 +41,7 @@ source { c_string = string c_bytes = bytes - Address { + address { city = string state = string street = string @@ -97,4 +97,4 @@ sink { } """ } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf new file mode 100644 index 00000000000..700d25ddd3f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/fake_to_kafka_protobuf_case_sensitive.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # spark config + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + +} +source { + FakeSource { + parallelism = 1 + plugin_output = "fake" + row.num = 16 + schema = { + fields { + MyIntField = int + CamelCaseString = string + snake_case_field = string + + nestedObject { + NestedField = string + AnotherField = int + } + MyMapField = "map" + } + } + } +} + +sink { + kafka { + topic = "test_protobuf_case_sensitive_topic" + bootstrap.servers = "kafkaCluster:9092" + format = protobuf + kafka.request.timeout.ms = 60000 + kafka.config = { + acks = "all" + request.timeout.ms = 60000 + buffer.memory = 33554432 + } + protobuf_message_name = TestCaseSensitive + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufCaseSensitiveE2E"; + + message TestCaseSensitive { + int32 MyIntField = 1; + string CamelCaseString = 2; + string snake_case_field = 3; + + message NestedObject { + string NestedField = 1; + int32 AnotherField = 2; + } + + NestedObject nestedObject = 4; + + map MyMapField = 5; + } + """ + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf new file mode 100644 index 00000000000..7dd43a5be73 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_case_sensitive_to_assert.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # spark config + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + topic = "test_protobuf_case_sensitive_topic" + format = protobuf + protobuf_message_name = TestCaseSensitive + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufCaseSensitiveE2E"; + + message TestCaseSensitive { + int32 MyIntField = 1; + string CamelCaseString = 2; + string snake_case_field = 3; + + message NestedObject { + string NestedField = 1; + int32 AnotherField = 2; + } + + NestedObject nestedObject = 4; + + map MyMapField = 5; + } + """ + schema = { + fields { + MyIntField = int + CamelCaseString = string + snake_case_field = string + + nestedObject { + NestedField = string + AnotherField = int + } + MyMapField = "map" + } + } + bootstrap.servers = "kafkaCluster:9092" + start_mode = "earliest" + plugin_output = "kafka_table" + } +} + +sink { + Assert { + plugin_input = "kafka_table" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 16 + }, + { + rule_type = MIN_ROW + rule_value = 16 + } + ], + field_rules = [ + { + field_name = MyIntField + field_type = int + }, + { + field_name = CamelCaseString + field_type = string + }, + { + field_name = snake_case_field + field_type = string + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf index 2570fcbdcf4..cbe4e7486a6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_to_assert.conf @@ -70,7 +70,7 @@ source { c_string = string c_bytes = bytes - Address { + address { city = string state = string street = string @@ -174,4 +174,4 @@ sink { ] } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf index 86e9342d310..7252206f96d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_transform_to_assert.conf @@ -70,7 +70,7 @@ source { c_string = string c_bytes = bytes - Address { + address { city = string state = string street = string @@ -89,7 +89,7 @@ transform { Sql { plugin_input = "kafka_table" plugin_output = "kafka_table_transform" - query = "select Address.city,c_string from dual" + query = "select address.city,c_string from dual" } } diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java index 3f7d01b3ff6..459f93860fe 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java @@ -143,13 +143,19 @@ private Object convertField( return res; case ROW: - Descriptors.Descriptor nestedTypeByName = - descriptor.findNestedTypeByName(fieldName); - DynamicMessage s = - (DynamicMessage) - dynamicMessage.getField( - descriptor.findFieldByName(fieldName.toLowerCase())); - return converter(nestedTypeByName, s, (SeaTunnelRowType) dataType); + if (descriptor == null || dynamicMessage == null || fieldName == null) { + return null; + } + Descriptors.FieldDescriptor rowField = descriptor.findFieldByName(fieldName); + if (rowField == null) { + return null; + } + Descriptors.Descriptor rowDescriptor = rowField.getMessageType(); + if (rowDescriptor == null) { + return null; + } + DynamicMessage rowMessage = (DynamicMessage) dynamicMessage.getField(rowField); + return converter(rowDescriptor, rowMessage, (SeaTunnelRowType) dataType); case ARRAY: SeaTunnelDataType basicType = ((ArrayType) dataType).getElementType(); List list = (List) val; diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java index a6a75aaee3f..57413fec0cb 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/RowToProtobufConverter.java @@ -56,8 +56,15 @@ public byte[] convertRowToGenericRecord(SeaTunnelRow element) { if (resolvedValue instanceof byte[]) { resolvedValue = ByteString.copyFrom((byte[]) resolvedValue); } - builder.setField( - descriptor.findFieldByName(fieldName.toLowerCase()), resolvedValue); + Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName); + if (fieldDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' not found in Protobuf schema. Available fields: %s", + fieldName, descriptor.getFields())); + } + builder.setField(fieldDescriptor, resolvedValue); } } @@ -136,15 +143,46 @@ private Object handleRowType( SeaTunnelRow seaTunnelRow = (SeaTunnelRow) data; SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType) seaTunnelDataType).getFieldTypes(); String[] fieldNames = ((SeaTunnelRowType) seaTunnelDataType).getFieldNames(); - Descriptors.Descriptor nestedTypeDescriptor = descriptor.findNestedTypeByName(fieldName); + + // Get the field descriptor for the nested message field + Descriptors.FieldDescriptor fieldDescriptor = descriptor.findFieldByName(fieldName); + if (fieldDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' not found in Protobuf schema. Available fields: %s", + fieldName, descriptor.getFields())); + } + + // Get the message type descriptor from the field + Descriptors.Descriptor nestedTypeDescriptor = fieldDescriptor.getMessageType(); + if (nestedTypeDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' is not a message type in Protobuf schema", fieldName)); + } + DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(nestedTypeDescriptor); for (int i = 0; i < fieldNames.length; i++) { Object resolvedValue = resolveObject( fieldNames[i], seaTunnelRow.getField(i), fieldTypes[i], nestedBuilder); - nestedBuilder.setField( - nestedTypeDescriptor.findFieldByName(fieldNames[i]), resolvedValue); + if (resolvedValue != null) { + Descriptors.FieldDescriptor nestedFieldDescriptor = + nestedTypeDescriptor.findFieldByName(fieldNames[i]); + if (nestedFieldDescriptor == null) { + throw new SeaTunnelProtobufFormatException( + ProtobufFormatErrorCode.PROTOBUF_SCHEMA_ILLEGAL, + String.format( + "Field '%s' not found in nested message type '%s'. Available fields: %s", + fieldNames[i], + nestedTypeDescriptor.getName(), + nestedTypeDescriptor.getFields())); + } + nestedBuilder.setField(nestedFieldDescriptor, resolvedValue); + } } return nestedBuilder.build(); diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java index e43125a41a1..39f86b492ec 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/exception/ProtobufFormatErrorCode.java @@ -21,7 +21,8 @@ public enum ProtobufFormatErrorCode implements SeaTunnelErrorCode { DESCRIPTOR_CONVERT_FAILED("PROTOBUF-01", "Protobuf descriptor conversion failed."), - UNSUPPORTED_DATA_TYPE("PROTOBUF-02", "Unsupported data type."); + UNSUPPORTED_DATA_TYPE("PROTOBUF-02", "Unsupported data type."), + PROTOBUF_SCHEMA_ILLEGAL("PROTOBUF-03", "Illegal Protobuf schema definition."); private final String code; private final String description; diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java index e3d6dd35a2d..8eb112d2b97 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/test/java/org/apache/seatunnel/format/protobuf/ProtobufConverterTest.java @@ -46,10 +46,11 @@ private SeaTunnelRow buildSeaTunnelRow() { String[] phoneNumbers = {"1", "2"}; byte[] byteVal = {1, 2, 3}; - SeaTunnelRow address = new SeaTunnelRow(3); - address.setField(0, "city_value"); - address.setField(1, "state_value"); - address.setField(2, "street_value"); + SeaTunnelRow address = new SeaTunnelRow(4); + address.setField(0, "street_value"); + address.setField(1, "city_value"); + address.setField(2, "state_value"); + address.setField(3, "zip_value"); seaTunnelRow.setField(0, 123); seaTunnelRow.setField(1, 123123123123L); @@ -68,9 +69,12 @@ private SeaTunnelRow buildSeaTunnelRow() { private SeaTunnelRowType buildSeaTunnelRowType() { SeaTunnelRowType addressType = new SeaTunnelRowType( - new String[] {"city", "state", "street"}, + new String[] {"street", "city", "state", "zip"}, new SeaTunnelDataType[] { - BasicType.STRING_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE }); return new SeaTunnelRowType( @@ -82,7 +86,7 @@ private SeaTunnelRowType buildSeaTunnelRowType() { "c_bool", "c_string", "c_bytes", - "Address", + "address", "attributes", "phone_numbers" }, @@ -152,4 +156,251 @@ public void testConverter() Assertions.assertEquals(originalRow, convertedRow); } + + @Test + public void testFieldNameCaseSensitive() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test that field names are case-sensitive and not converted to lowercase + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"MyIntField", "CamelCaseString", "snake_case_field"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, 100); + row.setField(1, "test"); + row.setField(2, "value"); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 MyIntField = 1;\n" + + " string CamelCaseString = 2;\n" + + " string snake_case_field = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + 100, dynamicMessage.getField(descriptor.findFieldByName("MyIntField"))); + Assertions.assertEquals( + "test", dynamicMessage.getField(descriptor.findFieldByName("CamelCaseString"))); + Assertions.assertEquals( + "value", dynamicMessage.getField(descriptor.findFieldByName("snake_case_field"))); + } + + @Test + public void testNullFieldHandling() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test that null fields are handled correctly + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"field1", "field2", "field3"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, 42); + row.setField(1, null); // null value + row.setField(2, true); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 field1 = 1;\n" + + " string field2 = 2;\n" + + " bool field3 = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals(42, dynamicMessage.getField(descriptor.findFieldByName("field1"))); + // In proto3, null string fields default to empty string + Assertions.assertEquals("", dynamicMessage.getField(descriptor.findFieldByName("field2"))); + Assertions.assertEquals( + true, dynamicMessage.getField(descriptor.findFieldByName("field3"))); + } + + @Test + public void testNestedRowTypeWithCaseSensitiveFields() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test nested row type with case-sensitive field names + SeaTunnelRowType nestedType = + new SeaTunnelRowType( + new String[] {"NestedField", "AnotherField"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, BasicType.INT_TYPE}); + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"TopLevelField", "MyNestedObject"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE, nestedType}); + + SeaTunnelRow nestedRow = new SeaTunnelRow(2); + nestedRow.setField(0, "nested_value"); + nestedRow.setField(1, 999); + + SeaTunnelRow row = new SeaTunnelRow(2); + row.setField(0, "top_value"); + row.setField(1, nestedRow); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " string TopLevelField = 1;\n" + + " message MyNestedObjectType {\n" + + " string NestedField = 1;\n" + + " int32 AnotherField = 2;\n" + + " }\n" + + " MyNestedObjectType MyNestedObject = 2;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + "top_value", dynamicMessage.getField(descriptor.findFieldByName("TopLevelField"))); + + DynamicMessage nestedMessage = + (DynamicMessage) + dynamicMessage.getField(descriptor.findFieldByName("MyNestedObject")); + Descriptors.Descriptor nestedDescriptor = + descriptor.findNestedTypeByName("MyNestedObjectType"); + Assertions.assertEquals( + "nested_value", + nestedMessage.getField(nestedDescriptor.findFieldByName("NestedField"))); + Assertions.assertEquals( + 999, nestedMessage.getField(nestedDescriptor.findFieldByName("AnotherField"))); + } + + @Test + public void testMapTypeWithCaseSensitiveFieldName() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test map type with case-sensitive field name + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"MyMapField"}, + new SeaTunnelDataType[] { + new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE) + }); + + Map mapData = new HashMap<>(); + mapData.put("key1", 100); + mapData.put("key2", 200); + + SeaTunnelRow row = new SeaTunnelRow(1); + row.setField(0, mapData); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " map MyMapField = 1;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + @SuppressWarnings("unchecked") + java.util.List mapEntries = + (java.util.List) + dynamicMessage.getField(descriptor.findFieldByName("MyMapField")); + Assertions.assertEquals(2, mapEntries.size()); + } + + @Test + public void testTinyIntUnsignedConversion() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test TINYINT to unsigned int conversion + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"tinyint_field"}, + new SeaTunnelDataType[] {BasicType.BYTE_TYPE}); + + SeaTunnelRow row = new SeaTunnelRow(1); + row.setField(0, (byte) -1); // -1 as signed byte should become 255 as unsigned + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " int32 tinyint_field = 1;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertEquals( + 255, dynamicMessage.getField(descriptor.findFieldByName("tinyint_field"))); + } + + @Test + public void testAllNullFields() + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Test when all fields are null + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"field1", "field2", "field3"}, + new SeaTunnelDataType[] { + BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.DOUBLE_TYPE + }); + + SeaTunnelRow row = new SeaTunnelRow(3); + row.setField(0, null); + row.setField(1, null); + row.setField(2, null); + + String protoContent = + "syntax = \"proto3\";\n" + + "package org.apache.seatunnel.format.protobuf;\n" + + "message TestMessage {\n" + + " string field1 = 1;\n" + + " int32 field2 = 2;\n" + + " double field3 = 3;\n" + + "}"; + + String messageName = "TestMessage"; + Descriptors.Descriptor descriptor = + CompileDescriptor.compileDescriptorTempFile(protoContent, messageName); + + RowToProtobufConverter converter = new RowToProtobufConverter(rowType, descriptor); + byte[] protobufMessage = converter.convertRowToGenericRecord(row); + + // Should not throw exception, and produce valid protobuf message with default values + Assertions.assertNotNull(protobufMessage); + DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, protobufMessage); + Assertions.assertNotNull(dynamicMessage); + } }