diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md index 9c0363c89e8..6e5ffc64f97 100644 --- a/docs/en/connector-v2/source/Kafka.md +++ b/docs/en/connector-v2/source/Kafka.md @@ -58,8 +58,9 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | | protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name | | protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition | +| strip_schema_registry_header | Boolean | No | false | Effective when the format is set to protobuf. Whether to strip the Confluent Schema Registry wire format header (magic byte, schema id and message indexes) before protobuf deserialization. This option is useful when consuming Protobuf messages that were encoded using Confluent Schema Registry. When enabled, the connector will try to detect and remove the Schema Registry header before parsing the Protobuf message. If the header is not detected, it will fall back to standard Protobuf deserialization. | | reader_cache_queue_size | Integer | No | 1024 | The reader shard cache queue is used to cache the data corresponding to the shards. The size of the shard cache depends on the number of shards obtained by each reader, rather than the amount of data in each shard. | -| is_native | Boolean | No | false | Supports retaining the source information of the record. +| is_native | Boolean | No | false | Supports retaining the source information of the record. ### debezium_record_table_filter @@ -406,6 +407,59 @@ source { } ``` +### Protobuf with Schema Registry wire format + +When consuming Protobuf messages that were encoded using Confluent Schema Registry, you need to set `strip_schema_registry_header` to `true`. The connector will automatically detect and remove the Schema Registry wire format header (magic byte, schema id, and message indexes) before deserializing the Protobuf message. + +Example: + +```hocon +source { + Kafka { + topic = "test_protobuf_schema_registry_topic" + format = protobuf + strip_schema_registry_header = true + protobuf_message_name = Person + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufE2E"; + + message Person { + int32 c_int32 = 1; + int64 c_int64 = 2; + float c_float = 3; + double c_double = 4; + bool c_bool = 5; + string c_string = 6; + bytes c_bytes = 7; + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip = 4; + } + + Address address = 8; + + map attributes = 9; + + repeated string phone_numbers = 10; + } + """ + bootstrap.servers = "kafkaCluster:9092" + start_mode = "earliest" + plugin_output = "kafka_table" + } +} +``` + +**Note**: When `strip_schema_registry_header` is enabled, the connector can safely handle both Schema Registry encoded messages and plain Protobuf messages. If the Schema Registry header is not detected, it will automatically fall back to standard Protobuf deserialization. +``` + ### Ignore No Leader Partition When dealing with Kafka clusters that may have temporary leadership issues, you can configure the connector to ignore partitions without a leader: diff --git a/docs/zh/connector-v2/source/Kafka.md b/docs/zh/connector-v2/source/Kafka.md index 0f16ef3cf4b..89c396baa96 100644 --- a/docs/zh/connector-v2/source/Kafka.md +++ b/docs/zh/connector-v2/source/Kafka.md @@ -58,6 +58,7 @@ import ChangeLog from '../changelog/connector-kafka.md'; | common-options | | 否 | - | 源插件的常见参数,详情请参考 [Source Common Options](../source-common-options.md)。 | | protobuf_message_name | String | 否 | - | 当格式设置为 protobuf 时有效,指定消息名称。 | | protobuf_schema | String | 否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。 | +| strip_schema_registry_header | Boolean | 否 | false | 当格式设置为 protobuf 时有效。是否在 Protobuf 反序列化之前去除 Confluent Schema Registry 线格式头部(magic byte、schema id 和 message indexes)。当消费使用 Confluent Schema Registry 编码的 Protobuf 消息时,此选项非常有用。启用后,连接器将尝试在解析 Protobuf 消息之前检测并删除 Schema Registry 头部。如果未检测到头部,它将回退到标准的 Protobuf 反序列化。 | | reader_cache_queue_size | Integer | 否 | 1024 | Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。 | | is_native | Boolean | No | false | 支持保留record的源信息。 | @@ -399,6 +400,59 @@ source { } ``` +### Protobuf with Schema Registry wire format + +当消费使用 Confluent Schema Registry 编码的 Protobuf 消息时,您需要将 `strip_schema_registry_header` 设置为 `true`。连接器将自动检测并删除 Schema Registry 格式头部(magic byte、schema id 和 message indexes),然后再反序列化 Protobuf 消息。 + +使用样例: + +```hocon +source { + Kafka { + topic = "test_protobuf_schema_registry_topic" + format = protobuf + strip_schema_registry_header = true + protobuf_message_name = Person + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufE2E"; + + message Person { + int32 c_int32 = 1; + int64 c_int64 = 2; + float c_float = 3; + double c_double = 4; + bool c_bool = 5; + string c_string = 6; + bytes c_bytes = 7; + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip = 4; + } + + Address address = 8; + + map attributes = 9; + + repeated string phone_numbers = 10; + } + """ + bootstrap.servers = "kafkaCluster:9092" + start_mode = "earliest" + plugin_output = "kafka_table" + } +} +``` + +**注意**:当启用 `strip_schema_registry_header` 时,连接器可以安全地处理 Schema Registry 编码的消息和纯 Protobuf 消息。如果未检测到 Schema Registry 头部,它将自动回退到标准 Protobuf 反序列化。 +``` + ### 忽略无 Leader 分区 当处理可能存在临时 leader 问题的 Kafka 集群时,您可以配置连接器忽略没有 leader 的分区: diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java index ec1441320a5..33e8413cef2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java @@ -125,4 +125,13 @@ public class KafkaSourceOptions extends KafkaBaseOptions { .noDefaultValue() .withDescription( "The time required for consumption mode to be timestamp.The endTimestamp configuration specifies the end timestamp of the messages and is only applicable in batch mode"); + + public static final Option STRIP_SCHEMA_REGISTRY_HEADER = + Options.key("strip_schema_registry_header") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to strip the Confluent Schema Registry wire format header " + + "(magic byte, schema id and message indexes) before " + + "protobuf deserialization."); } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index ce2c38cbb04..017325f057d 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -55,6 +55,7 @@ import org.apache.seatunnel.format.json.maxwell.MaxWellJsonDeserializationSchema; import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema; import org.apache.seatunnel.format.protobuf.ProtobufDeserializationSchema; +import org.apache.seatunnel.format.protobuf.SchemaRegistryAwareProtobufDeserializationSchema; import org.apache.seatunnel.format.text.TextDeserializationSchema; import org.apache.seatunnel.format.text.constant.TextFormatConstant; @@ -100,6 +101,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_END_TIMESTAMP; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.STRIP_SCHEMA_REGISTRY_HEADER; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.TOPIC; public class KafkaSourceConfig implements Serializable { @@ -408,7 +410,13 @@ private DeserializationSchema createDeserializationSchema( schema = new AvroDeserializationSchema(catalogTable); break; case PROTOBUF: - schema = new ProtobufDeserializationSchema(catalogTable); + boolean stripSchemaRegistryHeader = + readonlyConfig.get(STRIP_SCHEMA_REGISTRY_HEADER); + if (stripSchemaRegistryHeader) { + schema = new SchemaRegistryAwareProtobufDeserializationSchema(catalogTable); + } else { + schema = new ProtobufDeserializationSchema(catalogTable); + } break; default: throw new SeaTunnelJsonFormatException( diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java index 568622ce37c..e60afc3f0bf 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java @@ -59,7 +59,8 @@ public OptionRule optionRule() { KafkaSourceOptions.DEBEZIUM_RECORD_TABLE_FILTER, KafkaSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, KafkaSourceOptions.READER_CACHE_QUEUE_SIZE, - KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION) + KafkaSourceOptions.IGNORE_NO_LEADER_PARTITION, + KafkaSourceOptions.STRIP_SCHEMA_REGISTRY_HEADER) .conditional( KafkaSourceOptions.START_MODE, StartMode.TIMESTAMP, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 38c9184139f..85961389f0c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -1558,6 +1558,53 @@ private void sendData(DefaultSeaTunnelRowSerializer serializer) { producer.flush(); } + private byte[] wrapWithSchemaRegistryHeader(byte[] protobufBytes) { + // Confluent Schema Registry Protobuf wire format: + // magic byte (0) + 4 bytes schema id + 1 byte message index (varint for value 1) + byte magic = 0; + int schemaId = 1; + byte[] header = new byte[6]; + header[0] = magic; + header[1] = (byte) ((schemaId >> 24) & 0xFF); + header[2] = (byte) ((schemaId >> 16) & 0xFF); + header[3] = (byte) ((schemaId >> 8) & 0xFF); + header[4] = (byte) (schemaId & 0xFF); + header[5] = 1; // single message index + + byte[] result = new byte[header.length + protobufBytes.length]; + System.arraycopy(header, 0, result, 0, header.length); + System.arraycopy(protobufBytes, 0, result, header.length, protobufBytes.length); + return result; + } + + private void sendSchemaRegistryHeaderData(DefaultSeaTunnelRowSerializer serializer) { + // Produce Schema Registry wire-format records to Kafka + IntStream.range(0, 20) + .forEach( + i -> { + try { + SeaTunnelRow originalRow = buildSeaTunnelRow(); + ProducerRecord originalRecord = + serializer.serializeRow(originalRow); + byte[] wrappedValue = + wrapWithSchemaRegistryHeader(originalRecord.value()); + ProducerRecord wrappedRecord = + new ProducerRecord<>( + originalRecord.topic(), + originalRecord.partition(), + originalRecord.key(), + wrappedValue); + producer.send(wrappedRecord).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException( + "Error sending Kafka message with Schema Registry header", + e); + } + }); + + producer.flush(); + } + @TestTemplate public void testKafkaProtobufForTransformToAssert(TestContainer container) throws IOException, InterruptedException, URISyntaxException { @@ -1608,6 +1655,59 @@ public void testKafkaProtobufForTransformToAssert(TestContainer container) } } + @TestTemplate + public void testKafkaProtobufSchemaRegistryHeaderForTransformToAssert(TestContainer container) + throws IOException, InterruptedException, URISyntaxException { + + String confFile = + "/protobuf/kafka_protobuf_schema_registry_header_transform_to_assert.conf"; + String path = getTestConfigFile(confFile); + Config config = ConfigFactory.parseFile(new File(path)); + Config sinkConfig = config.getConfigList("source").get(0); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig); + SeaTunnelRowType seaTunnelRowType = buildSeaTunnelRowType(); + + // Create serializer + DefaultSeaTunnelRowSerializer serializer = + getDefaultSeaTunnelRowSerializer( + "test_protobuf_schema_registry_topic_transform_fake_source", + seaTunnelRowType, + readonlyConfig); + + // Produce Schema Registry wire-format records to Kafka + sendSchemaRegistryHeaderData(serializer); + + // Execute the job and validate + Container.ExecResult execResult = container.executeJob(confFile); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + try (KafkaConsumer consumer = + new KafkaConsumer<>(kafkaByteConsumerConfig())) { + consumer.subscribe(Arrays.asList("verify_protobuf_schema_registry_transform")); + Map offsets = + consumer.endOffsets( + Arrays.asList( + new TopicPartition( + "verify_protobuf_schema_registry_transform", 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + String data = new String(record.value(), "UTF-8"); + ObjectNode jsonNodes = JsonUtils.parseObject(data); + Assertions.assertEquals(jsonNodes.size(), 2); + Assertions.assertEquals(jsonNodes.get("city").asText(), "city_value"); + Assertions.assertEquals(jsonNodes.get("c_string").asText(), "test data"); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + } + public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { URL resource = KafkaIT.class.getResource(configFile); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_schema_registry_header_transform_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_schema_registry_header_transform_to_assert.conf new file mode 100644 index 00000000000..6154d0bd5f3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/protobuf/kafka_protobuf_schema_registry_header_transform_to_assert.conf @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + topic = "test_protobuf_schema_registry_topic_transform_fake_source" + format = protobuf + strip_schema_registry_header = true + protobuf_message_name = Person + protobuf_schema = """ + syntax = "proto3"; + + package org.apache.seatunnel.format.protobuf; + + option java_outer_classname = "ProtobufE2E"; + + message Person { + int32 c_int32 = 1; + int64 c_int64 = 2; + float c_float = 3; + double c_double = 4; + bool c_bool = 5; + string c_string = 6; + bytes c_bytes = 7; + + message Address { + string street = 1; + string city = 2; + string state = 3; + string zip = 4; + } + + Address address = 8; + + map attributes = 9; + + repeated string phone_numbers = 10; + } + """ + schema = { + fields { + c_int32 = int + c_int64 = long + c_float = float + c_double = double + c_bool = boolean + c_string = string + c_bytes = bytes + + Address { + city = string + state = string + street = string + } + attributes = "map" + phone_numbers = "array" + } + } + bootstrap.servers = "kafkaCluster:9092" + start_mode = "earliest" + plugin_output = "kafka_table" + } +} + +transform { + Sql { + plugin_input = "kafka_table" + plugin_output = "kafka_table_transform" + query = "select Address.city,c_string from dual" + } +} + +sink { + kafka { + topic = "verify_protobuf_schema_registry_transform" + plugin_input = "kafka_table_transform" + bootstrap.servers = "kafkaCluster:9092" + kafka.request.timeout.ms = 60000 + kafka.config = { + acks = "all" + request.timeout.ms = 60000 + buffer.memory = 33554432 + } + + } +} + diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/SchemaRegistryAwareProtobufDeserializationSchema.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/SchemaRegistryAwareProtobufDeserializationSchema.java new file mode 100644 index 00000000000..12e872af59c --- /dev/null +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/SchemaRegistryAwareProtobufDeserializationSchema.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.protobuf; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A Protobuf deserialization schema that is aware of Confluent Schema Registry's wire format. + * + *

This schema will try to strip the Schema Registry header (magic byte, schema id and message + * indexes) before delegating to {@link ProtobufDeserializationSchema}. If stripping fails, it falls + * back to using the original payload, so it can safely be enabled for both plain and Schema + * Registry encoded messages. + */ +public class SchemaRegistryAwareProtobufDeserializationSchema + implements DeserializationSchema { + + private static final long serialVersionUID = -2134049729306615854L; + + /** + * Maximum number of additional header bytes (beyond the 5 bytes magic + schema id) to probe + * when trying to locate the actual Protobuf message. This covers the variable-length "message + * indexes" part used by Schema Registry for Protobuf. + */ + private static final int MAX_ADDITIONAL_HEADER_BYTES = 16; + + private static final Logger LOG = + LoggerFactory.getLogger(SchemaRegistryAwareProtobufDeserializationSchema.class); + + private final ProtobufDeserializationSchema inner; + private final SeaTunnelRowType rowType; + + public SchemaRegistryAwareProtobufDeserializationSchema(CatalogTable catalogTable) { + this.inner = new ProtobufDeserializationSchema(catalogTable); + this.rowType = catalogTable.getSeaTunnelRowType(); + } + + @Override + public SeaTunnelRow deserialize(byte[] message) throws IOException { + if (message == null || message.length == 0) { + return inner.deserialize(message); + } + + int length = message.length; + + // Confluent Schema Registry Protobuf wire format: + // 1 byte magic (0), 4 bytes schema id, N bytes message indexes (varints), then protobuf. + if (length >= 6 && message[0] == 0) { + int candidateStart = 6; + if (candidateStart < length) { + try { + return inner.deserialize(Arrays.copyOfRange(message, candidateStart, length)); + } catch (IOException | RuntimeException ignored) { + LOG.warn("Protobuf message not recognized at candidate offset 6, falling back"); + } + } + + int maxProbeStart = Math.min(5 + MAX_ADDITIONAL_HEADER_BYTES, length - 1); + for (int start = 5; start <= maxProbeStart; start++) { + if (start == candidateStart) { + continue; + } + try { + return inner.deserialize(Arrays.copyOfRange(message, start, length)); + } catch (IOException | RuntimeException ignored) { + LOG.warn( + "Protobuf message not recognized at candidate offset {}, falling back", + start); + } + } + } + + return inner.deserialize(message); + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowType; + } +}